diff --git a/vllm/distributed/device_communicators/all2all.py b/vllm/distributed/device_communicators/all2all.py index 10db1c2c4e67..013ef3c1f5c3 100644 --- a/vllm/distributed/device_communicators/all2all.py +++ b/vllm/distributed/device_communicators/all2all.py @@ -277,7 +277,7 @@ class DeepEPHTAll2AllManager(DeepEPAll2AllManagerBase): num_rdma_bytes = None num_qps_per_rank = None - if self.internode: + if self.internode and not envs.VLLM_DEEPEP_HIGH_THROUGHPUT_FORCE_INTRA_NODE: num_rdma_bytes = envs.VLLM_DEEPEP_BUFFER_SIZE_MB * 1024 * 1024 num_qps_per_rank = self.num_sms // 2 else: @@ -363,6 +363,8 @@ class DeepEPLLAll2AllManager(DeepEPAll2AllManagerBase): num_rdma_bytes=num_rdma_bytes, low_latency_mode=True, num_qps_per_rank=num_qps_per_rank, + allow_nvlink_for_low_latency_mode=envs.VLLM_DEEPEP_LOW_LATENCY_ALLOW_NVLINK, + allow_mnnvl=envs.VLLM_DEEPEP_LOW_LATENCY_USE_MNNVL, ) def get_handle(self, kwargs): diff --git a/vllm/envs.py b/vllm/envs.py index e6cef075528f..0c45f93ec057 100755 --- a/vllm/envs.py +++ b/vllm/envs.py @@ -204,6 +204,9 @@ if TYPE_CHECKING: VLLM_KV_EVENTS_USE_INT_BLOCK_HASHES: bool = True VLLM_OBJECT_STORAGE_SHM_BUFFER_NAME: str = "VLLM_OBJECT_STORAGE_SHM_BUFFER" VLLM_DEEPEP_BUFFER_SIZE_MB: int = 1024 + VLLM_DEEPEP_HIGH_THROUGHPUT_FORCE_INTRA_NODE: bool = False + VLLM_DEEPEP_LOW_LATENCY_ALLOW_NVLINK: bool = False + VLLM_DEEPEP_LOW_LATENCY_USE_MNNVL: bool = False VLLM_DBO_COMM_SMS: int = 20 GPT_OSS_SYSTEM_TOOL_MCP_LABELS: list[str] = [] VLLM_PATTERN_MATCH_DEBUG: str | None = None @@ -1350,6 +1353,22 @@ environment_variables: dict[str, Callable[[], Any]] = { "VLLM_DEEPEP_BUFFER_SIZE_MB": lambda: int( os.getenv("VLLM_DEEPEP_BUFFER_SIZE_MB", "1024") ), + # Force DeepEP to use intranode kernel for inter-node communication in + # high throughput mode. This is useful archive higher prefill throuhgput + # on system supports multi-node nvlink (e.g GB200). + "VLLM_DEEPEP_HIGH_THROUGHPUT_FORCE_INTRA_NODE": lambda: bool( + int(os.getenv("VLLM_DEEPEP_HIGH_THROUGHPUT_FORCE_INTRA_NODE", "0")) + ), + # Allow DeepEP to use nvlink for internode_ll kernel, turn this on for + # better latency on GB200 like system + "VLLM_DEEPEP_LOW_LATENCY_ALLOW_NVLINK": lambda: bool( + int(os.getenv("VLLM_DEEPEP_LOW_LATENCY_ALLOW_NVLINK", "0")) + ), + # Allow DeepEP to use MNNVL (multi-node nvlink) for internode_ll kernel, + # turn this for better latency on GB200 like system + "VLLM_DEEPEP_LOW_LATENCY_USE_MNNVL": lambda: bool( + int(os.getenv("VLLM_DEEPEP_LOW_LATENCY_USE_MNNVL", "0")) + ), # The number of SMs to allocate for communication kernels when running DBO # the rest of the SMs on the device will be allocated to compute "VLLM_DBO_COMM_SMS": lambda: int(os.getenv("VLLM_DBO_COMM_SMS", "20")), @@ -1512,6 +1531,9 @@ def compute_hash() -> str: "VLLM_ENABLE_INDUCTOR_COORDINATE_DESCENT_TUNING", "VLLM_NVFP4_GEMM_BACKEND", "VLLM_USE_FBGEMM", + "VLLM_DEEPEP_HIGH_THROUGHPUT_FORCE_INTRA_NODE", + "VLLM_DEEPEP_LOW_LATENCY_ALLOW_NVLINK", + "VLLM_DEEPEP_LOW_LATENCY_USE_MNNVL", ] for key in environment_variables_to_hash: # if this goes out of sync with environment_variables,