mirror of
https://git.datalinker.icu/vllm-project/vllm.git
synced 2026-04-16 06:27:04 +08:00
358 lines
14 KiB
Python
358 lines
14 KiB
Python
# SPDX-License-Identifier: Apache-2.0
|
|
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
|
|
|
from contextlib import contextmanager
|
|
from typing import Any, Literal
|
|
|
|
import torch
|
|
|
|
from vllm.config import VllmConfig
|
|
from vllm.envs import SLAB_OPTIMIZATION
|
|
from vllm.logger import init_logger
|
|
from vllm.lora.lora_model import LoRAModel
|
|
from vllm.lora.model_manager import (
|
|
LoRAModelManager,
|
|
LRUCacheLoRAModelManager,
|
|
create_lora_manager,
|
|
)
|
|
from vllm.lora.peft_helper import PEFTHelper
|
|
from vllm.lora.request import LoRARequest
|
|
from vllm.lora.utils import get_adapter_absolute_path
|
|
|
|
logger = init_logger(__name__)
|
|
|
|
|
|
class WorkerLoRAManager:
|
|
"""WorkerLoRAManager that manages LoRA models on the worker side.
|
|
|
|
Every request, the requested LoRAs will be loaded (unless they are already
|
|
loaded), and every other LoRA will be unloaded."""
|
|
|
|
_manager_cls: type[LoRAModelManager] = LoRAModelManager
|
|
|
|
def __init__(
|
|
self,
|
|
vllm_config: VllmConfig,
|
|
device: torch.device,
|
|
embedding_modules: dict[str, str],
|
|
embedding_padding_modules: list[str],
|
|
lora_model_cls: type[LoRAModel] = LoRAModel,
|
|
):
|
|
self._lora_model_cls = lora_model_cls
|
|
self.embedding_modules = embedding_modules
|
|
self.embedding_padding_modules = embedding_padding_modules
|
|
self._cached_dummy_lora: None | Literal[False] | LoRAModel = False
|
|
self.max_num_seqs = vllm_config.scheduler_config.max_num_seqs
|
|
self.max_num_batched_tokens = (
|
|
vllm_config.scheduler_config.max_num_batched_tokens
|
|
)
|
|
self.vocab_size = vllm_config.model_config.get_vocab_size()
|
|
self.lora_config = vllm_config.lora_config
|
|
|
|
# Use get_text_config() in case of multimodal models
|
|
text_config = vllm_config.model_config.hf_config.get_text_config()
|
|
|
|
self.max_position_embeddings = text_config.max_position_embeddings
|
|
self.device = device
|
|
# Lazily initialized by create_lora_manager.
|
|
self._adapter_manager: LoRAModelManager
|
|
|
|
@contextmanager
|
|
def dummy_lora_cache(self):
|
|
"""Use this context manager to reuse the dummy lora model
|
|
to avoid creating it repeatedly."""
|
|
self._cached_dummy_lora = None
|
|
yield
|
|
self._cached_dummy_lora = False
|
|
|
|
@property
|
|
def is_enabled(self) -> bool:
|
|
return True
|
|
|
|
def create_lora_manager(
|
|
self,
|
|
model: torch.nn.Module,
|
|
) -> Any:
|
|
lora_manager = create_lora_manager(
|
|
model,
|
|
max_num_seqs=self.max_num_seqs,
|
|
max_num_batched_tokens=self.max_num_batched_tokens,
|
|
vocab_size=self.vocab_size,
|
|
lora_config=self.lora_config,
|
|
device=self.device,
|
|
lora_manager_cls=self._manager_cls,
|
|
)
|
|
self._adapter_manager = lora_manager
|
|
return lora_manager.model
|
|
|
|
def _load_adapter(self, lora_request: LoRARequest) -> LoRAModel | None:
|
|
if SLAB_OPTIMIZATION:
|
|
lora_path = get_adapter_absolute_path(lora_request.lora_path)
|
|
# Check for dummy/fake warmup paths
|
|
if (
|
|
"/not/a/real/path" in lora_path
|
|
or "warmup" in lora_request.lora_name.lower()
|
|
or not lora_path
|
|
or lora_path == "/not/a/real/path"
|
|
):
|
|
logger.warning(
|
|
"[SLAB_OPTIMIZATION] Skipping dummy warmup LoRA: %s "
|
|
"(path: %s) - not needed with slab optimization",
|
|
lora_request.lora_name,
|
|
lora_path,
|
|
)
|
|
return None
|
|
|
|
# Check if adapter_config.json exists for real LoRAs
|
|
import os
|
|
|
|
lora_config_path = os.path.join(lora_path, "adapter_config.json")
|
|
if not os.path.exists(lora_config_path):
|
|
logger.warning(
|
|
"[SLAB_OPTIMIZATION] Skipping LoRA %s - "
|
|
"adapter_config.json not found at %s, likely dummy warmup",
|
|
lora_request.lora_name,
|
|
lora_config_path,
|
|
)
|
|
return None
|
|
|
|
try:
|
|
supported_lora_modules = self._adapter_manager.supported_lora_modules
|
|
packed_modules_mapping = self._adapter_manager.packed_modules_mapping
|
|
expected_lora_lst: list[str] = []
|
|
for module in supported_lora_modules:
|
|
if module in packed_modules_mapping:
|
|
expected_lora_lst.extend(packed_modules_mapping[module])
|
|
else:
|
|
expected_lora_lst.append(module)
|
|
if module == "experts":
|
|
expected_lora_lst.append(module)
|
|
expected_lora_modules = set(expected_lora_lst)
|
|
lora_path = get_adapter_absolute_path(lora_request.lora_path)
|
|
|
|
peft_helper = PEFTHelper.from_local_dir(
|
|
lora_path,
|
|
self.max_position_embeddings,
|
|
lora_request.tensorizer_config_dict,
|
|
)
|
|
|
|
# Validates the LoRA configuration against requirements before
|
|
# loading weights, throwing an exception if validation fails.
|
|
peft_helper.validate_legal(self.lora_config)
|
|
|
|
# For some models like Qwen2VL, we need to use hf_to_vllm_mapper
|
|
# to ensure correct loading of lora weights.
|
|
model = self._adapter_manager.model
|
|
hf_to_vllm_mapper = getattr(model, "hf_to_vllm_mapper", None)
|
|
# Get target modules, lora_config, AND packed_modules info
|
|
target_modules_dict: dict | None = None
|
|
target_lora_config: Any = None
|
|
packed_modules_dict: dict | None = None
|
|
packed_modules_map: dict | None = None
|
|
|
|
if SLAB_OPTIMIZATION and hasattr(self, "_adapter_manager"):
|
|
target_modules_dict = getattr(self._adapter_manager, "modules", None)
|
|
target_lora_config = getattr(self._adapter_manager, "lora_config", None)
|
|
packed_modules_dict = getattr(
|
|
self._adapter_manager, "packed_modules", None
|
|
)
|
|
packed_modules_map = getattr(
|
|
self._adapter_manager, "packed_modules_mapping", None
|
|
)
|
|
|
|
if target_modules_dict and target_lora_config:
|
|
logger.debug(
|
|
"[SLAB_OPTIMIZATION] Passing %d target modules and "
|
|
"lora_config (fully_sharded_loras=%s) to LoRA creation",
|
|
len(target_modules_dict),
|
|
target_lora_config.fully_sharded_loras,
|
|
)
|
|
logger.debug(
|
|
"[SLAB_OPTIMIZATION] Passing %d packed_modules for "
|
|
"pre-slab packing",
|
|
len(packed_modules_dict) if packed_modules_dict else 0,
|
|
)
|
|
else:
|
|
logger.warning(
|
|
"[SLAB_OPTIMIZATION] Missing target info - "
|
|
"modules: %s, lora_config: %s",
|
|
target_modules_dict is not None,
|
|
target_lora_config is not None,
|
|
)
|
|
|
|
lora = self._lora_model_cls.from_local_checkpoint(
|
|
lora_path,
|
|
expected_lora_modules,
|
|
peft_helper=peft_helper,
|
|
lora_model_id=lora_request.lora_int_id,
|
|
device="cpu",
|
|
dtype=self.lora_config.lora_dtype,
|
|
model_vocab_size=self.vocab_size,
|
|
embedding_modules=self.embedding_modules,
|
|
embedding_padding_modules=self.embedding_padding_modules,
|
|
tensorizer_config_dict=lora_request.tensorizer_config_dict,
|
|
weights_mapper=hf_to_vllm_mapper,
|
|
target_modules_dict=target_modules_dict,
|
|
target_lora_config=target_lora_config,
|
|
slab_path=lora_request.slab_path,
|
|
packed_modules=packed_modules_dict,
|
|
packed_modules_mapping=packed_modules_map,
|
|
)
|
|
|
|
except FileNotFoundError as e:
|
|
# FileNotFoundError should be raised if both
|
|
# - No adapter found to download from huggingface (or in
|
|
# offline mode)
|
|
# - No local adapter files found at `lora_request.lora_path`
|
|
# For NotFoundError
|
|
raise ValueError(
|
|
f"Loading lora {lora_request.lora_name} failed: No adapter "
|
|
f"found for {lora_request.lora_path}"
|
|
) from e
|
|
except Exception as e:
|
|
# For BadRequestError
|
|
raise e
|
|
|
|
return lora
|
|
|
|
def add_dummy_lora(self, lora_request: LoRARequest, rank: int) -> bool:
|
|
if lora_request.lora_int_id in self.list_adapters():
|
|
return False
|
|
if isinstance(self._cached_dummy_lora, LoRAModel):
|
|
dummy_lora = self._cached_dummy_lora.clone(lora_request.lora_int_id)
|
|
else:
|
|
dummy_lora = self._adapter_manager.create_dummy_lora(
|
|
lora_request.lora_int_id, rank, self.embedding_modules
|
|
)
|
|
if self._cached_dummy_lora is None:
|
|
self._cached_dummy_lora = dummy_lora
|
|
return self._adapter_manager.add_adapter(dummy_lora)
|
|
|
|
def pin_adapter(self, adapter_id: int) -> bool:
|
|
return self._adapter_manager.pin_adapter(adapter_id)
|
|
|
|
def set_active_adapters(self, requests: set[Any], mapping: Any | None) -> None:
|
|
self._apply_adapters(requests)
|
|
if mapping is not None:
|
|
self._adapter_manager.set_adapter_mapping(mapping)
|
|
|
|
def _apply_adapters(self, adapter_requests: set[Any]) -> None:
|
|
existing_adapters = self.list_adapters()
|
|
models_map = {
|
|
adapter_request.adapter_id: adapter_request
|
|
for adapter_request in adapter_requests
|
|
if adapter_request
|
|
}
|
|
if len(models_map) > self._adapter_manager.adapter_slots:
|
|
raise RuntimeError(
|
|
f"Number of requested models ({len(models_map)}) is greater "
|
|
"than the number of GPU model slots "
|
|
f"({self._adapter_manager.adapter_slots})."
|
|
)
|
|
requested_ids = set(models_map)
|
|
for adapter_id in existing_adapters - requested_ids:
|
|
self.remove_adapter(adapter_id)
|
|
for adapter_id in requested_ids - existing_adapters:
|
|
self.add_adapter(models_map[adapter_id])
|
|
|
|
def add_adapter(self, adapter_request: Any) -> bool:
|
|
if adapter_request.adapter_id in self.list_adapters():
|
|
return False
|
|
loaded_adapter = self._load_adapter(adapter_request)
|
|
if loaded_adapter is None:
|
|
# Dummy warmup LoRA was skipped under SLAB_OPTIMIZATION
|
|
logger.debug(
|
|
"[SLAB_OPTIMIZATION] Skipped dummy LoRA: %s",
|
|
adapter_request.lora_name,
|
|
)
|
|
return False
|
|
loaded = self._adapter_manager.add_adapter(loaded_adapter)
|
|
self._adapter_manager.activate_adapter(loaded_adapter.id)
|
|
return loaded
|
|
|
|
def remove_adapter(self, adapter_id: int) -> bool:
|
|
return self._adapter_manager.remove_adapter(adapter_id)
|
|
|
|
def remove_all_adapters(self):
|
|
self._adapter_manager.remove_all_adapters()
|
|
|
|
def list_adapters(self) -> set[int]:
|
|
return set(self._adapter_manager.list_adapters())
|
|
|
|
|
|
class LRUCacheWorkerLoRAManager(WorkerLoRAManager):
|
|
"""WorkerLoRAManager that manages LoRA models on the worker side.
|
|
|
|
Uses an LRU Cache. Every request, the requested LoRAs will be loaded
|
|
(unless they are already loaded) and least recently used LoRAs will
|
|
be unloaded if the cache is above capacity."""
|
|
|
|
_manager_cls: type[LRUCacheLoRAModelManager] = LRUCacheLoRAModelManager
|
|
|
|
def create_lora_manager(
|
|
self,
|
|
model: torch.nn.Module,
|
|
) -> Any:
|
|
lora_manager = create_lora_manager(
|
|
model,
|
|
lora_manager_cls=self._manager_cls,
|
|
max_num_seqs=self.max_num_seqs,
|
|
vocab_size=self.vocab_size,
|
|
lora_config=self.lora_config,
|
|
device=self.device,
|
|
max_num_batched_tokens=self.max_num_batched_tokens,
|
|
)
|
|
self._adapter_manager = lora_manager
|
|
return lora_manager.model
|
|
|
|
def _apply_adapters(self, lora_requests: set[LoRARequest]) -> None:
|
|
loras_map = {
|
|
lora_request.lora_int_id: lora_request
|
|
for lora_request in lora_requests
|
|
if lora_request
|
|
}
|
|
if len(loras_map) > self._adapter_manager.lora_slots:
|
|
raise RuntimeError(
|
|
f"Number of requested LoRAs ({len(loras_map)}) is greater "
|
|
"than the number of GPU LoRA slots "
|
|
f"({self._adapter_manager.lora_slots})."
|
|
)
|
|
for lora in loras_map.values():
|
|
self.add_adapter(lora)
|
|
|
|
def add_adapter(self, lora_request: LoRARequest) -> bool:
|
|
# Note that this method is not thread-safe. It may be invoked multiple
|
|
# times for the same adapter when using multiple API servers.
|
|
# This is ok because it's currently only called from
|
|
# the single-threaded core engine loop.
|
|
|
|
if lora_request.lora_int_id not in self.list_adapters():
|
|
# Load the new adapter first to ensure it is actually valid, before
|
|
# evicting any existing adapters.
|
|
# This may cause the # of loaded lora adapters to very temporarily
|
|
# exceed `--max-cpu-loras`.
|
|
lora = self._load_adapter(lora_request)
|
|
if lora is None:
|
|
# Dummy warmup LoRA was skipped under SLAB_OPTIMIZATION
|
|
logger.debug(
|
|
"[SLAB_OPTIMIZATION] Skipped dummy LoRA: %s",
|
|
lora_request.lora_name,
|
|
)
|
|
return False
|
|
|
|
# Loading succeeded, now check if we will exceed cache capacity and
|
|
# evict if the oldest adapter if so
|
|
if len(self._adapter_manager) + 1 > self._adapter_manager.capacity:
|
|
assert isinstance(self._adapter_manager, LRUCacheLoRAModelManager)
|
|
self._adapter_manager.remove_oldest_adapter()
|
|
# Then add the new adapter to the cache
|
|
loaded = self._adapter_manager.add_adapter(lora)
|
|
else:
|
|
# If the lora is already loaded, just touch it to
|
|
# update its position in the caches
|
|
loaded = (
|
|
self._adapter_manager.get_adapter(lora_request.lora_int_id) is not None
|
|
)
|
|
self._adapter_manager.activate_adapter(lora_request.lora_int_id)
|
|
return loaded
|