Signed-off-by: Robert Shaw <robshaw@redhat.com>
This commit is contained in:
Robert Shaw 2025-07-20 19:07:31 +00:00
parent 3f4ae353c2
commit 840d3812de
2 changed files with 204 additions and 323 deletions

View File

@ -4,7 +4,7 @@
import logging import logging
import time import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Callable, Optional, Union from typing import Callable, Optional
import numpy as np import numpy as np
import prometheus_client import prometheus_client
@ -35,10 +35,8 @@ class StatLoggerBase(ABC):
... ...
@abstractmethod @abstractmethod
def record(self, def record(self, scheduler_stats: Optional[SchedulerStats],
scheduler_stats: Optional[SchedulerStats], iteration_stats: Optional[IterationStats]):
iteration_stats: Optional[IterationStats],
engine_idx: int = 0):
... ...
@abstractmethod @abstractmethod
@ -80,10 +78,8 @@ class LoggingStatLogger(StatLoggerBase):
# Compute summary metrics for tracked stats # Compute summary metrics for tracked stats
return float(np.sum(tracked_stats) / (now - self.last_log_time)) return float(np.sum(tracked_stats) / (now - self.last_log_time))
def record(self, def record(self, scheduler_stats: Optional[SchedulerStats],
scheduler_stats: Optional[SchedulerStats], iteration_stats: Optional[IterationStats]):
iteration_stats: Optional[IterationStats],
engine_idx: int = 0):
"""Log Stats to standard output.""" """Log Stats to standard output."""
if iteration_stats: if iteration_stats:
@ -150,290 +146,233 @@ class PrometheusStatLogger(StatLoggerBase):
_histogram_cls = prometheus_client.Histogram _histogram_cls = prometheus_client.Histogram
_spec_decoding_cls = SpecDecodingProm _spec_decoding_cls = SpecDecodingProm
def __init__(self, def __init__(self, vllm_config: VllmConfig, engine_index: int = 0):
vllm_config: VllmConfig,
engine_indexes: Optional[list[int]] = None):
if engine_indexes is None:
engine_indexes = [0]
self.engine_indexes = engine_indexes
unregister_vllm_metrics() unregister_vllm_metrics()
self.vllm_config = vllm_config self.vllm_config = vllm_config
self.engine_index = engine_index
# Use this flag to hide metrics that were deprecated in # Use this flag to hide metrics that were deprecated in
# a previous release and which will be removed future # a previous release and which will be removed future
self.show_hidden_metrics = \ self.show_hidden_metrics = \
vllm_config.observability_config.show_hidden_metrics vllm_config.observability_config.show_hidden_metrics
labelnames = ["model_name", "engine"] labelnames = ["model_name", "engine"]
model_name = vllm_config.model_config.served_model_name labelvalues = [
vllm_config.model_config.served_model_name,
str(engine_index)
]
max_model_len = vllm_config.model_config.max_model_len max_model_len = vllm_config.model_config.max_model_len
if (len(self.engine_indexes) > 1
and vllm_config.speculative_config is not None):
raise NotImplementedError("Prometheus metrics with Spec Decoding "
"with >1 EngineCore per AsyncLLM is not "
"supported yet.")
spec_decode_labelvalues = [
vllm_config.model_config.served_model_name,
str(self.engine_indexes[0])
]
self.spec_decoding_prom = self._spec_decoding_cls( self.spec_decoding_prom = self._spec_decoding_cls(
vllm_config.speculative_config, labelnames, vllm_config.speculative_config, labelnames, labelvalues)
spec_decode_labelvalues)
# #
# Scheduler state # Scheduler state
# #
gauge_scheduler_running = self._gauge_cls( self.gauge_scheduler_running = self._gauge_cls(
name="vllm:num_requests_running", name="vllm:num_requests_running",
documentation="Number of requests in model execution batches.", documentation="Number of requests in model execution batches.",
multiprocess_mode="mostrecent", multiprocess_mode="mostrecent",
labelnames=labelnames) labelnames=labelnames).labels(*labelvalues)
self.gauge_scheduler_running = make_per_engine(gauge_scheduler_running,
engine_indexes,
model_name)
gauge_scheduler_waiting = self._gauge_cls( self.gauge_scheduler_waiting = self._gauge_cls(
name="vllm:num_requests_waiting", name="vllm:num_requests_waiting",
documentation="Number of requests waiting to be processed.", documentation="Number of requests waiting to be processed.",
multiprocess_mode="mostrecent", multiprocess_mode="mostrecent",
labelnames=labelnames) labelnames=labelnames).labels(*labelvalues)
self.gauge_scheduler_waiting = make_per_engine(gauge_scheduler_waiting,
engine_indexes,
model_name)
# #
# GPU cache # GPU cache
# #
# Deprecated in 0.9 - Renamed as vllm:kv_cache_usage_perc # Deprecated in 0.9 - Renamed as vllm:kv_cache_usage_perc
# TODO: in 0.10, only enable if show_hidden_metrics=True # TODO: in 0.10, only enable if show_hidden_metrics=True
gauge_gpu_cache_usage = self._gauge_cls( self.gauge_gpu_cache_usage = self._gauge_cls(
name="vllm:gpu_cache_usage_perc", name="vllm:gpu_cache_usage_perc",
documentation=( documentation=(
"GPU KV-cache usage. 1 means 100 percent usage." "GPU KV-cache usage. 1 means 100 percent usage."
"DEPRECATED: Use vllm:kv_cache_usage_perc instead."), "DEPRECATED: Use vllm:kv_cache_usage_perc instead."),
multiprocess_mode="mostrecent", multiprocess_mode="mostrecent",
labelnames=labelnames) labelnames=labelnames).labels(*labelvalues)
self.gauge_gpu_cache_usage = make_per_engine(gauge_gpu_cache_usage,
engine_indexes,
model_name)
# Deprecated in 0.9 - Renamed as vllm:prefix_cache_queries # Deprecated in 0.9 - Renamed as vllm:prefix_cache_queries
# TODO: in 0.10, only enable if show_hidden_metrics=True # TODO: in 0.10, only enable if show_hidden_metrics=True
counter_gpu_prefix_cache_queries = self._counter_cls( self.counter_gpu_prefix_cache_queries = self._counter_cls(
name="vllm:gpu_prefix_cache_queries", name="vllm:gpu_prefix_cache_queries",
documentation=( documentation=
"GPU prefix cache queries, in terms of number of queried" ("GPU prefix cache queries, in terms of number of queried tokens."
"tokens. DEPRECATED: Use vllm:prefix_cache_queries instead."), "DEPRECATED: Use vllm:prefix_cache_queries instead."),
labelnames=labelnames) labelnames=labelnames).labels(*labelvalues)
self.counter_gpu_prefix_cache_queries = make_per_engine(
counter_gpu_prefix_cache_queries, engine_indexes, model_name)
# Deprecated in 0.9 - Renamed as vllm:prefix_cache_hits # Deprecated in 0.9 - Renamed as vllm:prefix_cache_hits
# TODO: in 0.10, only enable if show_hidden_metrics=True # TODO: in 0.10, only enable if show_hidden_metrics=True
counter_gpu_prefix_cache_hits = self._counter_cls( self.counter_gpu_prefix_cache_hits = self._counter_cls(
name="vllm:gpu_prefix_cache_hits", name="vllm:gpu_prefix_cache_hits",
documentation=( documentation=(
"GPU prefix cache hits, in terms of number of cached " "GPU prefix cache hits, in terms of number of cached tokens."
"tokens. DEPRECATED: Use vllm:prefix_cache_hits instead."), "DEPRECATED: Use vllm:prefix_cache_hits instead."),
labelnames=labelnames) labelnames=labelnames).labels(*labelvalues)
self.counter_gpu_prefix_cache_hits = make_per_engine(
counter_gpu_prefix_cache_hits, engine_indexes, model_name)
gauge_kv_cache_usage = self._gauge_cls( self.gauge_kv_cache_usage = self._gauge_cls(
name="vllm:kv_cache_usage_perc", name="vllm:kv_cache_usage_perc",
documentation="KV-cache usage. 1 means 100 percent usage.", documentation="KV-cache usage. 1 means 100 percent usage.",
labelnames=labelnames) labelnames=labelnames).labels(*labelvalues)
self.gauge_kv_cache_usage = make_per_engine(gauge_kv_cache_usage,
engine_indexes, model_name)
counter_prefix_cache_queries = self._counter_cls( self.counter_prefix_cache_queries = self._counter_cls(
name="vllm:prefix_cache_queries", name="vllm:prefix_cache_queries",
documentation=( documentation=(
"Prefix cache queries, in terms of number of queried tokens."), "Prefix cache queries, in terms of number of queried tokens."),
labelnames=labelnames) labelnames=labelnames).labels(*labelvalues)
self.counter_prefix_cache_queries = make_per_engine(
counter_prefix_cache_queries, engine_indexes, model_name)
counter_prefix_cache_hits = self._counter_cls( self.counter_prefix_cache_hits = self._counter_cls(
name="vllm:prefix_cache_hits", name="vllm:prefix_cache_hits",
documentation=( documentation=(
"Prefix cache hits, in terms of number of cached tokens."), "Prefix cache hits, in terms of number of cached tokens."),
labelnames=labelnames) labelnames=labelnames).labels(*labelvalues)
self.counter_prefix_cache_hits = make_per_engine(
counter_prefix_cache_hits, engine_indexes, model_name)
# #
# Counters # Counters
# #
counter_num_preempted_reqs = self._counter_cls( self.counter_num_preempted_reqs = self._counter_cls(
name="vllm:num_preemptions", name="vllm:num_preemptions",
documentation="Cumulative number of preemption from the engine.", documentation="Cumulative number of preemption from the engine.",
labelnames=labelnames) labelnames=labelnames).labels(*labelvalues)
self.counter_num_preempted_reqs = make_per_engine(
counter_num_preempted_reqs, engine_indexes, model_name)
counter_prompt_tokens = self._counter_cls( self.counter_prompt_tokens = self._counter_cls(
name="vllm:prompt_tokens", name="vllm:prompt_tokens",
documentation="Number of prefill tokens processed.", documentation="Number of prefill tokens processed.",
labelnames=labelnames) labelnames=labelnames).labels(*labelvalues)
self.counter_prompt_tokens = make_per_engine(counter_prompt_tokens,
engine_indexes,
model_name)
counter_generation_tokens = self._counter_cls( self.counter_generation_tokens = self._counter_cls(
name="vllm:generation_tokens", name="vllm:generation_tokens",
documentation="Number of generation tokens processed.", documentation="Number of generation tokens processed.",
labelnames=labelnames) labelnames=labelnames).labels(*labelvalues)
self.counter_generation_tokens = make_per_engine(
counter_generation_tokens, engine_indexes, model_name)
self.counter_request_success: dict[FinishReason, dict[ self.counter_request_success: dict[FinishReason,
int, prometheus_client.Counter]] = {} prometheus_client.Counter] = {}
counter_request_success_base = self._counter_cls( counter_request_success_base = self._counter_cls(
name="vllm:request_success", name="vllm:request_success",
documentation="Count of successfully processed requests.", documentation="Count of successfully processed requests.",
labelnames=labelnames + ["finished_reason"]) labelnames=labelnames + ["finished_reason"])
for reason in FinishReason: for reason in FinishReason:
self.counter_request_success[reason] = { self.counter_request_success[
idx: reason] = counter_request_success_base.labels(*(labelvalues +
counter_request_success_base.labels(model_name, str(idx), [str(reason)]))
str(reason))
for idx in engine_indexes
}
# #
# Histograms of counts # Histograms of counts
# #
histogram_num_prompt_tokens_request = self._histogram_cls( self.histogram_num_prompt_tokens_request = \
name="vllm:request_prompt_tokens", self._histogram_cls(
documentation="Number of prefill tokens processed.", name="vllm:request_prompt_tokens",
buckets=build_1_2_5_buckets(max_model_len), documentation="Number of prefill tokens processed.",
labelnames=labelnames) buckets=build_1_2_5_buckets(max_model_len),
self.histogram_num_prompt_tokens_request = make_per_engine( labelnames=labelnames).labels(*labelvalues)
histogram_num_prompt_tokens_request, engine_indexes, model_name)
histogram_num_generation_tokens_request = self._histogram_cls( self.histogram_num_generation_tokens_request = \
name="vllm:request_generation_tokens", self._histogram_cls(
documentation="Number of generation tokens processed.", name="vllm:request_generation_tokens",
buckets=build_1_2_5_buckets(max_model_len), documentation="Number of generation tokens processed.",
labelnames=labelnames) buckets=build_1_2_5_buckets(max_model_len),
self.histogram_num_generation_tokens_request = make_per_engine( labelnames=labelnames).labels(*labelvalues)
histogram_num_generation_tokens_request, engine_indexes,
model_name)
# TODO: This metric might be incorrect in case of using multiple # TODO: This metric might be incorrect in case of using multiple
# api_server counts which uses prometheus mp. # api_server counts which uses prometheus mp.
# See: https://github.com/vllm-project/vllm/pull/18053 # See: https://github.com/vllm-project/vllm/pull/18053
histogram_iteration_tokens = self._histogram_cls( self.histogram_iteration_tokens = \
name="vllm:iteration_tokens_total", self._histogram_cls(
documentation="Histogram of number of tokens per engine_step.", name="vllm:iteration_tokens_total",
buckets=[ documentation="Histogram of number of tokens per engine_step.",
1, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384 buckets=[
], 1, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192,
labelnames=labelnames) 16384
self.histogram_iteration_tokens = make_per_engine( ],
histogram_iteration_tokens, engine_indexes, model_name) labelnames=labelnames).labels(*labelvalues)
histogram_max_num_generation_tokens_request = self._histogram_cls( self.histogram_max_num_generation_tokens_request = \
name="vllm:request_max_num_generation_tokens", self._histogram_cls(
documentation= name="vllm:request_max_num_generation_tokens",
"Histogram of maximum number of requested generation tokens.", documentation=
buckets=build_1_2_5_buckets(max_model_len), "Histogram of maximum number of requested generation tokens.",
labelnames=labelnames) buckets=build_1_2_5_buckets(max_model_len),
self.histogram_max_num_generation_tokens_request = make_per_engine( labelnames=labelnames).labels(*labelvalues)
histogram_max_num_generation_tokens_request, engine_indexes,
model_name)
histogram_n_request = self._histogram_cls( self.histogram_n_request = \
name="vllm:request_params_n", self._histogram_cls(
documentation="Histogram of the n request parameter.", name="vllm:request_params_n",
buckets=[1, 2, 5, 10, 20], documentation="Histogram of the n request parameter.",
labelnames=labelnames) buckets=[1, 2, 5, 10, 20],
self.histogram_n_request = make_per_engine(histogram_n_request, labelnames=labelnames).labels(*labelvalues)
engine_indexes, model_name)
histogram_max_tokens_request = self._histogram_cls( self.histogram_max_tokens_request = \
name="vllm:request_params_max_tokens", self._histogram_cls(
documentation="Histogram of the max_tokens request parameter.", name="vllm:request_params_max_tokens",
buckets=build_1_2_5_buckets(max_model_len), documentation="Histogram of the max_tokens request parameter.",
labelnames=labelnames) buckets=build_1_2_5_buckets(max_model_len),
self.histogram_max_tokens_request = make_per_engine( labelnames=labelnames).labels(*labelvalues)
histogram_max_tokens_request, engine_indexes, model_name)
# #
# Histogram of timing intervals # Histogram of timing intervals
# #
histogram_time_to_first_token = self._histogram_cls( self.histogram_time_to_first_token = \
name="vllm:time_to_first_token_seconds", self._histogram_cls(
documentation="Histogram of time to first token in seconds.", name="vllm:time_to_first_token_seconds",
buckets=[ documentation="Histogram of time to first token in seconds.",
0.001, 0.005, 0.01, 0.02, 0.04, 0.06, 0.08, 0.1, 0.25, 0.5, buckets=[
0.75, 1.0, 2.5, 5.0, 7.5, 10.0, 20.0, 40.0, 80.0, 160.0, 640.0, 0.001, 0.005, 0.01, 0.02, 0.04, 0.06, 0.08, 0.1, 0.25, 0.5,
2560.0 0.75, 1.0, 2.5, 5.0, 7.5, 10.0, 20.0, 40.0, 80.0, 160.0,
], 640.0, 2560.0
labelnames=labelnames) ],
self.histogram_time_to_first_token = make_per_engine( labelnames=labelnames).labels(*labelvalues)
histogram_time_to_first_token, engine_indexes, model_name)
histogram_time_per_output_token = self._histogram_cls( self.histogram_time_per_output_token = \
name="vllm:time_per_output_token_seconds", self._histogram_cls(
documentation="Histogram of time per output token in seconds.", name="vllm:time_per_output_token_seconds",
buckets=[ documentation="Histogram of time per output token in seconds.",
0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5, 0.75, buckets=[
1.0, 2.5, 5.0, 7.5, 10.0, 20.0, 40.0, 80.0 0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5,
], 0.75, 1.0, 2.5, 5.0, 7.5, 10.0, 20.0, 40.0, 80.0
labelnames=labelnames) ],
self.histogram_time_per_output_token = make_per_engine( labelnames=labelnames).labels(*labelvalues)
histogram_time_per_output_token, engine_indexes, model_name)
request_latency_buckets = [ request_latency_buckets = [
0.3, 0.5, 0.8, 1.0, 1.5, 2.0, 2.5, 5.0, 10.0, 15.0, 20.0, 30.0, 0.3, 0.5, 0.8, 1.0, 1.5, 2.0, 2.5, 5.0, 10.0, 15.0, 20.0, 30.0,
40.0, 50.0, 60.0, 120.0, 240.0, 480.0, 960.0, 1920.0, 7680.0 40.0, 50.0, 60.0, 120.0, 240.0, 480.0, 960.0, 1920.0, 7680.0
] ]
histogram_e2e_time_request = self._histogram_cls( self.histogram_e2e_time_request = \
name="vllm:e2e_request_latency_seconds", self._histogram_cls(
documentation="Histogram of e2e request latency in seconds.", name="vllm:e2e_request_latency_seconds",
buckets=request_latency_buckets, documentation="Histogram of e2e request latency in seconds.",
labelnames=labelnames) buckets=request_latency_buckets,
self.histogram_e2e_time_request = make_per_engine( labelnames=labelnames).labels(*labelvalues)
histogram_e2e_time_request, engine_indexes, model_name) self.histogram_queue_time_request = \
self._histogram_cls(
histogram_queue_time_request = self._histogram_cls( name="vllm:request_queue_time_seconds",
name="vllm:request_queue_time_seconds", documentation=
documentation= "Histogram of time spent in WAITING phase for request.",
"Histogram of time spent in WAITING phase for request.", buckets=request_latency_buckets,
buckets=request_latency_buckets, labelnames=labelnames).labels(*labelvalues)
labelnames=labelnames) self.histogram_inference_time_request = \
self.histogram_queue_time_request = make_per_engine( self._histogram_cls(
histogram_queue_time_request, engine_indexes, model_name) name="vllm:request_inference_time_seconds",
documentation=
histogram_inference_time_request = self._histogram_cls( "Histogram of time spent in RUNNING phase for request.",
name="vllm:request_inference_time_seconds", buckets=request_latency_buckets,
documentation= labelnames=labelnames).labels(*labelvalues)
"Histogram of time spent in RUNNING phase for request.", self.histogram_prefill_time_request = \
buckets=request_latency_buckets, self._histogram_cls(
labelnames=labelnames) name="vllm:request_prefill_time_seconds",
self.histogram_inference_time_request = make_per_engine( documentation=
histogram_inference_time_request, engine_indexes, model_name) "Histogram of time spent in PREFILL phase for request.",
buckets=request_latency_buckets,
histogram_prefill_time_request = self._histogram_cls( labelnames=labelnames).labels(*labelvalues)
name="vllm:request_prefill_time_seconds", self.histogram_decode_time_request = \
documentation= self._histogram_cls(
"Histogram of time spent in PREFILL phase for request.", name="vllm:request_decode_time_seconds",
buckets=request_latency_buckets, documentation=
labelnames=labelnames) "Histogram of time spent in DECODE phase for request.",
self.histogram_prefill_time_request = make_per_engine( buckets=request_latency_buckets,
histogram_prefill_time_request, engine_indexes, model_name) labelnames=labelnames).labels(*labelvalues)
histogram_decode_time_request = self._histogram_cls(
name="vllm:request_decode_time_seconds",
documentation=
"Histogram of time spent in DECODE phase for request.",
buckets=request_latency_buckets,
labelnames=labelnames)
self.histogram_decode_time_request = make_per_engine(
histogram_decode_time_request, engine_indexes, model_name)
# #
# LoRA metrics # LoRA metrics
@ -443,9 +382,6 @@ class PrometheusStatLogger(StatLoggerBase):
# api_server counts which uses prometheus mp. # api_server counts which uses prometheus mp.
self.gauge_lora_info: Optional[prometheus_client.Gauge] = None self.gauge_lora_info: Optional[prometheus_client.Gauge] = None
if vllm_config.lora_config is not None: if vllm_config.lora_config is not None:
if len(self.engine_indexes) > 1:
raise NotImplementedError(
"LoRA in DP mode is not supported yet.")
self.labelname_max_lora = "max_lora" self.labelname_max_lora = "max_lora"
self.labelname_waiting_lora_adapters = "waiting_lora_adapters" self.labelname_waiting_lora_adapters = "waiting_lora_adapters"
self.labelname_running_lora_adapters = "running_lora_adapters" self.labelname_running_lora_adapters = "running_lora_adapters"
@ -463,8 +399,9 @@ class PrometheusStatLogger(StatLoggerBase):
) )
def log_metrics_info(self, type: str, config_obj: SupportsMetricsInfo): def log_metrics_info(self, type: str, config_obj: SupportsMetricsInfo):
metrics_info = config_obj.metrics_info() metrics_info = config_obj.metrics_info()
metrics_info["engine"] = "" metrics_info["engine"] = self.engine_index
name, documentation = None, None name, documentation = None, None
if type == "cache_config": if type == "cache_config":
@ -480,36 +417,27 @@ class PrometheusStatLogger(StatLoggerBase):
documentation=documentation, documentation=documentation,
multiprocess_mode="mostrecent", multiprocess_mode="mostrecent",
labelnames=metrics_info.keys(), labelnames=metrics_info.keys(),
) ).labels(**metrics_info)
for engine_index in self.engine_indexes: info_gauge.set(1)
metrics_info = config_obj.metrics_info()
metrics_info["engine"] = str(engine_index)
info_gauge.labels(**metrics_info).set(1)
def record(self, def record(self, scheduler_stats: Optional[SchedulerStats],
scheduler_stats: Optional[SchedulerStats], iteration_stats: Optional[IterationStats]):
iteration_stats: Optional[IterationStats],
engine_idx: int = 0):
"""Log to prometheus.""" """Log to prometheus."""
if scheduler_stats is not None: if scheduler_stats is not None:
self.gauge_scheduler_running[engine_idx].set( self.gauge_scheduler_running.set(scheduler_stats.num_running_reqs)
scheduler_stats.num_running_reqs) self.gauge_scheduler_waiting.set(scheduler_stats.num_waiting_reqs)
self.gauge_scheduler_waiting[engine_idx].set(
scheduler_stats.num_waiting_reqs)
self.gauge_gpu_cache_usage[engine_idx].set( self.gauge_gpu_cache_usage.set(scheduler_stats.kv_cache_usage)
scheduler_stats.kv_cache_usage) self.gauge_kv_cache_usage.set(scheduler_stats.kv_cache_usage)
self.gauge_kv_cache_usage[engine_idx].set(
scheduler_stats.kv_cache_usage)
self.counter_gpu_prefix_cache_queries[engine_idx].inc( self.counter_gpu_prefix_cache_queries.inc(
scheduler_stats.prefix_cache_stats.queries) scheduler_stats.prefix_cache_stats.queries)
self.counter_gpu_prefix_cache_hits[engine_idx].inc( self.counter_gpu_prefix_cache_hits.inc(
scheduler_stats.prefix_cache_stats.hits) scheduler_stats.prefix_cache_stats.hits)
self.counter_prefix_cache_queries[engine_idx].inc( self.counter_prefix_cache_queries.inc(
scheduler_stats.prefix_cache_stats.queries) scheduler_stats.prefix_cache_stats.queries)
self.counter_prefix_cache_hits[engine_idx].inc( self.counter_prefix_cache_hits.inc(
scheduler_stats.prefix_cache_stats.hits) scheduler_stats.prefix_cache_stats.hits)
if scheduler_stats.spec_decoding_stats is not None: if scheduler_stats.spec_decoding_stats is not None:
@ -519,45 +447,42 @@ class PrometheusStatLogger(StatLoggerBase):
if iteration_stats is None: if iteration_stats is None:
return return
self.counter_num_preempted_reqs[engine_idx].inc( self.counter_num_preempted_reqs.inc(iteration_stats.num_preempted_reqs)
iteration_stats.num_preempted_reqs) self.counter_prompt_tokens.inc(iteration_stats.num_prompt_tokens)
self.counter_prompt_tokens[engine_idx].inc( self.counter_generation_tokens.inc(
iteration_stats.num_prompt_tokens)
self.counter_generation_tokens[engine_idx].inc(
iteration_stats.num_generation_tokens) iteration_stats.num_generation_tokens)
self.histogram_iteration_tokens[engine_idx].observe( self.histogram_iteration_tokens.observe(
iteration_stats.num_prompt_tokens + \ iteration_stats.num_prompt_tokens + \
iteration_stats.num_generation_tokens) iteration_stats.num_generation_tokens)
for max_gen_tokens in iteration_stats.max_num_generation_tokens_iter: for max_gen_tokens in iteration_stats.max_num_generation_tokens_iter:
self.histogram_max_num_generation_tokens_request[ self.histogram_max_num_generation_tokens_request.observe(
engine_idx].observe(max_gen_tokens) max_gen_tokens)
for n_param in iteration_stats.n_params_iter: for n_param in iteration_stats.n_params_iter:
self.histogram_n_request[engine_idx].observe(n_param) self.histogram_n_request.observe(n_param)
for ttft in iteration_stats.time_to_first_tokens_iter: for ttft in iteration_stats.time_to_first_tokens_iter:
self.histogram_time_to_first_token[engine_idx].observe(ttft) self.histogram_time_to_first_token.observe(ttft)
for tpot in iteration_stats.time_per_output_tokens_iter: for tpot in iteration_stats.time_per_output_tokens_iter:
self.histogram_time_per_output_token[engine_idx].observe(tpot) self.histogram_time_per_output_token.observe(tpot)
for finished_request in iteration_stats.finished_requests: for finished_request in iteration_stats.finished_requests:
self.counter_request_success[ self.counter_request_success[finished_request.finish_reason].inc()
finished_request.finish_reason][engine_idx].inc() self.histogram_e2e_time_request.observe(
self.histogram_e2e_time_request[engine_idx].observe(
finished_request.e2e_latency) finished_request.e2e_latency)
self.histogram_queue_time_request[engine_idx].observe( self.histogram_queue_time_request.observe(
finished_request.queued_time) finished_request.queued_time)
self.histogram_prefill_time_request[engine_idx].observe( self.histogram_prefill_time_request.observe(
finished_request.prefill_time) finished_request.prefill_time)
self.histogram_inference_time_request[engine_idx].observe( self.histogram_inference_time_request.observe(
finished_request.inference_time) finished_request.inference_time)
self.histogram_decode_time_request[engine_idx].observe( self.histogram_decode_time_request.observe(
finished_request.decode_time) finished_request.decode_time)
self.histogram_num_prompt_tokens_request[engine_idx].observe( self.histogram_num_prompt_tokens_request.observe(
finished_request.num_prompt_tokens) finished_request.num_prompt_tokens)
self.histogram_num_generation_tokens_request[engine_idx].observe( self.histogram_num_generation_tokens_request.observe(
finished_request.num_generation_tokens) finished_request.num_generation_tokens)
if finished_request.max_tokens_param: if finished_request.max_tokens_param:
self.histogram_max_tokens_request[engine_idx].observe( self.histogram_max_tokens_request.observe(
finished_request.max_tokens_param) finished_request.max_tokens_param)
if self.gauge_lora_info is not None: if self.gauge_lora_info is not None:
@ -577,18 +502,6 @@ class PrometheusStatLogger(StatLoggerBase):
self.log_metrics_info("cache_config", self.vllm_config.cache_config) self.log_metrics_info("cache_config", self.vllm_config.cache_config)
PromMetric = Union[
prometheus_client.Gauge,
prometheus_client.Counter,
prometheus_client.Histogram,
]
def make_per_engine(metric: PromMetric, engine_idxs: list[int],
model_name: str) -> dict[int, PromMetric]:
return {idx: metric.labels(model_name, str(idx)) for idx in engine_idxs}
def build_buckets(mantissa_lst: list[int], max_value: int) -> list[int]: def build_buckets(mantissa_lst: list[int], max_value: int) -> list[int]:
""" """
Builds a list of buckets with increasing powers of 10 multiplied by Builds a list of buckets with increasing powers of 10 multiplied by
@ -616,71 +529,29 @@ def build_1_2_5_buckets(max_value: int) -> list[int]:
return build_buckets([1, 2, 5], max_value) return build_buckets([1, 2, 5], max_value)
class StatLoggerManager: def setup_default_loggers(
""" vllm_config: VllmConfig,
StatLoggerManager: log_stats: bool,
Logging happens at the level of the EngineCore (per scheduler). engine_num: int,
* DP: >1 EngineCore per AsyncLLM - loggers for each EngineCore. custom_stat_loggers: Optional[list[StatLoggerFactory]] = None,
* With Local Logger, just make N copies for N EngineCores. ) -> list[list[StatLoggerBase]]:
* With Prometheus, we need a single logger with N "labels" """Setup logging and prometheus metrics."""
if not log_stats:
return []
This class abstracts away this implementation detail from factories: list[StatLoggerFactory]
the AsyncLLM, allowing the AsyncLLM to just call .record() if custom_stat_loggers is not None:
and .log() to a simple interface. factories = custom_stat_loggers
""" else:
factories = [PrometheusStatLogger]
if logger.isEnabledFor(logging.INFO):
factories.append(LoggingStatLogger)
def __init__( stat_loggers: list[list[StatLoggerBase]] = []
self, for i in range(engine_num):
vllm_config: VllmConfig, per_engine_stat_loggers: list[StatLoggerBase] = []
engine_idxs: Optional[list[int]] = None, for logger_factory in factories:
custom_stat_loggers: Optional[list[StatLoggerFactory]] = None, per_engine_stat_loggers.append(logger_factory(vllm_config, i))
): stat_loggers.append(per_engine_stat_loggers)
self.engine_idxs = engine_idxs if engine_idxs else [0]
factories: list[StatLoggerFactory] return stat_loggers
if custom_stat_loggers is not None:
factories = custom_stat_loggers
else:
factories = []
if logger.isEnabledFor(logging.INFO):
factories.append(LoggingStatLogger)
# engine_idx: StatLogger
self.per_engine_logger_dict: dict[int, list[StatLoggerBase]] = {}
for engine_idx in self.engine_idxs:
loggers: list[StatLoggerBase] = []
for logger_factory in factories:
loggers.append(logger_factory(vllm_config, engine_idx))
self.per_engine_logger_dict[engine_idx] = loggers
# For Prometheus, need to share the metrics between EngineCores.
# Each EngineCore's metrics are expressed as a unique label.
self.prometheus_logger = PrometheusStatLogger(vllm_config, engine_idxs)
def record(
self,
scheduler_stats: Optional[SchedulerStats],
iteration_stats: Optional[IterationStats],
engine_idx: Optional[int] = None,
):
if engine_idx is None:
engine_idx = 0
per_engine_loggers = self.per_engine_logger_dict[engine_idx]
for logger in per_engine_loggers:
logger.record(scheduler_stats, iteration_stats, engine_idx)
self.prometheus_logger.record(scheduler_stats, iteration_stats,
engine_idx)
def log(self):
for per_engine_loggers in self.per_engine_logger_dict.values():
for logger in per_engine_loggers:
logger.log()
def log_engine_initialized(self):
self.prometheus_logger.log_engine_initialized()
for per_engine_loggers in self.per_engine_logger_dict.values():
for logger in per_engine_loggers:
logger.log_engine_initialized()

View File

@ -3,6 +3,7 @@
import time import time
from typing import Optional, Union from typing import Optional, Union
from vllm.config import VllmConfig
from vllm.v1.metrics.loggers import PrometheusStatLogger from vllm.v1.metrics.loggers import PrometheusStatLogger
from vllm.v1.spec_decode.metrics import SpecDecodingProm from vllm.v1.spec_decode.metrics import SpecDecodingProm
@ -50,7 +51,13 @@ class RayGaugeWrapper(RayPrometheusMetric):
def __init__(self, def __init__(self,
name: str, name: str,
documentation: Optional[str] = "", documentation: Optional[str] = "",
labelnames: Optional[list[str]] = None): labelnames: Optional[list[str]] = None,
multiprocess_mode: Optional[str] = ""):
# All Ray metrics are keyed by WorkerId, so multiprocess modes like
# "mostrecent", "all", "sum" do not apply. This logic can be manually
# implemented at the observability layer (Prometheus/Grafana).
del multiprocess_mode
labelnames_tuple = tuple(labelnames) if labelnames else None labelnames_tuple = tuple(labelnames) if labelnames else None
self.metric = ray_metrics.Gauge(name=name, self.metric = ray_metrics.Gauge(name=name,
description=documentation, description=documentation,
@ -121,6 +128,9 @@ class RayPrometheusStatLogger(PrometheusStatLogger):
_histogram_cls = RayHistogramWrapper _histogram_cls = RayHistogramWrapper
_spec_decoding_cls = RaySpecDecodingProm _spec_decoding_cls = RaySpecDecodingProm
def __init__(self, vllm_config: VllmConfig, engine_index: int = 0):
super().__init__(vllm_config, engine_index)
@staticmethod @staticmethod
def _unregister_vllm_metrics(): def _unregister_vllm_metrics():
# No-op on purpose # No-op on purpose