From f8e9adfea85e6dd623aa94775a63064f1347b6d7 Mon Sep 17 00:00:00 2001 From: inkcherry Date: Thu, 20 Nov 2025 03:39:42 +0000 Subject: [PATCH] refine Signed-off-by: inkcherry --- .../kv_connector/v1/moriio_connector.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/moriio_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/moriio_connector.py index 2856733484cab..697ae3a786271 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/moriio_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/moriio_connector.py @@ -47,10 +47,10 @@ if TYPE_CHECKING: from vllm.v1.kv_cache_interface import KVCacheConfig from vllm.v1.request import Request -import logging from dataclasses import field from enum import Enum from queue import Empty, Queue +import logging logger = init_logger(__name__) @@ -964,7 +964,7 @@ class MoRIIOConnectorScheduler: self.side_channel_host = envs.VLLM_NIXL_SIDE_CHANNEL_HOST self.mode = get_moriio_mode() - self.handeshake_port = ( + self.handshake_port = ( self.vllm_config.kv_transfer_config.kv_connector_extra_config[ "handshake_port" ] @@ -1174,7 +1174,7 @@ class MoRIIOConnectorScheduler: for req_id, (req, block_ids) in self._reqs_need_save.items(): assert req.kv_transfer_params is not None - if req.num_prompt_tokens > len(block_ids): + if req.num_prompt_tokens > len(block_ids)*self.block_size: # not last chunk prefill self._reqs_need_pending_save[req_id] = (req, block_ids) continue @@ -1249,7 +1249,7 @@ class MoRIIOConnectorScheduler: remote_block_ids=computed_block_ids, remote_engine_id=self.engine_id, remote_host=self.side_channel_host, - remote_port=self.handeshake_port, + remote_port=self.handshake_port, tp_size=self.vllm_config.parallel_config.tensor_parallel_size, ) @@ -1398,7 +1398,7 @@ class MoRIIOConnectorWorker: + get_port_offset(self.dp_rank, self.tp_rank) ) logger.info(f"MoRIIO Worker init {self.tp_rank = },{self.dp_rank= }") - logger.info(f"MoRIIO side channel_port port: {self.side_channel_port}, han") + logger.info(f"MoRIIO side channel_port port: {self.side_channel_port}") self.engine_id: EngineId = engine_id self.world_size = get_tensor_model_parallel_world_size() @@ -1606,8 +1606,7 @@ class MoRIIOConnectorWorker: # TODO: inkcherry , check here? if self.metadata_socket not in socks: continue - else: - pass + def __del__(self): """Cleanup background threads on destruction.""" @@ -1636,11 +1635,11 @@ class MoRIIOConnectorWorker: # Listen for new requests for metadata. host = "*" logger.info( - f"======> mori handeshake starting listening on baseport: {base_port}" + f"======> mori handshake starting listening on baseport: {base_port}" ) path = make_zmq_path("tcp", host, base_port) - logger.info(f"======> mori handeshake sstarting listening on path: {path}") + logger.info(f"======> mori handshake starting listening on path: {path}") with zmq_ctx(zmq.ROUTER, path) as sock: ready_event.set() @@ -1688,7 +1687,7 @@ class MoRIIOConnectorWorker: port_offset = get_port_offset(remote_dp_rank, self.tp_rank) path = make_zmq_path("tcp", host, port + port_offset) logger.info( - "handeshake Querying metadata on path: %s at remote rank %s", + "handshake Querying metadata on path: %s at remote rank %s", path, )