diff --git a/benchmarks/benchmark_one_concurrent.py b/benchmarks/benchmark_one_concurrent.py new file mode 100644 index 0000000000000..0e4cadbbcd2a6 --- /dev/null +++ b/benchmarks/benchmark_one_concurrent.py @@ -0,0 +1,362 @@ +# SPDX-License-Identifier: Apache-2.0 +import argparse +import asyncio +import logging +import random +import time +from dataclasses import dataclass +from typing import Optional + +import aiohttp # Import aiohttp +import numpy as np +from tqdm import tqdm + +from backend_request_func import RequestFuncInput, RequestFuncOutput +from benchmark_dataset import RandomDataset, SampleRequest + +try: + from vllm.transformers_utils.tokenizer import get_tokenizer +except ImportError: + from backend_request_func import get_tokenizer + +logger = logging.getLogger(__name__) + + +@dataclass +class BenchmarkMetrics: + completed: int + total_input: int + total_output: int + mean_ttft_ms: float + median_ttft_ms: float + std_ttft_ms: float + percentiles_ttft_ms: list[tuple[float, float]] + mean_itl_ms: float + median_itl_ms: float + std_itl_ms: float + percentiles_itl_ms: list[tuple[float, float]] + mean_e2el_ms: float + median_e2el_ms: float + std_e2el_ms: float + percentiles_e2el_ms: list[tuple[float, float]] + + +async def reset_cache(reset_url: str): + """Sends a POST request to reset the prefix cache.""" + logger.debug("Resetting prefix cache at %s", reset_url) + try: + async with ( + aiohttp.ClientSession() as session, + session.post(reset_url) as response, + ): + response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx) + logger.debug("Prefix cache reset successful: %s", response.status) + except aiohttp.ClientConnectorError as e: + logger.error("Failed to connect to cache reset endpoint %s: %s}", reset_url, e) + except aiohttp.ClientResponseError as e: + logger.error( + "Cache reset request failed with status %s: %s", e.status, e.message + ) + except Exception as e: + logger.error("An unexpected error occurred during cache reset: %s", e) + + +async def sequential_benchmark( + backend: str, + api_url: str, + model_id: str, + tokenizer, + input_requests: list[SampleRequest], + request_func, + selected_percentiles: list[float], + cache_reset_url: Optional[str] = None, +): + """ + Benchmark that processes requests sequentially, waiting for each to complete + before starting the next one. Resets prefix cache between requests. + """ + outputs = [] + + pbar = tqdm(total=len(input_requests)) + + benchmark_start_time = time.perf_counter() + + # Process requests sequentially + for request in input_requests: + prompt, prompt_len, output_len = ( + request.prompt, + request.prompt_len, + request.expected_output_len, + ) + + logger.info("Sending request with len %s", request.prompt_len) + logger.debug('Request str: "%s"', request.prompt[:50]) + request_start_time = time.perf_counter() + + request_func_input = RequestFuncInput( + model=model_id, + prompt=prompt, + api_url=api_url, + prompt_len=prompt_len, + output_len=output_len, + ) + + output = await request_func(request_func_input=request_func_input) + + request_end_time = time.perf_counter() + # Add timing information + if output.success and not hasattr(output, "latency"): + output.latency = request_end_time - request_start_time + logger.info("Finished request with latency %.4f s", output.latency) + + outputs.append(output) + pbar.update(1) + + pbar.close() + + benchmark_duration = time.perf_counter() - benchmark_start_time + + # Calculate metrics + metrics = calculate_metrics( + input_requests=input_requests, + outputs=outputs, + dur_s=benchmark_duration, + tokenizer=tokenizer, + selected_percentiles=selected_percentiles, + ) + + print_results(metrics, benchmark_duration) + + result = { + "duration": benchmark_duration, + "completed": metrics.completed, + "total_input_tokens": metrics.total_input, + "total_output_tokens": metrics.total_output, + "input_lens": [request.prompt_len for request in input_requests], + "output_lens": [ + output.output_tokens if output.success else 0 for output in outputs + ], + "ttfts": [output.ttft for output in outputs if output.success], + "itls": [output.itl for output in outputs if output.success], + "generated_texts": [ + output.generated_text for output in outputs if output.success + ], + "errors": [output.error for output in outputs if not output.success], + } + + # Add summary statistics + for stat_name in ["ttft", "itl", "e2el"]: + for metric_name in ["mean", "median", "std"]: + result[f"{metric_name}_{stat_name}_ms"] = getattr( + metrics, f"{metric_name}_{stat_name}_ms" + ) + + for p, value in getattr(metrics, f"percentiles_{stat_name}_ms"): + p_word = str(int(p)) if int(p) == p else str(p) + result[f"p{p_word}_{stat_name}_ms"] = value + + return result + + +def calculate_metrics( + input_requests: list[SampleRequest], + outputs: list[RequestFuncOutput], + dur_s: float, + tokenizer, + selected_percentiles: list[float], +) -> BenchmarkMetrics: + """Calculate benchmark metrics from results.""" + total_input = 0 + completed = 0 + total_output = 0 + ttfts = [] + itls = [] + e2els = [] + + for i, output in enumerate(outputs): + if output.success: + output_len = output.output_tokens + + if not output_len: + # Use tokenizer to count output tokens if not provided + output_len = len( + tokenizer(output.generated_text, add_special_tokens=False).input_ids + ) + + total_output += output_len + total_input += input_requests[i].prompt_len + + if hasattr(output, "ttft") and output.ttft is not None: + ttfts.append(output.ttft) + + if hasattr(output, "itl") and output.itl: + # Ensure itl is a list of floats + if isinstance(output.itl, list): + itls.extend(output.itl) + else: + logger.warning( + "Expected list for ITL but got %s. Appending as is.", + type(output.itl), + ) + itls.append(output.itl) + + if hasattr(output, "latency") and output.latency is not None: + e2els.append(output.latency) + + completed += 1 + + return BenchmarkMetrics( + completed=completed, + total_input=total_input, + total_output=total_output, + mean_ttft_ms=np.mean(ttfts or [0]) * 1000, + median_ttft_ms=np.median(ttfts or [0]) * 1000, + std_ttft_ms=np.std(ttfts or [0]) * 1000, + percentiles_ttft_ms=[ + (p, np.percentile(ttfts or [0], p) * 1000) for p in selected_percentiles + ], + mean_itl_ms=np.mean(itls or [0]) * 1000, + median_itl_ms=np.median(itls or [0]) * 1000, + std_itl_ms=np.std(itls or [0]) * 1000, + percentiles_itl_ms=[ + (p, np.percentile(itls or [0], p) * 1000) for p in selected_percentiles + ], + mean_e2el_ms=np.mean(e2els or [0]) * 1000, + median_e2el_ms=np.median(e2els or [0]) * 1000, + std_e2el_ms=np.std(e2els or [0]) * 1000, + percentiles_e2el_ms=[ + (p, np.percentile(e2els or [0], p) * 1000) for p in selected_percentiles + ], + ) + + +def print_results(metrics: BenchmarkMetrics, benchmark_duration: float): + """Print benchmark results in a formatted way.""" + print("{s:{c}^{n}}".format(s=" Sequential Benchmark Result ", n=60, c="=")) + print("{:<40} {:<10}".format("Successful requests:", metrics.completed)) + print("{:<40} {:<10.2f}".format("Benchmark duration (s):", benchmark_duration)) + print("{:<40} {:<10}".format("Total input tokens:", metrics.total_input)) + print("{:<40} {:<10}".format("Total generated tokens:", metrics.total_output)) + + def print_metric_stats(metric_name, header): + print("{s:{c}^{n}}".format(s=header, n=60, c="-")) + print( + "{:<40} {:<10.2f}".format( + f"Mean {metric_name} (ms):", + getattr(metrics, f"mean_{metric_name.lower()}_ms"), + ) + ) + print( + "{:<40} {:<10.2f}".format( + f"Median {metric_name} (ms):", + getattr(metrics, f"median_{metric_name.lower()}_ms"), + ) + ) + + for p, value in getattr(metrics, f"percentiles_{metric_name.lower()}_ms"): + p_word = str(int(p)) if int(p) == p else str(p) + print("{:<40} {:<10.2f}".format(f"P{p_word} {metric_name} (ms):", value)) + + print_metric_stats("TTFT", "Time to First Token") + print_metric_stats("ITL", "Inter-token Latency") + print_metric_stats("E2EL", "End-to-end Latency") + print("=" * 60) + + +async def main_async(args): + # Import needed functions based on your setup + from backend_request_func import ASYNC_REQUEST_FUNCS + + backend = args.backend + model_id = args.model + tokenizer_id = args.tokenizer if args.tokenizer is not None else args.model + + # Set up API URL + if args.base_url is not None: + api_url = f"{args.base_url}{args.endpoint}" + else: + api_url = f"http://{args.host}:{args.port}{args.endpoint}" + + # Set up Cache Reset URL + cache_reset_url = f"http://{args.host}:{args.port}/reset_prefix_cache" + logger.info("Prefix cache reset configured at: %s", cache_reset_url) + + # Get tokenizer + tokenizer = get_tokenizer(tokenizer_id, trust_remote_code=args.trust_remote_code) + + # Get request function + if backend in ASYNC_REQUEST_FUNCS: + request_func = ASYNC_REQUEST_FUNCS[backend] + else: + raise ValueError(f"Unknown backend: {backend}") + + input_requests = RandomDataset().sample( + tokenizer=tokenizer, + num_requests=args.num_requests, + prefix_len=0, + input_len=args.input_len, + output_len=args.output_len, + range_ratio=0.0, + ) + + # Run benchmark + result = await sequential_benchmark( + backend=backend, + api_url=api_url, + model_id=model_id, + tokenizer=tokenizer, + input_requests=input_requests, + request_func=request_func, + selected_percentiles=[50, 90, 95, 99], + cache_reset_url=cache_reset_url, + ) + + return result + + +def main(args): + print(args) + random.seed(args.seed) + np.random.seed(args.seed) + + asyncio.run(main_async(args)) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Sequential benchmark for LLM serving") + parser.add_argument( + "--backend", type=str, default="vllm", help="Backend to use for requests" + ) + parser.add_argument( + "--base-url", + type=str, + default=None, + help="Server base URL (overrides --host and --port)", + ) + parser.add_argument("--host", type=str, default="127.0.0.1") + parser.add_argument("--port", type=int, default=8000) + parser.add_argument( + "--endpoint", type=str, default="/v1/completions", help="API endpoint" + ) + parser.add_argument("--model", type=str, required=True, help="Name of the model") + parser.add_argument( + "--tokenizer", type=str, help="Name of the tokenizer (defaults to model name)" + ) + parser.add_argument( + "--num-requests", type=int, default=100, help="Number of requests to process" + ) + parser.add_argument( + "--input-len", type=int, default=128, help="Input len for generated prompts" + ) + parser.add_argument( + "--output-len", type=int, default=None, help="Override output len for requests" + ) + parser.add_argument("--seed", type=int, default=42) + parser.add_argument( + "--trust-remote-code", + action="store_true", + help="Trust remote code from HuggingFace", + ) + + args = parser.parse_args() + main(args) diff --git a/tools/pd_disagg/Justfile b/tools/pd_disagg/Justfile new file mode 100644 index 0000000000000..27bf415521eec --- /dev/null +++ b/tools/pd_disagg/Justfile @@ -0,0 +1,82 @@ +# Needed for the proxy server +vllm-directory := "/home/rshaw/vllm/" + +PREFILL_GPU := "0" +DECODE_GPU := "2" +MODEL := "meta-llama/Llama-3.1-8B-Instruct" +PROXY_PORT := "8192" +PREFILL_PORT := "8100" +DECODE_PORT := "8200" +PREFILL_NIXL_SIDE_CHANNEL_PORT := "5557" +DECODE_NIXL_SIDE_CHANNEL_PORT := "5558" + +prefill: + VLLM_NIXL_SIDE_CHANNEL_PORT={{PREFILL_NIXL_SIDE_CHANNEL_PORT}} \ + CUDA_VISIBLE_DEVICES={{PREFILL_GPU}} \ + vllm serve {{MODEL}} \ + --port {{PREFILL_PORT}} \ + --tensor-parallel-size 1 \ + --enforce-eager \ + --disable-log-requests \ + --block-size 128 \ + --kv-transfer-config '{"kv_connector":"NixlConnector","kv_role":"kv_both"}' + +decode: + VLLM_NIXL_SIDE_CHANNEL_PORT={{DECODE_NIXL_SIDE_CHANNEL_PORT}} \ + CUDA_VISIBLE_DEVICES={{DECODE_GPU}} \ + vllm serve {{MODEL}} \ + --port {{DECODE_PORT}} \ + --tensor-parallel-size 1 \ + --enforce-eager \ + --disable-log-requests \ + --block-size 128 \ + --kv-transfer-config '{"kv_connector":"NixlConnector","kv_role":"kv_both"}' + +proxy: + python "{{vllm-directory}}tests/v1/kv_connector/nixl_integration/toy_proxy_server.py" \ + --port {{PROXY_PORT}} \ + --prefiller-port {{PREFILL_PORT}} \ + --decoder-port {{DECODE_PORT}} + +send_request: + curl -X POST http://localhost:{{PROXY_PORT}}/v1/completions \ + -H "Content-Type: application/json" \ + -d '{ \ + "model": "{{MODEL}}", \ + "prompt": "Red Hat is the best open source company by far across Linux, K8s, and AI, and vLLM has the greatest community in open source AI software infrastructure. I love vLLM because", \ + "max_tokens": 150, \ + "temperature": 0.7 \ + }' + +benchmark NUM_PROMPTS: + python {{vllm-directory}}/benchmarks/benchmark_serving.py \ + --port {{PROXY_PORT}} \ + --model {{MODEL}} \ + --dataset-name random \ + --random-input-len 30000 \ + --random-output-len 10 \ + --num-prompts {{NUM_PROMPTS}} \ + --seed $(date +%s) \ + +benchmark_one INPUT_LEN: + python {{vllm-directory}}benchmarks/benchmark_one_concurrent_req.py \ + --port {{PROXY_PORT}} \ + --model {{MODEL}} \ + --input-len {{INPUT_LEN}} \ + --output-len 1 \ + --num-requests 10 \ + --seed $(date +%s) + +benchmark_one_no_pd INPUT_LEN: + python {{vllm-directory}}benchmarks/benchmark_one_concurrent_req.py \ + --port {{DECODE_PORT}} \ + --model {{MODEL}} \ + --input-len {{INPUT_LEN}} \ + --output-len 1 \ + --num-requests 10 \ + --seed $(date +%s) + +eval: + lm_eval --model local-completions --tasks gsm8k \ + --model_args model={{MODEL}},base_url=http://127.0.0.1:{{PROXY_PORT}}/v1/completions,num_concurrent=100,max_retries=3,tokenized_requests=False \ + --limit 1000 diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 56ae1acf8571f..29a1b66408ddc 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -1040,10 +1040,14 @@ class NixlConnectorWorker: remote_xfer_side_handle, remote_block_descs_ids, notif_msg=notif_id, + skip_desc_merge=True, ) # Begin async xfer. + start = time.perf_counter() self.nixl_wrapper.transfer(handle) + end = time.perf_counter() + logger.info("========== TRANSFER: %s ==========", end - start) # Use handle to check completion in future step(). # TODO (NickLucche) surface xfer elapsed time