From a403d0fa41cc68e3b6da4e1097dc896fde2f1a6a Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Wed, 27 Aug 2025 05:50:47 -0700 Subject: [PATCH] [Misc] Remove unnecessary `_send_reconfig_message()` in `core_client.py` (#23127) Signed-off-by: Nick Hill --- vllm/v1/engine/core_client.py | 31 +++++++++---------------------- 1 file changed, 9 insertions(+), 22 deletions(-) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 079dd9a7d38d..65f7abc97110 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -1190,21 +1190,6 @@ class DPLBAsyncMPClient(DPAsyncMPClient): await self._send_input(EngineCoreRequestType.ABORT, request_ids, engine) - async def _send_reconfig_message( - self, reconfig_request: ReconfigureDistributedRequest, - engine: EngineIdentity) -> asyncio.Future: - """Send reconfiguration message and return the result future without - waiting for completion.""" - call_id = uuid.uuid1().int >> 64 - future = asyncio.get_running_loop().create_future() - self.utility_results[call_id] = future - message = (EngineCoreRequestType.UTILITY.value, *self.encoder.encode( - (self.client_index, call_id, "reinitialize_distributed", - (reconfig_request, )))) - await self._send_input_message(message, engine, reconfig_request) - self._ensure_output_queue_task() - return future - async def scale_elastic_ep(self, new_data_parallel_size: int) -> None: """Scale elastic EP data parallel size""" cur_data_parallel_size = len(self.core_engines) @@ -1214,7 +1199,7 @@ class DPLBAsyncMPClient(DPAsyncMPClient): f"different from cur_data_parallel_size {cur_data_parallel_size}") assert self.vllm_config.parallel_config.data_parallel_backend == \ - "ray", ("Only ray DP backend supports scaling elastic EP") + "ray", "Only ray DP backend supports scaling elastic EP" scale_up = new_data_parallel_size > cur_data_parallel_size @@ -1246,9 +1231,10 @@ class DPLBAsyncMPClient(DPAsyncMPClient): data_parallel_master_ip, new_data_parallel_master_port=self.vllm_config.parallel_config. data_parallel_master_port) - future = await self._send_reconfig_message(reconfig_request, - engine) - reconfig_futures.append(future) + coro = self._call_utility_async("reinitialize_distributed", + reconfig_request, + engine=engine) + reconfig_futures.append(asyncio.create_task(coro)) logger.info("All reconfigure messages sent, starting engine creation") @@ -1318,9 +1304,10 @@ class DPLBAsyncMPClient(DPAsyncMPClient): if cur_dp_rank >= new_data_parallel_size: reconfig_request.new_data_parallel_rank = \ ReconfigureRankType.SHUTDOWN_CURRENT_RANK - future = await self._send_reconfig_message(reconfig_request, - engine) - reconfig_futures.append(future) + coro = self._call_utility_async("reinitialize_distributed", + reconfig_request, + engine=engine) + reconfig_futures.append(asyncio.create_task(coro)) for _ in range(new_data_parallel_size, cur_data_parallel_size): self.core_engines.pop()