From 78c44fd722fe3ce010b0ba9c5e7349fe4094b39d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Lucchesi?= Date: Fri, 5 Dec 2025 18:17:36 +0100 Subject: [PATCH] [NIXL] Small cleanup of unused variables (#29618) Signed-off-by: NickLucche Co-authored-by: Cyrus Leung --- .../kv_connector/unit/test_nixl_connector.py | 2 +- .../kv_connector/v1/nixl_connector.py | 20 +++++++------------ 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/tests/v1/kv_connector/unit/test_nixl_connector.py b/tests/v1/kv_connector/unit/test_nixl_connector.py index ae4125d54190c..65db16f48c2c9 100644 --- a/tests/v1/kv_connector/unit/test_nixl_connector.py +++ b/tests/v1/kv_connector/unit/test_nixl_connector.py @@ -1307,7 +1307,7 @@ def test_shutdown_cleans_up_resources(dist_init): patch.object(nixl_wrapper, "remove_remote_agent") as mock_rem_agent, patch.object(nixl_wrapper, "deregister_memory") as mock_dereg, ): - worker._recving_transfers = {"req1": [(123, time.perf_counter())]} + worker._recving_transfers = {"req1": [123]} worker.src_xfer_side_handle = 456 worker.dst_xfer_side_handles = {"engine1": 789} worker._remote_agents = {"engine1": {0: "agent1"}} diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 49330abcec324..649e54adaba49 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -55,7 +55,7 @@ if TYPE_CHECKING: from vllm.v1.kv_cache_interface import KVCacheConfig from vllm.v1.request import Request -Transfer = tuple[int, float] # (xfer_handle, start_time) +TransferHandle = int EngineId = str ReqId = str @@ -874,7 +874,7 @@ class NixlConnectorWorker: # In progress transfers. # [req_id -> list[handle]] self._recving_metadata: dict[ReqId, ReqMeta] = {} - self._recving_transfers = defaultdict[ReqId, list[Transfer]](list) + self._recving_transfers = defaultdict[ReqId, list[TransferHandle]](list) # Track the expiration time of requests that are waiting to be sent. self._reqs_to_send: dict[ReqId, float] = {} # Set of requests that have been part of a batch, regardless of status. @@ -1201,14 +1201,11 @@ class NixlConnectorWorker: # Enable different block lengths for different layers when MLA is used. self.block_len_per_layer = list[int]() self.slot_size_per_layer = list[int]() # HD bytes in kv terms - self.device_id = self.tp_rank for layer_name, cache_or_caches in xfer_buffers.items(): cache_list = cache_or_caches if split_k_and_v else [cache_or_caches] for cache in cache_list: base_addr = cache.data_ptr() - if not self.use_host_buffer and current_platform.is_cuda_alike(): - self.device_id = cache.device.index if base_addr in seen_base_addresses: continue @@ -1251,8 +1248,7 @@ class NixlConnectorWorker: "All kv cache tensors must have the same size" ) # Need to make sure the device ID is non-negative for NIXL, - # Torch uses -1 to indicate CPU tensors while NIXL uses explicit - # memory type. + # Torch uses -1 to indicate CPU tensors. self.device_id = max(cache.get_device(), 0) caches_data.append( (base_addr, curr_tensor_size_bytes, self.device_id, "") @@ -1842,9 +1838,7 @@ class NixlConnectorWorker: self._reqs_to_send.pop(req_id, None) return notified_req_ids - def _pop_done_transfers( - self, transfers: dict[str, list[tuple[int, float]]] - ) -> set[str]: + def _pop_done_transfers(self, transfers: dict[str, list[int]]) -> set[str]: """ Pop completed xfers by checking for DONE state. Args: @@ -1855,7 +1849,7 @@ class NixlConnectorWorker: done_req_ids: set[str] = set() for req_id, handles in list(transfers.items()): in_progress = False - for handle, xfer_start_time in handles: + for handle in handles: try: xfer_state = self.nixl_wrapper.check_xfer_state(handle) if xfer_state == "DONE": @@ -2120,7 +2114,7 @@ class NixlConnectorWorker: self.nixl_wrapper.transfer(handle) # Use handle to check completion in future step(). - self._recving_transfers[request_id].append((handle, time.perf_counter())) + self._recving_transfers[request_id].append(handle) except Exception: logger.exception( "NIXL transfer setup/initiation failed for request %s. " @@ -2251,7 +2245,7 @@ class NixlConnectorWorker: """Shutdown the connector worker.""" self._handshake_initiation_executor.shutdown(wait=False) for handles in self._recving_transfers.values(): - for handle, _ in handles: + for handle in handles: self.nixl_wrapper.release_xfer_handle(handle) self._recving_transfers.clear() if self.src_xfer_side_handle: