From 02a58803948e6b493a9bde6d38b69423a638ae49 Mon Sep 17 00:00:00 2001 From: Wentao Ye <44945378+yewentao256@users.noreply.github.com> Date: Fri, 12 Dec 2025 13:05:34 -0500 Subject: [PATCH] [CI] Fix mypy for vllm/v1/executor (#30517) Signed-off-by: yewentao256 --- tools/pre_commit/mypy.py | 2 +- vllm/v1/executor/abstract.py | 2 +- vllm/v1/executor/multiproc_executor.py | 10 +++++++--- vllm/v1/executor/ray_executor.py | 6 +++--- vllm/v1/executor/uniproc_executor.py | 13 ++++++++----- 5 files changed, 20 insertions(+), 13 deletions(-) diff --git a/tools/pre_commit/mypy.py b/tools/pre_commit/mypy.py index 724b393044266..3f7e0a069f869 100755 --- a/tools/pre_commit/mypy.py +++ b/tools/pre_commit/mypy.py @@ -43,6 +43,7 @@ FILES = [ "vllm/worker", "vllm/v1/core", "vllm/v1/engine", + "vllm/v1/executor", "vllm/v1/metrics", "vllm/v1/pool", "vllm/v1/sample", @@ -60,7 +61,6 @@ SEPARATE_GROUPS = [ "vllm/model_executor", # v1 related "vllm/v1/attention", - "vllm/v1/executor", "vllm/v1/kv_offload", "vllm/v1/spec_decode", "vllm/v1/structured_output", diff --git a/vllm/v1/executor/abstract.py b/vllm/v1/executor/abstract.py index db8303fcec501..8ada52435edae 100644 --- a/vllm/v1/executor/abstract.py +++ b/vllm/v1/executor/abstract.py @@ -219,7 +219,7 @@ class Executor(ABC): def sample_tokens( self, grammar_output: GrammarOutput | None, non_block: bool = False - ) -> ModelRunnerOutput | None | Future[ModelRunnerOutput | None]: + ) -> ModelRunnerOutput | Future[ModelRunnerOutput]: output = self.collective_rpc( # type: ignore[call-overload] "sample_tokens", args=(grammar_output,), non_block=non_block ) diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index 7e8ebe25c4603..b42d026a3e15b 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -294,8 +294,8 @@ class MultiprocExecutor(Executor): kwargs: dict | None = None, non_block: bool = False, unique_reply_rank: int | None = None, - kv_output_aggregator: KVOutputAggregator = None, - ) -> Any | list[Any] | Future[Any | list[Any]]: + kv_output_aggregator: KVOutputAggregator | None = None, + ) -> Any: """Returns single result if unique_reply_rank and/or kv_output_aggregator is provided, otherwise list.""" assert self.rpc_broadcast_mq is not None, ( @@ -476,6 +476,8 @@ class WorkerProc: """Wrapper that runs one Worker in a separate process.""" READY_STR = "READY" + rpc_broadcast_mq: MessageQueue | None + worker_response_mq: MessageQueue | None def _init_message_queues( self, input_shm_handle: Handle, vllm_config: VllmConfig @@ -487,7 +489,7 @@ class WorkerProc: ) # Initializes a message queue for sending the model output - self.worker_response_mq: MessageQueue = MessageQueue(1, 1) + self.worker_response_mq = MessageQueue(1, 1) self.peer_response_handles = [] else: # Initialize remote MessageQueue for receiving SchedulerOutput across nodes @@ -720,6 +722,7 @@ class WorkerProc: try: reader.close() worker = WorkerProc(*args, **kwargs) + assert worker.worker_response_mq is not None # Send READY once we know everything is loaded ready_writer.send( @@ -804,6 +807,7 @@ class WorkerProc: def worker_busy_loop(self, cancel: threading.Event | None = None): """Main busy loop for Multiprocessing Workers""" + assert self.rpc_broadcast_mq is not None while True: method, args, kwargs, output_rank = self.rpc_broadcast_mq.dequeue( cancel=cancel, indefinite=True diff --git a/vllm/v1/executor/ray_executor.py b/vllm/v1/executor/ray_executor.py index 406eafcd339b0..2fd64e5c2277c 100644 --- a/vllm/v1/executor/ray_executor.py +++ b/vllm/v1/executor/ray_executor.py @@ -413,7 +413,7 @@ class RayDistributedExecutor(Executor): self, grammar_output: "GrammarOutput | None", non_block: bool = False, - ) -> ModelRunnerOutput | Future[ModelRunnerOutput]: + ) -> ModelRunnerOutput | None | Future[ModelRunnerOutput | None]: """Execute the model on the Ray workers. The scheduler output to use should have been provided in @@ -428,7 +428,7 @@ class RayDistributedExecutor(Executor): """ scheduler_output = self.scheduler_output if scheduler_output is None: - return COMPLETED_NONE_FUTURE if non_block else None # noqa + return COMPLETED_NONE_FUTURE if non_block else None self.scheduler_output = None @@ -439,7 +439,7 @@ class RayDistributedExecutor(Executor): scheduler_output: SchedulerOutput, grammar_output: "GrammarOutput | None", non_block: bool = False, - ) -> ModelRunnerOutput | Future[ModelRunnerOutput]: + ) -> ModelRunnerOutput | None | Future[ModelRunnerOutput | None]: # Build the compiled DAG for the first time. if self.forward_dag is None: # type: ignore self.forward_dag = self._compiled_ray_dag(enable_asyncio=False) diff --git a/vllm/v1/executor/uniproc_executor.py b/vllm/v1/executor/uniproc_executor.py index 095d3d1dac21b..b8ca922554304 100644 --- a/vllm/v1/executor/uniproc_executor.py +++ b/vllm/v1/executor/uniproc_executor.py @@ -67,7 +67,7 @@ class UniProcExecutor(Executor): kwargs: dict | None = None, non_block: bool = False, single_value: bool = False, - ) -> Any | list[Any] | Future[Any | list[Any]]: + ) -> Any: if kwargs is None: kwargs = {} @@ -79,10 +79,13 @@ class UniProcExecutor(Executor): result = run_method(self.driver_worker, method, args, kwargs) if isinstance(result, AsyncModelRunnerOutput): if (async_thread := self.async_output_thread) is not None: - get_output = result.get_output - if not single_value: - get_output = lambda go=result.get_output: [go()] - return async_thread.submit(get_output) + if single_value: + return async_thread.submit(result.get_output) + + def get_output_list() -> list[Any]: + return [result.get_output()] + + return async_thread.submit(get_output_list) result = result.get_output() future = Future[Any]() future.set_result(result if single_value else [result])