Signed-off-by: Robert Shaw <robshaw@redhat.com>
This commit is contained in:
Robert Shaw 2025-07-20 23:10:16 +00:00
parent fe68027a08
commit 2d32c2849f
5 changed files with 19 additions and 15 deletions

View File

@ -1893,6 +1893,8 @@ class ParallelConfig:
"""Whether to use "external" DP LB mode. Applies only to online serving
and when data_parallel_size > 0. Set implicitly when
data_parallel_rank is provided explicitly to vllm serve."""
data_parallel_rank_0_manage_all: bool = False
"""XXX"""
enable_expert_parallel: bool = False
"""Use expert parallelism instead of tensor parallelism for MoE layers."""
enable_eplb: bool = False

View File

@ -1107,8 +1107,10 @@ class EngineArgs:
self.data_parallel_rank = self.data_parallel_start_rank
# Validate External LB.
data_parallel_external_lb = True
if data_parallel_external_lb:
data_parallel_hybrid_lb = True
if data_parallel_hybrid_lb:
if self.data_parallel_size_local is None:
raise ValueError(
"With external LB, --data-parallel-size-local must be set."
@ -1171,11 +1173,12 @@ class EngineArgs:
tensor_parallel_size=self.tensor_parallel_size,
data_parallel_size=self.data_parallel_size,
data_parallel_rank=self.data_parallel_rank or 0,
data_parallel_external_lb=data_parallel_external_lb,
data_parallel_external_lb=False,
data_parallel_size_local=data_parallel_size_local,
data_parallel_master_ip=data_parallel_address,
data_parallel_rpc_port=data_parallel_rpc_port,
data_parallel_backend=self.data_parallel_backend,
data_parallel_rank_0_manage_all=False,
enable_expert_parallel=self.enable_expert_parallel,
enable_eplb=self.enable_eplb,
num_redundant_experts=self.num_redundant_experts,

View File

@ -422,8 +422,8 @@ class EngineCoreProc(EngineCore):
addresses.frontend_stats_publish_address)
# Only publish request queue stats to coordinator for "internal"
# LB mode.
self.publish_dp_lb_stats = (self.has_coordinator and True)
# and not vllm_config.parallel_config.data_parallel_external_lb)
self.publish_dp_lb_stats = (self.has_coordinator
and not vllm_config.parallel_config.data_parallel_external_lb)
self._init_data_parallel(vllm_config)

View File

@ -90,8 +90,7 @@ class EngineCoreClient(ABC):
client_args = (vllm_config, executor_class, log_stats,
client_addresses, client_index)
if parallel_config.data_parallel_size > 1:
# if parallel_config.data_parallel_external_lb:
if False:
if parallel_config.data_parallel_external_lb:
# External load balancer - client per DP rank.
return DPAsyncMPClient(*client_args)
# Internal load balancer - client balances to all DP ranks.
@ -432,12 +431,11 @@ class MPClient(EngineCoreClient):
dp_rank = parallel_config.data_parallel_rank
dp_local_size = parallel_config.data_parallel_size_local
offline_mode = parallel_config.data_parallel_rank_local is not None
manage_only_local = not (parallel_config.data_parallel_rank_0_manage_all)
# If External DPLB, Client manages local EngineCores.
# If Internal DPLB, Client manages local+remote EngineCores.
num_ranks = (dp_local_size
if parallel_config.data_parallel_external_lb else
dp_size)
num_ranks = dp_local_size if manage_only_local else dp_size
self.engine_ranks_managed = ([dp_rank] if offline_mode else range(
dp_rank, dp_rank + num_ranks))
assert parallel_config.data_parallel_size_local <= len(

View File

@ -544,7 +544,8 @@ def launch_core_engines(
local_start_index = parallel_config.data_parallel_rank_local
dp_rank = parallel_config.data_parallel_rank
host = parallel_config.data_parallel_master_ip
external_dp_lb = parallel_config.data_parallel_external_lb
# external_dp_lb = parallel_config.data_parallel_external_lb
rank_0_local_only = (not parallel_config.data_parallel_rank_0_manage_all)
# In offline mode there is an LLM instance per DP rank and
# one core engine per LLM, see
@ -553,8 +554,8 @@ def launch_core_engines(
# client_local_only = True for cases where this front-end
# sends requests only to colocated engines.
client_local_only = offline_mode or external_dp_lb or (local_engine_count
== dp_size)
client_local_only = (offline_mode or rank_0_local_only or
(local_engine_count == dp_size))
# Set up input and output addresses.
addresses = EngineZmqAddresses(
@ -610,7 +611,7 @@ def launch_core_engines(
]
else:
# Rank > 0 handshakes with just the local cores it is managing.
assert vllm_config.parallel_config.data_parallel_external_lb, (
assert rank_0_local_only, (
"Attempting to launch core_engines from dp_rank > 0, but "
"found internal DPLB, which is incompatible.")
engines_to_handshake = [
@ -627,7 +628,7 @@ def launch_core_engines(
handshake_address = get_engine_client_zmq_addr(
handshake_local_only, host, parallel_config.data_parallel_rpc_port)
if external_dp_lb and dp_rank > 0:
if rank_0_local_only and dp_rank > 0:
assert not handshake_local_only
local_handshake_address = get_open_zmq_ipc_path()
client_handshake_address = local_handshake_address