[V1][Bugfix]: vllm v1 verison metric num_gpu_blocks is None (#15755)

Signed-off-by: rongfu.leng <rongfu.leng@daocloud.io>
This commit is contained in:
rongfu.leng 2025-04-30 18:20:39 +08:00 committed by GitHub
parent 1534d389af
commit d803786731
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 37 additions and 11 deletions

View File

@ -120,7 +120,8 @@ class AsyncLLM(EngineClient):
executor_class=executor_class,
log_stats=self.log_stats,
)
for stat_logger in self.stat_loggers[0]:
stat_logger.log_engine_initialized()
self.output_handler: Optional[asyncio.Task] = None
try:
# Start output handler eagerly if we are in the asyncio eventloop.

View File

@ -1,4 +1,5 @@
# SPDX-License-Identifier: Apache-2.0
import json
import os
import queue
import signal
@ -116,6 +117,7 @@ class EngineCore:
logger.info("Batch queue is enabled with size %d",
self.batch_queue_size)
self.batch_queue = queue.Queue(self.batch_queue_size)
self.vllm_config = vllm_config
def _initialize_kv_caches(
self, vllm_config: VllmConfig) -> tuple[int, int, KVCacheConfig]:
@ -507,7 +509,12 @@ class EngineCoreProc(EngineCore):
bind=False) as socket:
# Send ready message to front-end once input socket is connected.
socket.send(b'READY')
message_dict = {
'type': 'READY',
'num_gpu_blocks': self.vllm_config.cache_config.num_gpu_blocks,
}
message = json.dumps(message_dict).encode('utf-8')
socket.send(message)
while True:
# (RequestType, RequestData)

View File

@ -1,6 +1,7 @@
# SPDX-License-Identifier: Apache-2.0
import asyncio
import contextlib
import json
import queue
import uuid
import weakref
@ -362,6 +363,7 @@ class MPClient(EngineCoreClient):
executor_class: type[Executor],
log_stats: bool,
):
self.vllm_config = vllm_config
# Serialization setup.
self.encoder = MsgpackEncoder()
self.decoder = MsgpackDecoder(EngineCoreOutputs)
@ -430,14 +432,19 @@ class MPClient(EngineCoreClient):
raise RuntimeError("Engine core initialization failed. "
"See root cause above.")
eng_id_bytes, msg = sync_input_socket.recv_multipart()
eng_id_bytes, data = sync_input_socket.recv_multipart()
eng_id = int.from_bytes(eng_id_bytes, byteorder="little")
if eng_id not in identities:
raise RuntimeError(f"Unexpected or duplicate engine: {eng_id}")
if msg != b'READY':
raise RuntimeError(f"Engine {eng_id} failed: {msg.decode()}")
message_dict = json.loads(data.decode('utf-8'))
if message_dict['type'] != 'READY':
raise RuntimeError(f"Engine {eng_id} failed: {data.decode()}")
logger.info("Core engine process %d ready.", eng_id)
identities.discard(eng_id)
# Setup KV cache config with initialization state from
# engine core process.
self.vllm_config.cache_config.num_gpu_blocks = message_dict[
'num_gpu_blocks']
def _init_core_engines(
self,

View File

@ -39,6 +39,10 @@ class StatLoggerBase(ABC):
iteration_stats: Optional[IterationStats]):
...
@abstractmethod
def log_engine_initialized(self):
...
def log(self): # noqa
pass
@ -47,6 +51,7 @@ class LoggingStatLogger(StatLoggerBase):
def __init__(self, vllm_config: VllmConfig, engine_index: int = 0):
self.engine_index = engine_index
self.vllm_config = vllm_config
self._reset(time.monotonic())
self.last_scheduler_stats = SchedulerStats()
# Prefix cache metrics. This cannot be reset.
@ -127,12 +132,19 @@ class LoggingStatLogger(StatLoggerBase):
if scheduler_stats.spec_decoding_stats is not None:
self.spec_decoding_logging.log(log_fn=log_fn)
def log_engine_initialized(self):
logger.info(
"vllm cache_config_info with initialization " \
"after num_gpu_blocks is: %d",
self.vllm_config.cache_config.num_gpu_blocks)
class PrometheusStatLogger(StatLoggerBase):
def __init__(self, vllm_config: VllmConfig, engine_index: int = 0):
self._unregister_vllm_metrics()
self.vllm_config = vllm_config
self.engine_index = engine_index
# Use this flag to hide metrics that were deprecated in
# a previous release and which will be removed future
self.show_hidden_metrics = \
@ -342,13 +354,9 @@ class PrometheusStatLogger(StatLoggerBase):
self.labelname_running_lora_adapters,
])
#
# Cache config info metric
#
self.log_metrics_info("cache_config", vllm_config.cache_config)
def log_metrics_info(self, type: str, config_obj: SupportsMetricsInfo):
metrics_info = config_obj.metrics_info()
metrics_info["engine"] = self.engine_index
name, documentation = None, None
if type == "cache_config":
@ -442,6 +450,9 @@ class PrometheusStatLogger(StatLoggerBase):
if hasattr(collector, "_name") and "vllm" in collector._name:
prometheus_client.REGISTRY.unregister(collector)
def log_engine_initialized(self):
self.log_metrics_info("cache_config", self.vllm_config.cache_config)
def build_buckets(mantissa_lst: list[int], max_value: int) -> list[int]:
"""