diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index fecf08c7e647c..72fcb5cd5bb78 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -991,14 +991,11 @@ class NixlConnectorWorker: # Enable different block lengths for different layers when MLA is used. self.block_len_per_layer = list[int]() self.slot_size_per_layer = list[int]() # HD bytes in kv terms - self.device_id = self.tp_rank for layer_name, cache_or_caches in xfer_buffers.items(): cache_list = cache_or_caches if split_k_and_v else [cache_or_caches] for cache in cache_list: base_addr = cache.data_ptr() - if not self.use_host_buffer and current_platform.is_cuda_alike(): - self.device_id = cache.device.index if base_addr in seen_base_addresses: continue @@ -1026,7 +1023,7 @@ class NixlConnectorWorker: "All kv cache tensors must have the same size" ) caches_data.append( - (base_addr, curr_tensor_size_bytes, self.device_id, "") + (base_addr, curr_tensor_size_bytes, self.tp_rank, "") ) logger.debug( @@ -1073,7 +1070,7 @@ class NixlConnectorWorker: block_offset = block_id * self.block_len_per_layer[i] addr = base_addr + block_offset # (addr, len, device id) - blocks_data.append((addr, kv_block_len, self.device_id)) + blocks_data.append((addr, kv_block_len, self.tp_rank)) if self._use_flashinfer: # Separate and interleave K/V regions to maintain the same @@ -1084,13 +1081,12 @@ class NixlConnectorWorker: addr = base_addr + block_offset # Register addresses for V cache (K registered first). v_addr = addr + kv_block_len - blocks_data.append((v_addr, kv_block_len, self.device_id)) + blocks_data.append((v_addr, kv_block_len, self.tp_rank)) logger.debug( - "Created %s blocks for src engine %s and rank %s on device id %s", + "Created %s blocks for src engine %s and rank %s", len(blocks_data), self.engine_id, self.tp_rank, - self.device_id, ) descs = self.nixl_wrapper.get_xfer_descs(blocks_data, self.nixl_memory_type) diff --git a/vllm/v1/engine/utils.py b/vllm/v1/engine/utils.py index 5b4655354ed7d..ca416dbc0df9e 100644 --- a/vllm/v1/engine/utils.py +++ b/vllm/v1/engine/utils.py @@ -134,12 +134,9 @@ class CoreEngineProcManager: data_parallel = vllm_config.parallel_config.data_parallel_size > 1 try: for proc, local_dp_rank in zip(self.processes, local_dp_ranks): - # Adjust device control in DP for non-CUDA platforms - # For CUDA platforms, setting same device id for different DP - # processes affects NCCL init performance. with ( set_device_control_env_var(vllm_config, local_dp_rank) - if (data_parallel and not current_platform.is_cuda_alike()) + if (data_parallel) else contextlib.nullcontext() ): proc.start() diff --git a/vllm/v1/worker/dp_utils.py b/vllm/v1/worker/dp_utils.py index 464fbf11a21ad..2b2a69f4af3ab 100644 --- a/vllm/v1/worker/dp_utils.py +++ b/vllm/v1/worker/dp_utils.py @@ -8,6 +8,7 @@ import torch.distributed as dist from vllm.config import ParallelConfig from vllm.distributed.parallel_state import get_dp_group from vllm.logger import init_logger +from vllm.platforms import current_platform from vllm.v1.worker.ubatch_utils import ( UBatchSlices, check_ubatch_thresholds, @@ -19,8 +20,7 @@ logger = init_logger(__name__) def _get_device_and_group(parallel_config: ParallelConfig): - # Use the actual device assigned to the DP group, not just the device type - device = get_dp_group().device + device = current_platform.device_type group = get_dp_group().device_group # Transfering this tensor from GPU to CPU will introduce a GPU sync diff --git a/vllm/v1/worker/gpu_worker.py b/vllm/v1/worker/gpu_worker.py index 2d34e883b1f6d..f43b6a6f04f66 100644 --- a/vllm/v1/worker/gpu_worker.py +++ b/vllm/v1/worker/gpu_worker.py @@ -172,27 +172,6 @@ class Worker(WorkerBase): if self.device_config.device.type == "cuda": # This env var set by Ray causes exceptions with graph building. os.environ.pop("NCCL_ASYNC_ERROR_HANDLING", None) - if ( - self.parallel_config.data_parallel_size > 1 - and self.parallel_config.data_parallel_size_local > 0 - and self.parallel_config.data_parallel_backend != "ray" - ): - # Use local DP rank if available, otherwise use global DP rank. - dp_local_rank = self.parallel_config.data_parallel_rank_local - if dp_local_rank is None: - dp_local_rank = self.parallel_config.data_parallel_rank - - tp_pp_world_size = ( - self.parallel_config.pipeline_parallel_size - * self.parallel_config.tensor_parallel_size - ) - - # DP_LOCAL_RANK * TP_PP_WORLD_SIZE + TP_LOCAL_RANK - self.local_rank += dp_local_rank * tp_pp_world_size - assert self.local_rank <= torch.cuda.device_count(), ( - f"DP adjusted local rank {self.local_rank} is out of bounds. " - ) - self.device = torch.device(f"cuda:{self.local_rank}") current_platform.set_device(self.device)