Signed-off-by: inkcherry <mingzhi.liu@amd.com>
This commit is contained in:
inkcherry 2025-11-20 03:39:42 +00:00
parent ecbad2a70b
commit f8e9adfea8

View File

@ -47,10 +47,10 @@ if TYPE_CHECKING:
from vllm.v1.kv_cache_interface import KVCacheConfig
from vllm.v1.request import Request
import logging
from dataclasses import field
from enum import Enum
from queue import Empty, Queue
import logging
logger = init_logger(__name__)
@ -964,7 +964,7 @@ class MoRIIOConnectorScheduler:
self.side_channel_host = envs.VLLM_NIXL_SIDE_CHANNEL_HOST
self.mode = get_moriio_mode()
self.handeshake_port = (
self.handshake_port = (
self.vllm_config.kv_transfer_config.kv_connector_extra_config[
"handshake_port"
]
@ -1174,7 +1174,7 @@ class MoRIIOConnectorScheduler:
for req_id, (req, block_ids) in self._reqs_need_save.items():
assert req.kv_transfer_params is not None
if req.num_prompt_tokens > len(block_ids):
if req.num_prompt_tokens > len(block_ids)*self.block_size:
# not last chunk prefill
self._reqs_need_pending_save[req_id] = (req, block_ids)
continue
@ -1249,7 +1249,7 @@ class MoRIIOConnectorScheduler:
remote_block_ids=computed_block_ids,
remote_engine_id=self.engine_id,
remote_host=self.side_channel_host,
remote_port=self.handeshake_port,
remote_port=self.handshake_port,
tp_size=self.vllm_config.parallel_config.tensor_parallel_size,
)
@ -1398,7 +1398,7 @@ class MoRIIOConnectorWorker:
+ get_port_offset(self.dp_rank, self.tp_rank)
)
logger.info(f"MoRIIO Worker init {self.tp_rank = },{self.dp_rank= }")
logger.info(f"MoRIIO side channel_port port: {self.side_channel_port}, han")
logger.info(f"MoRIIO side channel_port port: {self.side_channel_port}")
self.engine_id: EngineId = engine_id
self.world_size = get_tensor_model_parallel_world_size()
@ -1606,8 +1606,7 @@ class MoRIIOConnectorWorker:
# TODO: inkcherry , check here?
if self.metadata_socket not in socks:
continue
else:
pass
def __del__(self):
"""Cleanup background threads on destruction."""
@ -1636,11 +1635,11 @@ class MoRIIOConnectorWorker:
# Listen for new requests for metadata.
host = "*"
logger.info(
f"======> mori handeshake starting listening on baseport: {base_port}"
f"======> mori handshake starting listening on baseport: {base_port}"
)
path = make_zmq_path("tcp", host, base_port)
logger.info(f"======> mori handeshake sstarting listening on path: {path}")
logger.info(f"======> mori handshake starting listening on path: {path}")
with zmq_ctx(zmq.ROUTER, path) as sock:
ready_event.set()
@ -1688,7 +1687,7 @@ class MoRIIOConnectorWorker:
port_offset = get_port_offset(remote_dp_rank, self.tp_rank)
path = make_zmq_path("tcp", host, port + port_offset)
logger.info(
"handeshake Querying metadata on path: %s at remote rank %s",
"handshake Querying metadata on path: %s at remote rank %s",
path,
)