From 9dfbeb41e510ad04e90b03ef2f437f476c5abe28 Mon Sep 17 00:00:00 2001 From: Shiyan Deng Date: Fri, 5 Sep 2025 14:14:18 -0700 Subject: [PATCH] [RFC] allow cancelation after shutdown in blocking collective_rpc (#23390) Signed-off-by: Shiyan Deng --- vllm/v1/executor/multiproc_executor.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index 12e79ff165f4e..84eb956b5c254 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -253,7 +253,8 @@ class MultiprocExecutor(Executor): if not non_block: result = result.result() elif not non_block: - result = get_response(w, dequeue_timeout) + result = get_response(w, dequeue_timeout, + self.shutdown_event) else: raise RuntimeError("non_block can only be used when" " max_concurrent_batches > 1") @@ -295,12 +296,8 @@ class MultiprocExecutor(Executor): """Properly shut down the executor and its workers""" if not getattr(self, 'shutting_down', False): self.shutting_down = True - self.shutdown_event.set() - - if self.io_thread_pool is not None: - self.io_thread_pool.shutdown(wait=False, cancel_futures=True) - self.io_thread_pool = None + # Make sure all the worker processes are terminated first. if workers := getattr(self, 'workers', None): for w in workers: # Close death_writer to signal child processes to exit @@ -310,6 +307,11 @@ class MultiprocExecutor(Executor): w.worker_response_mq = None self._ensure_worker_termination([w.proc for w in workers]) + self.shutdown_event.set() + if self.io_thread_pool is not None: + self.io_thread_pool.shutdown(wait=False, cancel_futures=True) + del self.io_thread_pool + self.rpc_broadcast_mq = None def check_health(self) -> None: