From 9a1b9b99d7d21a65280fc68c82f5bb6152fdf9dd Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Sat, 31 May 2025 08:34:52 -0700 Subject: [PATCH] [BugFix] Fix multi-node offline data-parallel (#18981) Signed-off-by: Nick Hill Co-authored-by: Yizhou Liu --- examples/offline_inference/data_parallel.py | 12 ++++++++---- vllm/v1/engine/core_client.py | 7 +++---- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/examples/offline_inference/data_parallel.py b/examples/offline_inference/data_parallel.py index bf60d883c410e..15906e1a2768d 100644 --- a/examples/offline_inference/data_parallel.py +++ b/examples/offline_inference/data_parallel.py @@ -97,10 +97,14 @@ def main( # with DP, each rank should process different prompts. # usually all the DP ranks process a full dataset, # and each rank processes a different part of the dataset. - promts_per_rank = len(prompts) // dp_size - start = global_dp_rank * promts_per_rank - end = start + promts_per_rank - prompts = prompts[start:end] + floor = len(prompts) // dp_size + remainder = len(prompts) % dp_size + + # Distribute prompts into even groups. + def start(rank): + return rank * floor + min(rank, remainder) + + prompts = prompts[start(global_dp_rank) : start(global_dp_rank + 1)] if len(prompts) == 0: # if any rank has no prompts to process, # we need to set a placeholder prompt diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index e9e2d2d8d1e98..232d6742b7718 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -363,6 +363,7 @@ class MPClient(EngineCoreClient): local_engine_count = parallel_config.data_parallel_size_local local_start_index = parallel_config.data_parallel_rank_local dp_size = parallel_config.data_parallel_size + dp_rank = parallel_config.data_parallel_rank # SPMD mode is where there is an LLM instance per DP rank and # one core engine per LLM, see @@ -370,11 +371,9 @@ class MPClient(EngineCoreClient): spmd_mode = local_start_index is not None if spmd_mode: assert local_engine_count == 1 - self.core_engines = [ - CoreEngine(index=local_start_index, local=True) - ] + self.core_engines = [CoreEngine(index=dp_rank, local=True)] else: - assert parallel_config.data_parallel_rank == 0 + assert dp_rank == 0 local_start_index = 0 self.core_engines = [ CoreEngine(index=i, local=(i < local_engine_count))