[V1][Metrics] Hook up IterationStats for Prometheus metrics (#12478)

Signed-off-by: Mark McLoughlin <markmc@redhat.com>
This commit is contained in:
Mark McLoughlin 2025-01-28 16:38:38 +00:00 committed by GitHub
parent 925d2f1908
commit 3fd1fb63ef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 66 additions and 12 deletions

View File

@ -105,8 +105,6 @@ EXPECTED_VALUES = {
@pytest.mark.asyncio
async def test_metrics_counts(server: RemoteOpenAIServer,
client: openai.AsyncClient, use_v1: bool):
if use_v1:
pytest.skip("Skipping test on vllm V1")
for _ in range(_NUM_REQUESTS):
# sending a request triggers the metrics to be logged.
await client.completions.create(
@ -120,6 +118,9 @@ async def test_metrics_counts(server: RemoteOpenAIServer,
# Loop over all expected metric_families
for metric_family, suffix_values_list in EXPECTED_VALUES.items():
if use_v1 and metric_family not in EXPECTED_METRICS_V1:
continue
found_metric = False
# Check to see if the metric_family is found in the prom endpoint.
@ -199,6 +200,8 @@ EXPECTED_METRICS = [
EXPECTED_METRICS_V1 = [
"vllm:num_requests_running",
"vllm:num_requests_waiting",
"vllm:prompt_tokens_total",
"vllm:generation_tokens_total",
]

View File

@ -305,7 +305,8 @@ class AsyncLLM(EngineClient):
return
for logger in self.stat_loggers:
logger.log(scheduler_stats=scheduler_stats)
logger.log(scheduler_stats=scheduler_stats,
iteration_stats=iteration_stats)
def encode(
self,

View File

@ -1,11 +1,12 @@
import time
from abc import ABC, abstractmethod
from typing import Dict
from typing import Dict, List
import numpy as np
import prometheus_client
from vllm.logger import init_logger
from vllm.v1.metrics.stats import SchedulerStats
from vllm.v1.metrics.stats import IterationStats, SchedulerStats
logger = init_logger(__name__)
@ -15,27 +16,61 @@ _LOCAL_LOGGING_INTERVAL_SEC = 5.0
class StatLoggerBase(ABC):
@abstractmethod
def log(self, scheduler_stats: SchedulerStats):
def log(self, scheduler_stats: SchedulerStats,
iteration_stats: IterationStats):
...
class LoggingStatLogger(StatLoggerBase):
def __init__(self):
self.last_log_time = time.monotonic()
self._reset(time.monotonic())
def log(self, scheduler_stats: SchedulerStats):
def _reset(self, now):
self.last_log_time = now
# Tracked stats over current local logging interval.
self.num_prompt_tokens: List[int] = []
self.num_generation_tokens: List[int] = []
def _local_interval_elapsed(self, now: float) -> bool:
# Log every _LOCAL_LOGGING_INTERVAL_SEC.
elapsed_time = now - self.last_log_time
return elapsed_time > _LOCAL_LOGGING_INTERVAL_SEC
def _track_iteration_stats(self, iteration_stats: IterationStats):
# Save tracked stats for token counters.
self.num_prompt_tokens.append(iteration_stats.num_prompt_tokens)
self.num_generation_tokens.append(
iteration_stats.num_generation_tokens)
def _get_throughput(self, tracked_stats: List[int], now: float) -> float:
# Compute summary metrics for tracked stats
return float(np.sum(tracked_stats) / (now - self.last_log_time))
def log(self, scheduler_stats: SchedulerStats,
iteration_stats: IterationStats):
"""Log Stats to standard output."""
# Log every _LOCAL_LOGGING_INTERVAL_SEC.
self._track_iteration_stats(iteration_stats)
now = time.monotonic()
if now - self.last_log_time < _LOCAL_LOGGING_INTERVAL_SEC:
if not self._local_interval_elapsed(now):
return
self.last_log_time = now
prompt_throughput = self._get_throughput(self.num_prompt_tokens, now)
generation_throughput = self._get_throughput(
self.num_generation_tokens, now)
self._reset(now)
# Format and print output.
logger.info(
"Avg prompt throughput: %.1f tokens/s, "
"Avg generation throughput: %.1f tokens/s, "
"Running: %d reqs, Waiting: %d reqs ",
prompt_throughput,
generation_throughput,
scheduler_stats.num_running_reqs,
scheduler_stats.num_waiting_reqs,
)
@ -61,11 +96,26 @@ class PrometheusStatLogger(StatLoggerBase):
documentation="Number of requests waiting to be processed.",
labelnames=labelnames).labels(*labelvalues)
def log(self, scheduler_stats: SchedulerStats):
self.counter_prompt_tokens = prometheus_client.Counter(
name="vllm:prompt_tokens_total",
documentation="Number of prefill tokens processed.",
labelnames=labelnames).labels(*labelvalues)
self.counter_generation_tokens = prometheus_client.Counter(
name="vllm:generation_tokens_total",
documentation="Number of generation tokens processed.",
labelnames=labelnames).labels(*labelvalues)
def log(self, scheduler_stats: SchedulerStats,
iteration_stats: IterationStats):
"""Log to prometheus."""
self.gauge_scheduler_running.set(scheduler_stats.num_running_reqs)
self.gauge_scheduler_waiting.set(scheduler_stats.num_waiting_reqs)
self.counter_prompt_tokens.inc(iteration_stats.num_prompt_tokens)
self.counter_generation_tokens.inc(
iteration_stats.num_generation_tokens)
@staticmethod
def _unregister_vllm_metrics():
# Unregister any existing vLLM collectors (for CI/CD