From 6f5478298ddd8e6aa330f171c70811f667b8699b Mon Sep 17 00:00:00 2001 From: Seiji Eicher <58963096+eicherseiji@users.noreply.github.com> Date: Sun, 3 Aug 2025 19:23:32 -0700 Subject: [PATCH] Use `aiohttp` connection pool for benchmarking (#21981) Signed-off-by: Seiji Eicher --- vllm/benchmarks/lib/endpoint_request_func.py | 469 +++++++++---------- vllm/benchmarks/lib/ready_checker.py | 4 +- vllm/benchmarks/serve.py | 40 +- 3 files changed, 271 insertions(+), 242 deletions(-) diff --git a/vllm/benchmarks/lib/endpoint_request_func.py b/vllm/benchmarks/lib/endpoint_request_func.py index 60ae520db3862..2d64cc115f00f 100644 --- a/vllm/benchmarks/lib/endpoint_request_func.py +++ b/vllm/benchmarks/lib/endpoint_request_func.py @@ -50,6 +50,7 @@ class RequestFuncOutput: async def async_request_openai_completions( request_func_input: RequestFuncInput, + session: aiohttp.ClientSession, pbar: Optional[tqdm] = None, ) -> RequestFuncOutput: """The async request function for the OpenAI Completions API. @@ -66,96 +67,94 @@ async def async_request_openai_completions( ("completions", "profile") ), "OpenAI Completions API URL must end with 'completions' or 'profile'." - async with aiohttp.ClientSession(trust_env=True, - timeout=AIOHTTP_TIMEOUT) as session: - payload = { - "model": request_func_input.model_name \ - if request_func_input.model_name else request_func_input.model, - "prompt": request_func_input.prompt, - "temperature": 0.0, - "repetition_penalty": 1.0, - "max_tokens": request_func_input.output_len, - "logprobs": request_func_input.logprobs, - "stream": True, - "stream_options": { - "include_usage": True, - }, - } - if request_func_input.ignore_eos: - payload["ignore_eos"] = request_func_input.ignore_eos - if request_func_input.extra_body: - payload.update(request_func_input.extra_body) - headers = { - "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}" - } + payload = { + "model": request_func_input.model_name \ + if request_func_input.model_name else request_func_input.model, + "prompt": request_func_input.prompt, + "temperature": 0.0, + "repetition_penalty": 1.0, + "max_tokens": request_func_input.output_len, + "logprobs": request_func_input.logprobs, + "stream": True, + "stream_options": { + "include_usage": True, + }, + } + if request_func_input.ignore_eos: + payload["ignore_eos"] = request_func_input.ignore_eos + if request_func_input.extra_body: + payload.update(request_func_input.extra_body) + headers = { + "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}" + } - output = RequestFuncOutput() - output.prompt_len = request_func_input.prompt_len + output = RequestFuncOutput() + output.prompt_len = request_func_input.prompt_len - generated_text = "" - st = time.perf_counter() - most_recent_timestamp = st - try: - async with session.post(url=api_url, json=payload, - headers=headers) as response: - if response.status == 200: - first_chunk_received = False - async for chunk_bytes in response.content: - chunk_bytes = chunk_bytes.strip() - if not chunk_bytes: - continue - chunk_bytes = chunk_bytes.decode("utf-8") - # NOTE: SSE comments (often used as pings) start with - # a colon. These are not JSON data payload and should - # be skipped. - if chunk_bytes.startswith(":"): - continue + generated_text = "" + st = time.perf_counter() + most_recent_timestamp = st + try: + async with session.post(url=api_url, json=payload, + headers=headers) as response: + if response.status == 200: + first_chunk_received = False + async for chunk_bytes in response.content: + chunk_bytes = chunk_bytes.strip() + if not chunk_bytes: + continue + chunk_bytes = chunk_bytes.decode("utf-8") + # NOTE: SSE comments (often used as pings) start with + # a colon. These are not JSON data payload and should + # be skipped. + if chunk_bytes.startswith(":"): + continue - chunk = chunk_bytes.removeprefix("data: ") + chunk = chunk_bytes.removeprefix("data: ") - if chunk != "[DONE]": - data = json.loads(chunk) + if chunk != "[DONE]": + data = json.loads(chunk) - # NOTE: Some completion API might have a last - # usage summary response without a token so we - # want to check a token was generated - if choices := data.get("choices"): - # Note that text could be empty here - # e.g. for special tokens - text = choices[0].get("text") - timestamp = time.perf_counter() - # First token - if not first_chunk_received: - first_chunk_received = True - ttft = time.perf_counter() - st - output.ttft = ttft + # NOTE: Some completion API might have a last + # usage summary response without a token so we + # want to check a token was generated + if choices := data.get("choices"): + # Note that text could be empty here + # e.g. for special tokens + text = choices[0].get("text") + timestamp = time.perf_counter() + # First token + if not first_chunk_received: + first_chunk_received = True + ttft = time.perf_counter() - st + output.ttft = ttft - # Decoding phase - else: - output.itl.append(timestamp - - most_recent_timestamp) + # Decoding phase + else: + output.itl.append(timestamp - + most_recent_timestamp) - most_recent_timestamp = timestamp - generated_text += text or "" - elif usage := data.get("usage"): - output.output_tokens = usage.get( - "completion_tokens") - if first_chunk_received: - output.success = True - else: - output.success = False - output.error = ( - "Never received a valid chunk to calculate TTFT." - "This response will be marked as failed!") - output.generated_text = generated_text - output.latency = most_recent_timestamp - st + most_recent_timestamp = timestamp + generated_text += text or "" + elif usage := data.get("usage"): + output.output_tokens = usage.get( + "completion_tokens") + if first_chunk_received: + output.success = True else: - output.error = response.reason or "" output.success = False - except Exception: - output.success = False - exc_info = sys.exc_info() - output.error = "".join(traceback.format_exception(*exc_info)) + output.error = ( + "Never received a valid chunk to calculate TTFT." + "This response will be marked as failed!") + output.generated_text = generated_text + output.latency = most_recent_timestamp - st + else: + output.error = response.reason or "" + output.success = False + except Exception: + output.success = False + exc_info = sys.exc_info() + output.error = "".join(traceback.format_exception(*exc_info)) if pbar: pbar.update(1) @@ -164,45 +163,158 @@ async def async_request_openai_completions( async def async_request_openai_chat_completions( request_func_input: RequestFuncInput, + session: aiohttp.ClientSession, pbar: Optional[tqdm] = None, ) -> RequestFuncOutput: api_url = request_func_input.api_url assert api_url.endswith(("chat/completions", "profile")), ( "OpenAI Chat Completions API URL must end with 'chat/completions'.") - async with aiohttp.ClientSession(trust_env=True, - timeout=AIOHTTP_TIMEOUT) as session: - content = [{"type": "text", "text": request_func_input.prompt}] - if request_func_input.multi_modal_content: - content.append(request_func_input.multi_modal_content) - payload = { - "model": - request_func_input.model_name - if request_func_input.model_name else request_func_input.model, - "messages": [ - { - "role": "user", - "content": content - }, - ], - "temperature": - 0.0, - "max_completion_tokens": - request_func_input.output_len, - "stream": - True, - "stream_options": { - "include_usage": True, + content = [{"type": "text", "text": request_func_input.prompt}] + if request_func_input.multi_modal_content: + content.append(request_func_input.multi_modal_content) + payload = { + "model": + request_func_input.model_name + if request_func_input.model_name else request_func_input.model, + "messages": [ + { + "role": "user", + "content": content }, - } - if request_func_input.ignore_eos: - payload["ignore_eos"] = request_func_input.ignore_eos - if request_func_input.extra_body: - payload.update(request_func_input.extra_body) - headers = { - "Content-Type": "application/json", - "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}", - } + ], + "temperature": + 0.0, + "max_completion_tokens": + request_func_input.output_len, + "stream": + True, + "stream_options": { + "include_usage": True, + }, + } + if request_func_input.ignore_eos: + payload["ignore_eos"] = request_func_input.ignore_eos + if request_func_input.extra_body: + payload.update(request_func_input.extra_body) + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}", + } + + output = RequestFuncOutput() + output.prompt_len = request_func_input.prompt_len + + generated_text = "" + ttft = 0.0 + st = time.perf_counter() + most_recent_timestamp = st + try: + async with session.post(url=api_url, json=payload, + headers=headers) as response: + if response.status == 200: + async for chunk_bytes in response.content: + chunk_bytes = chunk_bytes.strip() + if not chunk_bytes: + continue + chunk_bytes = chunk_bytes.decode("utf-8") + # NOTE: SSE comments (often used as pings) start with + # a colon. These are not JSON data payload and should + # be skipped. + if chunk_bytes.startswith(":"): + continue + + chunk = chunk_bytes.removeprefix("data: ") + + if chunk != "[DONE]": + timestamp = time.perf_counter() + data = json.loads(chunk) + + if choices := data.get("choices"): + content = choices[0]["delta"].get("content") + # First token + if ttft == 0.0: + ttft = timestamp - st + output.ttft = ttft + + # Decoding phase + else: + output.itl.append(timestamp - + most_recent_timestamp) + + generated_text += content or "" + elif usage := data.get("usage"): + output.output_tokens = usage.get( + "completion_tokens") + + most_recent_timestamp = timestamp + + output.generated_text = generated_text + output.success = True + output.latency = most_recent_timestamp - st + else: + output.error = response.reason or "" + output.success = False + except Exception: + output.success = False + exc_info = sys.exc_info() + output.error = "".join(traceback.format_exception(*exc_info)) + + if pbar: + pbar.update(1) + return output + + +async def async_request_openai_audio( + request_func_input: RequestFuncInput, + session: aiohttp.ClientSession, + pbar: Optional[tqdm] = None, +) -> RequestFuncOutput: + # Lazy import without PlaceholderModule to avoid vllm dep. + import soundfile + + api_url = request_func_input.api_url + assert api_url.endswith(("transcriptions", "translations")), ( + "OpenAI Chat Completions API URL must end with 'transcriptions' ") + "or `translations`." + + content = [{"type": "text", "text": request_func_input.prompt}] + payload = { + "model": + request_func_input.model_name + if request_func_input.model_name else request_func_input.model, + "temperature": + 0.0, + "max_completion_tokens": + request_func_input.output_len, + "stream": + True, + "language": + "en", + # Flattened due to multipart/form-data + "stream_include_usage": + True, + "stream_continuous_usage_stats": + True, + } + if request_func_input.extra_body: + payload.update(request_func_input.extra_body) + headers = { + "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}", + } + + # Send audio file + def to_bytes(y, sr): + buffer = io.BytesIO() + soundfile.write(buffer, y, sr, format="WAV") + buffer.seek(0) + return buffer + + with to_bytes(*request_func_input.multi_modal_content["audio"]) as f: + form = aiohttp.FormData() + form.add_field("file", f, content_type="audio/wav") + for key, value in payload.items(): + form.add_field(key, str(value)) output = RequestFuncOutput() output.prompt_len = request_func_input.prompt_len @@ -212,28 +324,24 @@ async def async_request_openai_chat_completions( st = time.perf_counter() most_recent_timestamp = st try: - async with session.post(url=api_url, json=payload, + async with session.post(url=api_url, + data=form, headers=headers) as response: if response.status == 200: async for chunk_bytes in response.content: chunk_bytes = chunk_bytes.strip() if not chunk_bytes: continue - chunk_bytes = chunk_bytes.decode("utf-8") - # NOTE: SSE comments (often used as pings) start with - # a colon. These are not JSON data payload and should - # be skipped. - if chunk_bytes.startswith(":"): - continue - - chunk = chunk_bytes.removeprefix("data: ") + chunk = chunk_bytes.decode("utf-8").removeprefix( + "data: ") if chunk != "[DONE]": timestamp = time.perf_counter() data = json.loads(chunk) if choices := data.get("choices"): - content = choices[0]["delta"].get("content") + content = choices[0]["delta"].get( + "content") # First token if ttft == 0.0: ttft = timestamp - st @@ -241,8 +349,8 @@ async def async_request_openai_chat_completions( # Decoding phase else: - output.itl.append(timestamp - - most_recent_timestamp) + output.itl.append( + timestamp - most_recent_timestamp) generated_text += content or "" elif usage := data.get("usage"): @@ -267,117 +375,6 @@ async def async_request_openai_chat_completions( return output -async def async_request_openai_audio( - request_func_input: RequestFuncInput, - pbar: Optional[tqdm] = None, -) -> RequestFuncOutput: - # Lazy import without PlaceholderModule to avoid vllm dep. - import soundfile - - api_url = request_func_input.api_url - assert api_url.endswith(("transcriptions", "translations")), ( - "OpenAI Chat Completions API URL must end with 'transcriptions' ") - "or `translations`." - - async with aiohttp.ClientSession(trust_env=True, - timeout=AIOHTTP_TIMEOUT) as session: - content = [{"type": "text", "text": request_func_input.prompt}] - payload = { - "model": - request_func_input.model_name - if request_func_input.model_name else request_func_input.model, - "temperature": - 0.0, - "max_completion_tokens": - request_func_input.output_len, - "stream": - True, - "language": - "en", - # Flattened due to multipart/form-data - "stream_include_usage": - True, - "stream_continuous_usage_stats": - True, - } - if request_func_input.extra_body: - payload.update(request_func_input.extra_body) - headers = { - "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}", - } - - # Send audio file - def to_bytes(y, sr): - buffer = io.BytesIO() - soundfile.write(buffer, y, sr, format="WAV") - buffer.seek(0) - return buffer - - with to_bytes(*request_func_input.multi_modal_content["audio"]) as f: - form = aiohttp.FormData() - form.add_field("file", f, content_type="audio/wav") - for key, value in payload.items(): - form.add_field(key, str(value)) - - output = RequestFuncOutput() - output.prompt_len = request_func_input.prompt_len - - generated_text = "" - ttft = 0.0 - st = time.perf_counter() - most_recent_timestamp = st - try: - async with session.post(url=api_url, - data=form, - headers=headers) as response: - if response.status == 200: - async for chunk_bytes in response.content: - chunk_bytes = chunk_bytes.strip() - if not chunk_bytes: - continue - - chunk = chunk_bytes.decode("utf-8").removeprefix( - "data: ") - if chunk != "[DONE]": - timestamp = time.perf_counter() - data = json.loads(chunk) - - if choices := data.get("choices"): - content = choices[0]["delta"].get( - "content") - # First token - if ttft == 0.0: - ttft = timestamp - st - output.ttft = ttft - - # Decoding phase - else: - output.itl.append( - timestamp - most_recent_timestamp) - - generated_text += content or "" - elif usage := data.get("usage"): - output.output_tokens = usage.get( - "completion_tokens") - - most_recent_timestamp = timestamp - - output.generated_text = generated_text - output.success = True - output.latency = most_recent_timestamp - st - else: - output.error = response.reason or "" - output.success = False - except Exception: - output.success = False - exc_info = sys.exc_info() - output.error = "".join(traceback.format_exception(*exc_info)) - - if pbar: - pbar.update(1) - return output - - # TODO: Add more request functions for different API protocols. ASYNC_REQUEST_FUNCS = { "vllm": async_request_openai_completions, diff --git a/vllm/benchmarks/lib/ready_checker.py b/vllm/benchmarks/lib/ready_checker.py index a663f85b629d2..7e836158386a9 100644 --- a/vllm/benchmarks/lib/ready_checker.py +++ b/vllm/benchmarks/lib/ready_checker.py @@ -14,6 +14,7 @@ from .endpoint_request_func import RequestFuncInput, RequestFuncOutput async def wait_for_endpoint( request_func, test_input: RequestFuncInput, + session: aiohttp.ClientSession, timeout_seconds: int = 600, retry_interval: int = 5, ) -> RequestFuncOutput: @@ -55,7 +56,8 @@ async def wait_for_endpoint( # ping the endpoint using request_func try: - output = await request_func(request_func_input=test_input) + output = await request_func( + request_func_input=test_input, session=session) if output.success: pbar.close() return output diff --git a/vllm/benchmarks/serve.py b/vllm/benchmarks/serve.py index 45798547ac719..ca8d218581e77 100644 --- a/vllm/benchmarks/serve.py +++ b/vllm/benchmarks/serve.py @@ -28,6 +28,7 @@ from dataclasses import dataclass from datetime import datetime from typing import Any, Literal, Optional +import aiohttp import numpy as np from tqdm.asyncio import tqdm from transformers import PreTrainedTokenizerBase @@ -338,6 +339,24 @@ async def benchmark( else: raise ValueError(f"Unknown endpoint_type: {endpoint_type}") + # Reuses connections across requests to reduce TLS handshake overhead. + connector = aiohttp.TCPConnector( + limit=max_concurrency or 0, + limit_per_host=max_concurrency or 0, + ttl_dns_cache=300, + use_dns_cache=True, + keepalive_timeout=60, + enable_cleanup_closed=True, + force_close=False, + ssl=("https://" in api_url), + ) + + session = aiohttp.ClientSession( + connector=connector, + trust_env=True, + timeout=aiohttp.ClientTimeout(total=6 * 60 * 60), + ) + print("Starting initial single prompt test run...") test_prompt, test_prompt_len, test_output_len, test_mm_content = ( input_requests[0].prompt, @@ -361,7 +380,11 @@ async def benchmark( ) test_output = await wait_for_endpoint( - request_func, test_input, timeout_seconds=ready_check_timeout_sec) + request_func, + test_input, + session, + timeout_seconds=ready_check_timeout_sec, + ) if not test_output.success: raise ValueError( "Initial test run failed - Please make sure benchmark arguments " @@ -386,7 +409,8 @@ async def benchmark( multi_modal_content=test_mm_content, ignore_eos=ignore_eos, extra_body=extra_body) - profile_output = await request_func(request_func_input=profile_input) + profile_output = await request_func( + request_func_input=profile_input, session=session) if profile_output.success: print("Profiler started") @@ -412,12 +436,14 @@ async def benchmark( semaphore = (asyncio.Semaphore(max_concurrency) if max_concurrency else None) - async def limited_request_func(request_func_input, pbar): + async def limited_request_func(request_func_input, session, pbar): if semaphore is None: return await request_func(request_func_input=request_func_input, + session=session, pbar=pbar) async with semaphore: - return await request_func(request_func_input=request_func_input, + return await request_func(request_func_input=request_func_input, + session=session, pbar=pbar) benchmark_start_time = time.perf_counter() @@ -469,6 +495,7 @@ async def benchmark( tasks.append( asyncio.create_task( limited_request_func(request_func_input=request_func_input, + session=session, pbar=pbar))) outputs: list[RequestFuncOutput] = await asyncio.gather(*tasks) @@ -580,9 +607,12 @@ async def benchmark( output_len=test_output_len, logprobs=logprobs, ) - profile_output = await request_func(request_func_input=profile_input) + profile_output = await request_func( + request_func_input=profile_input, session=session) if profile_output.success: print("Profiler stopped") + + await session.close() return result