mirror of
https://git.datalinker.icu/vllm-project/vllm.git
synced 2025-12-10 06:25:01 +08:00
125 lines
3.5 KiB
Python
125 lines
3.5 KiB
Python
# SPDX-License-Identifier: Apache-2.0
|
|
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
|
|
|
import asyncio
|
|
import os
|
|
from typing import Any, Callable, Optional, Union
|
|
|
|
import pytest
|
|
|
|
from vllm.engine.arg_utils import AsyncEngineArgs, EngineArgs
|
|
from vllm.sampling_params import SamplingParams
|
|
from vllm.v1.engine.async_llm import AsyncLLM
|
|
from vllm.v1.engine.llm_engine import LLMEngine
|
|
from vllm.v1.executor.multiproc_executor import MultiprocExecutor
|
|
|
|
|
|
class Mock: ...
|
|
|
|
|
|
class CustomMultiprocExecutor(MultiprocExecutor):
|
|
def collective_rpc(
|
|
self,
|
|
method: Union[str, Callable],
|
|
timeout: Optional[float] = None,
|
|
args: tuple = (),
|
|
kwargs: Optional[dict] = None,
|
|
non_block: bool = False,
|
|
unique_reply_rank: Optional[int] = None,
|
|
) -> list[Any]:
|
|
# Drop marker to show that this was run
|
|
with open(".marker", "w"):
|
|
...
|
|
return super().collective_rpc(method, timeout, args, kwargs)
|
|
|
|
|
|
CustomMultiprocExecutorAsync = CustomMultiprocExecutor
|
|
MODEL = "Qwen/Qwen3-0.6B"
|
|
|
|
|
|
def test_custom_executor_type_checking():
|
|
with pytest.raises(ValueError):
|
|
engine_args = EngineArgs(
|
|
model=MODEL,
|
|
gpu_memory_utilization=0.2,
|
|
max_model_len=8192,
|
|
distributed_executor_backend=Mock,
|
|
)
|
|
LLMEngine.from_engine_args(engine_args)
|
|
with pytest.raises(ValueError):
|
|
engine_args = AsyncEngineArgs(
|
|
model=MODEL,
|
|
gpu_memory_utilization=0.2,
|
|
max_model_len=8192,
|
|
distributed_executor_backend=Mock,
|
|
)
|
|
AsyncLLM.from_engine_args(engine_args)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"distributed_executor_backend",
|
|
[
|
|
CustomMultiprocExecutor,
|
|
"tests.v1.executor.test_executor.CustomMultiprocExecutor",
|
|
],
|
|
)
|
|
def test_custom_executor(distributed_executor_backend, tmp_path):
|
|
cwd = os.path.abspath(".")
|
|
os.chdir(tmp_path)
|
|
try:
|
|
assert not os.path.exists(".marker")
|
|
|
|
engine_args = EngineArgs(
|
|
model=MODEL,
|
|
gpu_memory_utilization=0.2,
|
|
max_model_len=8192,
|
|
distributed_executor_backend=distributed_executor_backend,
|
|
enforce_eager=True, # reduce test time
|
|
)
|
|
engine = LLMEngine.from_engine_args(engine_args)
|
|
sampling_params = SamplingParams(max_tokens=1)
|
|
|
|
engine.add_request("0", "foo", sampling_params)
|
|
engine.step()
|
|
|
|
assert os.path.exists(".marker")
|
|
finally:
|
|
os.chdir(cwd)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"distributed_executor_backend",
|
|
[
|
|
CustomMultiprocExecutorAsync,
|
|
"tests.v1.executor.test_executor.CustomMultiprocExecutorAsync",
|
|
],
|
|
)
|
|
def test_custom_executor_async(distributed_executor_backend, tmp_path):
|
|
cwd = os.path.abspath(".")
|
|
os.chdir(tmp_path)
|
|
try:
|
|
assert not os.path.exists(".marker")
|
|
|
|
engine_args = AsyncEngineArgs(
|
|
model=MODEL,
|
|
gpu_memory_utilization=0.2,
|
|
max_model_len=8192,
|
|
distributed_executor_backend=distributed_executor_backend,
|
|
enforce_eager=True, # reduce test time
|
|
)
|
|
engine = AsyncLLM.from_engine_args(engine_args)
|
|
sampling_params = SamplingParams(max_tokens=1)
|
|
|
|
async def t():
|
|
stream = engine.generate(
|
|
request_id="0", prompt="foo", sampling_params=sampling_params
|
|
)
|
|
async for x in stream:
|
|
...
|
|
|
|
asyncio.run(t())
|
|
|
|
assert os.path.exists(".marker")
|
|
finally:
|
|
os.chdir(cwd)
|