From 54e405bd925040b167637b20ae144df46c8b6b0f Mon Sep 17 00:00:00 2001 From: Robert Shaw Date: Sun, 20 Jul 2025 16:45:46 +0000 Subject: [PATCH 1/4] updated Signed-off-by: Robert Shaw --- vllm/v1/engine/async_llm.py | 46 +++++-------------- vllm/v1/metrics/loggers.py | 88 ++++++++++++++++++++++++++++--------- 2 files changed, 79 insertions(+), 55 deletions(-) diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 8fcee8f92a573..7c3186e1a6993 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -36,10 +36,9 @@ from vllm.v1.engine.output_processor import (OutputProcessor, from vllm.v1.engine.parallel_sampling import ParentRequest from vllm.v1.engine.processor import Processor from vllm.v1.executor.abstract import Executor -from vllm.v1.metrics.loggers import (PrometheusStatLogger, StatLoggerBase, - StatLoggerFactory, setup_default_loggers) +from vllm.v1.metrics.loggers import StatLoggerFactory, setup_default_loggers from vllm.v1.metrics.prometheus import shutdown_prometheus -from vllm.v1.metrics.stats import IterationStats, SchedulerStats +from vllm.v1.metrics.stats import IterationStats logger = init_logger(__name__) @@ -103,7 +102,7 @@ class AsyncLLM(EngineClient): engine_idxs = [ idx for idx in range(start_idx, start_idx + local_engines) ] - self.stat_loggers = setup_default_loggers( + self.logger_manager = setup_default_loggers( vllm_config=vllm_config, log_stats=self.log_stats, engine_idxs=engine_idxs, @@ -136,12 +135,8 @@ class AsyncLLM(EngineClient): client_addresses=client_addresses, client_index=client_index, ) - if self.stat_loggers: - # loggers, prom_logger - loggers, _ = self.stat_loggers - for per_engine_loggers in loggers.values(): - for logger in per_engine_loggers: - logger.log_engine_initialized() + if self.logger_manager: + self.logger_manager.log_engine_initialized() self.output_handler: Optional[asyncio.Task] = None try: # Start output handler eagerly if we are in the asyncio eventloop. @@ -380,7 +375,7 @@ class AsyncLLM(EngineClient): engine_core = self.engine_core output_processor = self.output_processor log_stats = self.log_stats - stat_loggers = self.stat_loggers if log_stats else None + logger_manager = self.logger_manager async def output_handler(): try: @@ -420,13 +415,14 @@ class AsyncLLM(EngineClient): # 4) Logging. # TODO(rob): make into a coroutine and launch it in # background thread once Prometheus overhead is non-trivial. - if stat_loggers: - AsyncLLM._record_stats( - stat_loggers, - outputs.engine_index, + # NOTE: we do not use self.log + if logger_manager: + logger_manager.record( scheduler_stats=outputs.scheduler_stats, iteration_stats=iteration_stats, + engine_idx=outputs.engine_index, ) + except Exception as e: logger.exception("AsyncLLM output_handler failed.") output_processor.propagate_error(e) @@ -442,26 +438,6 @@ class AsyncLLM(EngineClient): if self.log_requests: logger.info("Aborted request %s.", request_id) - @staticmethod - def _record_stats( - stat_loggers: tuple[dict[int, list[StatLoggerBase]], - PrometheusStatLogger], - engine_idx: int, - scheduler_stats: Optional[SchedulerStats], - iteration_stats: Optional[IterationStats], - ): - """static so that it can be used from the output_handler task - without a circular ref to AsyncLLM.""" - - per_engine_loggers, prom_logger = stat_loggers - for stat_logger in per_engine_loggers[engine_idx]: - stat_logger.record(engine_idx=engine_idx, - scheduler_stats=scheduler_stats, - iteration_stats=iteration_stats) - prom_logger.record(engine_idx=engine_idx, - scheduler_stats=scheduler_stats, - iteration_stats=iteration_stats) - async def encode( self, prompt: PromptType, diff --git a/vllm/v1/metrics/loggers.py b/vllm/v1/metrics/loggers.py index 7a89d05fcbd3e..0c44127055a0e 100644 --- a/vllm/v1/metrics/loggers.py +++ b/vllm/v1/metrics/loggers.py @@ -605,27 +605,75 @@ def setup_default_loggers( log_stats: bool, engine_idxs: list[int], custom_stat_loggers: Optional[list[StatLoggerFactory]] = None, -) -> Optional[tuple[dict[int, list[StatLoggerBase]], PrometheusStatLogger]]: +) -> Optional["StatLoggerManager"]: """Setup logging and prometheus metrics.""" - if not log_stats: - return None + return (None if not log_stats else StatLoggerManager( + vllm_config, engine_idxs, custom_stat_loggers)) - factories: list[StatLoggerFactory] - if custom_stat_loggers is not None: - factories = custom_stat_loggers - else: - factories = [] - if logger.isEnabledFor(logging.INFO): - factories.append(LoggingStatLogger) - # engine_idx: Logger - stat_loggers: dict[int, list[StatLoggerBase]] = {} - for engine_idx in engine_idxs: - per_engine_stat_loggers: list[StatLoggerBase] = [] - for logger_factory in factories: - per_engine_stat_loggers.append( - logger_factory(vllm_config, engine_idx)) - stat_loggers[engine_idx] = per_engine_stat_loggers +class StatLoggerManager: + """ + StatLoggerManager: + Logging happens at the level of the EngineCore (per scheduler). + * DP: >1 EngineCore per AsyncLLM - loggers for each EngineCore. + * With Local Logger, just make N copies for N EngineCores. + * With Prometheus, we need a single logger with N "labels" - prom_stat_logger = PrometheusStatLogger(vllm_config, engine_idxs) - return stat_loggers, prom_stat_logger + This class abstracts away this implementation detail from + the AsyncLLM, allowing the AsyncLLM to just call .record() + and .log() to a simple interface. + """ + + def __init__( + self, + vllm_config: VllmConfig, + engine_idxs: Optional[list[int]] = None, + custom_stat_loggers: Optional[list[StatLoggerFactory]] = None, + ): + self.engine_idxs = set([0]) if not engine_idxs else set(engine_idxs) + + factories: list[StatLoggerFactory] + if custom_stat_loggers is not None: + factories = custom_stat_loggers + else: + factories = [] + if logger.isEnabledFor(logging.INFO): + factories.append(LoggingStatLogger) + + # engine_idx: StatLogger + self.per_engine_logger_dict: dict[int, list[StatLoggerBase]] = {} + for engine_idx in self.engine_idxs: + loggers: list[StatLoggerBase] = [] + for logger_factory in factories: + loggers.append(logger_factory(vllm_config, engine_idx)) + self.per_engine_logger_dict[engine_idx] = loggers + + # For Prometheus, need to share the metrics between EngineCores. + # Each EngineCore's metrics are expressed as a unique label. + self.prometheus_logger = PrometheusStatLogger(vllm_config, engine_idxs) + + def record( + self, + scheduler_stats: Optional[SchedulerStats], + iteration_stats: Optional[IterationStats], + engine_idx: Optional[int] = None, + ): + if engine_idx is None: + engine_idx = 0 + + per_engine_loggers = self.per_engine_logger_dict[engine_idx] + for logger in per_engine_loggers: + logger.record(scheduler_stats, iteration_stats, engine_idx) + + self.prometheus_logger.record(scheduler_stats, iteration_stats, + engine_idx) + + def log(self): + for per_engine_loggers in self.per_engine_logger_dict.values(): + for logger in per_engine_loggers: + logger.log() + + def log_engine_initialized(self): + for per_engine_loggers in self.per_engine_logger_dict.values(): + for logger in per_engine_loggers: + logger.log_engine_initialized() From 02ecfa80a499b930d4518e9ff405178575074b5c Mon Sep 17 00:00:00 2001 From: Robert Shaw Date: Sun, 20 Jul 2025 16:46:35 +0000 Subject: [PATCH 2/4] updated Signed-off-by: Robert Shaw --- vllm/v1/metrics/loggers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/v1/metrics/loggers.py b/vllm/v1/metrics/loggers.py index 0c44127055a0e..6de65280364d8 100644 --- a/vllm/v1/metrics/loggers.py +++ b/vllm/v1/metrics/loggers.py @@ -630,7 +630,7 @@ class StatLoggerManager: engine_idxs: Optional[list[int]] = None, custom_stat_loggers: Optional[list[StatLoggerFactory]] = None, ): - self.engine_idxs = set([0]) if not engine_idxs else set(engine_idxs) + self.engine_idxs = engine_idxs if engine_idxs else [0] factories: list[StatLoggerFactory] if custom_stat_loggers is not None: From 1358836fa0b84000bf57f33da896086d051254a4 Mon Sep 17 00:00:00 2001 From: Robert Shaw Date: Sun, 20 Jul 2025 16:48:31 +0000 Subject: [PATCH 3/4] updated Signed-off-by: Robert Shaw --- vllm/v1/engine/async_llm.py | 1 - 1 file changed, 1 deletion(-) diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 7c3186e1a6993..d6ffbba323423 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -422,7 +422,6 @@ class AsyncLLM(EngineClient): iteration_stats=iteration_stats, engine_idx=outputs.engine_index, ) - except Exception as e: logger.exception("AsyncLLM output_handler failed.") output_processor.propagate_error(e) From 4eae5cbeeabbb02a078ec46ab62f524eab89aa63 Mon Sep 17 00:00:00 2001 From: Robert Shaw Date: Sun, 20 Jul 2025 16:50:36 +0000 Subject: [PATCH 4/4] updated Signed-off-by: Robert Shaw --- vllm/v1/engine/async_llm.py | 7 +++---- vllm/v1/metrics/loggers.py | 11 ----------- 2 files changed, 3 insertions(+), 15 deletions(-) diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index d6ffbba323423..b8a40f77df314 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -36,7 +36,7 @@ from vllm.v1.engine.output_processor import (OutputProcessor, from vllm.v1.engine.parallel_sampling import ParentRequest from vllm.v1.engine.processor import Processor from vllm.v1.executor.abstract import Executor -from vllm.v1.metrics.loggers import StatLoggerFactory, setup_default_loggers +from vllm.v1.metrics.loggers import StatLoggerFactory, StatLoggerManager from vllm.v1.metrics.prometheus import shutdown_prometheus from vllm.v1.metrics.stats import IterationStats @@ -102,12 +102,11 @@ class AsyncLLM(EngineClient): engine_idxs = [ idx for idx in range(start_idx, start_idx + local_engines) ] - self.logger_manager = setup_default_loggers( + self.logger_manager = StatLoggerManager( vllm_config=vllm_config, - log_stats=self.log_stats, engine_idxs=engine_idxs, custom_stat_loggers=stat_loggers, - ) + ) if self.log_stats else None # Tokenizer (+ ensure liveness if running in another process). self.tokenizer = init_tokenizer_from_configs( diff --git a/vllm/v1/metrics/loggers.py b/vllm/v1/metrics/loggers.py index 6de65280364d8..b6b8b6d0a4d86 100644 --- a/vllm/v1/metrics/loggers.py +++ b/vllm/v1/metrics/loggers.py @@ -600,17 +600,6 @@ def build_1_2_5_buckets(max_value: int) -> list[int]: return build_buckets([1, 2, 5], max_value) -def setup_default_loggers( - vllm_config: VllmConfig, - log_stats: bool, - engine_idxs: list[int], - custom_stat_loggers: Optional[list[StatLoggerFactory]] = None, -) -> Optional["StatLoggerManager"]: - """Setup logging and prometheus metrics.""" - return (None if not log_stats else StatLoggerManager( - vllm_config, engine_idxs, custom_stat_loggers)) - - class StatLoggerManager: """ StatLoggerManager: