From 1823a00d67f8443f97330056beae57f4659624f7 Mon Sep 17 00:00:00 2001 From: Ming Yang Date: Mon, 8 Sep 2025 22:53:10 -0700 Subject: [PATCH] [Misc] Support bench serve long context (#24373) Signed-off-by: Ming Yang --- tests/benchmarks/test_serve_cli.py | 31 +++ vllm/benchmarks/lib/endpoint_request_func.py | 212 ++++++++++++------- 2 files changed, 163 insertions(+), 80 deletions(-) diff --git a/tests/benchmarks/test_serve_cli.py b/tests/benchmarks/test_serve_cli.py index bfcf274727e27..5471d6b8e4a5f 100644 --- a/tests/benchmarks/test_serve_cli.py +++ b/tests/benchmarks/test_serve_cli.py @@ -45,3 +45,34 @@ def test_bench_serve(server): print(result.stderr) assert result.returncode == 0, f"Benchmark failed: {result.stderr}" + +@pytest.mark.benchmark +def test_bench_serve_chat(server): + command = [ + "vllm", + "bench", + "serve", + "--model", + MODEL_NAME, + "--host", + server.host, + "--port", + str(server.port), + "--dataset-name", + "random", + "--random-input-len", + "32", + "--random-output-len", + "4", + "--num-prompts", + "5", + "--endpoint", + "/v1/chat/completions", + "--endpoint-type", + "openai-chat", + ] + result = subprocess.run(command, capture_output=True, text=True) + print(result.stdout) + print(result.stderr) + + assert result.returncode == 0, f"Benchmark failed: {result.stderr}" diff --git a/vllm/benchmarks/lib/endpoint_request_func.py b/vllm/benchmarks/lib/endpoint_request_func.py index 6bb2a497119e9..9d67580be26ad 100644 --- a/vllm/benchmarks/lib/endpoint_request_func.py +++ b/vllm/benchmarks/lib/endpoint_request_func.py @@ -17,6 +17,47 @@ from tqdm.asyncio import tqdm AIOHTTP_TIMEOUT = aiohttp.ClientTimeout(total=6 * 60 * 60) +class StreamedResponseHandler: + """Handles streaming HTTP responses by accumulating chunks until complete + messages are available.""" + + def __init__(self): + self.buffer = "" + + def add_chunk(self, chunk_bytes: bytes) -> list[str]: + """Add a chunk of bytes to the buffer and return any complete + messages.""" + chunk_str = chunk_bytes.decode("utf-8") + self.buffer += chunk_str + + messages = [] + + # Split by double newlines (SSE message separator) + while "\n\n" in self.buffer: + message, self.buffer = self.buffer.split("\n\n", 1) + message = message.strip() + if message: + messages.append(message) + + # if self.buffer is not empty, check if it is a complete message + # by removing data: prefix and check if it is a valid JSON + if self.buffer.startswith("data: "): + message_content = self.buffer.removeprefix("data: ").strip() + if message_content == "[DONE]": + messages.append(self.buffer.strip()) + self.buffer = "" + elif message_content: + try: + json.loads(message_content) + messages.append(self.buffer.strip()) + self.buffer = "" + except json.JSONDecodeError: + # Incomplete JSON, wait for more chunks. + pass + + return messages + + @dataclass class RequestFuncInput: """The input for the request function.""" @@ -102,46 +143,50 @@ async def async_request_openai_completions( headers=headers) as response: if response.status == 200: first_chunk_received = False - async for chunk_bytes in response.content: + handler = StreamedResponseHandler() + + async for chunk_bytes in response.content.iter_any(): chunk_bytes = chunk_bytes.strip() if not chunk_bytes: continue - chunk_bytes = chunk_bytes.decode("utf-8") - # NOTE: SSE comments (often used as pings) start with - # a colon. These are not JSON data payload and should - # be skipped. - if chunk_bytes.startswith(":"): - continue - chunk = chunk_bytes.removeprefix("data: ") + messages = handler.add_chunk(chunk_bytes) + for message in messages: + # NOTE: SSE comments (often used as pings) start with + # a colon. These are not JSON data payload and should + # be skipped. + if message.startswith(":"): + continue - if chunk != "[DONE]": - data = json.loads(chunk) + chunk = message.removeprefix("data: ") - # NOTE: Some completion API might have a last - # usage summary response without a token so we - # want to check a token was generated - if choices := data.get("choices"): - # Note that text could be empty here - # e.g. for special tokens - text = choices[0].get("text") - timestamp = time.perf_counter() - # First token - if not first_chunk_received: - first_chunk_received = True - ttft = time.perf_counter() - st - output.ttft = ttft + if chunk != "[DONE]": + data = json.loads(chunk) - # Decoding phase - else: - output.itl.append(timestamp - - most_recent_timestamp) + # NOTE: Some completion API might have a last + # usage summary response without a token so we + # want to check a token was generated + if choices := data.get("choices"): + # Note that text could be empty here + # e.g. for special tokens + text = choices[0].get("text") + timestamp = time.perf_counter() + # First token + if not first_chunk_received: + first_chunk_received = True + ttft = time.perf_counter() - st + output.ttft = ttft - most_recent_timestamp = timestamp - generated_text += text or "" - elif usage := data.get("usage"): - output.output_tokens = usage.get( - "completion_tokens") + # Decoding phase + else: + output.itl.append(timestamp - + most_recent_timestamp) + + most_recent_timestamp = timestamp + generated_text += text or "" + elif usage := data.get("usage"): + output.output_tokens = usage.get( + "completion_tokens") if first_chunk_received: output.success = True else: @@ -227,41 +272,44 @@ async def async_request_openai_chat_completions( async with session.post(url=api_url, json=payload, headers=headers) as response: if response.status == 200: - async for chunk_bytes in response.content: + handler = StreamedResponseHandler() + async for chunk_bytes in response.content.iter_any(): chunk_bytes = chunk_bytes.strip() if not chunk_bytes: continue - chunk_bytes = chunk_bytes.decode("utf-8") - # NOTE: SSE comments (often used as pings) start with - # a colon. These are not JSON data payload and should - # be skipped. - if chunk_bytes.startswith(":"): - continue - chunk = chunk_bytes.removeprefix("data: ") + messages = handler.add_chunk(chunk_bytes) + for message in messages: + # NOTE: SSE comments (often used as pings) start with + # a colon. These are not JSON data payload and should + # be skipped. + if message.startswith(":"): + continue - if chunk != "[DONE]": - timestamp = time.perf_counter() - data = json.loads(chunk) + chunk = message.removeprefix("data: ") - if choices := data.get("choices"): - content = choices[0]["delta"].get("content") - # First token - if ttft == 0.0: - ttft = timestamp - st - output.ttft = ttft + if chunk != "[DONE]": + timestamp = time.perf_counter() + data = json.loads(chunk) - # Decoding phase - else: - output.itl.append(timestamp - - most_recent_timestamp) + if choices := data.get("choices"): + content = choices[0]["delta"].get("content") + # First token + if ttft == 0.0: + ttft = timestamp - st + output.ttft = ttft - generated_text += content or "" - elif usage := data.get("usage"): - output.output_tokens = usage.get( - "completion_tokens") + # Decoding phase + else: + output.itl.append(timestamp - + most_recent_timestamp) - most_recent_timestamp = timestamp + generated_text += content or "" + elif usage := data.get("usage"): + output.output_tokens = usage.get( + "completion_tokens") + + most_recent_timestamp = timestamp output.generated_text = generated_text output.success = True @@ -347,36 +395,40 @@ async def async_request_openai_audio( data=form, headers=headers) as response: if response.status == 200: - async for chunk_bytes in response.content: + handler = StreamedResponseHandler() + + async for chunk_bytes in response.content.iter_any(): chunk_bytes = chunk_bytes.strip() if not chunk_bytes: continue - chunk = chunk_bytes.decode("utf-8").removeprefix( - "data: ") - if chunk != "[DONE]": - timestamp = time.perf_counter() - data = json.loads(chunk) + messages = handler.add_chunk(chunk_bytes) + for message in messages: + chunk = message.decode("utf-8").removeprefix( + "data: ") + if chunk != "[DONE]": + timestamp = time.perf_counter() + data = json.loads(chunk) - if choices := data.get("choices"): - content = choices[0]["delta"].get( - "content") - # First token - if ttft == 0.0: - ttft = timestamp - st - output.ttft = ttft + if choices := data.get("choices"): + content = choices[0]["delta"].get( + "content") + # First token + if ttft == 0.0: + ttft = timestamp - st + output.ttft = ttft - # Decoding phase - else: - output.itl.append( - timestamp - most_recent_timestamp) + # Decoding phase + else: + output.itl.append( + timestamp - most_recent_timestamp) - generated_text += content or "" - elif usage := data.get("usage"): - output.output_tokens = usage.get( - "completion_tokens") + generated_text += content or "" + elif usage := data.get("usage"): + output.output_tokens = usage.get( + "completion_tokens") - most_recent_timestamp = timestamp + most_recent_timestamp = timestamp output.generated_text = generated_text output.success = True