Signed-off-by: inkcherry <mingzhi.liu@amd.com>
This commit is contained in:
inkcherry 2025-11-21 12:23:48 +00:00
parent 4776e2ddcf
commit b29f405aa5

View File

@ -77,6 +77,7 @@ class MoRIIOConstants:
DEFAULT_HANDSHAKE_PORT = "6301"
DEFAULT_NOTIFY_PORT = "61005"
VLLM_MORI_READ_ABORT_REQUEST_TIMEOUT = 3600
try:
from mori.io import (
@ -237,7 +238,8 @@ class MoRIIOConfig:
# notify_port -> For synchronizing stages between prefill and decode
# handshake_port -> For initial handshake between mori engine
# TODO : merge notify_port and handshake_port to simplify port management, supports non-contiguous ports
# TODO : merge notify_port and handshake_port to simplify port management
# supports non-contiguous ports
kv_transfer_config = vllm_config.kv_transfer_config
extra_config = kv_transfer_config.kv_connector_extra_config
@ -1289,7 +1291,7 @@ class MoRIIOConnectorScheduler:
if delay_free_blocks:
# Prefill request on remote. It will be read from D upon completion
self._reqs_need_send[request.request_id] = (
time.perf_counter() + envs.VLLM_NIXL_ABORT_REQUEST_TIMEOUT
time.perf_counter() + MoRIIOConstants.VLLM_MORI_READ_ABORT_REQUEST_TIMEOUT
)
# If we execute in P-D serial mode, no notification port is needed.
@ -1359,7 +1361,10 @@ class MoRIIOConnectorWorker:
self._writer = MoRIIOWriter(self)
role = "producer" if self.is_producer else "consumer"
engine_suffix = f"{self.moriio_config.local_ip}:{self.moriio_config.handshake_port}:tp{self.tp_rank}:dp{self.dp_rank}"
engine_suffix = (
f"{self.moriio_config.local_ip}:{self.moriio_config.handshake_port}:"
f"tp{self.tp_rank}:dp{self.dp_rank}"
)
self.moriio_engine = IOEngine(
f"{role}:{engine_suffix}",
IOEngineConfig(
@ -1501,6 +1506,11 @@ class MoRIIOConnectorWorker:
remote_ip: IP address of remote node
"""
# synchronization to prevent dirty reads between transfer and attention operations
# we can consider removing this synchronization after ibgda is enabled.
# when mori-io supports ibgda functionality
stream = torch.cuda.current_stream()
event = torch.cuda.Event()
event.record(stream)
@ -1931,8 +1941,6 @@ class MoRIIOConnectorWorker:
else:
done_recving = self._pop_done_transfers()
else:
if self.mode == MoRIIOMode.WRITE:
self.moriio_wrapper.async_wait_reqid()
done_sending, done_recving = (
set(),
self.moriio_wrapper.pop_finished_write_req_ids(),