# SPDX-License-Identifier: Apache-2.0 import os import weakref from collections import defaultdict from collections.abc import Sequence from multiprocessing import Process from typing import (TYPE_CHECKING, Any, Callable, Generic, Optional, TypeVar, Union, overload) import torch from vllm.logger import init_logger from vllm.model_executor.models.utils import extract_layer_index from vllm.usage.usage_lib import (UsageContext, is_usage_stats_enabled, usage_message) from vllm.utils import get_mp_context, kill_process_tree if TYPE_CHECKING: from vllm.attention.layer import Attention logger = init_logger(__name__) T = TypeVar("T") class ConstantList(Generic[T], Sequence): def __init__(self, x: list[T]) -> None: self._x = x def append(self, item): raise Exception("Cannot append to a constant list") def extend(self, item): raise Exception("Cannot extend a constant list") def insert(self, item): raise Exception("Cannot insert into a constant list") def pop(self, item): raise Exception("Cannot pop from a constant list") def remove(self, item): raise Exception("Cannot remove from a constant list") def clear(self): raise Exception("Cannot clear a constant list") def index(self, item: T, start: int = 0, stop: Optional[int] = None) -> int: return self._x.index(item, start, stop if stop is not None else len(self._x)) @overload def __getitem__(self, item: int) -> T: ... @overload def __getitem__(self, s: slice, /) -> list[T]: ... def __getitem__(self, item: Union[int, slice]) -> Union[T, list[T]]: return self._x[item] @overload def __setitem__(self, item: int, value: T): ... @overload def __setitem__(self, s: slice, value: T, /): ... def __setitem__(self, item: Union[int, slice], value: Union[T, list[T]]): raise Exception("Cannot set item in a constant list") def __delitem__(self, item): raise Exception("Cannot delete item from a constant list") def __iter__(self): return iter(self._x) def __contains__(self, item): return item in self._x def __len__(self): return len(self._x) def __repr__(self): return f"ConstantList({self._x})" class BackgroundProcHandle: """ Utility class to handle creation, readiness, and shutdown of background processes used by the AsyncLLM and LLMEngine. """ def __init__( self, input_path: str, output_path: str, process_name: str, target_fn: Callable, process_kwargs: dict[Any, Any], ): context = get_mp_context() assert ("input_path" not in process_kwargs and "output_path" not in process_kwargs) process_kwargs["input_path"] = input_path process_kwargs["output_path"] = output_path # Run busy loop in background process. self.proc: Process = context.Process(target=target_fn, kwargs=process_kwargs, name=process_name) self._finalizer = weakref.finalize(self, shutdown, self.proc, input_path, output_path) self.proc.start() def fileno(self): return self.proc.sentinel def shutdown(self): self._finalizer() # Note(rob): shutdown function cannot be a bound method, # else the gc cannot collect the object. def shutdown(proc: Process, input_path: str, output_path: str): # Shutdown the process. if proc.is_alive(): proc.terminate() proc.join(5) if proc.is_alive() and (pid := proc.pid) is not None: kill_process_tree(pid) # Remove zmq ipc socket files. ipc_sockets = [output_path, input_path] for ipc_socket in ipc_sockets: socket_file = ipc_socket.replace("ipc://", "") if os and os.path.exists(socket_file): os.remove(socket_file) def bind_kv_cache( kv_caches: dict[str, torch.Tensor], forward_context: dict[str, "Attention"], runner_kv_caches: list[torch.Tensor], ) -> None: """ Bind the allocated KV cache to both ModelRunner and forward context so that the KV cache can be used in the forward pass. This function: 1) Fills the ModelRunner's kv cache list (`runner_kv_caches`) with kv_caches. 2) Associates each attention layer in the `forward_context` with its corresponding KV cache in kv_caches. Args: kv_caches: The allocated kv_caches with layer names as keys. forward_context: The global forward context containing all Attention layers with layer names as keys. runner_kv_caches: The kv_cache declared by ModelRunner. """ # Bind kv_caches to ModelRunner assert len(runner_kv_caches) == 0 # Convert kv_caches dict to a list of tensors in the order of layer_index. index2name = defaultdict(list) for layer_name in kv_caches: index2name[extract_layer_index(layer_name)].append(layer_name) for layer_index in sorted(index2name.keys()): layer_names = index2name[layer_index] if len(layer_names) > 1: # One typical case is encoder-decoder model, e.g., bart. # The cross attention and self attention in the same decoder layer # has different layer_name but the same layer_index. raise NotImplementedError layer_name = layer_names[0] runner_kv_caches.append(kv_caches[layer_name]) # Bind kv_caches to forward context for layer_name, kv_cache in kv_caches.items(): # NOTE: Use list because of v0 PP virtual engine. forward_context[layer_name].kv_cache = [kv_cache] def copy_slice(from_tensor: torch.Tensor, to_tensor: torch.Tensor, length: int) -> torch.Tensor: """ Copy the first length elements of a tensor into another tensor in a non-blocking manner. Used to copy pinned CPU tensor data to pre-allocated GPU tensors. Returns the sliced target tensor. """ return to_tensor[:length].copy_(from_tensor[:length], non_blocking=True) def report_usage_stats( vllm_config, usage_context: UsageContext = UsageContext.ENGINE_CONTEXT) -> None: """Report usage statistics if enabled.""" if not is_usage_stats_enabled(): return from vllm.model_executor.model_loader import get_architecture_class_name usage_message.report_usage( get_architecture_class_name(vllm_config.model_config), usage_context, extra_kvs={ # Common configuration "dtype": str(vllm_config.model_config.dtype), "tensor_parallel_size": vllm_config.parallel_config.tensor_parallel_size, "block_size": vllm_config.cache_config.block_size, "gpu_memory_utilization": vllm_config.cache_config.gpu_memory_utilization, # Quantization "quantization": vllm_config.model_config.quantization, "kv_cache_dtype": str(vllm_config.cache_config.cache_dtype), # Feature flags "enable_lora": bool(vllm_config.lora_config), "enable_prompt_adapter": bool(vllm_config.prompt_adapter_config), "enable_prefix_caching": vllm_config.cache_config.enable_prefix_caching, "enforce_eager": vllm_config.model_config.enforce_eager, "disable_custom_all_reduce": vllm_config.parallel_config.disable_custom_all_reduce, })