diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index c06cda356f57..6d86ab7f7a4c 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -1025,6 +1025,11 @@ class NixlConnectorWorker: # Sorted dict, oldest requests are put first so we can exit early. if now < expires: break + count = self.consumer_notification_counts_by_req.pop(req_id, 0) + logger.warning( + "Releasing expired KV blocks for request %s which were " + "retrieved by %d decode worker(s) within %d seconds.", req_id, + count, envs.VLLM_NIXL_ABORT_REQUEST_TIMEOUT) del self._reqs_to_send[req_id] done_sending.add(req_id) @@ -1040,6 +1045,13 @@ class NixlConnectorWorker: for notifs in self.nixl_wrapper.get_new_notifs().values(): for notif in notifs: req_id, tp_ratio = notif.decode("utf-8").rsplit(":", 1) + if req_id not in self._reqs_to_send: + logger.error( + "Potentially invalid KV blocks for " + "unrecognized request %s were retrieved by " + "a decode worker. They may have expired.", req_id) + continue + self.consumer_notification_counts_by_req[req_id] += 1 # Wait all consumers (D) to be done reading before freeing. if self.consumer_notification_counts_by_req[req_id] == int(