mirror of
https://git.datalinker.icu/vllm-project/vllm.git
synced 2025-12-16 03:45:01 +08:00
[Misc] Remove unnecessary _send_reconfig_message() in core_client.py (#23127)
Signed-off-by: Nick Hill <nhill@redhat.com>
This commit is contained in:
parent
8c13820f0b
commit
a403d0fa41
@ -1190,21 +1190,6 @@ class DPLBAsyncMPClient(DPAsyncMPClient):
|
|||||||
await self._send_input(EngineCoreRequestType.ABORT, request_ids,
|
await self._send_input(EngineCoreRequestType.ABORT, request_ids,
|
||||||
engine)
|
engine)
|
||||||
|
|
||||||
async def _send_reconfig_message(
|
|
||||||
self, reconfig_request: ReconfigureDistributedRequest,
|
|
||||||
engine: EngineIdentity) -> asyncio.Future:
|
|
||||||
"""Send reconfiguration message and return the result future without
|
|
||||||
waiting for completion."""
|
|
||||||
call_id = uuid.uuid1().int >> 64
|
|
||||||
future = asyncio.get_running_loop().create_future()
|
|
||||||
self.utility_results[call_id] = future
|
|
||||||
message = (EngineCoreRequestType.UTILITY.value, *self.encoder.encode(
|
|
||||||
(self.client_index, call_id, "reinitialize_distributed",
|
|
||||||
(reconfig_request, ))))
|
|
||||||
await self._send_input_message(message, engine, reconfig_request)
|
|
||||||
self._ensure_output_queue_task()
|
|
||||||
return future
|
|
||||||
|
|
||||||
async def scale_elastic_ep(self, new_data_parallel_size: int) -> None:
|
async def scale_elastic_ep(self, new_data_parallel_size: int) -> None:
|
||||||
"""Scale elastic EP data parallel size"""
|
"""Scale elastic EP data parallel size"""
|
||||||
cur_data_parallel_size = len(self.core_engines)
|
cur_data_parallel_size = len(self.core_engines)
|
||||||
@ -1214,7 +1199,7 @@ class DPLBAsyncMPClient(DPAsyncMPClient):
|
|||||||
f"different from cur_data_parallel_size {cur_data_parallel_size}")
|
f"different from cur_data_parallel_size {cur_data_parallel_size}")
|
||||||
|
|
||||||
assert self.vllm_config.parallel_config.data_parallel_backend == \
|
assert self.vllm_config.parallel_config.data_parallel_backend == \
|
||||||
"ray", ("Only ray DP backend supports scaling elastic EP")
|
"ray", "Only ray DP backend supports scaling elastic EP"
|
||||||
|
|
||||||
scale_up = new_data_parallel_size > cur_data_parallel_size
|
scale_up = new_data_parallel_size > cur_data_parallel_size
|
||||||
|
|
||||||
@ -1246,9 +1231,10 @@ class DPLBAsyncMPClient(DPAsyncMPClient):
|
|||||||
data_parallel_master_ip,
|
data_parallel_master_ip,
|
||||||
new_data_parallel_master_port=self.vllm_config.parallel_config.
|
new_data_parallel_master_port=self.vllm_config.parallel_config.
|
||||||
data_parallel_master_port)
|
data_parallel_master_port)
|
||||||
future = await self._send_reconfig_message(reconfig_request,
|
coro = self._call_utility_async("reinitialize_distributed",
|
||||||
engine)
|
reconfig_request,
|
||||||
reconfig_futures.append(future)
|
engine=engine)
|
||||||
|
reconfig_futures.append(asyncio.create_task(coro))
|
||||||
|
|
||||||
logger.info("All reconfigure messages sent, starting engine creation")
|
logger.info("All reconfigure messages sent, starting engine creation")
|
||||||
|
|
||||||
@ -1318,9 +1304,10 @@ class DPLBAsyncMPClient(DPAsyncMPClient):
|
|||||||
if cur_dp_rank >= new_data_parallel_size:
|
if cur_dp_rank >= new_data_parallel_size:
|
||||||
reconfig_request.new_data_parallel_rank = \
|
reconfig_request.new_data_parallel_rank = \
|
||||||
ReconfigureRankType.SHUTDOWN_CURRENT_RANK
|
ReconfigureRankType.SHUTDOWN_CURRENT_RANK
|
||||||
future = await self._send_reconfig_message(reconfig_request,
|
coro = self._call_utility_async("reinitialize_distributed",
|
||||||
engine)
|
reconfig_request,
|
||||||
reconfig_futures.append(future)
|
engine=engine)
|
||||||
|
reconfig_futures.append(asyncio.create_task(coro))
|
||||||
|
|
||||||
for _ in range(new_data_parallel_size, cur_data_parallel_size):
|
for _ in range(new_data_parallel_size, cur_data_parallel_size):
|
||||||
self.core_engines.pop()
|
self.core_engines.pop()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user