[Bugfix][DP] DP distribution does not require ray[default] (#23822)

Signed-off-by: Kebe <mail@kebe7jun.com>
This commit is contained in:
Kebe 2025-09-04 05:21:36 +09:00 committed by GitHub
parent 6adaed42f4
commit a43a3f1770
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -315,7 +315,6 @@ class CoreEngineActorManager:
import ray import ray
from ray._private.state import available_resources_per_node from ray._private.state import available_resources_per_node
from ray.util.state import list_nodes
logger.info("Creating placement groups for data parallel") logger.info("Creating placement groups for data parallel")
dp_master_ip = \ dp_master_ip = \
@ -324,31 +323,28 @@ class CoreEngineActorManager:
local_engine_count = \ local_engine_count = \
vllm_config.parallel_config.data_parallel_size_local vllm_config.parallel_config.data_parallel_size_local
nodes = sorted(list_nodes(filters=[("state", "=", "ALIVE")]),
key=lambda node: node.node_ip != dp_master_ip)
assert nodes[0].node_ip == dp_master_ip, (
"The head node is missing or dead")
assert len(nodes) == 1 or nodes[1].node_ip != dp_master_ip, (
"There can only be one head node")
available_resources = available_resources_per_node() available_resources = available_resources_per_node()
world_size = vllm_config.parallel_config.world_size world_size = vllm_config.parallel_config.world_size
placement_groups: list[PlacementGroup] = [] placement_groups: list[PlacementGroup] = []
local_dp_ranks: list[int] = [] local_dp_ranks: list[int] = []
dp_master_ip_key = f'node:{dp_master_ip}'
for node in nodes: nodes = sorted(available_resources.values(),
node_ip = node.node_ip key=lambda x: dp_master_ip_key not in x)
node_resources = available_resources[node.node_id] assert len(nodes) > 0, (
"No nodes with resources found in Ray cluster.")
assert dp_master_ip_key in nodes[0], (
"The DP master node (ip: %s) is missing or dead", dp_master_ip)
for node_resources in nodes:
if "GPU" not in node_resources: if "GPU" not in node_resources:
continue continue
# For now, each DP rank can only be assigned to one node # For now, each DP rank can only be assigned to one node
# TODO(rui): support allocating a single DP rank # TODO(rui): support allocating a single DP rank
# to multiple nodes # to multiple nodes
available_engine_count = int(node_resources["GPU"]) // world_size available_engine_count = int(node_resources["GPU"]) // world_size
if node_ip == dp_master_ip: if dp_master_ip_key in node_resources:
assert available_engine_count >= local_engine_count, ( assert available_engine_count >= local_engine_count, (
"Not enough resources to allocate DP ranks " "Not enough resources to allocate DP ranks "
f"on DP master node {node_ip}") f"on DP master node {dp_master_ip}")
for i in range(local_engine_count): for i in range(local_engine_count):
bundles = [{ bundles = [{
"GPU": 1.0, "GPU": 1.0,