mirror of
https://git.datalinker.icu/vllm-project/vllm.git
synced 2026-05-29 01:27:06 +08:00
[NIXL] Small cleanup of unused variables (#29618)
Signed-off-by: NickLucche <nlucches@redhat.com> Co-authored-by: Cyrus Leung <tlleungac@connect.ust.hk>
This commit is contained in:
parent
e7296b08da
commit
78c44fd722
@ -1307,7 +1307,7 @@ def test_shutdown_cleans_up_resources(dist_init):
|
|||||||
patch.object(nixl_wrapper, "remove_remote_agent") as mock_rem_agent,
|
patch.object(nixl_wrapper, "remove_remote_agent") as mock_rem_agent,
|
||||||
patch.object(nixl_wrapper, "deregister_memory") as mock_dereg,
|
patch.object(nixl_wrapper, "deregister_memory") as mock_dereg,
|
||||||
):
|
):
|
||||||
worker._recving_transfers = {"req1": [(123, time.perf_counter())]}
|
worker._recving_transfers = {"req1": [123]}
|
||||||
worker.src_xfer_side_handle = 456
|
worker.src_xfer_side_handle = 456
|
||||||
worker.dst_xfer_side_handles = {"engine1": 789}
|
worker.dst_xfer_side_handles = {"engine1": 789}
|
||||||
worker._remote_agents = {"engine1": {0: "agent1"}}
|
worker._remote_agents = {"engine1": {0: "agent1"}}
|
||||||
|
|||||||
@ -55,7 +55,7 @@ if TYPE_CHECKING:
|
|||||||
from vllm.v1.kv_cache_interface import KVCacheConfig
|
from vllm.v1.kv_cache_interface import KVCacheConfig
|
||||||
from vllm.v1.request import Request
|
from vllm.v1.request import Request
|
||||||
|
|
||||||
Transfer = tuple[int, float] # (xfer_handle, start_time)
|
TransferHandle = int
|
||||||
EngineId = str
|
EngineId = str
|
||||||
ReqId = str
|
ReqId = str
|
||||||
|
|
||||||
@ -874,7 +874,7 @@ class NixlConnectorWorker:
|
|||||||
# In progress transfers.
|
# In progress transfers.
|
||||||
# [req_id -> list[handle]]
|
# [req_id -> list[handle]]
|
||||||
self._recving_metadata: dict[ReqId, ReqMeta] = {}
|
self._recving_metadata: dict[ReqId, ReqMeta] = {}
|
||||||
self._recving_transfers = defaultdict[ReqId, list[Transfer]](list)
|
self._recving_transfers = defaultdict[ReqId, list[TransferHandle]](list)
|
||||||
# Track the expiration time of requests that are waiting to be sent.
|
# Track the expiration time of requests that are waiting to be sent.
|
||||||
self._reqs_to_send: dict[ReqId, float] = {}
|
self._reqs_to_send: dict[ReqId, float] = {}
|
||||||
# Set of requests that have been part of a batch, regardless of status.
|
# Set of requests that have been part of a batch, regardless of status.
|
||||||
@ -1201,14 +1201,11 @@ class NixlConnectorWorker:
|
|||||||
# Enable different block lengths for different layers when MLA is used.
|
# Enable different block lengths for different layers when MLA is used.
|
||||||
self.block_len_per_layer = list[int]()
|
self.block_len_per_layer = list[int]()
|
||||||
self.slot_size_per_layer = list[int]() # HD bytes in kv terms
|
self.slot_size_per_layer = list[int]() # HD bytes in kv terms
|
||||||
self.device_id = self.tp_rank
|
|
||||||
for layer_name, cache_or_caches in xfer_buffers.items():
|
for layer_name, cache_or_caches in xfer_buffers.items():
|
||||||
cache_list = cache_or_caches if split_k_and_v else [cache_or_caches]
|
cache_list = cache_or_caches if split_k_and_v else [cache_or_caches]
|
||||||
|
|
||||||
for cache in cache_list:
|
for cache in cache_list:
|
||||||
base_addr = cache.data_ptr()
|
base_addr = cache.data_ptr()
|
||||||
if not self.use_host_buffer and current_platform.is_cuda_alike():
|
|
||||||
self.device_id = cache.device.index
|
|
||||||
if base_addr in seen_base_addresses:
|
if base_addr in seen_base_addresses:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@ -1251,8 +1248,7 @@ class NixlConnectorWorker:
|
|||||||
"All kv cache tensors must have the same size"
|
"All kv cache tensors must have the same size"
|
||||||
)
|
)
|
||||||
# Need to make sure the device ID is non-negative for NIXL,
|
# Need to make sure the device ID is non-negative for NIXL,
|
||||||
# Torch uses -1 to indicate CPU tensors while NIXL uses explicit
|
# Torch uses -1 to indicate CPU tensors.
|
||||||
# memory type.
|
|
||||||
self.device_id = max(cache.get_device(), 0)
|
self.device_id = max(cache.get_device(), 0)
|
||||||
caches_data.append(
|
caches_data.append(
|
||||||
(base_addr, curr_tensor_size_bytes, self.device_id, "")
|
(base_addr, curr_tensor_size_bytes, self.device_id, "")
|
||||||
@ -1842,9 +1838,7 @@ class NixlConnectorWorker:
|
|||||||
self._reqs_to_send.pop(req_id, None)
|
self._reqs_to_send.pop(req_id, None)
|
||||||
return notified_req_ids
|
return notified_req_ids
|
||||||
|
|
||||||
def _pop_done_transfers(
|
def _pop_done_transfers(self, transfers: dict[str, list[int]]) -> set[str]:
|
||||||
self, transfers: dict[str, list[tuple[int, float]]]
|
|
||||||
) -> set[str]:
|
|
||||||
"""
|
"""
|
||||||
Pop completed xfers by checking for DONE state.
|
Pop completed xfers by checking for DONE state.
|
||||||
Args:
|
Args:
|
||||||
@ -1855,7 +1849,7 @@ class NixlConnectorWorker:
|
|||||||
done_req_ids: set[str] = set()
|
done_req_ids: set[str] = set()
|
||||||
for req_id, handles in list(transfers.items()):
|
for req_id, handles in list(transfers.items()):
|
||||||
in_progress = False
|
in_progress = False
|
||||||
for handle, xfer_start_time in handles:
|
for handle in handles:
|
||||||
try:
|
try:
|
||||||
xfer_state = self.nixl_wrapper.check_xfer_state(handle)
|
xfer_state = self.nixl_wrapper.check_xfer_state(handle)
|
||||||
if xfer_state == "DONE":
|
if xfer_state == "DONE":
|
||||||
@ -2120,7 +2114,7 @@ class NixlConnectorWorker:
|
|||||||
self.nixl_wrapper.transfer(handle)
|
self.nixl_wrapper.transfer(handle)
|
||||||
|
|
||||||
# Use handle to check completion in future step().
|
# Use handle to check completion in future step().
|
||||||
self._recving_transfers[request_id].append((handle, time.perf_counter()))
|
self._recving_transfers[request_id].append(handle)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception(
|
logger.exception(
|
||||||
"NIXL transfer setup/initiation failed for request %s. "
|
"NIXL transfer setup/initiation failed for request %s. "
|
||||||
@ -2251,7 +2245,7 @@ class NixlConnectorWorker:
|
|||||||
"""Shutdown the connector worker."""
|
"""Shutdown the connector worker."""
|
||||||
self._handshake_initiation_executor.shutdown(wait=False)
|
self._handshake_initiation_executor.shutdown(wait=False)
|
||||||
for handles in self._recving_transfers.values():
|
for handles in self._recving_transfers.values():
|
||||||
for handle, _ in handles:
|
for handle in handles:
|
||||||
self.nixl_wrapper.release_xfer_handle(handle)
|
self.nixl_wrapper.release_xfer_handle(handle)
|
||||||
self._recving_transfers.clear()
|
self._recving_transfers.clear()
|
||||||
if self.src_xfer_side_handle:
|
if self.src_xfer_side_handle:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user