From b4a01aaf95f54bba90cb0b072e9254ddb998af8f Mon Sep 17 00:00:00 2001 From: Yihua Cheng Date: Tue, 9 Sep 2025 21:23:37 -0700 Subject: [PATCH] [KV Connector] More async support for `get_num_new_matched_tokens` (#23620) Signed-off-by: ApostaC --- vllm/distributed/kv_transfer/kv_connector/v1/base.py | 9 ++++++--- .../kv_transfer/kv_connector/v1/lmcache_connector.py | 2 +- .../kv_transfer/kv_connector/v1/multi_connector.py | 6 +++++- .../kv_transfer/kv_connector/v1/nixl_connector.py | 2 +- .../kv_connector/v1/shared_storage_connector.py | 4 ++-- vllm/v1/core/sched/scheduler.py | 8 ++++++++ 6 files changed, 23 insertions(+), 8 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/base.py b/vllm/distributed/kv_transfer/kv_connector/v1/base.py index f3f493144d283..cd4561154b78b 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/base.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/base.py @@ -243,7 +243,7 @@ class KVConnectorBase_V1(ABC): self, request: "Request", num_computed_tokens: int, - ) -> tuple[int, bool]: + ) -> tuple[Optional[int], bool]: """ Get number of new tokens that can be loaded from the external KV cache beyond the num_computed_tokens. @@ -255,8 +255,11 @@ class KVConnectorBase_V1(ABC): Returns: A tuple with the following elements: - - The number of tokens that can be loaded from the - external KV cache beyond what is already computed. + - An optional number of tokens that can be loaded from the + external KV cache beyond what is already computed. + If None, it means that the connector needs more time to + determine the number of matched tokens, and the scheduler + should query for this request again later. - `True` if external KV cache tokens will be loaded asynchronously (between scheduler steps). Must be 'False' if the first element is 0. diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py index e838ac2499c04..c99f538ee4185 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py @@ -110,7 +110,7 @@ class LMCacheConnectorV1(KVConnectorBase_V1): self, request: "Request", num_computed_tokens: int, - ) -> tuple[int, bool]: + ) -> tuple[Optional[int], bool]: """ Get number of new tokens that can be loaded from the external KV cache beyond the num_computed_tokens. diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/multi_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/multi_connector.py index edbff4e4340f4..200b4bf874d89 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/multi_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/multi_connector.py @@ -143,11 +143,15 @@ class MultiConnector(KVConnectorBase_V1): self, request: "Request", num_computed_tokens: int, - ) -> tuple[int, bool]: + ) -> tuple[Optional[int], bool]: to_return = (0, False) for i, c in enumerate(self._connectors): toks, load_async = c.get_num_new_matched_tokens( request, num_computed_tokens) + # If there is a connector still looking up the matches, + # we return None to indicate that we are not done yet. + if toks is None: + return (None, False) # The first connector that has new matched tokens will be assigned # to this request. if to_return[0] == 0 and toks > 0: diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 20d1e31a7106b..17f5be76ce400 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -162,7 +162,7 @@ class NixlConnector(KVConnectorBase_V1): def get_num_new_matched_tokens( self, request: "Request", - num_computed_tokens: int) -> tuple[int, bool]: + num_computed_tokens: int) -> tuple[Optional[int], bool]: assert self.connector_scheduler is not None return self.connector_scheduler.get_num_new_matched_tokens( request, num_computed_tokens) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/shared_storage_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/shared_storage_connector.py index fd79387269d56..9f4da613f8b62 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/shared_storage_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/shared_storage_connector.py @@ -3,7 +3,7 @@ import hashlib import os from dataclasses import dataclass -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional import safetensors import torch @@ -238,7 +238,7 @@ class SharedStorageConnector(KVConnectorBase_V1): self, request: "Request", num_computed_tokens: int, - ) -> tuple[int, bool]: + ) -> tuple[Optional[int], bool]: """ Get number of new tokens that can be loaded from the external KV cache beyond the num_computed_tokens. diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index 2d40e96632c95..ed7c16dc520fe 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -387,6 +387,14 @@ class Scheduler(SchedulerInterface): self.connector.get_num_new_matched_tokens( request, num_new_local_computed_tokens)) + if num_external_computed_tokens is None: + # The request cannot be scheduled because + # the KVConnector couldn't determine + # the number of matched tokens. + self.waiting.pop_request() + skipped_waiting_requests.prepend_request(request) + continue + # Total computed tokens (local + external). num_computed_tokens = (num_new_local_computed_tokens + num_external_computed_tokens)