[V1] Simplify stats logging (#14082)

Signed-off-by: Nick Hill <nhill@redhat.com>
This commit is contained in:
Nick Hill 2025-03-03 10:34:14 -08:00 committed by GitHub
parent 2dfdfed8a0
commit 872db2be0e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 30 additions and 37 deletions

View File

@ -1,6 +1,7 @@
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
import asyncio import asyncio
import logging
import os import os
from collections.abc import AsyncGenerator, Mapping from collections.abc import AsyncGenerator, Mapping
from typing import Optional, Union from typing import Optional, Union
@ -57,10 +58,9 @@ class AsyncLLM(EngineClient):
self.log_stats = log_stats self.log_stats = log_stats
self.stat_loggers: list[StatLoggerBase] = [] self.stat_loggers: list[StatLoggerBase] = []
if self.log_stats: if self.log_stats:
self.stat_loggers.extend([ if logger.isEnabledFor(logging.INFO):
LoggingStatLogger(), self.stat_loggers.append(LoggingStatLogger())
PrometheusStatLogger(vllm_config), self.stat_loggers.append(PrometheusStatLogger(vllm_config))
])
# Tokenizer (+ ensure liveness if running in another process). # Tokenizer (+ ensure liveness if running in another process).
self.tokenizer = init_tokenizer_from_configs( self.tokenizer = init_tokenizer_from_configs(
@ -287,7 +287,7 @@ class AsyncLLM(EngineClient):
# 4) Logging. # 4) Logging.
# TODO(rob): make into a coroutine and launch it in # TODO(rob): make into a coroutine and launch it in
# background thread once Prometheus overhead is non-trivial. # background thread once Prometheus overhead is non-trivial.
self._log_stats( self._record_stats(
scheduler_stats=outputs.scheduler_stats, scheduler_stats=outputs.scheduler_stats,
iteration_stats=iteration_stats, iteration_stats=iteration_stats,
) )
@ -306,7 +306,7 @@ class AsyncLLM(EngineClient):
if self.log_requests: if self.log_requests:
logger.info("Aborted request %s.", request_id) logger.info("Aborted request %s.", request_id)
def _log_stats( def _record_stats(
self, self,
scheduler_stats: Optional[SchedulerStats], scheduler_stats: Optional[SchedulerStats],
iteration_stats: Optional[IterationStats], iteration_stats: Optional[IterationStats],
@ -316,9 +316,9 @@ class AsyncLLM(EngineClient):
assert scheduler_stats is not None assert scheduler_stats is not None
assert iteration_stats is not None assert iteration_stats is not None
for logger in self.stat_loggers: for stat_logger in self.stat_loggers:
logger.log(scheduler_stats=scheduler_stats, stat_logger.record(scheduler_stats=scheduler_stats,
iteration_stats=iteration_stats) iteration_stats=iteration_stats)
def encode( def encode(
self, self,
@ -354,7 +354,8 @@ class AsyncLLM(EngineClient):
scheduler_outputs=None, scheduler_outputs=None,
model_output=None, model_output=None,
) -> None: ) -> None:
logger.debug("Called do_log_stats.") for stat_logger in self.stat_loggers:
stat_logger.log()
async def check_health(self) -> None: async def check_health(self) -> None:
logger.debug("Called check_health.") logger.debug("Called check_health.")

View File

@ -316,19 +316,10 @@ class EngineCoreProc(EngineCore):
# Loop until process is sent a SIGINT or SIGTERM # Loop until process is sent a SIGINT or SIGTERM
while True: while True:
# 1) Poll the input queue until there is work to do. # 1) Poll the input queue until there is work to do.
if not self.scheduler.has_unfinished_requests(): while not self.scheduler.has_unfinished_requests():
while True: logger.debug("EngineCore busy loop waiting.")
try: req = self.input_queue.get()
req = self.input_queue.get(timeout=POLLING_TIMEOUT_S) self._handle_client_request(*req)
self._handle_client_request(*req)
break
except queue.Empty:
logger.debug("EngineCore busy loop waiting.")
# Break out the loop so we can log_stats in step().
if self.log_stats:
break
except BaseException:
raise
# 2) Handle any new client requests. # 2) Handle any new client requests.
while not self.input_queue.empty(): while not self.input_queue.empty():

View File

@ -21,15 +21,19 @@ _LOCAL_LOGGING_INTERVAL_SEC = 5.0
class StatLoggerBase(ABC): class StatLoggerBase(ABC):
@abstractmethod @abstractmethod
def log(self, scheduler_stats: SchedulerStats, def record(self, scheduler_stats: SchedulerStats,
iteration_stats: IterationStats): iteration_stats: IterationStats):
... ...
def log(self): # noqa
pass
class LoggingStatLogger(StatLoggerBase): class LoggingStatLogger(StatLoggerBase):
def __init__(self): def __init__(self):
self._reset(time.monotonic()) self._reset(time.monotonic())
self.last_scheduler_stats = SchedulerStats()
def _reset(self, now): def _reset(self, now):
self.last_log_time = now self.last_log_time = now
@ -41,11 +45,6 @@ class LoggingStatLogger(StatLoggerBase):
# Prefix cache metrics. TODO: Make the interval configurable. # Prefix cache metrics. TODO: Make the interval configurable.
self.prefix_caching_metrics = PrefixCachingMetrics() self.prefix_caching_metrics = PrefixCachingMetrics()
def _local_interval_elapsed(self, now: float) -> bool:
# Log every _LOCAL_LOGGING_INTERVAL_SEC.
elapsed_time = now - self.last_log_time
return elapsed_time > _LOCAL_LOGGING_INTERVAL_SEC
def _track_iteration_stats(self, iteration_stats: IterationStats): def _track_iteration_stats(self, iteration_stats: IterationStats):
# Save tracked stats for token counters. # Save tracked stats for token counters.
self.num_prompt_tokens.append(iteration_stats.num_prompt_tokens) self.num_prompt_tokens.append(iteration_stats.num_prompt_tokens)
@ -56,24 +55,26 @@ class LoggingStatLogger(StatLoggerBase):
# Compute summary metrics for tracked stats # Compute summary metrics for tracked stats
return float(np.sum(tracked_stats) / (now - self.last_log_time)) return float(np.sum(tracked_stats) / (now - self.last_log_time))
def log(self, scheduler_stats: SchedulerStats, def record(self, scheduler_stats: SchedulerStats,
iteration_stats: IterationStats): iteration_stats: IterationStats):
"""Log Stats to standard output.""" """Log Stats to standard output."""
self._track_iteration_stats(iteration_stats) self._track_iteration_stats(iteration_stats)
self.prefix_caching_metrics.observe(scheduler_stats.prefix_cache_stats) self.prefix_caching_metrics.observe(scheduler_stats.prefix_cache_stats)
now = time.monotonic() self.last_scheduler_stats = scheduler_stats
if not self._local_interval_elapsed(now):
return
def log(self):
now = time.monotonic()
prompt_throughput = self._get_throughput(self.num_prompt_tokens, now) prompt_throughput = self._get_throughput(self.num_prompt_tokens, now)
generation_throughput = self._get_throughput( generation_throughput = self._get_throughput(
self.num_generation_tokens, now) self.num_generation_tokens, now)
self._reset(now) self._reset(now)
scheduler_stats = self.last_scheduler_stats
# Format and print output. # Format and print output.
logger.info( logger.info(
"Avg prompt throughput: %.1f tokens/s, " "Avg prompt throughput: %.1f tokens/s, "
@ -274,8 +275,8 @@ class PrometheusStatLogger(StatLoggerBase):
labelnames=metrics_info.keys()).labels(**metrics_info) labelnames=metrics_info.keys()).labels(**metrics_info)
info_gauge.set(1) info_gauge.set(1)
def log(self, scheduler_stats: SchedulerStats, def record(self, scheduler_stats: SchedulerStats,
iteration_stats: IterationStats): iteration_stats: IterationStats):
"""Log to prometheus.""" """Log to prometheus."""
self.gauge_scheduler_running.set(scheduler_stats.num_running_reqs) self.gauge_scheduler_running.set(scheduler_stats.num_running_reqs)
self.gauge_scheduler_waiting.set(scheduler_stats.num_waiting_reqs) self.gauge_scheduler_waiting.set(scheduler_stats.num_waiting_reqs)