[Bugfix] Fix flaky failure when getting DP ports (#20151)

Signed-off-by: mgoin <mgoin64@gmail.com>
This commit is contained in:
Michael Goin 2025-06-27 16:30:53 +09:00 committed by GitHub
parent d1c956dc0f
commit 4ab3ac285e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1878,18 +1878,41 @@ class ParallelConfig:
return answer return answer
def stateless_init_dp_group(self) -> "ProcessGroup": def stateless_init_dp_group(self) -> "ProcessGroup":
# NOTE: In high-concurrency scenarios multiple processes
# can pick the same (currently free) port through a race
# condition when calling `get_open_port()`. When the first
# process binds the port the others will subsequently fail
# with `torch.distributed.DistNetworkError: EADDRINUSE`.
# To make the initialization more robust we retry a few times
# with a fresh port whenever this specific error is observed.
from torch.distributed import DistNetworkError
from vllm.distributed.utils import ( from vllm.distributed.utils import (
stateless_init_torch_distributed_process_group) stateless_init_torch_distributed_process_group)
# use gloo since the engine process might not have cuda device max_retries = 5
dp_group = stateless_init_torch_distributed_process_group( last_exc: Optional[Exception] = None
self.data_parallel_master_ip, for _ in range(max_retries):
self.get_next_dp_init_port(), try:
self.data_parallel_rank, # use gloo since the engine process might not have cuda device
self.data_parallel_size, return stateless_init_torch_distributed_process_group(
backend="gloo") self.data_parallel_master_ip,
self.get_next_dp_init_port(),
self.data_parallel_rank,
self.data_parallel_size,
backend="gloo")
except DistNetworkError as e:
# We only want to retry when the root cause is EADDRINUSE.
if "EADDRINUSE" in str(e):
logger.warning(
"Address already in use. Retrying with a new port.")
last_exc = e
continue # try again with a new port
raise e
return dp_group # If we get here all retries have failed.
assert last_exc is not None
raise last_exc
@staticmethod @staticmethod
def has_unfinished_dp(dp_group: "ProcessGroup", def has_unfinished_dp(dp_group: "ProcessGroup",