From b29f405aa5184353acb2e331e201ba0cdf949db7 Mon Sep 17 00:00:00 2001 From: inkcherry Date: Fri, 21 Nov 2025 12:23:48 +0000 Subject: [PATCH] update Signed-off-by: inkcherry --- .../kv_connector/v1/moriio_connector.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/moriio_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/moriio_connector.py index 7ac303331146f..461a0c96345c6 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/moriio_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/moriio_connector.py @@ -77,6 +77,7 @@ class MoRIIOConstants: DEFAULT_HANDSHAKE_PORT = "6301" DEFAULT_NOTIFY_PORT = "61005" + VLLM_MORI_READ_ABORT_REQUEST_TIMEOUT = 3600 try: from mori.io import ( @@ -237,7 +238,8 @@ class MoRIIOConfig: # notify_port -> For synchronizing stages between prefill and decode # handshake_port -> For initial handshake between mori engine - # TODO : merge notify_port and handshake_port to simplify port management, supports non-contiguous ports + # TODO : merge notify_port and handshake_port to simplify port management + # supports non-contiguous ports kv_transfer_config = vllm_config.kv_transfer_config extra_config = kv_transfer_config.kv_connector_extra_config @@ -1289,7 +1291,7 @@ class MoRIIOConnectorScheduler: if delay_free_blocks: # Prefill request on remote. It will be read from D upon completion self._reqs_need_send[request.request_id] = ( - time.perf_counter() + envs.VLLM_NIXL_ABORT_REQUEST_TIMEOUT + time.perf_counter() + MoRIIOConstants.VLLM_MORI_READ_ABORT_REQUEST_TIMEOUT ) # If we execute in P-D serial mode, no notification port is needed. @@ -1359,7 +1361,10 @@ class MoRIIOConnectorWorker: self._writer = MoRIIOWriter(self) role = "producer" if self.is_producer else "consumer" - engine_suffix = f"{self.moriio_config.local_ip}:{self.moriio_config.handshake_port}:tp{self.tp_rank}:dp{self.dp_rank}" + engine_suffix = ( + f"{self.moriio_config.local_ip}:{self.moriio_config.handshake_port}:" + f"tp{self.tp_rank}:dp{self.dp_rank}" + ) self.moriio_engine = IOEngine( f"{role}:{engine_suffix}", IOEngineConfig( @@ -1501,6 +1506,11 @@ class MoRIIOConnectorWorker: remote_ip: IP address of remote node """ + + # synchronization to prevent dirty reads between transfer and attention operations + # we can consider removing this synchronization after ibgda is enabled. + # when mori-io supports ibgda functionality + stream = torch.cuda.current_stream() event = torch.cuda.Event() event.record(stream) @@ -1931,8 +1941,6 @@ class MoRIIOConnectorWorker: else: done_recving = self._pop_done_transfers() else: - if self.mode == MoRIIOMode.WRITE: - self.moriio_wrapper.async_wait_reqid() done_sending, done_recving = ( set(), self.moriio_wrapper.pop_finished_write_req_ids(),