remove handle_proxy_request

Signed-off-by: inkcherry <mingzhi.liu@amd.com>
This commit is contained in:
inkcherry 2025-11-21 12:02:18 +00:00
parent 38d51f6dd8
commit 72ccb5d77c

View File

@ -1361,22 +1361,12 @@ class MoRIIOConnectorWorker:
f":tp {self.tp_rank}:dp {self.dp_rank}"
)
if not self.is_producer:
self.poller = zmq.Poller()
self.metadata_socket = self.zmq_context.socket(zmq.ROUTER)
self.metadata_socket.bind(f"tcp://{self.metadata_address}")
self.poller.register(self.metadata_socket, zmq.POLLIN)
self.moriio_engine = IOEngine(
"consumer:" + engine_suffix,
IOEngineConfig(
self.moriio_config.local_ip, self.moriio_config.local_kv_port
),
)
self._handle_request_thread = threading.Thread(
target=self.handle_proxy_request, daemon=True
)
self._handle_request_thread.start()
else:
self.moriio_engine = IOEngine(
"producer:" + engine_suffix,
@ -1384,7 +1374,6 @@ class MoRIIOConnectorWorker:
self.moriio_config.local_ip, self.moriio_config.local_kv_port
),
)
logger.debug(
"build MORI IOEngine %s:%s",
self.moriio_config.local_ip,
@ -1609,23 +1598,23 @@ class MoRIIOConnectorWorker:
"Max retries (%s) exceeded. Stopping ping loop.",
MoRIIOConstants.MAX_PING_RETRIES,
)
should_break = True
should_break = True
time.sleep(MoRIIOConstants.PING_INTERVAL)
index += 1
if should_break:
break
def handle_proxy_request(self):
if self.is_producer:
raise NotImplementedError(
"prefill instance doesn't need to send kv cache in pull mode"
)
while True:
socks = dict(self.poller.poll())
logger.debug("handle_proxy_request: socks = %s", socks)
# def handle_proxy_request(self):
# if self.is_producer:
# raise NotImplementedError(
# "prefill instance doesn't need to send kv cache in pull mode"
# )
# while True:
# socks = dict(self.poller.poll())
# logger.debug("handle_proxy_request: socks = %s", socks)
if self.metadata_socket not in socks:
continue
# if self.metadata_socket not in socks:
# continue
def close(self):
if hasattr(self, "_handshake_initiation_executor"):