diff --git a/tests/v1/kv_connector/moriio_integration/toy_proxy_server.py b/tests/v1/kv_connector/moriio_integration/toy_proxy_server.py index 27c450ee7b25c..67ef2b02c76a9 100644 --- a/tests/v1/kv_connector/moriio_integration/toy_proxy_server.py +++ b/tests/v1/kv_connector/moriio_integration/toy_proxy_server.py @@ -16,12 +16,11 @@ from quart import Quart, make_response, request logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) -prefill_instances = [] -decode_instances = [] +prefill_instances: list[dict] = [] +decode_instances: list[dict] = [] request_nums = 0 app = Quart(__name__) -yield_chunk = set() IP_PORT_PATTERN = re.compile(r"//(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}):(\d+)") @@ -200,7 +199,10 @@ async def handle_request(): request_nums += 1 def extract_ip_port_fast(url): - return IP_PORT_PATTERN.search(url).groups() + match = IP_PORT_PATTERN.search(url) + if not match: + raise ValueError(f"Invalid URL format: {url}") + return match.groups() req_data = await request.get_json() request_id = str(uuid.uuid4()) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/moriio_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/moriio_connector.py index ce6bb4f824f6d..df92a71702968 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/moriio_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/moriio_connector.py @@ -4,7 +4,6 @@ import contextlib import logging import math import os -import pickle import queue import threading import time @@ -1673,7 +1672,7 @@ class MoRIIOConnectorWorker: ) # send local mori io engine meta data logger.debug("MoRIIO handshake listener sent metadata") # now we send tensor meta data for each block - buf = pickle.dumps(layer_name_to_local_kv_cache_metadata) + buf = msgpack.dumps(layer_name_to_local_kv_cache_metadata) sock.send_multipart((identity, b"", buf)) elif msg == MoRIIOConstants.POP_DONE_RECV: _, req_id = sock.recv_multipart() @@ -1752,7 +1751,7 @@ class MoRIIOConnectorWorker: assert 0, f"Unexpected frame! {received_frame = }" buf = received_frame[1] self.layer_name_to_remote_kv_cache_metadata[expected_engine_id] = ( - pickle.loads(buf) + msgpack.loads(buf) ) setup_agent_time = time.perf_counter() diff --git a/vllm/entrypoints/openai/serving_chat.py b/vllm/entrypoints/openai/serving_chat.py index 59e1c8d531793..5a95721967ff0 100644 --- a/vllm/entrypoints/openai/serving_chat.py +++ b/vllm/entrypoints/openai/serving_chat.py @@ -328,6 +328,7 @@ class OpenAIServingChat(OpenAIServing): lora_request=lora_request, trace_headers=trace_headers, priority=request.priority, + data_parallel_rank=data_parallel_rank, ) generator = self.engine_client.generate( diff --git a/vllm/entrypoints/openai/serving_engine.py b/vllm/entrypoints/openai/serving_engine.py index 4d9903c9c5745..102237dd65e05 100644 --- a/vllm/entrypoints/openai/serving_engine.py +++ b/vllm/entrypoints/openai/serving_engine.py @@ -1172,7 +1172,7 @@ class OpenAIServing: lora_request: LoRARequest | None, trace_headers: Mapping[str, str] | None, priority: int, - data_parallel_rank: int, + data_parallel_rank: int | None, ) -> tuple[EngineCoreRequest, dict[str, Any]]: """Use the Processor to process inputs for AsyncLLM.""" tokenization_kwargs: dict[str, Any] = {} @@ -1220,6 +1220,7 @@ class OpenAIServing: lora_request=lora_request, trace_headers=trace_headers, priority=priority, + data_parallel_rank=None, ) generator = self.engine_client.generate(