From 72ccb5d77c69c0f515c470fe8667b92516872c2e Mon Sep 17 00:00:00 2001 From: inkcherry Date: Fri, 21 Nov 2025 12:02:18 +0000 Subject: [PATCH] remove handle_proxy_request Signed-off-by: inkcherry --- .../kv_connector/v1/moriio_connector.py | 33 +++++++------------ 1 file changed, 11 insertions(+), 22 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/moriio_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/moriio_connector.py index 931462f3c3dde..11552e5b460a3 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/moriio_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/moriio_connector.py @@ -1361,22 +1361,12 @@ class MoRIIOConnectorWorker: f":tp {self.tp_rank}:dp {self.dp_rank}" ) if not self.is_producer: - self.poller = zmq.Poller() - self.metadata_socket = self.zmq_context.socket(zmq.ROUTER) - self.metadata_socket.bind(f"tcp://{self.metadata_address}") - self.poller.register(self.metadata_socket, zmq.POLLIN) - self.moriio_engine = IOEngine( "consumer:" + engine_suffix, IOEngineConfig( self.moriio_config.local_ip, self.moriio_config.local_kv_port ), ) - - self._handle_request_thread = threading.Thread( - target=self.handle_proxy_request, daemon=True - ) - self._handle_request_thread.start() else: self.moriio_engine = IOEngine( "producer:" + engine_suffix, @@ -1384,7 +1374,6 @@ class MoRIIOConnectorWorker: self.moriio_config.local_ip, self.moriio_config.local_kv_port ), ) - logger.debug( "build MORI IOEngine %s:%s", self.moriio_config.local_ip, @@ -1609,23 +1598,23 @@ class MoRIIOConnectorWorker: "Max retries (%s) exceeded. Stopping ping loop.", MoRIIOConstants.MAX_PING_RETRIES, ) - should_break = True + should_break = True time.sleep(MoRIIOConstants.PING_INTERVAL) index += 1 if should_break: break - def handle_proxy_request(self): - if self.is_producer: - raise NotImplementedError( - "prefill instance doesn't need to send kv cache in pull mode" - ) - while True: - socks = dict(self.poller.poll()) - logger.debug("handle_proxy_request: socks = %s", socks) + # def handle_proxy_request(self): + # if self.is_producer: + # raise NotImplementedError( + # "prefill instance doesn't need to send kv cache in pull mode" + # ) + # while True: + # socks = dict(self.poller.poll()) + # logger.debug("handle_proxy_request: socks = %s", socks) - if self.metadata_socket not in socks: - continue + # if self.metadata_socket not in socks: + # continue def close(self): if hasattr(self, "_handshake_initiation_executor"):