[V1][Metrics] Add initial Prometheus logger (#12416)

Signed-off-by: Mark McLoughlin <markmc@redhat.com>
This commit is contained in:
Mark McLoughlin 2025-01-27 17:26:28 +00:00 committed by GitHub
parent 103bd17ac5
commit 01ba927040
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 78 additions and 10 deletions

View File

@ -16,6 +16,24 @@ from ...utils import RemoteOpenAIServer
MODEL_NAME = "TinyLlama/TinyLlama-1.1B-Chat-v1.0" MODEL_NAME = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
@pytest.fixture(scope="module", params=[True, False])
def use_v1(request):
# Module-scoped variant of run_with_both_engines
#
# Use this fixture to run a test with both v0 and v1, and
# also to conditionalize the test logic e.g.
#
# def test_metrics_exist(use_v1, server, client):
# ...
# expected = EXPECTED_V1_METRICS if use_v1 else EXPECTED_METRICS
# for metric in expected:
# assert metric in response.text
#
# @skip_v1 wouldn't work here because this is a module-level
# fixture - per-function decorators would have no effect
yield request.param
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def default_server_args(): def default_server_args():
return [ return [
@ -36,10 +54,12 @@ def default_server_args():
"--enable-chunked-prefill", "--enable-chunked-prefill",
"--disable-frontend-multiprocessing", "--disable-frontend-multiprocessing",
]) ])
def server(default_server_args, request): def server(use_v1, default_server_args, request):
if request.param: if request.param:
default_server_args.append(request.param) default_server_args.append(request.param)
with RemoteOpenAIServer(MODEL_NAME, default_server_args) as remote_server: env_dict = dict(VLLM_USE_V1='1' if use_v1 else '0')
with RemoteOpenAIServer(MODEL_NAME, default_server_args,
env_dict=env_dict) as remote_server:
yield remote_server yield remote_server
@ -84,7 +104,9 @@ EXPECTED_VALUES = {
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_metrics_counts(server: RemoteOpenAIServer, async def test_metrics_counts(server: RemoteOpenAIServer,
client: openai.AsyncClient): client: openai.AsyncClient, use_v1: bool):
if use_v1:
pytest.skip("Skipping test on vllm V1")
for _ in range(_NUM_REQUESTS): for _ in range(_NUM_REQUESTS):
# sending a request triggers the metrics to be logged. # sending a request triggers the metrics to be logged.
await client.completions.create( await client.completions.create(
@ -174,10 +196,15 @@ EXPECTED_METRICS = [
"swap_space_bytes", "swap_space_bytes",
] ]
EXPECTED_METRICS_V1 = [
"vllm:num_requests_running",
"vllm:num_requests_waiting",
]
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_metrics_exist(server: RemoteOpenAIServer, async def test_metrics_exist(server: RemoteOpenAIServer,
client: openai.AsyncClient): client: openai.AsyncClient, use_v1: bool):
# sending a request triggers the metrics to be logged. # sending a request triggers the metrics to be logged.
await client.completions.create(model=MODEL_NAME, await client.completions.create(model=MODEL_NAME,
prompt="Hello, my name is", prompt="Hello, my name is",
@ -187,11 +214,13 @@ async def test_metrics_exist(server: RemoteOpenAIServer,
response = requests.get(server.url_for("metrics")) response = requests.get(server.url_for("metrics"))
assert response.status_code == HTTPStatus.OK assert response.status_code == HTTPStatus.OK
for metric in EXPECTED_METRICS: for metric in (EXPECTED_METRICS_V1 if use_v1 else EXPECTED_METRICS):
assert metric in response.text assert metric in response.text
def test_metrics_exist_run_batch(): def test_metrics_exist_run_batch(use_v1: bool):
if use_v1:
pytest.skip("Skipping test on vllm V1")
input_batch = """{"custom_id": "request-0", "method": "POST", "url": "/v1/embeddings", "body": {"model": "intfloat/e5-mistral-7b-instruct", "input": "You are a helpful assistant."}}""" # noqa: E501 input_batch = """{"custom_id": "request-0", "method": "POST", "url": "/v1/embeddings", "body": {"model": "intfloat/e5-mistral-7b-instruct", "input": "You are a helpful assistant."}}""" # noqa: E501
base_url = "0.0.0.0" base_url = "0.0.0.0"

View File

@ -24,7 +24,8 @@ from vllm.v1.engine.core_client import EngineCoreClient
from vllm.v1.engine.output_processor import OutputProcessor from vllm.v1.engine.output_processor import OutputProcessor
from vllm.v1.engine.processor import Processor from vllm.v1.engine.processor import Processor
from vllm.v1.executor.abstract import Executor from vllm.v1.executor.abstract import Executor
from vllm.v1.metrics.loggers import LoggingStatLogger, StatLoggerBase from vllm.v1.metrics.loggers import (LoggingStatLogger, PrometheusStatLogger,
StatLoggerBase)
from vllm.v1.metrics.stats import IterationStats, SchedulerStats from vllm.v1.metrics.stats import IterationStats, SchedulerStats
logger = init_logger(__name__) logger = init_logger(__name__)
@ -46,13 +47,15 @@ class AsyncLLM(EngineClient):
assert start_engine_loop assert start_engine_loop
self.model_config = vllm_config.model_config
self.log_requests = log_requests self.log_requests = log_requests
self.log_stats = log_stats self.log_stats = log_stats
self.stat_loggers: List[StatLoggerBase] = [ self.stat_loggers: List[StatLoggerBase] = [
LoggingStatLogger(), LoggingStatLogger(),
# TODO(rob): PrometheusStatLogger(), PrometheusStatLogger(labels=dict(
model_name=self.model_config.served_model_name)),
] ]
self.model_config = vllm_config.model_config
# Tokenizer (+ ensure liveness if running in another process). # Tokenizer (+ ensure liveness if running in another process).
self.tokenizer = init_tokenizer_from_configs( self.tokenizer = init_tokenizer_from_configs(
@ -272,7 +275,7 @@ class AsyncLLM(EngineClient):
# 4) Logging. # 4) Logging.
# TODO(rob): make into a coroutine and launch it in # TODO(rob): make into a coroutine and launch it in
# background thread once we add Prometheus. # background thread once Prometheus overhead is non-trivial.
assert iteration_stats is not None assert iteration_stats is not None
self._log_stats( self._log_stats(
scheduler_stats=outputs.scheduler_stats, scheduler_stats=outputs.scheduler_stats,

View File

@ -1,5 +1,8 @@
import time import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Dict
import prometheus_client
from vllm.logger import init_logger from vllm.logger import init_logger
from vllm.v1.metrics.stats import SchedulerStats from vllm.v1.metrics.stats import SchedulerStats
@ -36,3 +39,36 @@ class LoggingStatLogger(StatLoggerBase):
scheduler_stats.num_running_reqs, scheduler_stats.num_running_reqs,
scheduler_stats.num_waiting_reqs, scheduler_stats.num_waiting_reqs,
) )
class PrometheusStatLogger(StatLoggerBase):
def __init__(self, labels: Dict[str, str]):
self.labels = labels
labelnames = self.labels.keys()
labelvalues = self.labels.values()
self._unregister_vllm_metrics()
self.gauge_scheduler_running = prometheus_client.Gauge(
name="vllm:num_requests_running",
documentation="Number of requests in model execution batches.",
labelnames=labelnames).labels(*labelvalues)
self.gauge_scheduler_waiting = prometheus_client.Gauge(
name="vllm:num_requests_waiting",
documentation="Number of requests waiting to be processed.",
labelnames=labelnames).labels(*labelvalues)
def log(self, scheduler_stats: SchedulerStats):
"""Log to prometheus."""
self.gauge_scheduler_running.set(scheduler_stats.num_running_reqs)
self.gauge_scheduler_waiting.set(scheduler_stats.num_waiting_reqs)
@staticmethod
def _unregister_vllm_metrics():
# Unregister any existing vLLM collectors (for CI/CD
for collector in list(prometheus_client.REGISTRY._collector_to_names):
if hasattr(collector, "_name") and "vllm" in collector._name:
prometheus_client.REGISTRY.unregister(collector)