diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 954f74c3fdaef..4c9d4cb467ae9 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -1,6 +1,7 @@ # SPDX-License-Identifier: Apache-2.0 import asyncio +import logging import os from collections.abc import AsyncGenerator, Mapping from typing import Optional, Union @@ -57,10 +58,9 @@ class AsyncLLM(EngineClient): self.log_stats = log_stats self.stat_loggers: list[StatLoggerBase] = [] if self.log_stats: - self.stat_loggers.extend([ - LoggingStatLogger(), - PrometheusStatLogger(vllm_config), - ]) + if logger.isEnabledFor(logging.INFO): + self.stat_loggers.append(LoggingStatLogger()) + self.stat_loggers.append(PrometheusStatLogger(vllm_config)) # Tokenizer (+ ensure liveness if running in another process). self.tokenizer = init_tokenizer_from_configs( @@ -287,7 +287,7 @@ class AsyncLLM(EngineClient): # 4) Logging. # TODO(rob): make into a coroutine and launch it in # background thread once Prometheus overhead is non-trivial. - self._log_stats( + self._record_stats( scheduler_stats=outputs.scheduler_stats, iteration_stats=iteration_stats, ) @@ -306,7 +306,7 @@ class AsyncLLM(EngineClient): if self.log_requests: logger.info("Aborted request %s.", request_id) - def _log_stats( + def _record_stats( self, scheduler_stats: Optional[SchedulerStats], iteration_stats: Optional[IterationStats], @@ -316,9 +316,9 @@ class AsyncLLM(EngineClient): assert scheduler_stats is not None assert iteration_stats is not None - for logger in self.stat_loggers: - logger.log(scheduler_stats=scheduler_stats, - iteration_stats=iteration_stats) + for stat_logger in self.stat_loggers: + stat_logger.record(scheduler_stats=scheduler_stats, + iteration_stats=iteration_stats) def encode( self, @@ -354,7 +354,8 @@ class AsyncLLM(EngineClient): scheduler_outputs=None, model_output=None, ) -> None: - logger.debug("Called do_log_stats.") + for stat_logger in self.stat_loggers: + stat_logger.log() async def check_health(self) -> None: logger.debug("Called check_health.") diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index b9bf8fac40f60..b78b903b805fb 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -316,19 +316,10 @@ class EngineCoreProc(EngineCore): # Loop until process is sent a SIGINT or SIGTERM while True: # 1) Poll the input queue until there is work to do. - if not self.scheduler.has_unfinished_requests(): - while True: - try: - req = self.input_queue.get(timeout=POLLING_TIMEOUT_S) - self._handle_client_request(*req) - break - except queue.Empty: - logger.debug("EngineCore busy loop waiting.") - # Break out the loop so we can log_stats in step(). - if self.log_stats: - break - except BaseException: - raise + while not self.scheduler.has_unfinished_requests(): + logger.debug("EngineCore busy loop waiting.") + req = self.input_queue.get() + self._handle_client_request(*req) # 2) Handle any new client requests. while not self.input_queue.empty(): diff --git a/vllm/v1/metrics/loggers.py b/vllm/v1/metrics/loggers.py index 5a2a1c30a9d58..7f6de79104841 100644 --- a/vllm/v1/metrics/loggers.py +++ b/vllm/v1/metrics/loggers.py @@ -21,15 +21,19 @@ _LOCAL_LOGGING_INTERVAL_SEC = 5.0 class StatLoggerBase(ABC): @abstractmethod - def log(self, scheduler_stats: SchedulerStats, - iteration_stats: IterationStats): + def record(self, scheduler_stats: SchedulerStats, + iteration_stats: IterationStats): ... + def log(self): # noqa + pass + class LoggingStatLogger(StatLoggerBase): def __init__(self): self._reset(time.monotonic()) + self.last_scheduler_stats = SchedulerStats() def _reset(self, now): self.last_log_time = now @@ -41,11 +45,6 @@ class LoggingStatLogger(StatLoggerBase): # Prefix cache metrics. TODO: Make the interval configurable. self.prefix_caching_metrics = PrefixCachingMetrics() - def _local_interval_elapsed(self, now: float) -> bool: - # Log every _LOCAL_LOGGING_INTERVAL_SEC. - elapsed_time = now - self.last_log_time - return elapsed_time > _LOCAL_LOGGING_INTERVAL_SEC - def _track_iteration_stats(self, iteration_stats: IterationStats): # Save tracked stats for token counters. self.num_prompt_tokens.append(iteration_stats.num_prompt_tokens) @@ -56,24 +55,26 @@ class LoggingStatLogger(StatLoggerBase): # Compute summary metrics for tracked stats return float(np.sum(tracked_stats) / (now - self.last_log_time)) - def log(self, scheduler_stats: SchedulerStats, - iteration_stats: IterationStats): + def record(self, scheduler_stats: SchedulerStats, + iteration_stats: IterationStats): """Log Stats to standard output.""" self._track_iteration_stats(iteration_stats) self.prefix_caching_metrics.observe(scheduler_stats.prefix_cache_stats) - now = time.monotonic() - if not self._local_interval_elapsed(now): - return + self.last_scheduler_stats = scheduler_stats + def log(self): + now = time.monotonic() prompt_throughput = self._get_throughput(self.num_prompt_tokens, now) generation_throughput = self._get_throughput( self.num_generation_tokens, now) self._reset(now) + scheduler_stats = self.last_scheduler_stats + # Format and print output. logger.info( "Avg prompt throughput: %.1f tokens/s, " @@ -274,8 +275,8 @@ class PrometheusStatLogger(StatLoggerBase): labelnames=metrics_info.keys()).labels(**metrics_info) info_gauge.set(1) - def log(self, scheduler_stats: SchedulerStats, - iteration_stats: IterationStats): + def record(self, scheduler_stats: SchedulerStats, + iteration_stats: IterationStats): """Log to prometheus.""" self.gauge_scheduler_running.set(scheduler_stats.num_running_reqs) self.gauge_scheduler_waiting.set(scheduler_stats.num_waiting_reqs)