Signed-off-by: inkcherry <mingzhi.liu@amd.com>
This commit is contained in:
inkcherry 2025-11-20 04:50:58 +00:00
parent 70ea1b2460
commit 64694c3e76

View File

@ -1690,8 +1690,9 @@ class MoRIIOConnectorWorker:
) )
self.moriio_wrapper.remote_engine_ip = host self.moriio_wrapper.remote_engine_ip = host
remote_agent_name=self.moriio_wrapper.register_remote_engine(
remote_agent_name = EngineDesc.unpack(metadata.agent_metadata).key metadata.agent_metadata
)
logger.info( logger.info(
f"MoRIIO handshake: registered remote agent " f"MoRIIO handshake: registered remote agent "
@ -1751,8 +1752,7 @@ class MoRIIOConnectorWorker:
# In dp(prefill)<->dp(decode) communication, we require an all-to-all handshake. # In dp(prefill)<->dp(decode) communication, we require an all-to-all handshake.
for cur_dp_rank in range(remote_dp_size): for cur_dp_rank in range(remote_dp_size):
dp_engine_id = f"{remote_engine_id}_dp{cur_dp_rank}" dp_engine_id = self.get_engine_name_with_dp(remote_engine_id, cur_dp_rank)
future = self._handshake_initiation_executor.submit( future = self._handshake_initiation_executor.submit(
self._moriio_handshake, host, port, tp_size, dp_engine_id, cur_dp_rank self._moriio_handshake, host, port, tp_size, dp_engine_id, cur_dp_rank
) )
@ -1990,8 +1990,6 @@ class MoRIIOConnectorWorker:
self._write_blocks_for_req(req_id, meta, layer_name, kv_layer) self._write_blocks_for_req(req_id, meta, layer_name, kv_layer)
while True: while True:
if remote_engine_id is None:
break
if ( if (
self._ready_requests.empty() self._ready_requests.empty()
and remote_engine_id not in self.write_ready_flags and remote_engine_id not in self.write_ready_flags
@ -2021,7 +2019,7 @@ class MoRIIOConnectorWorker:
if self.mode == MoRIIOMode.WRITE: if self.mode == MoRIIOMode.WRITE:
return return
wait_handshage_readd_req = False wait_handshake_readd_req = False
remote_engine_id = None remote_engine_id = None
for req_id, meta in metadata.reqs_to_recv.items(): for req_id, meta in metadata.reqs_to_recv.items():
@ -2037,7 +2035,7 @@ class MoRIIOConnectorWorker:
self._background_moriio_handshake( self._background_moriio_handshake(
req_id, remote_engine_id, meta req_id, remote_engine_id, meta
) )
wait_handshage_readd_req = True wait_handshake_readd_req = True
continue continue
@ -2049,7 +2047,7 @@ class MoRIIOConnectorWorker:
if ( if (
self._ready_requests.empty() self._ready_requests.empty()
and not self.load_ready_flag and not self.load_ready_flag
and wait_handshage_readd_req and wait_handshake_readd_req
): ):
continue continue
elif not self._ready_requests.empty() and self.load_ready_flag: elif not self._ready_requests.empty() and self.load_ready_flag: