diff --git a/tests/entrypoints/openai/test_orca_metrics.py b/tests/entrypoints/openai/test_orca_metrics.py new file mode 100644 index 0000000000000..d32cfde07c21e --- /dev/null +++ b/tests/entrypoints/openai/test_orca_metrics.py @@ -0,0 +1,128 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +import openai +import pytest +import pytest_asyncio + +from ...utils import RemoteOpenAIServer + +# any model with a chat template should work here +MODEL_NAME = "HuggingFaceH4/zephyr-7b-beta" + + +@pytest.fixture(scope="module") +def monkeypatch_module(): + from _pytest.monkeypatch import MonkeyPatch + + mpatch = MonkeyPatch() + yield mpatch + mpatch.undo() + + +@pytest.fixture(scope="module", params=[True]) +def server(request, monkeypatch_module): + use_v1 = request.param + monkeypatch_module.setenv("VLLM_USE_V1", "1" if use_v1 else "0") + + args = [ + "--dtype", + "bfloat16", + "--max-model-len", + "8192", + "--enforce-eager", + ] + + with RemoteOpenAIServer(MODEL_NAME, args) as remote_server: + yield remote_server + + +@pytest_asyncio.fixture +async def client(server): + async with server.get_async_client() as async_client: + yield async_client + + +@pytest.mark.asyncio +async def test_chat_completion_with_orca_header(server: RemoteOpenAIServer): + messages = [ + {"role": "system", "content": "you are a helpful assistant"}, + {"role": "user", "content": "what is 1+1?"}, + ] + + client = openai.OpenAI( + api_key="EMPTY", + base_url=f"http://localhost:{server.port}/v1", + default_headers={"endpoint-load-metrics-format": "TEXT"}, + ) + + # 1. Use raw client to get response headers. + raw_client = client.with_raw_response + + # 2. Make the API call using the raw_client + response_with_raw = raw_client.chat.completions.create( + model=MODEL_NAME, + messages=messages, + extra_headers={"endpoint-load-metrics-format": "TEXT"}, + ) + + # 3. Access the raw httpx.Response object + raw_http_response = response_with_raw.http_response + + # 4. Get the headers from the httpx.Response object + response_headers = raw_http_response.headers + + assert "endpoint-load-metrics" in response_headers + + +@pytest.mark.asyncio +async def test_completion_with_orca_header(client: openai.AsyncOpenAI): + # 1. Use raw client to get response headers. + raw_client = client.with_raw_response + + # 2. Make the API call using the raw_client + completion = await raw_client.completions.create( + model=MODEL_NAME, + prompt="Hello, my name is", + max_tokens=5, + extra_headers={"endpoint-load-metrics-format": "JSON"}, + ) + + # 3. Access the raw httpx.Response object + raw_http_response = completion.http_response + + # 4. Get the headers from the httpx.Response object + response_headers = raw_http_response.headers + + assert "endpoint-load-metrics" in response_headers + + +@pytest.mark.asyncio +async def test_single_completion(client: openai.AsyncOpenAI): + completion = await client.completions.create( + model=MODEL_NAME, + prompt="Hello, my name is", + max_tokens=5, + extra_headers={"endpoint-load-metrics-format": "JSON"}, + temperature=0.0, + ) + + assert completion.id is not None + assert completion.choices is not None and len(completion.choices) == 1 + + choice = completion.choices[0] + assert len(choice.text) >= 5 + assert choice.finish_reason == "length" + assert completion.usage == openai.types.CompletionUsage( + completion_tokens=5, prompt_tokens=6, total_tokens=11 + ) + + # test using token IDs + completion = await client.completions.create( + model=MODEL_NAME, + prompt=[0, 0, 0, 0, 0], + max_tokens=5, + temperature=0.0, + ) + assert len(completion.choices[0].text) >= 1 + assert completion.choices[0].prompt_logprobs is None diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 22b5584749ae7..c37aba2776aeb 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -51,6 +51,7 @@ from vllm.entrypoints.anthropic.serving_messages import AnthropicServingMessages from vllm.entrypoints.launcher import serve_http from vllm.entrypoints.logger import RequestLogger from vllm.entrypoints.openai.cli_args import make_arg_parser, validate_parsed_serve_args +from vllm.entrypoints.openai.orca_metrics import metrics_header from vllm.entrypoints.openai.protocol import ( ChatCompletionRequest, ChatCompletionResponse, @@ -128,6 +129,8 @@ prometheus_multiproc_dir: tempfile.TemporaryDirectory # Cannot use __name__ (https://github.com/vllm-project/vllm/pull/4765) logger = init_logger("vllm.entrypoints.openai.api_server") +ENDPOINT_LOAD_METRICS_FORMAT_HEADER_LABEL = "endpoint-load-metrics-format" + _running_tasks: set[asyncio.Task] = set() @@ -672,6 +675,9 @@ async def create_messages(request: AnthropicMessagesRequest, raw_request: Reques @with_cancellation @load_aware_call async def create_chat_completion(request: ChatCompletionRequest, raw_request: Request): + metrics_header_format = raw_request.headers.get( + ENDPOINT_LOAD_METRICS_FORMAT_HEADER_LABEL, "" + ) handler = chat(raw_request) if handler is None: return base(raw_request).create_error_response( @@ -689,7 +695,10 @@ async def create_chat_completion(request: ChatCompletionRequest, raw_request: Re ) elif isinstance(generator, ChatCompletionResponse): - return JSONResponse(content=generator.model_dump()) + return JSONResponse( + content=generator.model_dump(), + headers=metrics_header(metrics_header_format), + ) return StreamingResponse(content=generator, media_type="text/event-stream") @@ -707,6 +716,9 @@ async def create_chat_completion(request: ChatCompletionRequest, raw_request: Re @with_cancellation @load_aware_call async def create_completion(request: CompletionRequest, raw_request: Request): + metrics_header_format = raw_request.headers.get( + ENDPOINT_LOAD_METRICS_FORMAT_HEADER_LABEL, "" + ) handler = completion(raw_request) if handler is None: return base(raw_request).create_error_response( @@ -729,7 +741,10 @@ async def create_completion(request: CompletionRequest, raw_request: Request): content=generator.model_dump(), status_code=generator.error.code ) elif isinstance(generator, CompletionResponse): - return JSONResponse(content=generator.model_dump()) + return JSONResponse( + content=generator.model_dump(), + headers=metrics_header(metrics_header_format), + ) return StreamingResponse(content=generator, media_type="text/event-stream") diff --git a/vllm/entrypoints/openai/orca_metrics.py b/vllm/entrypoints/openai/orca_metrics.py new file mode 100644 index 0000000000000..3808262bf31f2 --- /dev/null +++ b/vllm/entrypoints/openai/orca_metrics.py @@ -0,0 +1,120 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +""" +Utility functions that create ORCA endpoint load report response headers. +""" + +import json +from collections.abc import Mapping + +from vllm.logger import init_logger +from vllm.v1.metrics.reader import Gauge, get_metrics_snapshot + +logger = init_logger(__name__) + + +def create_orca_header( + metrics_format: str, named_metrics: list[tuple[str, float]] +) -> Mapping[str, str] | None: + """ + Creates ORCA headers named 'endpoint-load-metrics' in the specified format + and adds custom metrics to named_metrics. + ORCA headers format description: https://docs.google.com/document/d/1C1ybMmDKJIVlrbOLbywhu9iRYo4rilR-cT50OTtOFTs/edit?tab=t.0 + ORCA proto https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto + + Parameters: + - metrics_format (str): The format of the header ('TEXT', 'JSON'). + - named_metrics (List[Tuple[str, float]]): List of tuples with metric names + and their corresponding double values. + + Returns: + - Optional[Mapping[str,str]]: A dictionary with header key as + 'endpoint-load-metrics' and values as the ORCA header strings with + format prefix and data in with named_metrics in. + """ + + if metrics_format.lower() not in ["text", "json"]: + logger.warning( + "Warning: `%s` format is not supported in the ORCA response header", + format, + ) + return None + + header = {} + orca_report = { + "named_metrics": { + metric_name: value + for metric_name, value in named_metrics + if isinstance(metric_name, str) and isinstance(value, float) + } + } + # output example: + # endpoint-load-metrics: TEXT named_metrics.kv_cache_utilization=0.4 + if metrics_format.lower() == "text": + native_http_header = ", ".join( + [ + f"named_metrics.{metric_name}={value}" + for metric_name, value in named_metrics + if isinstance(metric_name, str) and isinstance(value, float) + ] + ) + header["endpoint-load-metrics"] = f"TEXT {native_http_header}" + + # output example: + # endpoint-load-metrics: JSON “named_metrics”: {“custom-metric-util”: 0.4} + elif metrics_format.lower() == "json": + header["endpoint-load-metrics"] = f"JSON {json.dumps(orca_report)}" + + logger.info("Created ORCA header %s", header) + + return header + + +def get_named_metrics_from_prometheus() -> list[tuple[str, float]]: + """ + Collects current metrics from Prometheus and returns some of them + in the form of the `named_metrics` list for `create_orca_header()`. + + Parameters: + - None + + Returns: + - list[tuple[str, float]]: List of tuples of metric names and their values. + """ + named_metrics: list[tuple[str, float]] = [] + # Map from prometheus metric names to ORCA named metrics. + prometheus_to_orca_metrics = { + "vllm:kv_cache_usage_perc": "kv_cache_usage_perc", + "vllm:num_requests_waiting": "num_requests_waiting", + } + metrics = get_metrics_snapshot() + for metric in metrics: + orca_name = prometheus_to_orca_metrics.get(metric.name) + # If this metric is mapped into ORCA, then add it to the report. + # Note: Only Gauge metrics are currently supported. + if orca_name is not None and isinstance(metric, Gauge): + named_metrics.append((str(orca_name), float(metric.value))) + return named_metrics + + +def metrics_header(metrics_format: str) -> Mapping[str, str] | None: + """ + Creates ORCA headers named 'endpoint-load-metrics' in the specified format. + Metrics are collected from Prometheus using `get_named_metrics_from_prometheus()`. + + ORCA headers format description: https://docs.google.com/document/d/1C1ybMmDKJIVlrbOLbywhu9iRYo4rilR-cT50OTtOFTs/edit?tab=t.0 + ORCA proto https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto + + Parameters: + - metrics_format (str): The format of the header ('TEXT', 'JSON'). + + Returns: + - Optional[Mapping[str,str]]: A dictionary with header key as + 'endpoint-load-metrics' and values as the ORCA header strings with + format prefix and data in with named_metrics in. + """ + if not metrics_format: + return None + # Get named metrics from prometheus. + named_metrics = get_named_metrics_from_prometheus() + return create_orca_header(metrics_format, named_metrics)