[V1][Metrics] Add per-request prompt/generation_tokens histograms (#12516)

Signed-off-by: Mark McLoughlin <markmc@redhat.com>
This commit is contained in:
Mark McLoughlin 2025-01-28 22:07:22 +00:00 committed by GitHub
parent f26d790718
commit c386c43ca3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 103 additions and 15 deletions

View File

@ -202,6 +202,12 @@ EXPECTED_METRICS_V1 = [
"vllm:num_requests_waiting", "vllm:num_requests_waiting",
"vllm:prompt_tokens_total", "vllm:prompt_tokens_total",
"vllm:generation_tokens_total", "vllm:generation_tokens_total",
"vllm:request_prompt_tokens_sum",
"vllm:request_prompt_tokens_bucket",
"vllm:request_prompt_tokens_count",
"vllm:request_generation_tokens_sum",
"vllm:request_generation_tokens_bucket",
"vllm:request_generation_tokens_count",
] ]

View File

@ -53,8 +53,7 @@ class AsyncLLM(EngineClient):
self.log_stats = log_stats self.log_stats = log_stats
self.stat_loggers: List[StatLoggerBase] = [ self.stat_loggers: List[StatLoggerBase] = [
LoggingStatLogger(), LoggingStatLogger(),
PrometheusStatLogger(labels=dict( PrometheusStatLogger(vllm_config.model_config),
model_name=self.model_config.served_model_name)),
] ]
# Tokenizer (+ ensure liveness if running in another process). # Tokenizer (+ ensure liveness if running in another process).

View File

@ -8,7 +8,7 @@ from vllm.transformers_utils.tokenizer_group import BaseTokenizerGroup
from vllm.v1.engine import EngineCoreOutput, EngineCoreRequest from vllm.v1.engine import EngineCoreOutput, EngineCoreRequest
from vllm.v1.engine.detokenizer import (DetokenizerOutput, from vllm.v1.engine.detokenizer import (DetokenizerOutput,
IncrementalDetokenizer) IncrementalDetokenizer)
from vllm.v1.metrics.stats import IterationStats from vllm.v1.metrics.stats import IterationStats, RequestStateStats
@dataclass @dataclass
@ -37,6 +37,8 @@ class RequestState:
self.is_prefilling = True self.is_prefilling = True
self.queue = queue self.queue = queue
self.stats = RequestStateStats()
@classmethod @classmethod
def from_new_request( def from_new_request(
cls, cls,
@ -146,7 +148,8 @@ class OutputProcessor:
# 1) Compute stats for this iteration. # 1) Compute stats for this iteration.
iteration_stats.update_from_output(engine_core_output, iteration_stats.update_from_output(engine_core_output,
req_state.is_prefilling, req_state.is_prefilling,
req_state.prompt_len) req_state.prompt_len,
req_state.stats)
req_state.is_prefilling = False req_state.is_prefilling = False
# 2) Detokenize the token ids into text. # 2) Detokenize the token ids into text.
@ -171,6 +174,10 @@ class OutputProcessor:
# detected stop string, abort needed in EngineCore. # detected stop string, abort needed in EngineCore.
reqs_to_abort.append(req_id) reqs_to_abort.append(req_id)
# Track per-request stats
iteration_stats.update_from_finished_request(
request_output, req_state.stats)
return OutputProcessorOutput( return OutputProcessorOutput(
request_outputs=request_outputs, request_outputs=request_outputs,
reqs_to_abort=reqs_to_abort, reqs_to_abort=reqs_to_abort,

View File

@ -1,10 +1,11 @@
import time import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Dict, List from typing import List
import numpy as np import numpy as np
import prometheus_client import prometheus_client
from vllm.config import ModelConfig
from vllm.logger import init_logger from vllm.logger import init_logger
from vllm.v1.metrics.stats import IterationStats, SchedulerStats from vllm.v1.metrics.stats import IterationStats, SchedulerStats
@ -78,14 +79,14 @@ class LoggingStatLogger(StatLoggerBase):
class PrometheusStatLogger(StatLoggerBase): class PrometheusStatLogger(StatLoggerBase):
def __init__(self, labels: Dict[str, str]): def __init__(self, model_config: ModelConfig):
self.labels = labels
labelnames = self.labels.keys()
labelvalues = self.labels.values()
self._unregister_vllm_metrics() self._unregister_vllm_metrics()
labelnames = ["model_name"]
labelvalues = [model_config.served_model_name]
max_model_len = model_config.max_model_len
self.gauge_scheduler_running = prometheus_client.Gauge( self.gauge_scheduler_running = prometheus_client.Gauge(
name="vllm:num_requests_running", name="vllm:num_requests_running",
documentation="Number of requests in model execution batches.", documentation="Number of requests in model execution batches.",
@ -106,6 +107,20 @@ class PrometheusStatLogger(StatLoggerBase):
documentation="Number of generation tokens processed.", documentation="Number of generation tokens processed.",
labelnames=labelnames).labels(*labelvalues) labelnames=labelnames).labels(*labelvalues)
self.histogram_num_prompt_tokens_request = \
prometheus_client.Histogram(
name="vllm:request_prompt_tokens",
documentation="Number of prefill tokens processed.",
buckets=build_1_2_5_buckets(max_model_len),
labelnames=labelnames).labels(*labelvalues)
self.histogram_num_generation_tokens_request = \
prometheus_client.Histogram(
name="vllm:request_generation_tokens",
documentation="Number of generation tokens processed.",
buckets=build_1_2_5_buckets(max_model_len),
labelnames=labelnames).labels(*labelvalues)
def log(self, scheduler_stats: SchedulerStats, def log(self, scheduler_stats: SchedulerStats,
iteration_stats: IterationStats): iteration_stats: IterationStats):
"""Log to prometheus.""" """Log to prometheus."""
@ -116,9 +131,42 @@ class PrometheusStatLogger(StatLoggerBase):
self.counter_generation_tokens.inc( self.counter_generation_tokens.inc(
iteration_stats.num_generation_tokens) iteration_stats.num_generation_tokens)
for finished_request in iteration_stats.finished_requests:
self.histogram_num_prompt_tokens_request.observe(
finished_request.num_prompt_tokens)
self.histogram_num_generation_tokens_request.observe(
finished_request.num_generation_tokens)
@staticmethod @staticmethod
def _unregister_vllm_metrics(): def _unregister_vllm_metrics():
# Unregister any existing vLLM collectors (for CI/CD # Unregister any existing vLLM collectors (for CI/CD
for collector in list(prometheus_client.REGISTRY._collector_to_names): for collector in list(prometheus_client.REGISTRY._collector_to_names):
if hasattr(collector, "_name") and "vllm" in collector._name: if hasattr(collector, "_name") and "vllm" in collector._name:
prometheus_client.REGISTRY.unregister(collector) prometheus_client.REGISTRY.unregister(collector)
def build_buckets(mantissa_lst: List[int], max_value: int) -> List[int]:
"""
Builds a list of buckets with increasing powers of 10 multiplied by
mantissa values until the value exceeds the specified maximum.
"""
exponent = 0
buckets: List[int] = []
while True:
for m in mantissa_lst:
value = m * 10**exponent
if value <= max_value:
buckets.append(value)
else:
return buckets
exponent += 1
def build_1_2_5_buckets(max_value: int) -> List[int]:
"""
Example:
>>> build_1_2_5_buckets(100)
[1, 2, 5, 10, 20, 50, 100]
"""
return build_buckets([1, 2, 5], max_value)

View File

@ -1,7 +1,8 @@
from dataclasses import dataclass from dataclasses import dataclass
from typing import TYPE_CHECKING from typing import TYPE_CHECKING, List
if TYPE_CHECKING: if TYPE_CHECKING:
from vllm.outputs import RequestOutput
from vllm.v1.engine import EngineCoreOutput from vllm.v1.engine import EngineCoreOutput
@ -16,6 +17,21 @@ class SchedulerStats:
# gpu_prefix_cache_hit_rate: float = 0.0 # gpu_prefix_cache_hit_rate: float = 0.0
@dataclass
class RequestStateStats:
"""Stats that need to be tracked across delta updates."""
num_generation_tokens: int = 0
@dataclass
class FinishedRequestStats:
"""Stats associated with a finished request."""
num_prompt_tokens: int = 0
num_generation_tokens: int = 0
class IterationStats: class IterationStats:
"""Stats associated with a single set of EngineCoreOutputs.""" """Stats associated with a single set of EngineCoreOutputs."""
@ -23,17 +39,29 @@ class IterationStats:
self.log_stats = log_stats self.log_stats = log_stats
self.num_generation_tokens = 0 self.num_generation_tokens = 0
self.num_prompt_tokens = 0 self.num_prompt_tokens = 0
self.finished_requests: List[FinishedRequestStats] = []
def update_from_output(self, output: "EngineCoreOutput", def update_from_output(self, output: "EngineCoreOutput",
is_prefilling: bool, prompt_len: int): is_prefilling: bool, prompt_len: int,
request_state_stats: RequestStateStats):
if not self.log_stats: if not self.log_stats:
return return
self.num_generation_tokens += len(output.new_token_ids) num_new_generation_tokens = len(output.new_token_ids)
self.num_generation_tokens += num_new_generation_tokens
if is_prefilling: if is_prefilling:
# This relies on the invariant that EngineCore does # This relies on the invariant that EngineCore does
# not stream outputs for partially completed prefills # not stream outputs for partially completed prefills
# (scheduler.update_from_output makes EngineCoreOutput # (scheduler.update_from_output makes EngineCoreOutput
# iff num_computed_tokens == num_tokens). # iff num_computed_tokens == num_tokens).
assert (len(output.new_token_ids) > 0) assert (num_new_generation_tokens > 0)
self.num_prompt_tokens += prompt_len self.num_prompt_tokens += prompt_len
request_state_stats.num_generation_tokens += num_new_generation_tokens
def update_from_finished_request(self, request_output: "RequestOutput",
request_state_stats: RequestStateStats):
self.finished_requests.append(
FinishedRequestStats(len(request_output.prompt_token_ids),
request_state_stats.num_generation_tokens))