mirror of
https://git.datalinker.icu/vllm-project/vllm.git
synced 2025-12-10 06:35:00 +08:00
192 lines
7.3 KiB
Python
192 lines
7.3 KiB
Python
"""A Neuron worker class."""
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
import torch
|
|
import torch.distributed
|
|
|
|
from vllm.config import (CacheConfig, DeviceConfig, ModelConfig,
|
|
ParallelConfig, SchedulerConfig, LoRAConfig)
|
|
from vllm.model_executor import set_random_seed
|
|
from vllm.model_executor.parallel_utils.communication_op import (
|
|
broadcast_tensor_dict)
|
|
from vllm.model_executor.parallel_utils.parallel_state import (
|
|
ensure_model_parallel_initialized)
|
|
from vllm.sequence import SamplerOutput, SequenceGroupMetadata
|
|
from vllm.worker.cache_engine import CacheEngine
|
|
from vllm.worker.model_runner import ModelRunner
|
|
|
|
|
|
class Worker:
|
|
"""A worker class that executes the model on a group of neuron cores.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
model_config: ModelConfig,
|
|
parallel_config: ParallelConfig,
|
|
scheduler_config: SchedulerConfig,
|
|
device_config: DeviceConfig,
|
|
local_rank: int,
|
|
rank: int,
|
|
distributed_init_method: str,
|
|
lora_config: Optional[LoRAConfig] = None,
|
|
kv_cache_dtype: Optional[str] = "auto",
|
|
is_driver_worker: bool = False,
|
|
) -> None:
|
|
self.model_config = model_config
|
|
self.parallel_config = parallel_config
|
|
self.scheduler_config = scheduler_config
|
|
self.device_config = device_config
|
|
self.local_rank = local_rank
|
|
self.rank = rank
|
|
self.distributed_init_method = distributed_init_method
|
|
self.lora_config = lora_config
|
|
self.is_driver_worker = is_driver_worker
|
|
if self.is_driver_worker:
|
|
assert self.rank == 0, "The driver worker must have rank 0."
|
|
|
|
self.model_runner = ModelRunner(model_config,
|
|
parallel_config,
|
|
scheduler_config,
|
|
device_config,
|
|
lora_config=self.lora_config,
|
|
is_driver_worker=is_driver_worker)
|
|
# Uninitialized cache engine. Will be initialized by
|
|
# self.init_cache_engine().
|
|
self.cache_config = None
|
|
self.cache_engine = None
|
|
self.cache_events = None
|
|
self.gpu_cache = None
|
|
|
|
def init_model(self) -> None:
|
|
# Initialize the distributed environment.
|
|
_init_distributed_environment(self.parallel_config,
|
|
self.rank,
|
|
self.distributed_init_method,
|
|
distributed_backend="gloo")
|
|
|
|
# Initialize the model.
|
|
set_random_seed(self.model_config.seed)
|
|
|
|
def load_model(self):
|
|
self.model_runner.load_model()
|
|
|
|
@torch.inference_mode()
|
|
def profile_num_available_blocks(
|
|
self,
|
|
block_size: int = 128,
|
|
gpu_memory_utilization: float = 0.9,
|
|
cpu_swap_space: int = 0,
|
|
cache_dtype: str = "float16",
|
|
) -> Tuple[int, int]:
|
|
"""Simply returns max_num_seqs as num_gpu_blocks, 0 as num_cpu_blocks."""
|
|
num_gpu_blocks = self.scheduler_config.max_num_seqs
|
|
num_cpu_blocks = 0
|
|
return num_gpu_blocks, num_cpu_blocks
|
|
|
|
def init_cache_engine(self, cache_config: CacheConfig) -> None:
|
|
self.cache_config = cache_config
|
|
self.cache_engine = CacheEngine(self.cache_config, self.model_config,
|
|
self.parallel_config)
|
|
self.model_runner.set_block_size(self.cache_engine.block_size)
|
|
|
|
def warm_up_model(self) -> None:
|
|
# Warm up is maintained in transformers-neuronx
|
|
pass
|
|
|
|
def cache_swap(
|
|
self,
|
|
blocks_to_swap_in: Dict[int, int],
|
|
blocks_to_swap_out: Dict[int, int],
|
|
blocks_to_copy: Dict[int, List[int]],
|
|
) -> None:
|
|
# Issue cache operations.
|
|
issued_cache_op = False
|
|
if blocks_to_swap_in:
|
|
self.cache_engine.swap_in(blocks_to_swap_in)
|
|
issued_cache_op = True
|
|
if blocks_to_swap_out:
|
|
self.cache_engine.swap_out(blocks_to_swap_out)
|
|
issued_cache_op = True
|
|
if blocks_to_copy:
|
|
self.cache_engine.copy(blocks_to_copy)
|
|
issued_cache_op = True
|
|
|
|
cache_events = self.cache_events if issued_cache_op else None
|
|
|
|
# Wait for cache operations to finish.
|
|
if cache_events is not None:
|
|
raise NotImplementedError(
|
|
"cache operations are not implemented for neuron backend.")
|
|
|
|
@torch.inference_mode()
|
|
def execute_model(
|
|
self,
|
|
seq_group_metadata_list: Optional[List[SequenceGroupMetadata]] = None,
|
|
blocks_to_swap_in: Optional[Dict[int, int]] = None,
|
|
blocks_to_swap_out: Optional[Dict[int, int]] = None,
|
|
blocks_to_copy: Optional[Dict[int, List[int]]] = None,
|
|
) -> Optional[SamplerOutput]:
|
|
if self.is_driver_worker:
|
|
assert seq_group_metadata_list is not None
|
|
num_seq_groups = len(seq_group_metadata_list)
|
|
assert blocks_to_swap_in is not None
|
|
assert blocks_to_swap_out is not None
|
|
assert blocks_to_copy is not None
|
|
data = {
|
|
"num_seq_groups": num_seq_groups,
|
|
"blocks_to_swap_in": blocks_to_swap_in,
|
|
"blocks_to_swap_out": blocks_to_swap_out,
|
|
"blocks_to_copy": blocks_to_copy,
|
|
}
|
|
broadcast_tensor_dict(data, src=0)
|
|
else:
|
|
data = broadcast_tensor_dict(src=0)
|
|
num_seq_groups = data["num_seq_groups"]
|
|
blocks_to_swap_in = data["blocks_to_swap_in"]
|
|
blocks_to_swap_out = data["blocks_to_swap_out"]
|
|
blocks_to_copy = data["blocks_to_copy"]
|
|
|
|
self.cache_swap(blocks_to_swap_in, blocks_to_swap_out, blocks_to_copy)
|
|
|
|
# If there is no input, we don't need to execute the model.
|
|
if num_seq_groups == 0:
|
|
return {}
|
|
|
|
output = self.model_runner.execute_model(seq_group_metadata_list,
|
|
self.gpu_cache)
|
|
return output
|
|
|
|
|
|
def _init_distributed_environment(
|
|
parallel_config: ParallelConfig,
|
|
rank: int,
|
|
distributed_init_method: Optional[str] = None,
|
|
distributed_backend: Optional[str] = None,
|
|
) -> None:
|
|
"""Initialize the distributed environment."""
|
|
if torch.distributed.is_initialized():
|
|
torch_world_size = torch.distributed.get_world_size()
|
|
if torch_world_size != parallel_config.world_size:
|
|
raise RuntimeError(
|
|
"torch.distributed is already initialized but the torch world "
|
|
"size does not match parallel_config.world_size "
|
|
f"({torch_world_size} vs. {parallel_config.world_size}).")
|
|
elif not distributed_init_method:
|
|
raise ValueError(
|
|
"distributed_init_method must be set if torch.distributed "
|
|
"is not already initialized")
|
|
else:
|
|
distributed_backend = distributed_backend if distributed_backend else "nccl"
|
|
torch.distributed.init_process_group(
|
|
backend=distributed_backend,
|
|
world_size=parallel_config.world_size,
|
|
rank=rank,
|
|
init_method=distributed_init_method,
|
|
)
|
|
|
|
# A small all_reduce for warmup.
|
|
torch.distributed.all_reduce(torch.zeros(1))
|
|
ensure_model_parallel_initialized(parallel_config.tensor_parallel_size,
|
|
parallel_config.pipeline_parallel_size)
|