From d4aa14434397b46a562f93d0371719e62d9bd62d Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Wed, 29 Oct 2025 13:16:52 -0700 Subject: [PATCH] [BugFix] Fix handling of resumed reqs in `SharedStorageConnector` (#27719) Signed-off-by: Nick Hill --- .../v1/shared_storage_connector.py | 46 +++++++++---------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/shared_storage_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/shared_storage_connector.py index d0cd4b07c51d..fc277630603a 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/shared_storage_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/shared_storage_connector.py @@ -336,36 +336,34 @@ class SharedStorageConnector(KVConnectorBase_V1): cached_reqs = scheduler_output.scheduled_cached_reqs for i, req_id in enumerate(cached_reqs.req_ids): + resumed_from_preemption = cached_reqs.resumed_from_preemption[i] + if not resumed_from_preemption or req_id not in self._requests_need_load: + continue + num_computed_tokens = cached_reqs.num_computed_tokens[i] num_new_tokens = scheduler_output.num_scheduled_tokens[req_id] new_block_ids = cached_reqs.new_block_ids[i] - resumed_from_preemption = cached_reqs.resumed_from_preemption[i] - # NOTE(rob): here we rely on the resumed requests being - # the first N requests in the list scheduled_cache_reqs. - if not resumed_from_preemption: - break - if req_id in self._requests_need_load: - # NOTE(rob): cached_req_data does not have the full - # list of token ids (only new tokens). So we look it - # up in the actual request object. - request = self._requests_need_load[req_id] - total_tokens = num_computed_tokens + num_new_tokens - token_ids = request.all_token_ids[:total_tokens] + # NOTE(rob): cached_req_data does not have the full + # list of token ids (only new tokens). So we look it + # up in the actual request object. + request = self._requests_need_load[req_id] + total_tokens = num_computed_tokens + num_new_tokens + token_ids = request.all_token_ids[:total_tokens] - # NOTE(rob): For resumed req, new_block_ids is all - # of the block_ids for the request. - assert new_block_ids is not None - block_ids = new_block_ids[0] + # NOTE(rob): For resumed req, new_block_ids is all + # of the block_ids for the request. + assert new_block_ids is not None + block_ids = new_block_ids[0] - meta.add_request( - token_ids=token_ids, - block_ids=block_ids, - block_size=self._block_size, - is_store=False, - mm_hashes=[f.identifier for f in request.mm_features], - ) - total_need_load += 1 + meta.add_request( + token_ids=token_ids, + block_ids=block_ids, + block_size=self._block_size, + is_store=False, + mm_hashes=[f.identifier for f in request.mm_features], + ) + total_need_load += 1 assert total_need_load == len(self._requests_need_load) self._requests_need_load.clear()