From 720b10fdc6891b2540fce172b63eb07c1ba48958 Mon Sep 17 00:00:00 2001 From: Robert Shaw <114415538+robertgshaw2-neuralmagic@users.noreply.github.com> Date: Thu, 26 Dec 2024 18:03:43 -0500 Subject: [PATCH 1/9] [1/N] API Server (Remove Proxy) (#11529) --- vllm/entrypoints/openai/api_server.py | 18 ++++++++++++------ vllm/entrypoints/openai/cli_args.py | 6 +++++- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 3e50613a73dd3..16086689a10d1 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -585,12 +585,18 @@ def build_app(args: Namespace) -> FastAPI: status_code=401) return await call_next(request) - @app.middleware("http") - async def add_request_id(request: Request, call_next): - request_id = request.headers.get("X-Request-Id") or uuid.uuid4().hex - response = await call_next(request) - response.headers["X-Request-Id"] = request_id - return response + if args.enable_request_id_headers: + logger.warning( + "CAUTION: Enabling X-Request-Id headers in the API Server. " + "This can harm performance at high QPS.") + + @app.middleware("http") + async def add_request_id(request: Request, call_next): + request_id = request.headers.get( + "X-Request-Id") or uuid.uuid4().hex + response = await call_next(request) + response.headers["X-Request-Id"] = request_id + return response for middleware in args.middleware: module_path, object_name = middleware.rsplit(".", 1) diff --git a/vllm/entrypoints/openai/cli_args.py b/vllm/entrypoints/openai/cli_args.py index 24c206a1261f2..908f8c3532c9e 100644 --- a/vllm/entrypoints/openai/cli_args.py +++ b/vllm/entrypoints/openai/cli_args.py @@ -196,7 +196,11 @@ def make_arg_parser(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: action="store_true", help="If specified, will run the OpenAI frontend server in the same " "process as the model serving engine.") - + parser.add_argument( + "--enable-request-id-headers", + action="store_true", + help="If specified, API server will add X-Request-Id header to " + "responses. Caution: this hurts performance at high QPS.") parser.add_argument( "--enable-auto-tool-choice", action="store_true", From 2072924d1480460d3b3578a4548c2bffe33fe1c3 Mon Sep 17 00:00:00 2001 From: Michael Goin Date: Thu, 26 Dec 2024 18:33:30 -0500 Subject: [PATCH 2/9] [Model] [Quantization] Support deepseek_v3 w8a8 fp8 block-wise quantization (#11523) Signed-off-by: mgoin Signed-off-by: simon-mo Signed-off-by: simon-mo Co-authored-by: simon-mo Co-authored-by: simon-mo Co-authored-by: HandH1998 <1335248067@qq.com> --- tests/kernels/test_block_fp8.py | 265 +++++++++++++ vllm/config.py | 14 +- .../layers/fused_moe/fused_moe.py | 131 +++++-- vllm/model_executor/layers/fused_moe/layer.py | 7 +- vllm/model_executor/layers/linear.py | 23 +- .../model_executor/layers/quantization/fp8.py | 199 ++++++++-- .../layers/quantization/utils/fp8_utils.py | 353 ++++++++++++++++++ vllm/model_executor/parameter.py | 9 + 8 files changed, 931 insertions(+), 70 deletions(-) create mode 100644 tests/kernels/test_block_fp8.py create mode 100644 vllm/model_executor/layers/quantization/utils/fp8_utils.py diff --git a/tests/kernels/test_block_fp8.py b/tests/kernels/test_block_fp8.py new file mode 100644 index 0000000000000..a16cc4582a180 --- /dev/null +++ b/tests/kernels/test_block_fp8.py @@ -0,0 +1,265 @@ +# Adapted from https://github.com/sgl-project/sglang/pull/2575 +import itertools + +import pytest +import torch + +from vllm.model_executor.layers.activation import SiluAndMul +from vllm.model_executor.layers.fused_moe import fused_moe +from vllm.model_executor.layers.quantization.utils.fp8_utils import ( + per_token_group_quant_fp8, w8a8_block_fp8_matmul) +from vllm.platforms import current_platform + +if current_platform.get_device_capability() < (9, 0): + pytest.skip("FP8 Triton requires CUDA 9.0 or higher", + allow_module_level=True) + +# Test configurations +DTYPES = [torch.bfloat16] # [torch.half, torch.bfloat16, torch.float32] +NUM_TOKENS = [7, 83, 2048] +D = [512, 4096, 5120, 13824] +GROUP_SIZE = [64, 128, 256, 512] +M = [1, 7, 83, 512, 2048] +N = [128, 512, 1024, 4096, 7748, 13824] +K = [256, 4096, 5120, 3884, 13824] +# Deepseek-V3's intermediate size 18432, so N is 18432*2/8=4608 at TP8 +# and its hidden size is 7168. +M_moe = [1, 7, 83, 512, 2048] +N_moe = [4608] # [128, 4608, 13824] +K_moe = [7168] # [256, 7168, 13824] +BLOCK_SIZE = [[128, 128]] +E = [256] # [8, 24, 128, 256] +TOP_KS = [1] # [1, 2, 6] +OUT_DTYPES = [torch.bfloat16] # [torch.float32, torch.half, torch.bfloat16] +SEEDS = [0] + + +def native_per_token_group_quant_fp8(x, + group_size, + eps=1e-10, + dtype=torch.float8_e4m3fn): + """Function to perform per-token-group quantization on an input tensor + `x` using native torch.""" + assert x.shape[-1] % group_size == 0, ("the last dimension of `x` cannot " + "be divisible by `group_size`") + assert x.is_contiguous(), "`x` is not contiguous" + + finfo = torch.finfo(dtype) + fp8_min = finfo.min + fp8_max = finfo.max + + x_ = x.reshape(x.numel() // group_size, group_size) + amax = x_.abs().max(dim=-1, + keepdim=True)[0].clamp(min=eps).to(torch.float32) + x_s = amax / fp8_max + x_q = (x_ / x_s).clamp(min=fp8_min, max=fp8_max).to(dtype) + x_q = x_q.reshape(x.shape) + x_s = x_s.reshape(x.shape[:-1] + (x.shape[-1] // group_size, )) + + return x_q, x_s + + +def native_w8a8_block_fp8_matmul(A, + B, + As, + Bs, + block_size, + output_dtype=torch.float16): + """Matrix multiplication with block-wise quantization using native torch.""" + A = A.to(torch.float32) + B = B.to(torch.float32) + assert A.shape[-1] == B.shape[-1] + assert B.ndim == 2 and B.is_contiguous() and Bs.ndim == 2 + assert len(block_size) == 2 + block_n, block_k = block_size[0], block_size[1] + assert (A.shape[-1] + block_k - 1) // block_k == As.shape[-1] + assert A.shape[:-1] == As.shape[:-1] + + M = A.numel() // A.shape[-1] + N, K = B.shape + origin_C_shape = A.shape[:-1] + (N, ) + A = A.reshape(M, A.shape[-1]) + As = As.reshape(M, As.shape[-1]) + n_tiles = (N + block_n - 1) // block_n + k_tiles = (K + block_k - 1) // block_k + assert n_tiles == Bs.shape[0] + assert k_tiles == Bs.shape[1] + + C_shape = (M, N) + C = torch.zeros(C_shape, dtype=torch.float32, device=A.device) + + A_tiles = [ + A[:, i * block_k:min((i + 1) * block_k, K)] for i in range(k_tiles) + ] + B_tiles = [[ + B[j * block_n:min((j + 1) * block_n, N), + i * block_k:min((i + 1) * block_k, K), ] for i in range(k_tiles) + ] for j in range(n_tiles)] + C_tiles = [ + C[:, j * block_n:min((j + 1) * block_n, N)] for j in range(n_tiles) + ] + As_tiles = [As[:, i:i + 1] for i in range(k_tiles)] + + for i in range(k_tiles): + for j in range(n_tiles): + a = A_tiles[i] + b = B_tiles[j][i] + c = C_tiles[j] + s = As_tiles[i] * Bs[j][i] + c[:, :] += torch.matmul(a, b.t()) * s + + C = C.reshape(origin_C_shape).to(output_dtype) + return C + + +def torch_w8a8_block_fp8_moe(a, w1, w2, w1_s, w2_s, score, topk, block_shape): + """Fused moe with block-wise quantization using native torch.""" + B, D = a.shape + a = a.view(B, -1, D).repeat(1, topk, 1).reshape(-1, D) + out = torch.zeros(B * topk, w2.shape[1], dtype=a.dtype, device=a.device) + score = torch.softmax(score, dim=-1, dtype=torch.float32) + topk_weight, topk_ids = torch.topk(score, topk) + topk_weight = topk_weight.view(-1) + topk_ids = topk_ids.view(-1) + + _, block_k = block_shape[0], block_shape[1] + a_q, a_s = native_per_token_group_quant_fp8(a, block_k) + a_q = a_q.to(torch.float32) + for i in range(w1.shape[0]): + mask = topk_ids == i + if mask.sum(): + inter_out = native_w8a8_block_fp8_matmul(a_q[mask], + w1[i], + a_s[mask], + w1_s[i], + block_shape, + output_dtype=a.dtype) + act_out = SiluAndMul().forward_native(inter_out) + act_out_q, act_out_s = native_per_token_group_quant_fp8( + act_out, block_k) + act_out = act_out.to(torch.float32) + out[mask] = native_w8a8_block_fp8_matmul(act_out_q, + w2[i], + act_out_s, + w2_s[i], + block_shape, + output_dtype=a.dtype) + return (out.view(B, -1, w2.shape[1]) * + topk_weight.view(B, -1, 1).to(out.dtype)).sum(dim=1) + + +# Skip all tests if CUDA is not available +pytest.importorskip("torch.cuda") + + +@pytest.fixture(autouse=True) +def setup_cuda(): + torch.set_default_device("cuda") + + +@pytest.mark.parametrize("num_tokens,d,dtype,group_size,seed", + itertools.product(NUM_TOKENS, D, DTYPES, GROUP_SIZE, + SEEDS)) +@torch.inference_mode() +def test_per_token_group_quant_fp8(num_tokens, d, dtype, group_size, seed): + torch.manual_seed(seed) + x = torch.rand(num_tokens, d, dtype=dtype) + + ref_out, ref_scale = native_per_token_group_quant_fp8(x, group_size) + out, scale = per_token_group_quant_fp8(x, group_size) + + assert torch.allclose(out.to(torch.float32), + ref_out.to(torch.float32), + rtol=0.15) + assert torch.allclose(scale, ref_scale) + + +@pytest.mark.parametrize("M,N,K,block_size,out_dtype,seed", + itertools.product(M, N, K, BLOCK_SIZE, OUT_DTYPES, + SEEDS)) +@torch.inference_mode() +def test_w8a8_block_fp8_matmul(M, N, K, block_size, out_dtype, seed): + torch.manual_seed(seed) + factor_for_scale = 1e-2 + fp8_info = torch.finfo(torch.float8_e4m3fn) + fp8_max, fp8_min = fp8_info.max, fp8_info.min + + A_fp32 = (torch.rand(M, K, dtype=torch.float32) - 0.5) * 2 * fp8_max + A_fp8 = A_fp32.clamp(min=fp8_min, max=fp8_max).to(torch.float8_e4m3fn) + + B_fp32 = (torch.rand(N, K, dtype=torch.float32) - 0.5) * 2 * fp8_max + B_fp8 = B_fp32.clamp(min=fp8_min, max=fp8_max).to(torch.float8_e4m3fn) + + block_n, block_k = block_size[0], block_size[1] + n_tiles = (N + block_n - 1) // block_n + k_tiles = (K + block_k - 1) // block_k + + As = torch.rand(M, k_tiles, dtype=torch.float32) * factor_for_scale + Bs = torch.rand(n_tiles, k_tiles, dtype=torch.float32) * factor_for_scale + + ref_out = native_w8a8_block_fp8_matmul(A_fp8, B_fp8, As, Bs, block_size, + out_dtype) + out = w8a8_block_fp8_matmul(A_fp8, B_fp8, As, Bs, block_size, out_dtype) + + rel_diff = (torch.mean( + torch.abs(out.to(torch.float32) - ref_out.to(torch.float32))) / + torch.mean(torch.abs(ref_out.to(torch.float32)))) + assert rel_diff < 0.001 + + +@pytest.mark.parametrize("M,N,K,E,topk,block_size,dtype,seed", + itertools.product(M_moe, N_moe, K_moe, E, TOP_KS, + BLOCK_SIZE, DTYPES, SEEDS)) +@torch.inference_mode() +def test_w8a8_block_fp8_fused_moe(M, N, K, E, topk, block_size, dtype, seed): + torch.manual_seed(seed) + factor_for_scale = 1e-2 + fp8_info = torch.finfo(torch.float8_e4m3fn) + fp8_max, fp8_min = fp8_info.max, fp8_info.min + + a = torch.randn((M, K), dtype=dtype) / 10 + + w1_bf16 = (torch.rand( + (E, 2 * N, K), dtype=torch.bfloat16) - 0.5) * 2 * fp8_max + w1 = w1_bf16.clamp(min=fp8_min, max=fp8_max).to(torch.float8_e4m3fn) + del w1_bf16 + + w2_bf16 = (torch.rand((E, K, N), dtype=torch.bfloat16) - 0.5) * 2 * fp8_max + w2 = w2_bf16.clamp(min=fp8_min, max=fp8_max).to(torch.float8_e4m3fn) + del w2_bf16 + + block_n, block_k = block_size[0], block_size[1] + n_tiles_w1 = (2 * N + block_n - 1) // block_n + n_tiles_w2 = (K + block_n - 1) // block_n + k_tiles_w1 = (K + block_k - 1) // block_k + k_tiles_w2 = (N + block_k - 1) // block_k + + w1_s = torch.rand( + (E, n_tiles_w1, k_tiles_w1), dtype=torch.float32) * factor_for_scale + w2_s = torch.rand( + (E, n_tiles_w2, k_tiles_w2), dtype=torch.float32) * factor_for_scale + + score = torch.randn((M, E), dtype=dtype) + + out = fused_moe( + a, + w1, + w2, + score, + topk, + renormalize=False, + use_fp8_w8a8=True, + w1_scale=w1_s, + w2_scale=w2_s, + block_shape=block_size, + ) + ref_out = torch_w8a8_block_fp8_moe(a, w1, w2, w1_s, w2_s, score, topk, + block_size) + + print(f"{out.sum()=}") + print(f"{ref_out.sum()=}") + + rel_diff = (torch.mean( + torch.abs(out.to(torch.float32) - ref_out.to(torch.float32))) / + torch.mean(torch.abs(ref_out.to(torch.float32)))) + assert rel_diff < 0.03 diff --git a/vllm/config.py b/vllm/config.py index de8ba029ddc23..58649236b4225 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -161,7 +161,7 @@ class ModelConfig: override default pooling config for the pooling model. logits_processor_pattern: Optional regex pattern specifying valid logits processor qualified names that can be passed with the - `logits_processors` extra completion argument. Defaults to None, + `logits_processors` extra completion argument. Defaults to None, which allows no processors. generation_config: Configuration parameter file for generation. """ @@ -364,7 +364,7 @@ class ModelConfig: def maybe_pull_model_tokenizer_for_s3(self, model: str, tokenizer: str) -> None: """ - Pull the model config or tokenizer to a temporary + Pull the model config or tokenizer to a temporary directory in case of S3. Args: @@ -866,14 +866,14 @@ class ModelConfig: def get_diff_sampling_param(self) -> Dict[str, Any]: """ - This method returns a dictionary containing the parameters - that differ from the default sampling parameters, but only - if `generation_config` is set. If `generation_config` is not + This method returns a dictionary containing the parameters + that differ from the default sampling parameters, but only + if `generation_config` is set. If `generation_config` is not set, an empty dictionary is returned. Returns: - Dict[str, Any]: A dictionary with the differing sampling - parameters if `generation_config` is set, otherwise an + Dict[str, Any]: A dictionary with the differing sampling + parameters if `generation_config` is set, otherwise an empty dictionary. """ if self.generation_config is None: diff --git a/vllm/model_executor/layers/fused_moe/fused_moe.py b/vllm/model_executor/layers/fused_moe/fused_moe.py index e6f9f01ef0f74..92e9ba3c9cebd 100644 --- a/vllm/model_executor/layers/fused_moe/fused_moe.py +++ b/vllm/model_executor/layers/fused_moe/fused_moe.py @@ -2,7 +2,7 @@ import functools import json import os -from typing import Any, Callable, Dict, Optional, Tuple +from typing import Any, Callable, Dict, List, Optional, Tuple import torch import triton @@ -11,6 +11,8 @@ import triton.language as tl import vllm.envs as envs from vllm import _custom_ops as ops from vllm.logger import init_logger +from vllm.model_executor.layers.quantization.utils.fp8_utils import ( + per_token_group_quant_fp8) from vllm.platforms import current_platform from vllm.utils import direct_register_custom_op @@ -45,8 +47,14 @@ def fused_moe_kernel( stride_bn, stride_cm, stride_cn, + stride_asm, + stride_ask, stride_bse, + stride_bsk, stride_bsn, + # Block size for block-wise quantization + group_n: tl.constexpr, + group_k: tl.constexpr, # Meta-parameters BLOCK_SIZE_M: tl.constexpr, BLOCK_SIZE_N: tl.constexpr, @@ -125,8 +133,14 @@ def fused_moe_kernel( b_scale = tl.load(b_scale_ptrs) if use_fp8_w8a8: - a_scale = tl.load(a_scale_ptr) - b_scale = tl.load(b_scale_ptr + off_experts) + if group_k > 0 and group_n > 0: + a_scale_ptrs = a_scale_ptr + (offs_token // top_k) * stride_asm + offs_bsn = offs_bn // group_n + b_scale_ptrs = (b_scale_ptr + off_experts * stride_bse + + offs_bsn * stride_bsn) + else: + a_scale = tl.load(a_scale_ptr) + b_scale = tl.load(b_scale_ptr + off_experts) # ----------------------------------------------------------- # Iterate to compute a block of the C matrix. @@ -149,7 +163,18 @@ def fused_moe_kernel( if use_int8_w8a16: accumulator = tl.dot(a, b.to(compute_type), acc=accumulator) elif use_fp8_w8a8: - accumulator = tl.dot(a, b, acc=accumulator) + if group_k > 0 and group_n > 0: + k_start = k * BLOCK_SIZE_K + offs_ks = k_start // group_k + a_scale = tl.load(a_scale_ptrs + offs_ks * stride_ask, + mask=token_mask, + other=0.0) + b_scale = tl.load(b_scale_ptrs + offs_ks * stride_bsk) + + accumulator += tl.dot(a, b) * a_scale[:, + None] * b_scale[None, :] + else: + accumulator = tl.dot(a, b, acc=accumulator) else: accumulator += tl.dot(a, b) # Advance the ptrs to the next K block. @@ -164,7 +189,10 @@ def fused_moe_kernel( if use_int8_w8a16: accumulator = (accumulator * b_scale).to(compute_type) elif use_fp8_w8a8: - accumulator = (accumulator * a_scale * b_scale).to(compute_type) + if group_k > 0 and group_n > 0: + accumulator = accumulator.to(compute_type) + else: + accumulator = (accumulator * a_scale * b_scale).to(compute_type) else: accumulator = accumulator.to(compute_type) # ----------------------------------------------------------- @@ -233,22 +261,37 @@ def moe_align_block_size( return sorted_ids, expert_ids, num_tokens_post_pad -def invoke_fused_moe_kernel(A: torch.Tensor, B: torch.Tensor, C: torch.Tensor, +def invoke_fused_moe_kernel(A: torch.Tensor, + B: torch.Tensor, + C: torch.Tensor, A_scale: Optional[torch.Tensor], B_scale: Optional[torch.Tensor], - topk_weights: torch.Tensor, topk_ids: torch.Tensor, + topk_weights: torch.Tensor, + topk_ids: torch.Tensor, sorted_token_ids: torch.Tensor, expert_ids: torch.Tensor, num_tokens_post_padded: torch.Tensor, - mul_routed_weight: bool, top_k: int, - config: Dict[str, Any], compute_type: tl.dtype, - use_fp8_w8a8: bool, use_int8_w8a16: bool) -> None: + mul_routed_weight: bool, + top_k: int, + config: Dict[str, Any], + compute_type: tl.dtype, + use_fp8_w8a8: bool, + use_int8_w8a16: bool, + block_shape: Optional[List[int]] = None) -> None: assert topk_weights.stride(1) == 1 assert sorted_token_ids.stride(0) == 1 if use_fp8_w8a8: - A, A_scale = ops.scaled_fp8_quant(A, A_scale) assert B_scale is not None + if block_shape is None: + A, A_scale = ops.scaled_fp8_quant(A, A_scale) + else: + assert len(block_shape) == 2 + block_n, block_k = block_shape[0], block_shape[1] + A, A_scale = per_token_group_quant_fp8(A, block_k) + assert triton.cdiv(A.shape[-1], block_k) == A_scale.shape[-1] + assert triton.cdiv(B.shape[-2], block_n) == B_scale.shape[-2] + assert triton.cdiv(B.shape[-1], block_k) == B_scale.shape[-1] elif use_int8_w8a16: assert B_scale is not None else: @@ -279,8 +322,13 @@ def invoke_fused_moe_kernel(A: torch.Tensor, B: torch.Tensor, C: torch.Tensor, B.stride(1), C.stride(1), C.stride(2), - B_scale.stride(0) if B_scale is not None and use_int8_w8a16 else 0, - B_scale.stride(1) if B_scale is not None and use_int8_w8a16 else 0, + A_scale.stride(0) if A_scale is not None and A_scale.ndim == 2 else 0, + A_scale.stride(1) if A_scale is not None and A_scale.ndim == 2 else 0, + B_scale.stride(0) if B_scale is not None and B_scale.ndim >= 2 else 0, + B_scale.stride(2) if B_scale is not None and B_scale.ndim == 3 else 0, + B_scale.stride(1) if B_scale is not None and B_scale.ndim >= 2 else 0, + 0 if block_shape is None else block_shape[0], + 0 if block_shape is None else block_shape[1], MUL_ROUTED_WEIGHT=mul_routed_weight, top_k=top_k, compute_type=compute_type, @@ -362,6 +410,7 @@ def try_get_optimal_moe_config( dtype: Optional[str], M: int, is_marlin: bool = False, + block_shape: Optional[List[int]] = None, ): from vllm.model_executor.layers.fused_moe import get_config override_config = get_config() @@ -380,6 +429,12 @@ def try_get_optimal_moe_config( # Else use the default config config = get_default_config(M, E, N, w1_shape[2], top_k, dtype, is_marlin) + # NOTE: For block-wise quant, + # BLOCK_K must be divisible by block_shape[1] + # BLOCK_N and BLOCK_M has no requirements + if block_shape is not None: + config["BLOCK_SIZE_N"] = block_shape[0] + config["BLOCK_SIZE_K"] = block_shape[1] return config @@ -479,10 +534,11 @@ def inplace_fused_experts(hidden_states: torch.Tensor, w1_scale: Optional[torch.Tensor] = None, w2_scale: Optional[torch.Tensor] = None, a1_scale: Optional[torch.Tensor] = None, - a2_scale: Optional[torch.Tensor] = None) -> None: + a2_scale: Optional[torch.Tensor] = None, + block_shape: Optional[List[int]] = None) -> None: fused_experts_impl(hidden_states, w1, w2, topk_weights, topk_ids, True, use_fp8_w8a8, use_int8_w8a16, w1_scale, w2_scale, - a1_scale, a2_scale) + a1_scale, a2_scale, block_shape) def inplace_fused_experts_fake( @@ -496,7 +552,8 @@ def inplace_fused_experts_fake( w1_scale: Optional[torch.Tensor] = None, w2_scale: Optional[torch.Tensor] = None, a1_scale: Optional[torch.Tensor] = None, - a2_scale: Optional[torch.Tensor] = None) -> None: + a2_scale: Optional[torch.Tensor] = None, + block_shape: Optional[List[int]] = None) -> None: pass @@ -519,10 +576,11 @@ def outplace_fused_experts( w1_scale: Optional[torch.Tensor] = None, w2_scale: Optional[torch.Tensor] = None, a1_scale: Optional[torch.Tensor] = None, - a2_scale: Optional[torch.Tensor] = None) -> torch.Tensor: + a2_scale: Optional[torch.Tensor] = None, + block_shape: Optional[List[int]] = None) -> torch.Tensor: return fused_experts_impl(hidden_states, w1, w2, topk_weights, topk_ids, False, use_fp8_w8a8, use_int8_w8a16, w1_scale, - w2_scale, a1_scale, a2_scale) + w2_scale, a1_scale, a2_scale, block_shape) def outplace_fused_experts_fake( @@ -536,7 +594,8 @@ def outplace_fused_experts_fake( w1_scale: Optional[torch.Tensor] = None, w2_scale: Optional[torch.Tensor] = None, a1_scale: Optional[torch.Tensor] = None, - a2_scale: Optional[torch.Tensor] = None) -> torch.Tensor: + a2_scale: Optional[torch.Tensor] = None, + block_shape: Optional[List[int]] = None) -> torch.Tensor: return torch.empty_like(hidden_states) @@ -559,18 +618,22 @@ def fused_experts(hidden_states: torch.Tensor, w1_scale: Optional[torch.Tensor] = None, w2_scale: Optional[torch.Tensor] = None, a1_scale: Optional[torch.Tensor] = None, - a2_scale: Optional[torch.Tensor] = None): + a2_scale: Optional[torch.Tensor] = None, + block_shape: Optional[List[int]] = None): if inplace: torch.ops.vllm.inplace_fused_experts(hidden_states, w1, w2, topk_weights, topk_ids, use_fp8_w8a8, use_int8_w8a16, w1_scale, w2_scale, a1_scale, - a2_scale) + a2_scale, block_shape) return hidden_states else: - return torch.ops.vllm.outplace_fused_experts( - hidden_states, w1, w2, topk_weights, topk_ids, use_fp8_w8a8, - use_int8_w8a16, w1_scale, w2_scale, a1_scale, a2_scale) + return torch.ops.vllm.outplace_fused_experts(hidden_states, w1, w2, + topk_weights, topk_ids, + use_fp8_w8a8, + use_int8_w8a16, w1_scale, + w2_scale, a1_scale, + a2_scale, block_shape) def fused_experts_impl(hidden_states: torch.Tensor, @@ -584,7 +647,8 @@ def fused_experts_impl(hidden_states: torch.Tensor, w1_scale: Optional[torch.Tensor] = None, w2_scale: Optional[torch.Tensor] = None, a1_scale: Optional[torch.Tensor] = None, - a2_scale: Optional[torch.Tensor] = None): + a2_scale: Optional[torch.Tensor] = None, + block_shape: Optional[List[int]] = None): # Check constraints. assert hidden_states.shape[1] == w1.shape[2], "Hidden size mismatch" assert topk_weights.shape == topk_ids.shape, "topk shape mismatch" @@ -611,6 +675,7 @@ def fused_experts_impl(hidden_states: torch.Tensor, w2.shape, topk_ids.shape[1], config_dtype, + block_shape=block_shape, ) config = get_config_func(M) @@ -674,7 +739,8 @@ def fused_experts_impl(hidden_states: torch.Tensor, config, compute_type=compute_type, use_fp8_w8a8=use_fp8_w8a8, - use_int8_w8a16=use_int8_w8a16) + use_int8_w8a16=use_int8_w8a16, + block_shape=block_shape) ops.silu_and_mul(intermediate_cache2, intermediate_cache1.view(-1, N)) @@ -693,7 +759,8 @@ def fused_experts_impl(hidden_states: torch.Tensor, config, compute_type=compute_type, use_fp8_w8a8=use_fp8_w8a8, - use_int8_w8a16=use_int8_w8a16) + use_int8_w8a16=use_int8_w8a16, + block_shape=block_shape) ops.moe_sum(intermediate_cache3.view(*intermediate_cache3.shape), out_hidden_states[begin_chunk_idx:end_chunk_idx]) @@ -718,6 +785,7 @@ def fused_moe( w2_scale: Optional[torch.Tensor] = None, a1_scale: Optional[torch.Tensor] = None, a2_scale: Optional[torch.Tensor] = None, + block_shape: Optional[List[int]] = None, ) -> torch.Tensor: """ This function computes a Mixture of Experts (MoE) layer using two sets of @@ -745,6 +813,12 @@ def fused_moe( w1. - w2_scale (Optional[torch.Tensor]): Optional scale to be used for w2. + - a1_scale (Optional[torch.Tensor]): Optional scale to be used for + a1. + - a2_scale (Optional[torch.Tensor]): Optional scale to be used for + a2. + - block_shape: (Optional[List[int]]): Optional block size for block-wise + quantization. Returns: - torch.Tensor: The output tensor after applying the MoE layer. @@ -775,4 +849,5 @@ def fused_moe( w1_scale=w1_scale, w2_scale=w2_scale, a1_scale=a1_scale, - a2_scale=a2_scale) + a2_scale=a2_scale, + block_shape=block_shape) diff --git a/vllm/model_executor/layers/fused_moe/layer.py b/vllm/model_executor/layers/fused_moe/layer.py index 8c6f7c6e06515..55c0a202920ff 100644 --- a/vllm/model_executor/layers/fused_moe/layer.py +++ b/vllm/model_executor/layers/fused_moe/layer.py @@ -29,6 +29,7 @@ class FusedMoeWeightScaleSupported(Enum): TENSOR = "tensor" CHANNEL = "channel" GROUP = "group" + BLOCK = "block" class FusedMoEMethodBase(QuantizeMethodBase): @@ -199,6 +200,7 @@ class FusedMoE(torch.nn.Module): get_tensor_model_parallel_world_size()) self.top_k = top_k self.num_experts = num_experts + assert intermediate_size % self.tp_size == 0 self.intermediate_size_per_partition = intermediate_size // self.tp_size self.reduce_results = reduce_results self.renormalize = renormalize @@ -398,7 +400,10 @@ class FusedMoE(torch.nn.Module): loaded_weight=loaded_weight, expert_data=expert_data, tp_rank=tp_rank) - elif quant_method == FusedMoeWeightScaleSupported.GROUP.value: + elif quant_method in [ + FusedMoeWeightScaleSupported.GROUP.value, + FusedMoeWeightScaleSupported.BLOCK.value, + ]: self._load_model_weight_or_group_weight_scale( shard_id=shard_id, shard_dim=shard_dim, diff --git a/vllm/model_executor/layers/linear.py b/vllm/model_executor/layers/linear.py index 46ef11e7d02c6..33b221b994b2b 100644 --- a/vllm/model_executor/layers/linear.py +++ b/vllm/model_executor/layers/linear.py @@ -14,11 +14,14 @@ from vllm.distributed import (divide, get_tensor_model_parallel_rank, from vllm.logger import init_logger from vllm.model_executor.layers.quantization.base_config import ( QuantizationConfig, QuantizeMethodBase) +# yapf: disable from vllm.model_executor.parameter import (BasevLLMParameter, + BlockQuantScaleParameter, PackedColumnParameter, PackedvLLMParameter, PerTensorScaleParameter, RowvLLMParameter) +# yapf: enable from vllm.model_executor.utils import set_weight_attrs logger = init_logger(__name__) @@ -623,8 +626,24 @@ class MergedColumnParallelLinear(ColumnParallelLinear): assert loaded_shard_id < len(self.output_sizes) tp_size = get_tensor_model_parallel_world_size() - shard_offset = sum(self.output_sizes[:loaded_shard_id]) // tp_size - shard_size = self.output_sizes[loaded_shard_id] // tp_size + + if isinstance(param, BlockQuantScaleParameter): + from vllm.model_executor.layers.quantization.fp8 import ( + Fp8LinearMethod, Fp8MoEMethod) + assert self.quant_method is not None + assert isinstance(self.quant_method, + (Fp8LinearMethod, Fp8MoEMethod)) + weight_block_size = self.quant_method.quant_config.weight_block_size + assert weight_block_size is not None + block_n, _ = weight_block_size[0], weight_block_size[1] + shard_offset = ( + (sum(self.output_sizes[:loaded_shard_id]) + block_n - 1) // + block_n) // tp_size + shard_size = ((self.output_sizes[loaded_shard_id] + block_n - 1) // + block_n // tp_size) + else: + shard_offset = sum(self.output_sizes[:loaded_shard_id]) // tp_size + shard_size = self.output_sizes[loaded_shard_id] // tp_size param.load_merged_column_weight(loaded_weight=loaded_weight, shard_id=loaded_shard_id, diff --git a/vllm/model_executor/layers/quantization/fp8.py b/vllm/model_executor/layers/quantization/fp8.py index 978e727bc7cb3..5dfd86727a02a 100644 --- a/vllm/model_executor/layers/quantization/fp8.py +++ b/vllm/model_executor/layers/quantization/fp8.py @@ -6,6 +6,7 @@ from torch.nn.parameter import Parameter import vllm.envs as envs from vllm import _custom_ops as ops +from vllm.distributed import get_tensor_model_parallel_world_size from vllm.logger import init_logger from vllm.model_executor.layers.fused_moe import (FusedMoE, FusedMoEMethodBase, FusedMoeWeightScaleSupported) @@ -14,6 +15,8 @@ from vllm.model_executor.layers.linear import (LinearBase, LinearMethodBase, from vllm.model_executor.layers.quantization.base_config import ( QuantizationConfig, QuantizeMethodBase) from vllm.model_executor.layers.quantization.kv_cache import BaseKVCacheMethod +from vllm.model_executor.layers.quantization.utils.fp8_utils import ( + apply_w8a8_block_fp8_linear) from vllm.model_executor.layers.quantization.utils.marlin_utils_fp8 import ( apply_fp8_marlin_linear, prepare_fp8_layer_for_marlin) from vllm.model_executor.layers.quantization.utils.quant_utils import ( @@ -22,7 +25,8 @@ from vllm.model_executor.layers.quantization.utils.w8a8_utils import ( all_close_1d, apply_fp8_linear, convert_to_channelwise, cutlass_fp8_supported, normalize_e4m3fn_to_e4m3fnuz, per_tensor_dequantize, requantize_with_max_scale) -from vllm.model_executor.parameter import (ModelWeightParameter, +from vllm.model_executor.parameter import (BlockQuantScaleParameter, + ModelWeightParameter, PerTensorScaleParameter) from vllm.model_executor.utils import set_weight_attrs from vllm.platforms import current_platform @@ -41,6 +45,7 @@ class Fp8Config(QuantizationConfig): is_checkpoint_fp8_serialized: bool = False, activation_scheme: str = "dynamic", ignored_layers: Optional[List[str]] = None, + weight_block_size: Optional[List[int]] = None, ) -> None: self.is_checkpoint_fp8_serialized = is_checkpoint_fp8_serialized if is_checkpoint_fp8_serialized: @@ -51,6 +56,20 @@ class Fp8Config(QuantizationConfig): f"Unsupported activation scheme {activation_scheme}") self.activation_scheme = activation_scheme self.ignored_layers = ignored_layers or [] + if weight_block_size is not None: + if not is_checkpoint_fp8_serialized: + raise ValueError( + "The block-wise quantization only supports fp8-serialized " + "checkpoint for now.") + if len(weight_block_size) != 2: + raise ValueError( + "The quantization block size of weight must have 2 " + f"dimensions, but got {len(weight_block_size)} dimensions") + if activation_scheme != "dynamic": + raise ValueError("The block-wise quantization only supports " + "dynamic activation scheme for now, but got " + f"{activation_scheme} activation scheme.") + self.weight_block_size = weight_block_size @classmethod def get_name(cls) -> str: @@ -74,9 +93,12 @@ class Fp8Config(QuantizationConfig): is_checkpoint_fp8_serialized = ("fp8" in quant_method) activation_scheme = cls.get_from_keys(config, ["activation_scheme"]) ignored_layers = cls.get_from_keys_or(config, ["ignored_layers"], None) + weight_block_size = cls.get_from_keys_or(config, ["weight_block_size"], + None) return cls(is_checkpoint_fp8_serialized=is_checkpoint_fp8_serialized, activation_scheme=activation_scheme, - ignored_layers=ignored_layers) + ignored_layers=ignored_layers, + weight_block_size=weight_block_size) def get_quant_method(self, layer: torch.nn.Module, prefix: str) -> Optional["QuantizeMethodBase"]: @@ -123,6 +145,11 @@ class Fp8LinearMethod(LinearMethodBase): if current_platform.is_rocm(): self.use_marlin = False + self.block_quant = self.quant_config.weight_block_size is not None + if self.block_quant: + # Marlin doesn't support block-wise fp8 + self.use_marlin = False + def create_weights( self, layer: torch.nn.Module, @@ -133,10 +160,34 @@ class Fp8LinearMethod(LinearMethodBase): params_dtype: torch.dtype, **extra_weight_attrs, ): - del input_size, output_size output_size_per_partition = sum(output_partition_sizes) weight_loader = extra_weight_attrs.get("weight_loader") + if self.block_quant: + tp_size = get_tensor_model_parallel_world_size() + assert self.quant_config.weight_block_size is not None + block_n, block_k = ( + self.quant_config.weight_block_size[0], + self.quant_config.weight_block_size[1], + ) + # Required by row parallel + if (tp_size > 1 + and input_size // input_size_per_partition == tp_size + and input_size_per_partition % block_k != 0): + raise ValueError( + f"Weight input_size_per_partition = " + f"{input_size_per_partition} is not divisible by " + f"weight quantization block_k = {block_k}.") + # Required by column parallel or enabling merged weights + if (tp_size > 1 and output_size // output_size_per_partition + == tp_size) or len(output_partition_sizes) > 1: + for output_partition_size in output_partition_sizes: + if output_partition_size % block_n != 0: + raise ValueError( + f"Weight output_partition_size = " + f"{output_partition_size} is not divisible by " + f"weight quantization block_n = {block_n}.") + layer.logical_widths = output_partition_sizes layer.input_size_per_partition = input_size_per_partition @@ -161,12 +212,29 @@ class Fp8LinearMethod(LinearMethodBase): # Otherwise, wait until process_weights_after_loading. if self.quant_config.is_checkpoint_fp8_serialized: # WEIGHT SCALE - scale = PerTensorScaleParameter(data=torch.empty( - len(output_partition_sizes), dtype=torch.float32), - weight_loader=weight_loader) - - scale[:] = torch.finfo(torch.float32).min - layer.register_parameter("weight_scale", scale) + if not self.block_quant: + scale = PerTensorScaleParameter( + data=torch.empty(len(output_partition_sizes), + dtype=torch.float32), + weight_loader=weight_loader, + ) + scale[:] = torch.finfo(torch.float32).min + layer.register_parameter("weight_scale", scale) + else: + assert self.quant_config.activation_scheme == "dynamic" + scale = BlockQuantScaleParameter( + data=torch.empty( + (output_size_per_partition + block_n - 1) // block_n, + (input_size_per_partition + block_k - 1) // block_k, + dtype=torch.float32, + ), + input_dim=1, + output_dim=0, + weight_loader=weight_loader, + ) + scale[:] = torch.finfo(torch.float32).min + # The weight_scale_inv name is intentional for deepseekv3 + layer.register_parameter("weight_scale_inv", scale) # INPUT ACTIVATION SCALE if self.quant_config.activation_scheme == "static": @@ -180,6 +248,9 @@ class Fp8LinearMethod(LinearMethodBase): layer.register_parameter("input_scale", None) def process_weights_after_loading(self, layer: Module) -> None: + # Block quant doesn't need to process weights after loading + if self.block_quant: + return layer.weight = torch.nn.Parameter(layer.weight.data, requires_grad=False) # If checkpoint not serialized fp8, quantize the weights. @@ -266,6 +337,17 @@ class Fp8LinearMethod(LinearMethodBase): size_k=layer.input_size_per_partition, bias=bias) + if self.block_quant: + assert self.quant_config.weight_block_size is not None + return apply_w8a8_block_fp8_linear( + input=x, + weight=layer.weight, + block_size=self.quant_config.weight_block_size, + weight_scale=layer.weight_scale_inv, + input_scale=layer.input_scale, + bias=bias, + ) + return apply_fp8_linear( input=x, weight=layer.weight, @@ -291,6 +373,7 @@ class Fp8MoEMethod(FusedMoEMethodBase): def __init__(self, quant_config: Fp8Config): self.quant_config = quant_config + self.block_quant = self.quant_config.weight_block_size is not None def create_weights(self, layer: Module, num_experts: int, hidden_size: int, intermediate_size: int, params_dtype: torch.dtype, @@ -298,6 +381,27 @@ class Fp8MoEMethod(FusedMoEMethodBase): if self.quant_config.is_checkpoint_fp8_serialized: params_dtype = torch.float8_e4m3fn + if self.block_quant: + assert self.quant_config.weight_block_size is not None + tp_size = get_tensor_model_parallel_world_size() + block_n, block_k = ( + self.quant_config.weight_block_size[0], + self.quant_config.weight_block_size[1], + ) + # NOTE: To ensure proper alignment of the block-wise quantization + # scales, the output_size of the weights for both the gate and up + # layers must be divisible by block_n. + # Required by column parallel or enabling merged weights + if intermediate_size % block_n != 0: + raise ValueError( + f"The output_size of gate's and up's weight = " + f"{intermediate_size} is not divisible by " + f"weight quantization block_n = {block_n}.") + if (tp_size > 1 and intermediate_size % block_k != 0): + # Required by row parallel + raise ValueError(f"The input_size of down's weight = " + f"{intermediate_size} is not divisible by " + f"weight quantization block_k = {block_k}.") # WEIGHTS w13_weight = torch.nn.Parameter(torch.empty(num_experts, @@ -317,21 +421,45 @@ class Fp8MoEMethod(FusedMoEMethodBase): set_weight_attrs(w2_weight, extra_weight_attrs) # WEIGHT_SCALES - # Allocate 2 scales for w1 and w3 respectively. - # They will be combined to a single scale after weight loading. - w13_weight_scale = torch.nn.Parameter(torch.ones(num_experts, - 2, - dtype=torch.float32), - requires_grad=False) - layer.register_parameter("w13_weight_scale", w13_weight_scale) + if not self.block_quant: + # Allocate 2 scales for w1 and w3 respectively. + # They will be combined to a single scale after weight loading. + w13_weight_scale = torch.nn.Parameter(torch.ones( + num_experts, 2, dtype=torch.float32), + requires_grad=False) + w2_weight_scale = torch.nn.Parameter(torch.ones( + num_experts, dtype=torch.float32), + requires_grad=False) + layer.register_parameter("w13_weight_scale", w13_weight_scale) + layer.register_parameter("w2_weight_scale", w2_weight_scale) + else: + w13_weight_scale = torch.nn.Parameter( + torch.ones( + num_experts, + 2 * ((intermediate_size + block_n - 1) // block_n), + (hidden_size + block_k - 1) // block_k, + dtype=torch.float32, + ), + requires_grad=False, + ) + w2_weight_scale = torch.nn.Parameter( + torch.ones( + num_experts, + (hidden_size + block_n - 1) // block_n, + (intermediate_size + block_k - 1) // block_k, + dtype=torch.float32, + ), + requires_grad=False, + ) + layer.register_parameter("w13_weight_scale_inv", w13_weight_scale) + layer.register_parameter("w2_weight_scale_inv", w2_weight_scale) + assert self.quant_config.activation_scheme == "dynamic" - w2_weight_scale = torch.nn.Parameter(torch.ones(num_experts, - dtype=torch.float32), - requires_grad=False) - layer.register_parameter("w2_weight_scale", w2_weight_scale) # Add the quantization method used (per tensor/grouped/channel) # to ensure the weight scales are loaded in properly extra_weight_attrs.update( + {"quant_method": FusedMoeWeightScaleSupported.BLOCK. + value} if self.block_quant else {"quant_method": FusedMoeWeightScaleSupported.TENSOR.value}) # If loading fp8 checkpoint, pass the weight loaders. # If loading an fp16 checkpoint, do not (we will quantize in @@ -364,7 +492,9 @@ class Fp8MoEMethod(FusedMoEMethodBase): layer.w2_input_scale = None def process_weights_after_loading(self, layer: Module) -> None: - + # Block quant doesn't need to process weights after loading + if self.block_quant: + return # If checkpoint is fp16, quantize in place. if not self.quant_config.is_checkpoint_fp8_serialized: # If rocm, use float8_e4m3fnuz as dtype @@ -489,17 +619,22 @@ class Fp8MoEMethod(FusedMoEMethodBase): num_expert_group=num_expert_group, custom_routing_function=custom_routing_function) - return fused_experts(x, - layer.w13_weight, - layer.w2_weight, - topk_weights=topk_weights, - topk_ids=topk_ids, - inplace=True, - use_fp8_w8a8=True, - w1_scale=layer.w13_weight_scale, - w2_scale=layer.w2_weight_scale, - a1_scale=layer.w13_input_scale, - a2_scale=layer.w2_input_scale) + return fused_experts( + x, + layer.w13_weight, + layer.w2_weight, + topk_weights=topk_weights, + topk_ids=topk_ids, + inplace=True, + use_fp8_w8a8=True, + w1_scale=(layer.w13_weight_scale_inv + if self.block_quant else layer.w13_weight_scale), + w2_scale=(layer.w2_weight_scale_inv + if self.block_quant else layer.w2_weight_scale), + a1_scale=layer.w13_input_scale, + a2_scale=layer.w2_input_scale, + block_shape=self.quant_config.weight_block_size, + ) class Fp8KVCacheMethod(BaseKVCacheMethod): diff --git a/vllm/model_executor/layers/quantization/utils/fp8_utils.py b/vllm/model_executor/layers/quantization/utils/fp8_utils.py new file mode 100644 index 0000000000000..f3c3e130e4161 --- /dev/null +++ b/vllm/model_executor/layers/quantization/utils/fp8_utils.py @@ -0,0 +1,353 @@ +# Adapted from https://github.com/sgl-project/sglang/pull/2575 +from typing import List, Optional, Tuple + +import torch +import triton +import triton.language as tl + + +def apply_w8a8_block_fp8_linear( + input: torch.Tensor, + weight: torch.Tensor, + block_size: List[int], + weight_scale: torch.Tensor, + input_scale: Optional[torch.Tensor] = None, + bias: Optional[torch.Tensor] = None, +) -> torch.Tensor: + assert input_scale is None + # View input as 2D matrix for fp8 methods + input_2d = input.view(-1, input.shape[-1]) + output_shape = [*input.shape[:-1], weight.shape[0]] + + q_input, x_scale = per_token_group_quant_fp8(input_2d, block_size[1]) + output = w8a8_block_fp8_matmul(q_input, + weight, + x_scale, + weight_scale, + block_size, + output_dtype=input.dtype) + + if bias is not None: + output = output + bias + return output.to(dtype=input.dtype).view(*output_shape) + + +def input_to_float8( + x: torch.Tensor, + dtype: torch.dtype = torch.float8_e4m3fn +) -> Tuple[torch.Tensor, torch.Tensor]: + """This function quantizes input values to float8 values " + "with tensor-wise quantization.""" + finfo = torch.finfo(dtype) + min_val, max_val = x.aminmax() + amax = torch.maximum(min_val.abs(), max_val.abs()).clamp(min=1e-12) + scale = finfo.max / amax + x_scl_sat = (x * scale).clamp(min=finfo.min, max=finfo.max) + return x_scl_sat.to(dtype).contiguous(), scale.float().reciprocal() + + +def block_quant_to_tensor_quant( + x_q_block: torch.Tensor, + x_s: torch.Tensor, + block_size: List[int], +) -> Tuple[torch.Tensor, torch.Tensor]: + """This function converts block-wise quantization to tensor-wise + quantization. The inputs are block-wise quantization tensor `x_q_block`, + block-wise quantization scale and the block size. + The outputs are tensor-wise quantization tensor and tensor-wise + quantization scale. Note only float8 is supported for now. + """ + block_n, block_k = block_size[0], block_size[1] + n, k = x_q_block.shape + n_tiles = (n + block_n - 1) // block_n + k_tiles = (k + block_k - 1) // block_k + assert n_tiles == x_s.shape[0] + assert k_tiles == x_s.shape[1] + + x_dq_block = x_q_block.to(torch.float32) + + x_dq_block_tiles = [[ + x_dq_block[j * block_n:min((j + 1) * block_n, n), + i * block_k:min((i + 1) * block_k, k), ] + for i in range(k_tiles) + ] for j in range(n_tiles)] + + for i in range(k_tiles): + for j in range(n_tiles): + x_dq_block_tiles[j][i][:, :] = x_dq_block_tiles[j][i] * x_s[j][i] + + x_q_tensor, scale = input_to_float8(x_dq_block, dtype=x_q_block.dtype) + return x_q_tensor, scale + + +@triton.jit +def _per_token_group_quant_fp8( + # Pointers to inputs and output + y_ptr, + y_q_ptr, + y_s_ptr, + # Stride of input + y_stride, + # Columns of input + N, + # Avoid to divide zero + eps, + # Information for float8 + fp8_min, + fp8_max, + # Meta-parameters + BLOCK: tl.constexpr, +): + """A Triton-accelerated function to perform per-token-group + quantization on a tensor. + This function converts the tensor values into float8 values. + """ + # Map the program id to the row of X and Y it should compute. + g_id = tl.program_id(0) + y_ptr += g_id * y_stride + y_q_ptr += g_id * y_stride + y_s_ptr += g_id + + cols = tl.arange(0, BLOCK) # N <= BLOCK + mask = cols < N + + y = tl.load(y_ptr + cols, mask=mask, other=0.0).to(tl.float32) + # Quant + _absmax = tl.maximum(tl.max(tl.abs(y)), eps) + y_s = _absmax / fp8_max + y_q = tl.clamp(y / y_s, fp8_min, fp8_max).to(y_q_ptr.dtype.element_ty) + + tl.store(y_q_ptr + cols, y_q, mask=mask) + tl.store(y_s_ptr, y_s) + + +def per_token_group_quant_fp8( + x: torch.Tensor, + group_size: int, + eps: float = 1e-10, + dtype: torch.dtype = torch.float8_e4m3fn, +) -> Tuple[torch.Tensor, torch.Tensor]: + """Function to perform per-token-group quantization on an input tensor `x`. + It converts the tensor values into signed float8 values and returns the + quantized tensor along with the scaling factor used for quantization. + Args: + x: The input tenosr with ndim >= 2. + group_size: The group size used for quantization. + eps: The minimum to avoid dividing zero. + dtype: The dype of output tensor. Note that only `torch.float8_e4m3fn` + is supported for now. + Returns: + Tuple[torch.Tensor, torch.Tensor]: The quantized tensor and the + scaling factor for quantization. + """ + assert (x.shape[-1] % group_size == 0), ( + f"the last dimension of `x` {x.shape[-1]} must be divisible " + f"by `group_size` {group_size}") + assert x.is_contiguous(), "`x` must be contiguous" + + finfo = torch.finfo(dtype) + fp8_min = finfo.min + fp8_max = finfo.max + + x_q = torch.empty_like(x, device=x.device, dtype=dtype) + M = x.numel() // group_size + N = group_size + x_s = torch.empty( + x.shape[:-1] + (x.shape[-1] // group_size, ), + device=x.device, + dtype=torch.float32, + ) + + BLOCK = triton.next_power_of_2(N) + # heuristics for number of warps + num_warps = min(max(BLOCK // 256, 1), 8) + num_stages = 1 + _per_token_group_quant_fp8[(M, )]( + x, + x_q, + x_s, + group_size, + N, + eps, + fp8_min=fp8_min, + fp8_max=fp8_max, + BLOCK=BLOCK, + num_warps=num_warps, + num_stages=num_stages, + ) + + return x_q, x_s + + +@triton.jit +def _w8a8_block_fp8_matmul( + # Pointers to inputs and output + A, + B, + C, + As, + Bs, + # Shape for matmul + M, + N, + K, + # Block size for block-wise quantization + group_n, + group_k, + # Stride for inputs and output + stride_am, + stride_ak, + stride_bk, + stride_bn, + stride_cm, + stride_cn, + stride_As_m, + stride_As_k, + stride_Bs_k, + stride_Bs_n, + # Meta-parameters + BLOCK_SIZE_M: tl.constexpr, + BLOCK_SIZE_N: tl.constexpr, + BLOCK_SIZE_K: tl.constexpr, + GROUP_SIZE_M: tl.constexpr, +): + """Triton-accelerated function used to perform linear operations (dot + product) on input tensors `A` and `B` with block-wise quantization, and + store the result in output tensor `C`. + """ + + pid = tl.program_id(axis=0) + num_pid_m = tl.cdiv(M, BLOCK_SIZE_M) + num_pid_n = tl.cdiv(N, BLOCK_SIZE_N) + num_pid_in_group = GROUP_SIZE_M * num_pid_n + group_id = pid // num_pid_in_group + first_pid_m = group_id * GROUP_SIZE_M + group_size_m = min(num_pid_m - first_pid_m, GROUP_SIZE_M) + pid_m = first_pid_m + (pid % group_size_m) + pid_n = (pid % num_pid_in_group) // group_size_m + + offs_am = (pid_m * BLOCK_SIZE_M + tl.arange(0, BLOCK_SIZE_M)) % M + offs_bn = (pid_n * BLOCK_SIZE_N + tl.arange(0, BLOCK_SIZE_N)) % N + offs_k = tl.arange(0, BLOCK_SIZE_K) + a_ptrs = A + (offs_am[:, None] * stride_am + offs_k[None, :] * stride_ak) + b_ptrs = B + (offs_k[:, None] * stride_bk + offs_bn[None, :] * stride_bn) + + As_ptrs = As + offs_am * stride_As_m + offs_bsn = offs_bn // group_n + Bs_ptrs = Bs + offs_bsn * stride_Bs_n + + accumulator = tl.zeros((BLOCK_SIZE_M, BLOCK_SIZE_N), dtype=tl.float32) + for k in range(0, tl.cdiv(K, BLOCK_SIZE_K)): + a = tl.load(a_ptrs, + mask=offs_k[None, :] < K - k * BLOCK_SIZE_K, + other=0.0) + b = tl.load(b_ptrs, + mask=offs_k[:, None] < K - k * BLOCK_SIZE_K, + other=0.0) + + k_start = k * BLOCK_SIZE_K + offs_ks = k_start // group_k + a_s = tl.load(As_ptrs + offs_ks * stride_As_k) + b_s = tl.load(Bs_ptrs + offs_ks * stride_Bs_k) + + accumulator += tl.dot(a, b) * a_s[:, None] * b_s[None, :] + a_ptrs += BLOCK_SIZE_K * stride_ak + b_ptrs += BLOCK_SIZE_K * stride_bk + + if C.dtype.element_ty == tl.bfloat16: + c = accumulator.to(tl.bfloat16) + elif C.dtype.element_ty == tl.float16: + c = accumulator.to(tl.float16) + else: + c = accumulator.to(tl.float32) + + offs_cm = pid_m * BLOCK_SIZE_M + tl.arange(0, BLOCK_SIZE_M) + offs_cn = pid_n * BLOCK_SIZE_N + tl.arange(0, BLOCK_SIZE_N) + c_ptrs = C + stride_cm * offs_cm[:, None] + stride_cn * offs_cn[None, :] + c_mask = (offs_cm[:, None] < M) & (offs_cn[None, :] < N) + tl.store(c_ptrs, c, mask=c_mask) + + +def w8a8_block_fp8_matmul( + A: torch.Tensor, + B: torch.Tensor, + As: torch.Tensor, + Bs: torch.Tensor, + block_size: List[int], + output_dtype: torch.dtype = torch.float16, +) -> torch.Tensor: + """This function performs matrix multiplication with block-wise + quantization. + It takes two input tensors `A` and `B` with scales `As` and `Bs`. + The output is returned in the specified `output_dtype`. + Args: + A: The input tensor, e.g., activation. + B: The input tensor, e.g., weight. + As: The per-token-group quantization scale for `A`. + Bs: The per-block quantization scale for `B`. + block_size: The block size for per-block quantization. It should + be 2-dim, e.g., [128, 128]. + output_dytpe: The dtype of the returned tensor. + Returns: + torch.Tensor: The result of matmul. + """ + assert len(block_size) == 2 + block_n, block_k = block_size[0], block_size[1] + + assert A.shape[-1] == B.shape[-1] + assert A.shape[:-1] == As.shape[:-1] and A.is_contiguous() + assert triton.cdiv(A.shape[-1], block_k) == As.shape[-1] + M = A.numel() // A.shape[-1] + + assert B.ndim == 2 and B.is_contiguous() and Bs.ndim == 2 + N, K = B.shape + assert triton.cdiv(N, block_n) == Bs.shape[0] + assert triton.cdiv(K, block_k) == Bs.shape[1] + + C_shape = A.shape[:-1] + (N, ) + C = A.new_empty(C_shape, dtype=output_dtype) + + # TODO: + # BLOCK_SIZE_M, BLOCK_SIZE_K, BLOCK_SIZE_N can be optimized. + # BLOCK_SIZE_K must be divisible by block_k + # BLOCK_SIZE_N and BLOCK_SIZE_M has no requirements + BLOCK_SIZE_M = 128 + if M < BLOCK_SIZE_M: + BLOCK_SIZE_M = triton.next_power_of_2(M) + BLOCK_SIZE_M = max(BLOCK_SIZE_M, 16) + BLOCK_SIZE_K = block_k + assert block_k % BLOCK_SIZE_K == 0 + BLOCK_SIZE_N = block_n + + def grid(META): + return (triton.cdiv(M, META["BLOCK_SIZE_M"]) * + triton.cdiv(N, META["BLOCK_SIZE_N"]), ) + + _w8a8_block_fp8_matmul[grid]( + A, + B, + C, + As, + Bs, + M, + N, + K, + block_n, + block_k, + A.stride(-2), + A.stride(-1), + B.stride(1), + B.stride(0), + C.stride(-2), + C.stride(-1), + As.stride(-2), + As.stride(-1), + Bs.stride(1), + Bs.stride(0), + BLOCK_SIZE_M=BLOCK_SIZE_M, + BLOCK_SIZE_N=BLOCK_SIZE_N, + BLOCK_SIZE_K=BLOCK_SIZE_K, + GROUP_SIZE_M=8, + ) + + return C diff --git a/vllm/model_executor/parameter.py b/vllm/model_executor/parameter.py index 7a6d7c90f34d5..02d22a5ca62c0 100644 --- a/vllm/model_executor/parameter.py +++ b/vllm/model_executor/parameter.py @@ -328,6 +328,15 @@ class PackedvLLMParameter(ModelWeightParameter): marlin_tile_size=self.marlin_tile_size) +class BlockQuantScaleParameter(_ColumnvLLMParameter, RowvLLMParameter): + """ + Parameter class for weight scales loaded for weights with + block-wise quantization. Uses both column and row parallelism. + """ + + pass + + def permute_param_layout_(param: BasevLLMParameter, input_dim: int, output_dim: int, **kwargs) -> BasevLLMParameter: """ From 55fb97f7bd61273fe8464a72866a72eaa88b5759 Mon Sep 17 00:00:00 2001 From: Robert Shaw <114415538+robertgshaw2-neuralmagic@users.noreply.github.com> Date: Thu, 26 Dec 2024 18:43:05 -0500 Subject: [PATCH 3/9] [2/N] API Server: Avoid ulimit footgun (#11530) --- vllm/entrypoints/api_server.py | 4 +++- vllm/entrypoints/openai/api_server.py | 6 +++++- vllm/utils.py | 18 ++++++++++++++++++ 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/vllm/entrypoints/api_server.py b/vllm/entrypoints/api_server.py index 95da1c6e7b9bf..daefbff7e5178 100644 --- a/vllm/entrypoints/api_server.py +++ b/vllm/entrypoints/api_server.py @@ -21,7 +21,7 @@ from vllm.entrypoints.utils import with_cancellation from vllm.logger import init_logger from vllm.sampling_params import SamplingParams from vllm.usage.usage_lib import UsageContext -from vllm.utils import FlexibleArgumentParser, random_uuid +from vllm.utils import FlexibleArgumentParser, random_uuid, set_ulimit from vllm.version import __version__ as VLLM_VERSION logger = init_logger("vllm.entrypoints.api_server") @@ -119,6 +119,8 @@ async def run_server(args: Namespace, logger.info("vLLM API server version %s", VLLM_VERSION) logger.info("args: %s", args) + set_ulimit() + app = await init_app(args, llm_engine) assert engine is not None diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 16086689a10d1..2e45b474237f9 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -68,7 +68,7 @@ from vllm.entrypoints.utils import with_cancellation from vllm.logger import init_logger from vllm.usage.usage_lib import UsageContext from vllm.utils import (FlexibleArgumentParser, get_open_zmq_ipc_path, - is_valid_ipv6_address) + is_valid_ipv6_address, set_ulimit) from vllm.version import __version__ as VLLM_VERSION TIMEOUT_KEEP_ALIVE = 5 # seconds @@ -727,6 +727,10 @@ async def run_server(args, **uvicorn_kwargs) -> None: sock_addr = (args.host or "", args.port) sock = create_server_socket(sock_addr) + # workaround to avoid footguns where uvicorn drops requests with too + # many concurrent requests active + set_ulimit() + def signal_handler(*_) -> None: # Interrupt server on sigterm while initializing raise KeyboardInterrupt("terminated") diff --git a/vllm/utils.py b/vllm/utils.py index 7d290dcb7dad0..3d198887021dc 100644 --- a/vllm/utils.py +++ b/vllm/utils.py @@ -12,6 +12,7 @@ import inspect import ipaddress import os import re +import resource import signal import socket import subprocess @@ -1818,3 +1819,20 @@ def memory_profiling( result.non_torch_increase_in_bytes = current_cuda_memory_bytes - baseline_memory_in_bytes - weights_memory_in_bytes - diff.torch_memory_in_bytes # noqa result.profile_time = diff.timestamp result.non_kv_cache_memory_in_bytes = result.non_torch_increase_in_bytes + result.torch_peak_increase_in_bytes + result.weights_memory_in_bytes # noqa + + +# Adapted from: https://github.com/sgl-project/sglang/blob/f46f394f4d4dbe4aae85403dec006199b34d2840/python/sglang/srt/utils.py#L630 # noqa: E501Curre +def set_ulimit(target_soft_limit=65535): + resource_type = resource.RLIMIT_NOFILE + current_soft, current_hard = resource.getrlimit(resource_type) + + if current_soft < target_soft_limit: + try: + resource.setrlimit(resource_type, + (target_soft_limit, current_hard)) + except ValueError as e: + logger.warning( + "Found ulimit of %s and failed to automatically increase" + "with error %s. This can cause fd limit errors like" + "`OSError: [Errno 24] Too many open files`. Consider " + "increasing with ulimit -n", current_soft, e) From f49777ba62b4926d0f8c100ab06edb03c5c10098 Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Thu, 26 Dec 2024 16:09:44 -0800 Subject: [PATCH 4/9] Deepseek v3 (#11502) Signed-off-by: mgoin Co-authored-by: mgoin Co-authored-by: robertgshaw2-neuralmagic --- csrc/moe/moe_align_sum_kernels.cu | 158 ++++- vllm/config.py | 11 +- .../layers/fused_moe/fused_moe.py | 17 +- vllm/model_executor/layers/fused_moe/layer.py | 102 ++- .../model_executor/layers/quantization/fp8.py | 7 +- vllm/model_executor/models/deepseek_v3.py | 650 ++++++++++++++++++ vllm/model_executor/models/registry.py | 1 + 7 files changed, 886 insertions(+), 60 deletions(-) create mode 100644 vllm/model_executor/models/deepseek_v3.py diff --git a/csrc/moe/moe_align_sum_kernels.cu b/csrc/moe/moe_align_sum_kernels.cu index fff7ce34c838a..24341d63fb1f8 100644 --- a/csrc/moe/moe_align_sum_kernels.cu +++ b/csrc/moe/moe_align_sum_kernels.cu @@ -113,6 +113,92 @@ __global__ void moe_align_block_size_kernel(scalar_t* __restrict__ topk_ids, } } +// TODO(simon): this is temporarily adapted from +// https://github.com/sgl-project/sglang/commit/31548116a8dc8c6df7e146e0587335a59fc5b9d7 +// we did this to unblock Deepseek V3 but there should be a better +// implementation to manage shared memory. +template +__global__ void moe_align_block_size_global_mem_kernel( + scalar_t* __restrict__ topk_ids, int32_t* sorted_token_ids, + int32_t* expert_ids, int32_t* total_tokens_post_pad, int32_t num_experts, + int32_t block_size, size_t numel, int32_t* tokens_cnts, int32_t* cumsum) { + const size_t tokens_per_thread = CEILDIV(numel, blockDim.x); + const size_t start_idx = threadIdx.x * tokens_per_thread; + + for (int i = 0; i < num_experts; ++i) { + tokens_cnts[index(num_experts, threadIdx.x + 1, i)] = 0; + } + + /** + * In the first step we compute token_cnts[thread_index + 1][expert_index], + * which counts how many tokens in the token shard of thread_index are + * assigned to expert expert_index. + */ + for (int i = start_idx; i < numel && i < start_idx + tokens_per_thread; ++i) { + ++tokens_cnts[index(num_experts, threadIdx.x + 1, topk_ids[i])]; + } + + __syncthreads(); + + // For each expert we accumulate the token counts from the different threads. + if (threadIdx.x < num_experts) { + tokens_cnts[index(num_experts, 0, threadIdx.x)] = 0; + for (int i = 1; i <= blockDim.x; ++i) { + tokens_cnts[index(num_experts, i, threadIdx.x)] += + tokens_cnts[index(num_experts, i - 1, threadIdx.x)]; + } + } + + __syncthreads(); + + // We accumulate the token counts of all experts in thread 0. + if (threadIdx.x == 0) { + cumsum[0] = 0; + for (int i = 1; i <= num_experts; ++i) { + cumsum[i] = cumsum[i - 1] + + CEILDIV(tokens_cnts[index(num_experts, blockDim.x, i - 1)], + block_size) * + block_size; + } + *total_tokens_post_pad = cumsum[num_experts]; + } + + __syncthreads(); + + /** + * For each expert, each thread processes the tokens of the corresponding + * blocks and stores the corresponding expert_id for each block. + */ + if (threadIdx.x < num_experts) { + for (int i = cumsum[threadIdx.x]; i < cumsum[threadIdx.x + 1]; + i += block_size) { + expert_ids[i / block_size] = threadIdx.x; + } + } + + /** + * Each thread processes a token shard, calculating the index of each token + * after sorting by expert number. Given the example topk_ids = + * [0,1,2,1,2,3,0,3,4] and block_size = 4, then the output would be [0, 6, *, + * *, 1, 3, *, *, 2, 4, *, *, 5, 7, *, *, 8, *, *, *], where * represents a + * padding value(preset in python). + */ + for (int i = start_idx; i < numel && i < start_idx + tokens_per_thread; ++i) { + int32_t expert_id = topk_ids[i]; + /** The cumsum[expert_id] stores the starting index of the tokens that the + * expert with expert_id needs to process, and + * tokens_cnts[threadIdx.x][expert_id] stores the indices of the tokens + * processed by the expert with expert_id within the current thread's token + * shard. + */ + int32_t rank_post_pad = + tokens_cnts[index(num_experts, threadIdx.x, expert_id)] + + cumsum[expert_id]; + sorted_token_ids[rank_post_pad] = i; + ++tokens_cnts[index(num_experts, threadIdx.x, expert_id)]; + } +} + template __global__ void moe_sum_kernel( scalar_t* __restrict__ out, // [..., d] @@ -137,25 +223,61 @@ void moe_align_block_size(torch::Tensor topk_ids, int64_t num_experts, torch::Tensor experts_ids, torch::Tensor num_tokens_post_pad) { const cudaStream_t stream = at::cuda::getCurrentCUDAStream(); - VLLM_DISPATCH_INTEGRAL_TYPES( - topk_ids.scalar_type(), "moe_align_block_size_kernel", [&] { - // calc needed amount of shared mem for `tokens_cnts` and `cumsum` - // tensors - const int32_t num_thread = max((int32_t)num_experts, WARP_SIZE); - const int32_t shared_mem = - ((num_thread + 1) * num_experts + (num_experts + 1)) * - sizeof(int32_t); - // set dynamic shared mem - auto kernel = vllm::moe::moe_align_block_size_kernel; - AT_CUDA_CHECK(VLLM_DevFuncAttribute_SET_MaxDynamicSharedMemorySize( - (void*)kernel, shared_mem)); - kernel<<<1, num_thread, shared_mem, stream>>>( - topk_ids.data_ptr(), sorted_token_ids.data_ptr(), - experts_ids.data_ptr(), - num_tokens_post_pad.data_ptr(), num_experts, block_size, - topk_ids.numel()); - }); + // If we have very large number of experts, we can no longer use shared + // memory. + // TODO(simon): the right solution should be calculating the exact right + // amount of shared memory and use that. The num_experts >= 256 is just a + // temporary solution to unblock Deepseek V3. + if (num_experts >= 256) { + VLLM_DISPATCH_INTEGRAL_TYPES( + topk_ids.scalar_type(), "moe_align_block_size_global_mem_kernel", [&] { + // calc needed amount of shared mem for `tokens_cnts` and `cumsum` + // tensors + const int32_t num_thread = max((int32_t)num_experts, WARP_SIZE); + + const int32_t mem_tokens_cnts = + ((num_experts + 1) * num_experts) * sizeof(int32_t); + const int32_t mem_cumsum = (num_experts + 1) * sizeof(int32_t); + // allocate global memory + int32_t* tokens_cnts; + int32_t* cumsum; + cudaMalloc(&tokens_cnts, mem_tokens_cnts); + cudaMalloc(&cumsum, mem_cumsum); + + auto kernel = + vllm::moe::moe_align_block_size_global_mem_kernel; + kernel<<<1, num_thread, 0, stream>>>( + topk_ids.data_ptr(), + sorted_token_ids.data_ptr(), + experts_ids.data_ptr(), + num_tokens_post_pad.data_ptr(), num_experts, block_size, + topk_ids.numel(), tokens_cnts, cumsum); + cudaFree(tokens_cnts); + cudaFree(cumsum); + }); + } else { + VLLM_DISPATCH_INTEGRAL_TYPES( + topk_ids.scalar_type(), "moe_align_block_size_kernel", [&] { + // calc needed amount of shared mem for `tokens_cnts` and `cumsum` + // tensors + const int32_t num_thread = max((int32_t)num_experts, WARP_SIZE); + const int32_t shared_mem = + ((num_thread + 1) * num_experts + (num_experts + 1)) * + sizeof(int32_t); + + // set dynamic shared mem + auto kernel = vllm::moe::moe_align_block_size_kernel; + AT_CUDA_CHECK(VLLM_DevFuncAttribute_SET_MaxDynamicSharedMemorySize( + (void*)kernel, shared_mem)); + kernel<<<1, num_thread, shared_mem, stream>>>( + topk_ids.data_ptr(), + sorted_token_ids.data_ptr(), + experts_ids.data_ptr(), + num_tokens_post_pad.data_ptr(), num_experts, block_size, + topk_ids.numel()); + }); + } } void moe_sum(torch::Tensor& input, // [num_tokens, topk, hidden_size] diff --git a/vllm/config.py b/vllm/config.py index 58649236b4225..ac767bbe14be4 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -596,6 +596,12 @@ class ModelConfig: self.max_seq_len_to_capture = min(self.max_seq_len_to_capture, self.max_model_len) + if (self.hf_config.model_type == 'deepseek_v3' + and not self.enforce_eager): + logger.warning("CUDA graph is not supported for Deepseek V3 yet, " + "fallback to the eager mode.") + self.enforce_eager = True + def _verify_bnb_config(self) -> None: """ The current version of bitsandbytes (0.44.0) with 8-bit models does not @@ -712,8 +718,9 @@ class ModelConfig: def get_head_size(self) -> int: # TODO remove hard code - if hasattr(self.hf_text_config, "model_type" - ) and self.hf_text_config.model_type == 'deepseek_v2': + if hasattr(self.hf_text_config, + "model_type") and (self.hf_text_config.model_type + in ('deepseek_v2', 'deepseek_v3')): # FlashAttention supports only head_size 32, 64, 128, 256, # we need to pad head_size 192 to 256 return 256 diff --git a/vllm/model_executor/layers/fused_moe/fused_moe.py b/vllm/model_executor/layers/fused_moe/fused_moe.py index 92e9ba3c9cebd..4101facbe7874 100644 --- a/vllm/model_executor/layers/fused_moe/fused_moe.py +++ b/vllm/model_executor/layers/fused_moe/fused_moe.py @@ -476,18 +476,29 @@ def fused_topk( return topk_weights, topk_ids -# This is used by the Deepseek-V2 model +# This is used by the Deepseek-V2 and Deepseek-V3 model def grouped_topk(hidden_states: torch.Tensor, gating_output: torch.Tensor, topk: int, renormalize: bool, num_expert_group: int = 0, - topk_group: int = 0): + topk_group: int = 0, + scoring_func: str = "softmax", + e_score_correction_bias: Optional[torch.Tensor] = None): assert hidden_states.shape[0] == gating_output.shape[0], ( "Number of tokens mismatch") - scores = torch.softmax(gating_output, dim=-1) + if scoring_func == "softmax": + scores = torch.softmax(gating_output, dim=-1) + elif scoring_func == "sigmoid": + scores = gating_output.sigmoid() + else: + raise ValueError(f"Unsupported scoring function: {scoring_func}") + + if e_score_correction_bias is not None: + scores.add_(e_score_correction_bias.unsqueeze(0)) + num_token = scores.shape[0] group_scores = scores.view(num_token, num_expert_group, -1).max(dim=-1).values # [n, n_group] diff --git a/vllm/model_executor/layers/fused_moe/layer.py b/vllm/model_executor/layers/fused_moe/layer.py index 55c0a202920ff..01ffac4550f28 100644 --- a/vllm/model_executor/layers/fused_moe/layer.py +++ b/vllm/model_executor/layers/fused_moe/layer.py @@ -73,16 +73,18 @@ class UnquantizedFusedMoEMethod(FusedMoEMethodBase, CustomOp): set_weight_attrs(w2_weight, extra_weight_attrs) def apply( - self, - layer: torch.nn.Module, - x: torch.Tensor, - router_logits: torch.Tensor, - top_k: int, - renormalize: bool, - use_grouped_topk: bool, - topk_group: Optional[int] = None, - num_expert_group: Optional[int] = None, - custom_routing_function: Optional[Callable] = None + self, + layer: torch.nn.Module, + x: torch.Tensor, + router_logits: torch.Tensor, + top_k: int, + renormalize: bool, + use_grouped_topk: bool, + topk_group: Optional[int] = None, + num_expert_group: Optional[int] = None, + custom_routing_function: Optional[Callable] = None, + scoring_func: str = "softmax", + e_score_correction_bias: Optional[torch.Tensor] = None ) -> torch.Tensor: return self.forward(x=x, layer=layer, @@ -92,19 +94,23 @@ class UnquantizedFusedMoEMethod(FusedMoEMethodBase, CustomOp): use_grouped_topk=use_grouped_topk, topk_group=topk_group, num_expert_group=num_expert_group, - custom_routing_function=custom_routing_function) + custom_routing_function=custom_routing_function, + scoring_func=scoring_func, + e_score_correction_bias=e_score_correction_bias) def forward_cuda( - self, - layer: torch.nn.Module, - x: torch.Tensor, - use_grouped_topk: bool, - top_k: int, - router_logits: torch.Tensor, - renormalize: bool, - topk_group: Optional[int] = None, - num_expert_group: Optional[int] = None, - custom_routing_function: Optional[Callable] = None + self, + layer: torch.nn.Module, + x: torch.Tensor, + use_grouped_topk: bool, + top_k: int, + router_logits: torch.Tensor, + renormalize: bool, + topk_group: Optional[int] = None, + num_expert_group: Optional[int] = None, + custom_routing_function: Optional[Callable] = None, + scoring_func: str = "softmax", + e_score_correction_bias: Optional[torch.Tensor] = None ) -> torch.Tensor: topk_weights, topk_ids = FusedMoE.select_experts( hidden_states=x, @@ -114,7 +120,9 @@ class UnquantizedFusedMoEMethod(FusedMoEMethodBase, CustomOp): renormalize=renormalize, topk_group=topk_group, num_expert_group=num_expert_group, - custom_routing_function=custom_routing_function) + custom_routing_function=custom_routing_function, + scoring_func=scoring_func, + e_score_correction_bias=e_score_correction_bias) return fused_experts(hidden_states=x, w1=layer.w13_weight, @@ -128,21 +136,29 @@ class UnquantizedFusedMoEMethod(FusedMoEMethodBase, CustomOp): "The CPU backend currently does not support MoE.") def forward_tpu( - self, - layer: torch.nn.Module, - x: torch.Tensor, - use_grouped_topk: bool, - top_k: int, - router_logits: torch.Tensor, - renormalize: bool, - topk_group: Optional[int] = None, - num_expert_group: Optional[int] = None, - custom_routing_function: Optional[Callable] = None + self, + layer: torch.nn.Module, + x: torch.Tensor, + use_grouped_topk: bool, + top_k: int, + router_logits: torch.Tensor, + renormalize: bool, + topk_group: Optional[int] = None, + num_expert_group: Optional[int] = None, + custom_routing_function: Optional[Callable] = None, + scoring_func: str = "softmax", + e_score_correction_bias: Optional[torch.Tensor] = None ) -> torch.Tensor: assert not use_grouped_topk assert num_expert_group is None assert topk_group is None assert custom_routing_function is None + if scoring_func != "softmax": + raise NotImplementedError( + "Only softmax scoring function is supported for TPU.") + if e_score_correction_bias is not None: + raise NotImplementedError( + "Expert score correction bias is not supported for TPU.") return fused_moe_pallas(hidden_states=x, w1=layer.w13_weight, w2=layer.w2_weight, @@ -156,7 +172,7 @@ class UnquantizedFusedMoEMethod(FusedMoEMethodBase, CustomOp): class FusedMoE(torch.nn.Module): """FusedMoE layer for MoE models. - This layer contains both MergedColumnParallel weights (gate_up_proj / + This layer contains both MergedColumnParallel weights (gate_up_proj / w13) and RowParallelLinear weights (down_proj/ w2). Note: Mixtral uses w1, w2, and w3 for gate, up, and down_proj. We @@ -190,6 +206,8 @@ class FusedMoE(torch.nn.Module): tp_size: Optional[int] = None, prefix: str = "", custom_routing_function: Optional[Callable] = None, + scoring_func: str = "softmax", + e_score_correction_bias: Optional[torch.Tensor] = None, ): super().__init__() @@ -210,6 +228,12 @@ class FusedMoE(torch.nn.Module): self.num_expert_group = num_expert_group self.topk_group = topk_group self.custom_routing_function = custom_routing_function + self.scoring_func = scoring_func + self.e_score_correction_bias = e_score_correction_bias + + if self.scoring_func != "softmax" and not self.use_grouped_topk: + raise ValueError("Only softmax scoring function is supported for " + "non-grouped topk.") if quant_config is None: self.quant_method: Optional[QuantizeMethodBase] = ( @@ -446,7 +470,9 @@ class FusedMoE(torch.nn.Module): renormalize: bool, topk_group: Optional[int] = None, num_expert_group: Optional[int] = None, - custom_routing_function: Optional[Callable] = None): + custom_routing_function: Optional[Callable] = None, + scoring_func: str = "softmax", + e_score_correction_bias: Optional[torch.Tensor] = None): from vllm.model_executor.layers.fused_moe.fused_moe import ( fused_topk, grouped_topk) @@ -460,7 +486,9 @@ class FusedMoE(torch.nn.Module): topk=top_k, renormalize=renormalize, num_expert_group=num_expert_group, - topk_group=topk_group) + topk_group=topk_group, + scoring_func=scoring_func, + e_score_correction_bias=e_score_correction_bias) elif custom_routing_function is None: topk_weights, topk_ids = fused_topk(hidden_states=hidden_states, gating_output=router_logits, @@ -489,7 +517,9 @@ class FusedMoE(torch.nn.Module): use_grouped_topk=self.use_grouped_topk, topk_group=self.topk_group, num_expert_group=self.num_expert_group, - custom_routing_function=self.custom_routing_function) + custom_routing_function=self.custom_routing_function, + scoring_func=self.scoring_func, + e_score_correction_bias=self.e_score_correction_bias) if self.reduce_results and self.tp_size > 1: final_hidden_states = tensor_model_parallel_all_reduce( diff --git a/vllm/model_executor/layers/quantization/fp8.py b/vllm/model_executor/layers/quantization/fp8.py index 5dfd86727a02a..4362468c1db69 100644 --- a/vllm/model_executor/layers/quantization/fp8.py +++ b/vllm/model_executor/layers/quantization/fp8.py @@ -605,6 +605,8 @@ class Fp8MoEMethod(FusedMoEMethodBase): topk_group: Optional[int] = None, num_expert_group: Optional[int] = None, custom_routing_function: Optional[Callable] = None, + scoring_func: str = "softmax", + e_score_correction_bias: Optional[torch.Tensor] = None, ) -> torch.Tensor: from vllm.model_executor.layers.fused_moe import fused_experts @@ -617,7 +619,10 @@ class Fp8MoEMethod(FusedMoEMethodBase): renormalize=renormalize, topk_group=topk_group, num_expert_group=num_expert_group, - custom_routing_function=custom_routing_function) + custom_routing_function=custom_routing_function, + scoring_func=scoring_func, + e_score_correction_bias=e_score_correction_bias, + ) return fused_experts( x, diff --git a/vllm/model_executor/models/deepseek_v3.py b/vllm/model_executor/models/deepseek_v3.py new file mode 100644 index 0000000000000..333dc019b4d99 --- /dev/null +++ b/vllm/model_executor/models/deepseek_v3.py @@ -0,0 +1,650 @@ +# Adapted from +# https://github.com/huggingface/transformers/blob/v4.28.0/src/transformers/models/llama/modeling_llama.py +# Copyright 2023 The vLLM team. +# Copyright 2023 DeepSeek-AI and the HuggingFace Inc. team. All rights reserved. +# +# This code is based on EleutherAI's GPT-NeoX library and the GPT-NeoX +# and OPT implementations in this library. It has been modified from its +# original forms to accommodate minor architectural differences compared +# to GPT-NeoX and OPT used by the Meta AI team that trained the model. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Inference-only DeepseekV3 model.""" +from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union + +import torch +from torch import nn +from transformers import PretrainedConfig + +from vllm.attention import Attention, AttentionMetadata +from vllm.config import CacheConfig, VllmConfig +from vllm.distributed import (get_pp_group, + get_tensor_model_parallel_world_size, + tensor_model_parallel_all_reduce) +from vllm.model_executor.layers.activation import SiluAndMul +from vllm.model_executor.layers.fused_moe import FusedMoE +from vllm.model_executor.layers.layernorm import RMSNorm +from vllm.model_executor.layers.linear import (ColumnParallelLinear, + MergedColumnParallelLinear, + ReplicatedLinear, + RowParallelLinear) +from vllm.model_executor.layers.logits_processor import LogitsProcessor +from vllm.model_executor.layers.quantization import QuantizationConfig +from vllm.model_executor.layers.rotary_embedding import get_rope +from vllm.model_executor.layers.sampler import SamplerOutput, get_sampler +from vllm.model_executor.layers.vocab_parallel_embedding import ( + ParallelLMHead, VocabParallelEmbedding) +from vllm.model_executor.model_loader.weight_utils import default_weight_loader +from vllm.model_executor.sampling_metadata import SamplingMetadata +from vllm.sequence import IntermediateTensors + +from .interfaces import SupportsPP +from .utils import (PPMissingLayer, is_pp_missing_parameter, + make_empty_intermediate_tensors_factory, make_layers, + maybe_prefix) + + +class DeepseekV3MLP(nn.Module): + + def __init__( + self, + hidden_size: int, + intermediate_size: int, + hidden_act: str, + quant_config: Optional[QuantizationConfig] = None, + reduce_results: bool = True, + prefix: str = "", + ) -> None: + super().__init__() + self.gate_up_proj = MergedColumnParallelLinear( + hidden_size, [intermediate_size] * 2, + bias=False, + quant_config=quant_config, + prefix=f"{prefix}.gate_up_proj") + self.down_proj = RowParallelLinear(intermediate_size, + hidden_size, + bias=False, + quant_config=quant_config, + reduce_results=reduce_results, + prefix=f"{prefix}.down_proj") + if hidden_act != "silu": + raise ValueError(f"Unsupported activation: {hidden_act}. " + "Only silu is supported for now.") + self.act_fn = SiluAndMul() + + def forward(self, x): + gate_up, _ = self.gate_up_proj(x) + x = self.act_fn(gate_up) + x, _ = self.down_proj(x) + return x + + +class DeepseekV3MoE(nn.Module): + + def __init__( + self, + config: PretrainedConfig, + quant_config: Optional[QuantizationConfig] = None, + prefix: str = "", + ): + super().__init__() + self.tp_size = get_tensor_model_parallel_world_size() + self.routed_scaling_factor = config.routed_scaling_factor + self.n_shared_experts = config.n_shared_experts + self.routed_scaling_factor = config.routed_scaling_factor + if self.tp_size > config.n_routed_experts: + raise ValueError( + f"Tensor parallel size {self.tp_size} is greater than " + f"the number of experts {config.n_routed_experts}.") + + if config.hidden_act != "silu": + raise ValueError(f"Unsupported activation: {config.hidden_act}. " + "Only silu is supported for now.") + + self.gate = ReplicatedLinear(config.hidden_size, + config.n_routed_experts, + bias=False, + quant_config=None, + prefix=f"{prefix}.gate") + if config.topk_method == "noaux_tc": + self.gate.e_score_correction_bias = nn.Parameter( + torch.empty(config.n_routed_experts)) + else: + self.gate.e_score_correction_bias = None + + self.experts = FusedMoE( + num_experts=config.n_routed_experts, + top_k=config.num_experts_per_tok, + hidden_size=config.hidden_size, + intermediate_size=config.moe_intermediate_size, + reduce_results=False, + renormalize=config.norm_topk_prob, + quant_config=quant_config, + use_grouped_topk=True, + num_expert_group=config.n_group, + topk_group=config.topk_group, + prefix=f"{prefix}.experts", + scoring_func=config.scoring_func, + e_score_correction_bias=self.gate.e_score_correction_bias) + + if config.n_shared_experts is not None: + intermediate_size = (config.moe_intermediate_size * + config.n_shared_experts) + self.shared_experts = DeepseekV3MLP( + hidden_size=config.hidden_size, + intermediate_size=intermediate_size, + hidden_act=config.hidden_act, + quant_config=quant_config, + reduce_results=False, + ) + + def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: + num_tokens, hidden_dim = hidden_states.shape + hidden_states = hidden_states.view(-1, hidden_dim) + if self.n_shared_experts is not None: + shared_output = self.shared_experts(hidden_states) + # router_logits: (num_tokens, n_experts) + router_logits, _ = self.gate(hidden_states) + final_hidden_states = self.experts( + hidden_states=hidden_states, + router_logits=router_logits) * self.routed_scaling_factor + if shared_output is not None: + final_hidden_states = final_hidden_states + shared_output + if self.tp_size > 1: + final_hidden_states = tensor_model_parallel_all_reduce( + final_hidden_states) + + return final_hidden_states.view(num_tokens, hidden_dim) + + +def yarn_get_mscale(scale: float = 1, mscale: float = 1) -> float: + import math + if scale <= 1: + return 1.0 + return 0.1 * mscale * math.log(scale) + 1.0 + + +class DeepseekV3Attention(nn.Module): + + def __init__( + self, + config: PretrainedConfig, + hidden_size: int, + num_heads: int, + qk_nope_head_dim: int, + qk_rope_head_dim: int, + v_head_dim: int, + q_lora_rank: int, + kv_lora_rank: int, + rope_theta: float = 10000, + rope_scaling: Optional[Dict[str, Any]] = None, + max_position_embeddings: int = 8192, + cache_config: Optional[CacheConfig] = None, + quant_config: Optional[QuantizationConfig] = None, + prefix: str = "", + ) -> None: + super().__init__() + self.hidden_size = hidden_size + self.qk_nope_head_dim = qk_nope_head_dim + self.qk_rope_head_dim = qk_rope_head_dim + self.qk_head_dim = qk_nope_head_dim + qk_rope_head_dim + self.v_head_dim = v_head_dim + self.q_lora_rank = q_lora_rank + self.kv_lora_rank = kv_lora_rank + self.num_heads = num_heads + tp_size = get_tensor_model_parallel_world_size() + assert num_heads % tp_size == 0 + self.num_local_heads = num_heads // tp_size + self.scaling = self.qk_head_dim**-0.5 + self.rope_theta = rope_theta + self.max_position_embeddings = max_position_embeddings + + if self.q_lora_rank is not None: + self.q_a_proj = ReplicatedLinear(self.hidden_size, + self.q_lora_rank, + bias=False, + quant_config=quant_config, + prefix=f"{prefix}.q_a_proj") + self.q_a_layernorm = RMSNorm(self.q_lora_rank, + eps=config.rms_norm_eps) + self.q_b_proj = ColumnParallelLinear(q_lora_rank, + self.num_heads * + self.qk_head_dim, + bias=False, + quant_config=quant_config, + prefix=f"{prefix}.q_b_proj") + else: + self.q_proj = ColumnParallelLinear(self.hidden_size, + self.num_heads * + self.qk_head_dim, + bias=False, + quant_config=quant_config, + prefix=f"{prefix}.q_proj") + + self.kv_a_proj_with_mqa = ReplicatedLinear( + self.hidden_size, + self.kv_lora_rank + self.qk_rope_head_dim, + bias=False, + quant_config=quant_config, + prefix=f"{prefix}.kv_a_proj_with_mqa") + self.kv_a_layernorm = RMSNorm(self.kv_lora_rank, + eps=config.rms_norm_eps) + self.kv_b_proj = ColumnParallelLinear( + self.kv_lora_rank, + self.num_heads * (self.qk_nope_head_dim + self.v_head_dim), + bias=False, + quant_config=quant_config, + prefix=f"{prefix}.kv_b_proj") + # O projection. + self.o_proj = RowParallelLinear(self.num_heads * self.v_head_dim, + self.hidden_size, + bias=False, + quant_config=quant_config, + prefix=f"{prefix}.o_proj") + rope_scaling["rope_type"] = 'deepseek_yarn' + self.rotary_emb = get_rope(qk_rope_head_dim, + rotary_dim=qk_rope_head_dim, + max_position=max_position_embeddings, + base=rope_theta, + rope_scaling=rope_scaling, + is_neox_style=False) + + if rope_scaling: + mscale_all_dim = rope_scaling.get("mscale_all_dim", False) + scaling_factor = rope_scaling["factor"] + mscale = yarn_get_mscale(scaling_factor, float(mscale_all_dim)) + self.scaling = self.scaling * mscale * mscale + + # self.attn = Attention(self.num_heads, + # self.qk_head_dim, + # self.scaling, + # num_kv_heads=self.num_heads) + + # TODO, support head_size 192 + self.attn = Attention(self.num_local_heads, + 256, + self.scaling, + num_kv_heads=self.num_local_heads, + cache_config=cache_config, + quant_config=quant_config, + prefix=f"{prefix}.attn") + + def forward( + self, + positions: torch.Tensor, + hidden_states: torch.Tensor, + kv_cache: torch.Tensor, + attn_metadata: AttentionMetadata, + ) -> torch.Tensor: + if self.q_lora_rank is not None: + q = self.q_a_proj(hidden_states)[0] + q = self.q_a_layernorm(q) + q = self.q_b_proj(q)[0].view(-1, self.num_local_heads, + self.qk_head_dim) + else: + q = self.q_proj(hidden_states)[0].view(-1, self.num_local_heads, + self.qk_head_dim) + q_nope, q_pe = q.split([self.qk_nope_head_dim, self.qk_rope_head_dim], + dim=-1) + latent_cache = self.kv_a_proj_with_mqa(hidden_states)[0] + kv_a, _ = latent_cache.split( + [self.kv_lora_rank, self.qk_rope_head_dim], dim=-1) + latent_cache = latent_cache.unsqueeze(1) + kv_a = self.kv_a_layernorm(kv_a.contiguous()) + kv = self.kv_b_proj(kv_a)[0] + kv = kv.view(-1, self.num_local_heads, + self.qk_nope_head_dim + self.v_head_dim) + k_nope, v = kv.split([self.qk_nope_head_dim, self.v_head_dim], dim=-1) + k_pe = latent_cache[:, :, self.kv_lora_rank:] + q_pe, k_pe = self.rotary_emb(positions, q_pe, k_pe) + q[..., self.qk_nope_head_dim:] = q_pe + k = torch.empty_like(q) + k[..., :self.qk_nope_head_dim] = k_nope + k[..., self.qk_nope_head_dim:] = k_pe + q = torch.nn.functional.pad(q, [0, 256 - self.qk_head_dim], + value=0).view(-1, + self.num_local_heads * 256) + k = torch.nn.functional.pad(k, [0, 256 - self.qk_head_dim], + value=0).view(-1, + self.num_local_heads * 256) + v = torch.nn.functional.pad(v, [0, 256 - self.v_head_dim], + value=0).view(-1, + self.num_local_heads * 256) + attn_output = self.attn(q, k, v, kv_cache, attn_metadata) + attn_output = attn_output.view( + -1, self.num_local_heads, 256)[..., :self.v_head_dim].reshape( + -1, self.num_local_heads * self.v_head_dim) + output, _ = self.o_proj(attn_output) + return output + + +class DeepseekV3DecoderLayer(nn.Module): + + def __init__( + self, + config: PretrainedConfig, + prefix: str, + cache_config: Optional[CacheConfig] = None, + quant_config: Optional[QuantizationConfig] = None, + ) -> None: + super().__init__() + self.hidden_size = config.hidden_size + rope_theta = getattr(config, "rope_theta", 10000) + rope_scaling = getattr(config, "rope_scaling", None) + max_position_embeddings = getattr(config, "max_position_embeddings", + 8192) + # DecoderLayers are created with `make_layers` which passes the prefix + # with the layer's index. + layer_idx = int(prefix.split(sep='.')[-1]) + self.self_attn = DeepseekV3Attention( + config=config, + hidden_size=self.hidden_size, + num_heads=config.num_attention_heads, + qk_nope_head_dim=config.qk_nope_head_dim, + qk_rope_head_dim=config.qk_rope_head_dim, + v_head_dim=config.v_head_dim, + q_lora_rank=config.q_lora_rank + if hasattr(config, "q_lora_rank") else None, + kv_lora_rank=config.kv_lora_rank, + rope_theta=rope_theta, + rope_scaling=rope_scaling, + max_position_embeddings=max_position_embeddings, + cache_config=cache_config, + quant_config=quant_config, + prefix=f"{prefix}.self_attn", + ) + if (config.n_routed_experts is not None + and layer_idx >= config.first_k_dense_replace + and layer_idx % config.moe_layer_freq == 0): + self.mlp = DeepseekV3MoE( + config=config, + quant_config=quant_config, + prefix=f"{prefix}.mlp", + ) + else: + self.mlp = DeepseekV3MLP( + hidden_size=config.hidden_size, + intermediate_size=config.intermediate_size, + hidden_act=config.hidden_act, + quant_config=quant_config, + prefix=f"{prefix}.mlp", + ) + self.input_layernorm = RMSNorm(config.hidden_size, + eps=config.rms_norm_eps) + self.post_attention_layernorm = RMSNorm(config.hidden_size, + eps=config.rms_norm_eps) + + def forward( + self, + positions: torch.Tensor, + hidden_states: torch.Tensor, + kv_cache: torch.Tensor, + attn_metadata: AttentionMetadata, + residual: Optional[torch.Tensor], + ) -> torch.Tensor: + # Self Attention + if residual is None: + residual = hidden_states + hidden_states = self.input_layernorm(hidden_states) + else: + hidden_states, residual = self.input_layernorm( + hidden_states, residual) + hidden_states = self.self_attn( + positions=positions, + hidden_states=hidden_states, + kv_cache=kv_cache, + attn_metadata=attn_metadata, + ) + + # Fully Connected + hidden_states, residual = self.post_attention_layernorm( + hidden_states, residual) + hidden_states = self.mlp(hidden_states) + return hidden_states, residual + + +# TODO(simon): check whether we support torch compile for Deepseek V3 +# @support_torch_compile +class DeepseekV3Model(nn.Module): + + fall_back_to_pt_during_load = False + + def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""): + super().__init__() + + config = vllm_config.model_config.hf_config + cache_config = vllm_config.cache_config + quant_config = vllm_config.quant_config + + self.padding_idx = config.pad_token_id + self.vocab_size = config.vocab_size + + if get_pp_group().is_first_rank: + self.embed_tokens = VocabParallelEmbedding( + config.vocab_size, + config.hidden_size, + ) + else: + self.embed_tokens = PPMissingLayer() + + self.start_layer, self.end_layer, self.layers = make_layers( + config.num_hidden_layers, + lambda prefix: DeepseekV3DecoderLayer( + config, + prefix, + cache_config=cache_config, + quant_config=quant_config, + ), + prefix=f"{prefix}.layers") + + if get_pp_group().is_last_rank: + self.norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps) + else: + self.norm = PPMissingLayer() + self.make_empty_intermediate_tensors = ( + make_empty_intermediate_tensors_factory( + ["hidden_states", "residual"], config.hidden_size)) + + def get_input_embeddings(self, input_ids: torch.Tensor) -> torch.Tensor: + return self.embed_tokens(input_ids) + + def forward( + self, + input_ids: torch.Tensor, + positions: torch.Tensor, + kv_caches: List[torch.Tensor], + attn_metadata: AttentionMetadata, + intermediate_tensors: Optional[IntermediateTensors], + inputs_embeds: Optional[torch.Tensor] = None, + ) -> Union[torch.Tensor, IntermediateTensors]: + if get_pp_group().is_first_rank: + if inputs_embeds is not None: + hidden_states = inputs_embeds + else: + hidden_states = self.get_input_embeddings(input_ids) + residual = None + else: + assert intermediate_tensors is not None + hidden_states = intermediate_tensors["hidden_states"] + residual = intermediate_tensors["residual"] + + for i in range(self.start_layer, self.end_layer): + layer = self.layers[i] + hidden_states, residual = layer(positions, hidden_states, + kv_caches[i - self.start_layer], + attn_metadata, residual) + + if not get_pp_group().is_last_rank: + return IntermediateTensors({ + "hidden_states": hidden_states, + "residual": residual + }) + + hidden_states, _ = self.norm(hidden_states, residual) + return hidden_states + + +class DeepseekV3ForCausalLM(nn.Module, SupportsPP): + + def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""): + super().__init__() + config = vllm_config.model_config.hf_config + quant_config = vllm_config.quant_config + self.config = config + self.quant_config = quant_config + self.model = DeepseekV3Model(vllm_config=vllm_config, + prefix=maybe_prefix(prefix, "model")) + self.lm_head = ParallelLMHead(config.vocab_size, + config.hidden_size, + quant_config=quant_config) + self.logits_processor = LogitsProcessor(config.vocab_size) + self.sampler = get_sampler() + self.make_empty_intermediate_tensors = ( + self.model.make_empty_intermediate_tensors) + + def get_input_embeddings(self, input_ids: torch.Tensor) -> torch.Tensor: + return self.model.get_input_embeddings(input_ids) + + def forward( + self, + input_ids: torch.Tensor, + positions: torch.Tensor, + kv_caches: List[torch.Tensor], + attn_metadata: AttentionMetadata, + intermediate_tensors: Optional[IntermediateTensors] = None, + inputs_embeds: Optional[torch.Tensor] = None, + ) -> Union[torch.Tensor, IntermediateTensors]: + hidden_states = self.model(input_ids, positions, kv_caches, + attn_metadata, intermediate_tensors, + inputs_embeds) + return hidden_states + + def compute_logits( + self, + hidden_states: torch.Tensor, + sampling_metadata: SamplingMetadata, + ) -> Optional[torch.Tensor]: + logits = self.logits_processor(self.lm_head, hidden_states, + sampling_metadata) + return logits + + def sample( + self, + logits: Optional[torch.Tensor], + sampling_metadata: SamplingMetadata, + ) -> Optional[SamplerOutput]: + next_tokens = self.sampler(logits, sampling_metadata) + return next_tokens + + def make_empty_intermediate_tensors( + self, batch_size: int, dtype: torch.dtype, + device: torch.device) -> IntermediateTensors: + return IntermediateTensors({ + "hidden_states": + torch.zeros((batch_size, self.config.hidden_size), + dtype=dtype, + device=device), + "residual": + torch.zeros((batch_size, self.config.hidden_size), + dtype=dtype, + device=device), + }) + + def load_weights(self, weights: Iterable[Tuple[str, + torch.Tensor]]) -> Set[str]: + stacked_params_mapping = [ + # (param_name, shard_name, shard_id) + ("gate_up_proj", "gate_proj", 0), + ("gate_up_proj", "up_proj", 1), + ] + + # Params for weights, fp8 weight scales, fp8 activation scales + # (param_name, weight_name, expert_id, shard_id) + expert_params_mapping = FusedMoE.make_expert_params_mapping( + ckpt_gate_proj_name="gate_proj", + ckpt_down_proj_name="down_proj", + ckpt_up_proj_name="up_proj", + num_experts=self.config.n_routed_experts) + + params_dict = dict(self.named_parameters()) + loaded_params: Set[str] = set() + for name, loaded_weight in weights: + if "rotary_emb.inv_freq" in name: + continue + + # TODO(simon): support nextn predict layers + if self.config.num_nextn_predict_layers > 0: + assert self.config.num_nextn_predict_layers == 1 + layer_idx = self.config.num_hidden_layers + if name.startswith(f"model.layers.{layer_idx}"): + continue + + for (param_name, weight_name, shard_id) in stacked_params_mapping: + # Skip non-stacked layers and experts (experts handled below). + if weight_name not in name: + continue + # We have mlp.experts[0].gate_proj in the checkpoint. + # Since we handle the experts below in expert_params_mapping, + # we need to skip here BEFORE we update the name, otherwise + # name will be updated to mlp.experts[0].gate_up_proj, which + # will then be updated below in expert_params_mapping + # for mlp.experts[0].gate_gate_up_proj, which breaks load. + if (("mlp.experts." in name) and name not in params_dict): + continue + name = name.replace(weight_name, param_name) + # Skip loading extra bias for GPTQ models. + if name.endswith(".bias") and name not in params_dict: + continue + + if is_pp_missing_parameter(name, self): + continue + + param = params_dict[name] + weight_loader = param.weight_loader + weight_loader(param, loaded_weight, shard_id) + break + else: + for mapping in expert_params_mapping: + param_name, weight_name, expert_id, shard_id = mapping + if weight_name not in name: + continue + name = name.replace(weight_name, param_name) + + if is_pp_missing_parameter(name, self): + continue + + param = params_dict[name] + weight_loader = param.weight_loader + weight_loader(param, + loaded_weight, + name, + shard_id=shard_id, + expert_id=expert_id) + break + else: + # Skip loading extra bias for GPTQ models. + if name.endswith(".bias") and name not in params_dict: + continue + + if is_pp_missing_parameter(name, self): + continue + + if name not in params_dict: + for key in params_dict: + print(key) + param = params_dict[name] + weight_loader = getattr(param, "weight_loader", + default_weight_loader) + weight_loader(param, loaded_weight) + loaded_params.add(name) + return loaded_params diff --git a/vllm/model_executor/models/registry.py b/vllm/model_executor/models/registry.py index b32a3421d5841..feb33bb373c3e 100644 --- a/vllm/model_executor/models/registry.py +++ b/vllm/model_executor/models/registry.py @@ -45,6 +45,7 @@ _TEXT_GENERATION_MODELS = { "DeciLMForCausalLM": ("decilm", "DeciLMForCausalLM"), "DeepseekForCausalLM": ("deepseek", "DeepseekForCausalLM"), "DeepseekV2ForCausalLM": ("deepseek_v2", "DeepseekV2ForCausalLM"), + "DeepseekV3ForCausalLM": ("deepseek_v3", "DeepseekV3ForCausalLM"), "ExaoneForCausalLM": ("exaone", "ExaoneForCausalLM"), "FalconForCausalLM": ("falcon", "FalconForCausalLM"), "GemmaForCausalLM": ("gemma", "GemmaForCausalLM"), From 82d24f7aacf79bbccb6413333dff6303fbbb44b9 Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Thu, 26 Dec 2024 16:21:56 -0800 Subject: [PATCH 5/9] [Docs] Document Deepseek V3 support (#11535) Signed-off-by: simon-mo --- README.md | 2 +- docs/source/models/supported_models.md | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 93b71ddaccc61..f83c9d759b359 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ vLLM is flexible and easy to use with: vLLM seamlessly supports most popular open-source models on HuggingFace, including: - Transformer-like LLMs (e.g., Llama) -- Mixture-of-Expert LLMs (e.g., Mixtral) +- Mixture-of-Expert LLMs (e.g., Mixtral, Deepseek-V2 and V3) - Embedding Models (e.g. E5-Mistral) - Multi-modal LLMs (e.g., LLaVA) diff --git a/docs/source/models/supported_models.md b/docs/source/models/supported_models.md index 85fba83147708..95add0d71bbab 100644 --- a/docs/source/models/supported_models.md +++ b/docs/source/models/supported_models.md @@ -137,6 +137,11 @@ See [this page](#generative-models) for more information on how to use generativ - :code:`deepseek-ai/DeepSeek-V2`, :code:`deepseek-ai/DeepSeek-V2-Chat` etc. - - ✅︎ + * - :code:`DeepseekV3ForCausalLM` + - DeepSeek-V3 + - :code:`deepseek-ai/DeepSeek-V3-Base`, :code:`deepseek-ai/DeepSeek-V3` etc. + - + - ✅︎ * - :code:`ExaoneForCausalLM` - EXAONE-3 - :code:`LGAI-EXAONE/EXAONE-3.0-7.8B-Instruct`, etc. @@ -676,7 +681,7 @@ See [this page](#generative-models) for more information on how to use generativ - PaliGemma, PaliGemma 2 - T + I\ :sup:`E` - :code:`google/paligemma-3b-pt-224`, :code:`google/paligemma-3b-mix-224`, :code:`google/paligemma2-3b-ft-docci-448`, etc. - - + - - ✅︎ - * - :code:`Phi3VForCausalLM` From 0c0c2015c526f1fe6f86fdd8d6bd99a935d2d275 Mon Sep 17 00:00:00 2001 From: Robert Shaw <114415538+robertgshaw2-neuralmagic@users.noreply.github.com> Date: Thu, 26 Dec 2024 19:26:18 -0500 Subject: [PATCH 6/9] Update openai_compatible_server.md (#11536) Co-authored-by: Simon Mo --- docs/source/serving/openai_compatible_server.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/source/serving/openai_compatible_server.md b/docs/source/serving/openai_compatible_server.md index 23c66f72162d2..caf5e8cafd9aa 100644 --- a/docs/source/serving/openai_compatible_server.md +++ b/docs/source/serving/openai_compatible_server.md @@ -112,7 +112,13 @@ completion = client.chat.completions.create( ## Extra HTTP Headers -Only `X-Request-Id` HTTP request header is supported for now. +Only `X-Request-Id` HTTP request header is supported for now. It can be enabled +with `--enable-request-id-headers`. + +> Note that enablement of the headers can impact performance significantly at high QPS +> rates. We recommend implementing HTTP headers at the router level (e.g. via Istio), +> rather than within the vLLM layer for this reason. +> See https://github.com/vllm-project/vllm/pull/11529 for more details. ```python completion = client.chat.completions.create( From 371d04d39bf056e4cc56100c83d4812b7cb230e4 Mon Sep 17 00:00:00 2001 From: Woosuk Kwon Date: Fri, 27 Dec 2024 09:32:38 +0900 Subject: [PATCH 7/9] [V1] Use FlashInfer Sampling Kernel for Top-P & Top-K Sampling (#11394) Signed-off-by: Woosuk Kwon --- tests/v1/sample/test_sampler.py | 54 +++--- vllm/envs.py | 5 +- vllm/v1/sample/ops/__init__.py | 0 vllm/v1/sample/ops/penalties.py | 57 ++++++ vllm/v1/sample/ops/topk_topp_sampler.py | 201 ++++++++++++++++++++ vllm/v1/sample/sampler.py | 234 ++++++++---------------- 6 files changed, 358 insertions(+), 193 deletions(-) create mode 100644 vllm/v1/sample/ops/__init__.py create mode 100644 vllm/v1/sample/ops/penalties.py create mode 100644 vllm/v1/sample/ops/topk_topp_sampler.py diff --git a/tests/v1/sample/test_sampler.py b/tests/v1/sample/test_sampler.py index d8d055805cbea..5ebf72927cfd6 100644 --- a/tests/v1/sample/test_sampler.py +++ b/tests/v1/sample/test_sampler.py @@ -68,7 +68,7 @@ def _create_default_sampling_metadata( no_top_p=True, no_top_k=True, generators={}, - max_num_logprobs=VOCAB_SIZE, + max_num_logprobs=0, prompt_token_ids=_create_prompt_tokens_tensor(prompt_token_ids, vocab_size, device), output_token_ids=output_token_ids, @@ -169,20 +169,14 @@ def test_sampler_min_tokens_penalty(device: str, batch_size: int): sampling_metadata.min_tokens = min_tokens sampling_metadata.stop_token_ids = stop_token_ids sampler = Sampler() - sampler_output = sampler(fake_logits, sampling_metadata) + logits = sampler.apply_penalties(fake_logits, sampling_metadata) + logits = logits.cpu() for batch_idx in range(batch_size): - for vocab in range(VOCAB_SIZE): - # Verify that the logprobs for stop token ids is set - # to -inf. - logprob_index = torch.where( - sampler_output.logprob_token_ids[batch_idx] == - vocab)[0].item() - if vocab in stop_token_ids[batch_idx]: - assert sampler_output.logprobs[batch_idx][ - logprob_index] == -float("inf") + for token_id in range(VOCAB_SIZE): + if token_id in stop_token_ids[batch_idx]: + assert logits[batch_idx][token_id] == -float("inf") else: - assert sampler_output.logprobs[batch_idx][ - logprob_index] != -float("inf") + assert logits[batch_idx][token_id] != -float("inf") @pytest.mark.parametrize("device", CUDA_DEVICES) @@ -205,18 +199,14 @@ def test_sampler_presence_penalty(device: str, batch_size: int, batch_size, presence_penalty, torch.device(device)) sampling_metadata.no_penalties = False sampler = Sampler() - sampler_output = sampler(fake_logits, sampling_metadata) + logits = sampler.apply_penalties(fake_logits, sampling_metadata) + logits = logits.cpu() for batch_idx in range(batch_size): - # The logprobs in the SamplerOutput are arranged in descending order. - # Since all tokens initially have the same logprobs, the non-penalized - # tokens will appear at the beginning, while the penalized tokens - # will appear at the end of the list. - penalized_token_id = sampler_output.logprob_token_ids[batch_idx][ - VOCAB_SIZE - 1] - penalized_log_prod = sampler_output.logprobs[batch_idx][VOCAB_SIZE - 1] - non_penalized_token_id = sampler_output.logprob_token_ids[batch_idx][0] - non_penalized_log_prod = sampler_output.logprobs[batch_idx][0] - assert non_penalized_log_prod > penalized_log_prod + # Since all tokens initially have the same logits, the non-penalized + # token ID will be the one with the highest logit value, while the + # penalized token ID will be the one with the lowest logit value. + non_penalized_token_id = logits[batch_idx].argmax().item() + penalized_token_id = logits[batch_idx].argmin().item() if presence_penalty > 0: # If `presence_penalty` is set to a value greater than 0, it # indicates a preference for new tokens over those already @@ -256,11 +246,11 @@ def test_sampler_frequency_penalty(device: str, batch_size: int, sampling_metadata.output_token_ids = output_token_ids sampling_metadata.no_penalties = False sampler = Sampler() - sampler_output = sampler(fake_logits, sampling_metadata) + logits = sampler.apply_penalties(fake_logits, sampling_metadata) + logits = logits.cpu() for batch_idx in range(batch_size): - logprobs_token_ids = sampler_output.logprob_token_ids[batch_idx] - non_penalized_token_id = logprobs_token_ids[0] - penalized_token_id = logprobs_token_ids[VOCAB_SIZE - 1] + non_penalized_token_id = logits[batch_idx].argmax().item() + penalized_token_id = logits[batch_idx].argmin().item() distinct_sorted_token_ids_in_output = \ sorted_token_ids_in_output[batch_idx] most_frequent_token_id = distinct_sorted_token_ids_in_output[ @@ -305,11 +295,11 @@ def test_sampler_repetition_penalty(device: str, batch_size: int, batch_size, repetition_penalty, torch.device(device)) sampling_metadata.no_penalties = False sampler = Sampler() - sampler_output = sampler(fake_logits, sampling_metadata) + logits = sampler.apply_penalties(fake_logits, sampling_metadata) + logits = logits.cpu() for batch_idx in range(batch_size): - logprobs_token_ids = sampler_output.logprob_token_ids[batch_idx] - non_penalized_token_id = logprobs_token_ids[0] - penalized_token_id = logprobs_token_ids[VOCAB_SIZE - 1] + non_penalized_token_id = logits[batch_idx].argmax().item() + penalized_token_id = logits[batch_idx].argmin().item() prompt_tokens = sampling_metadata.prompt_token_ids[ batch_idx][:].tolist() output_tokens = sampling_metadata.output_token_ids[batch_idx] diff --git a/vllm/envs.py b/vllm/envs.py index 18870c1c6b51a..c4a568c680db0 100644 --- a/vllm/envs.py +++ b/vllm/envs.py @@ -30,7 +30,7 @@ if TYPE_CHECKING: VLLM_LOGGING_CONFIG_PATH: Optional[str] = None VLLM_TRACE_FUNCTION: int = 0 VLLM_ATTENTION_BACKEND: Optional[str] = None - VLLM_USE_FLASHINFER_SAMPLER: bool = False + VLLM_USE_FLASHINFER_SAMPLER: Optional[bool] = None VLLM_USE_FLASHINFER_REJECTION_SAMPLER: bool = False VLLM_FLASHINFER_FORCE_TENSOR_CORES: bool = False VLLM_PP_LAYER_PARTITION: Optional[str] = None @@ -277,7 +277,8 @@ environment_variables: Dict[str, Callable[[], Any]] = { # If set, vllm will use flashinfer sampler "VLLM_USE_FLASHINFER_SAMPLER": - lambda: bool(int(os.getenv("VLLM_USE_FLASHINFER_SAMPLER", "0"))), + lambda: bool(int(os.environ["VLLM_USE_FLASHINFER_SAMPLER"])) + if "VLLM_USE_FLASHINFER_SAMPLER" in os.environ else None, # If set, vllm will force flashinfer to use tensor cores; # otherwise will use heuristic based on model architecture. diff --git a/vllm/v1/sample/ops/__init__.py b/vllm/v1/sample/ops/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/vllm/v1/sample/ops/penalties.py b/vllm/v1/sample/ops/penalties.py new file mode 100644 index 0000000000000..91ebaf9269f32 --- /dev/null +++ b/vllm/v1/sample/ops/penalties.py @@ -0,0 +1,57 @@ +from typing import List, Set, Tuple + +import torch + +from vllm.model_executor.layers.utils import ( + apply_penalties as _apply_penalties) +from vllm.utils import is_pin_memory_available, make_tensor_with_pad + + +def apply_min_token_penalties(logits: torch.Tensor, + output_token_ids: List[List[int]], + stop_token_ids: List[Set[int]], + min_tokens: List[int]) -> None: + """ + Applies minimum token penalty by setting the logits of the stop tokens + to -inf. + """ + min_tokens_logits_to_penalize: List[Tuple[int, int]] = [] + for index, min_token in enumerate(min_tokens): + if (len(output_token_ids[index]) < min_token): + for stop_token_id in stop_token_ids[index]: + min_tokens_logits_to_penalize.append((index, stop_token_id)) + if min_tokens_logits_to_penalize: + logits[tuple(zip(*min_tokens_logits_to_penalize))] = -float("inf") + + +def apply_penalties(logits: torch.Tensor, prompt_token_ids: torch.Tensor, + presence_penalties: torch.Tensor, + frequency_penalties: torch.Tensor, + repetition_penalties: torch.Tensor, + output_token_ids: List[List[int]]) -> torch.Tensor: + """ + Applies presence, frequency and repetition penalties to the logits. + """ + _, vocab_size = logits.shape + output_tokens_t = _convert_to_tensors(output_token_ids, vocab_size, + logits.device) + return _apply_penalties(logits, prompt_token_ids, output_tokens_t, + presence_penalties, frequency_penalties, + repetition_penalties) + + +def _convert_to_tensors(output_token_ids: List[List[int]], vocab_size: int, + device: torch.device) -> torch.Tensor: + """ + Convert the different list data structures to tensors. + """ + output_tokens_tensor = make_tensor_with_pad( + output_token_ids, + # Use the value of vocab_size as a pad since we don't have a + # token_id of this value. + pad=vocab_size, + device="cpu", + dtype=torch.int64, + pin_memory=is_pin_memory_available(), + ) + return output_tokens_tensor.to(device, non_blocking=True) diff --git a/vllm/v1/sample/ops/topk_topp_sampler.py b/vllm/v1/sample/ops/topk_topp_sampler.py new file mode 100644 index 0000000000000..c088c3c129ca5 --- /dev/null +++ b/vllm/v1/sample/ops/topk_topp_sampler.py @@ -0,0 +1,201 @@ +from typing import Dict + +import torch +import torch.nn as nn + +from vllm import envs +from vllm.logger import init_logger +from vllm.platforms import current_platform + +logger = init_logger(__name__) + +try: + import flashinfer.sampling + is_flashinfer_available = True +except ImportError: + is_flashinfer_available = False + + +class TopKTopPSampler(nn.Module): + + def __init__(self): + super().__init__() + if current_platform.is_cuda: + if is_flashinfer_available: + if envs.VLLM_USE_FLASHINFER_SAMPLER is not False: + # NOTE(woosuk): The V0 sampler doesn't use FlashInfer for + # sampling unless VLLM_USE_FLASHINFER_SAMPLER=1 (i.e., by + # default it is unused). For backward compatibility, we set + # `VLLM_USE_FLASHINFER_SAMPLER` as None by default and + # interpret it differently in V0 and V1 samplers: In V0, + # None means False, while in V1, None means True. This is + # why we use the condition + # `envs.VLLM_USE_FLASHINFER_SAMPLER is not False` here. + logger.info("Using FlashInfer for top-p & top-k sampling.") + self.forward = self.forward_cuda + else: + logger.warning( + "FlashInfer is available, but it is not enabled. " + "Falling back to the PyTorch-native implementation of " + "top-p & top-k sampling. For the best performance, " + "please set VLLM_USE_FLASHINFER_SAMPLER=1.") + self.forward = self.forward_native + else: + logger.warning( + "FlashInfer is not available. Falling back to the PyTorch-" + "native implementation of top-p & top-k sampling. For the " + "best performance, please install FalshInfer.") + self.forward = self.forward_native + else: + self.forward = self.forward_native + + def forward_native( + self, + logits: torch.Tensor, + generators: Dict[int, torch.Generator], + no_top_k: bool, + k: torch.Tensor, + no_top_p: bool, + p: torch.Tensor, + ) -> torch.Tensor: + """PyTorch-native implementation of top-k and top-p sampling.""" + logits = apply_top_k_top_p(logits, no_top_k, k, no_top_p, p) + probs = logits.softmax(dim=-1, dtype=torch.float32) + return random_sample(probs, generators) + + def forward_cuda( + self, + logits: torch.Tensor, + generators: Dict[int, torch.Generator], + no_top_k: bool, + k: torch.Tensor, + no_top_p: bool, + p: torch.Tensor, + ) -> torch.Tensor: + """More optimized implementation for top-k and top-p sampling.""" + probs = logits.softmax(dim=-1, dtype=torch.float32) + if no_top_k and no_top_p: + # We prefer `random_sample` over `flashinfer_sample` when sorting is + # not needed. This is because `random_sample` does not require + # CPU-GPU synchronization while `flashinfer_sample` does. + return random_sample(probs, generators) + return flashinfer_sample(probs, no_top_k, k, no_top_p, p, generators) + + +def apply_top_k_top_p( + logits: torch.Tensor, + no_top_k: bool, + k: torch.Tensor, + no_top_p: bool, + p: torch.Tensor, +) -> torch.Tensor: + """Apply top-k and top-p masks to the logits. + + This function sorts the logits tensor, which can be slow for large batches. + """ + if no_top_k and no_top_p: + return logits + logits_sort, logits_idx = logits.sort(dim=-1, descending=False) + + if not no_top_k: + # Apply top-k. + top_k_mask = logits_sort.size(1) - k.to(torch.long) + # Get all the top_k values. + top_k_mask = logits_sort.gather(1, top_k_mask.unsqueeze(dim=1)) + top_k_mask = logits_sort < top_k_mask + logits_sort.masked_fill_(top_k_mask, -float("inf")) + + if not no_top_p: + # Apply top-p. + probs_sort = logits_sort.softmax(dim=-1) + probs_sum = probs_sort.cumsum(dim=-1) + top_p_mask = probs_sum <= 1 - p.unsqueeze(dim=1) + # at least one + top_p_mask[:, -1] = False + logits_sort.masked_fill_(top_p_mask, -float("inf")) + + # Re-sort the probabilities. + logits = logits_sort.scatter(dim=-1, index=logits_idx, src=logits_sort) + return logits + + +def random_sample( + probs: torch.Tensor, + generators: Dict[int, torch.Generator], +) -> torch.Tensor: + """Randomly sample from the probabilities. + + We use this function instead of torch.multinomial because torch.multinomial + causes CPU-GPU synchronization. + """ + q = torch.empty_like(probs) + # NOTE(woosuk): To batch-process the requests without their own seeds, + # which is the common case, we first assume that every request does + # not have its own seed. Then, we overwrite the values for the requests + # that have their own seeds. + if len(generators) != probs.shape[0]: + q.exponential_() + if generators: + # TODO(woosuk): This can be slow because we handle each request + # one by one. Optimize this. + for i, generator in generators.items(): + q[i].exponential_(generator=generator) + return probs.div_(q).argmax(dim=-1).view(-1) + + +def flashinfer_sample( + probs: torch.Tensor, + no_top_k: bool, + k: torch.Tensor, + no_top_p: bool, + p: torch.Tensor, + generators: Dict[int, torch.Generator], +) -> torch.Tensor: + """Sample from the probabilities using FlashInfer. + + Statistically, this function is equivalent to the `random_sample` function. + However, this function is faster because it avoids sorting the logits tensor + via rejection sampling. + + NOTE: The outputs of this function do not necessarily match the outputs of + the `random_sample` function. It only guarantees that the outputs are + statistically equivalent. + + NOTE: This function includes CPU-GPU synchronization, while `random_sample` + does not. Call this function at the end of the forward pass to minimize + the synchronization overhead. + """ + assert not (no_top_k and no_top_p) + max_top_k_round = 32 + batch_size = probs.shape[0] + uniform_samples = torch.empty((max_top_k_round, batch_size), + device=probs.device) + if len(generators) != batch_size: + uniform_samples.uniform_() + if generators: + for i, generator in generators.items(): + uniform_samples[:, i].uniform_(generator=generator) + + if no_top_k: + # Top-p only. + next_token_ids, success = flashinfer.sampling.top_p_sampling_from_probs( + probs, uniform_samples, p, deterministic=True) + elif no_top_p: + # Top-k only. + next_token_ids, success = flashinfer.sampling.top_k_sampling_from_probs( + probs, uniform_samples, k, deterministic=True) + else: + # Both top-k and top-p. + next_token_ids, success = ( + flashinfer.sampling.top_k_top_p_sampling_from_probs( + probs, uniform_samples, k, p, deterministic=True)) + + # NOTE: CPU-GPU synchronization happens here. + if not success.all(): + if not no_top_k: + probs = flashinfer.sampling.top_k_renorm_prob(probs, k) + if not no_top_p: + probs = flashinfer.sampling.top_p_renorm_prob(probs, p) + next_token_ids = flashinfer.sampling.sampling_from_probs( + probs, uniform_samples[0], deterministic=True) + return next_token_ids.view(-1) diff --git a/vllm/v1/sample/sampler.py b/vllm/v1/sample/sampler.py index 82470fb2610f8..1e38453a0ff28 100644 --- a/vllm/v1/sample/sampler.py +++ b/vllm/v1/sample/sampler.py @@ -1,53 +1,55 @@ """A layer that samples the next tokens from the model's outputs.""" -from typing import Dict, List, Set, Tuple +from typing import Tuple import torch import torch.nn as nn -from vllm.model_executor.layers.utils import apply_penalties -from vllm.utils import is_pin_memory_available, make_tensor_with_pad from vllm.v1.outputs import SamplerOutput from vllm.v1.sample.metadata import SamplingMetadata +from vllm.v1.sample.ops.penalties import (apply_min_token_penalties, + apply_penalties) +from vllm.v1.sample.ops.topk_topp_sampler import TopKTopPSampler _SAMPLING_EPS = 1e-5 class Sampler(nn.Module): + def __init__(self): + super().__init__() + self.topk_topp_sampler = TopKTopPSampler() + def forward( self, logits: torch.Tensor, sampling_metadata: SamplingMetadata, ) -> SamplerOutput: - _apply_min_token_penalties(logits, sampling_metadata.output_token_ids, - sampling_metadata.stop_token_ids, - sampling_metadata.min_tokens) - if not sampling_metadata.no_penalties: - assert sampling_metadata.prompt_token_ids is not None - _apply_penalties(logits, sampling_metadata.prompt_token_ids, - sampling_metadata.presence_penalties, - sampling_metadata.frequency_penalties, - sampling_metadata.repetition_penalties, - sampling_metadata.output_token_ids) - logits = self.apply_temperature(logits, sampling_metadata.temperature) - logits = self.apply_top_k_top_p(logits, sampling_metadata) - probs = self.get_probs(logits) - sampled = self.sample(probs, sampling_metadata) - # Use int32 to reduce the tensor size. - sampled = sampled.to(torch.int32) - - if sampling_metadata.max_num_logprobs > 0: - logprobs = self.get_logprobs(logits) - # FIXME: Mask the sampled token_id, get topk logprobs, - # and concatenate the topk with the sampled token_id. - topk_logprobs, topk_indices = torch.topk( - logprobs, sampling_metadata.max_num_logprobs, dim=-1) - # Use int32 to reduce the tensor size. - topk_indices = topk_indices.to(torch.int32) + needs_logprobs = sampling_metadata.max_num_logprobs > 0 + if needs_logprobs: + # NOTE(woosuk): Use the original logits (before any penalties or + # temperature scaling) for the top-k logprobs. + # This is different from the V0 sampler, which uses the logits that + # is used for sampling (after penalties and temperature scaling). + # NOTE: We compute logprobs first because the below ops may + # modify the logits tensor in-place (and we don't want to clone + # the logits tensor for memory efficiency). + topk_logprobs, topk_indices = self.get_topk_logprobs( + logits, sampling_metadata) else: topk_logprobs = None topk_indices = None + # Use float32 for the logits. + logits = logits.to(torch.float32) + # Apply penalties (e.g., min_tokens, freq_penalties). + logits = self.apply_penalties(logits, sampling_metadata) + # Apply temperature. + logits = self.apply_temperature(logits, sampling_metadata.temperature) + # Sample the next token. + sampled = self.sample(logits, sampling_metadata) + # Use int32 to reduce the tensor size. + sampled = sampled.to(torch.int32) + # NOTE: CPU-GPU synchronization happens here. sampler_output = SamplerOutput( sampled_token_ids=sampled.tolist(), @@ -63,71 +65,37 @@ class Sampler(nn.Module): logits: torch.Tensor, temp: torch.Tensor, ) -> torch.Tensor: - # Use float32 to apply temperature scaling. - logits = logits.to(torch.float32) # Avoid division by zero. temp = torch.where(temp < _SAMPLING_EPS, 1.0, temp) # Use in-place division to avoid creating a new tensor. logits.div_(temp.unsqueeze(dim=1)) return logits - def apply_top_k_top_p( - self, - logits: torch.Tensor, - sampling_metadata: SamplingMetadata, - ) -> torch.Tensor: - return _apply_top_k_top_p( - logits, - sampling_metadata.no_top_k, - sampling_metadata.top_k, - sampling_metadata.no_top_p, - sampling_metadata.top_p, - ) - - def get_probs(self, logits: torch.Tensor) -> torch.Tensor: - return torch.softmax(logits, dim=-1, dtype=torch.float32) - - def get_logprobs(self, logits: torch.Tensor) -> torch.Tensor: - return torch.log_softmax(logits, dim=-1, dtype=torch.float32) - - def greedy_sample(self, probs: torch.Tensor) -> torch.Tensor: - return probs.argmax(dim=-1).view(-1) - - def random_sample( - self, - probs: torch.Tensor, - generators: Dict[int, torch.Generator], - ) -> torch.Tensor: - q = torch.empty_like(probs) - # NOTE(woosuk): To batch-process the requests without their own seeds, - # which is the common case, we first assume that every request does - # not have its own seed. Then, we overwrite the values for the requests - # that have their own seeds. - if len(generators) != probs.shape[0]: - # This might still be done here unnecessarily if there are greedies - q.exponential_() - if generators: - # TODO(woosuk): This can be slow because we handle each request - # one by one. Optimize this. - for i, generator in generators.items(): - q[i].exponential_(generator=generator) - return probs.div_(q).argmax(dim=-1).view(-1) + def greedy_sample(self, logits: torch.Tensor) -> torch.Tensor: + return logits.argmax(dim=-1).view(-1) def sample( self, - probs: torch.Tensor, + logits: torch.Tensor, sampling_metadata: SamplingMetadata, ) -> torch.Tensor: assert not (sampling_metadata.all_greedy and sampling_metadata.all_random) if sampling_metadata.all_greedy: - return self.greedy_sample(probs) - if sampling_metadata.all_random: - return self.random_sample(probs, sampling_metadata.generators) + return self.greedy_sample(logits) - greedy_sampled = self.greedy_sample(probs) - random_sampled = self.random_sample(probs, - sampling_metadata.generators) + random_sampled = self.topk_topp_sampler( + logits, + sampling_metadata.generators, + sampling_metadata.no_top_k, + sampling_metadata.top_k, + sampling_metadata.no_top_p, + sampling_metadata.top_p, + ) + if sampling_metadata.all_random: + return random_sampled + + greedy_sampled = self.greedy_sample(logits) sampled = torch.where( sampling_metadata.temperature < _SAMPLING_EPS, greedy_sampled, @@ -135,86 +103,34 @@ class Sampler(nn.Module): ) return sampled + def get_topk_logprobs( + self, + logits: torch.Tensor, + sampling_metadata: SamplingMetadata, + ) -> Tuple[torch.Tensor, torch.Tensor]: + logprobs = logits.log_softmax(dim=-1, dtype=torch.float32) + # FIXME: Mask the sampled token_id, get topk logprobs, + # and concatenate the topk with the sampled token_id. + topk_logprobs, topk_indices = torch.topk( + logprobs, sampling_metadata.max_num_logprobs, dim=-1) + # Use int32 to reduce the tensor size. + topk_indices = topk_indices.to(torch.int32) + return topk_logprobs, topk_indices -# TODO(woosuk): Optimize this with a custom kernel. -def _apply_top_k_top_p( - logits: torch.Tensor, - no_top_k: bool, - k: torch.Tensor, - no_top_p: bool, - p: torch.Tensor, -) -> torch.Tensor: - if no_top_k and no_top_p: + def apply_penalties( + self, + logits: torch.Tensor, + sampling_metadata: SamplingMetadata, + ) -> torch.Tensor: + apply_min_token_penalties(logits, sampling_metadata.output_token_ids, + sampling_metadata.stop_token_ids, + sampling_metadata.min_tokens) + if not sampling_metadata.no_penalties: + assert sampling_metadata.prompt_token_ids is not None + logits = apply_penalties(logits, + sampling_metadata.prompt_token_ids, + sampling_metadata.presence_penalties, + sampling_metadata.frequency_penalties, + sampling_metadata.repetition_penalties, + sampling_metadata.output_token_ids) return logits - logits_sort, logits_idx = logits.sort(dim=-1, descending=False) - - if not no_top_k: - # Apply top-k. - top_k_mask = logits_sort.size(1) - k.to(torch.long) - # Get all the top_k values. - top_k_mask = logits_sort.gather(1, top_k_mask.unsqueeze(dim=1)) - top_k_mask = logits_sort < top_k_mask - logits_sort.masked_fill_(top_k_mask, -float("inf")) - - if not no_top_p: - # Apply top-p. - probs_sort = logits_sort.softmax(dim=-1) - probs_sum = probs_sort.cumsum(dim=-1) - top_p_mask = probs_sum <= 1 - p.unsqueeze(dim=1) - # at least one - top_p_mask[:, -1] = False - logits_sort.masked_fill_(top_p_mask, -float("inf")) - - # Re-sort the probabilities. - logits = logits_sort.scatter(dim=-1, index=logits_idx, src=logits_sort) - return logits - - -def _apply_min_token_penalties(logits: torch.Tensor, - output_token_ids: List[List[int]], - stop_token_ids: List[Set[int]], - min_tokens: List[int]): - """ - Applies minimum token penalty by setting the logits of the stop tokens - to -inf. - """ - min_tokens_logits_to_penalize: List[Tuple[int, int]] = [] - for index, min_token in enumerate(min_tokens): - if (len(output_token_ids[index]) < min_token): - for stop_token_id in stop_token_ids[index]: - min_tokens_logits_to_penalize.append((index, stop_token_id)) - if min_tokens_logits_to_penalize: - logits[tuple(zip(*min_tokens_logits_to_penalize))] = -float("inf") - - -def _apply_penalties(logits: torch.Tensor, prompt_token_ids: torch.Tensor, - presence_penalties: torch.Tensor, - frequency_penalties: torch.Tensor, - repetition_penalties: torch.Tensor, - output_token_ids: List[List[int]]): - """ - Applies presence, frequency and repetition penalties to the logits. - """ - _, vocab_size = logits.shape - output_tokens_t = _convert_to_tensors(output_token_ids, vocab_size, - logits.device) - return apply_penalties(logits, prompt_token_ids, output_tokens_t, - presence_penalties, frequency_penalties, - repetition_penalties) - - -def _convert_to_tensors(output_token_ids: List[List[int]], vocab_size: int, - device: torch.device) -> torch.Tensor: - """ - Convert the different list data structures to tensors. - """ - output_tokens_tensor = make_tensor_with_pad( - output_token_ids, - # Use the value of vocab_size as a pad since we don't have a - # token_id of this value. - pad=vocab_size, - device="cpu", - dtype=torch.int64, - pin_memory=is_pin_memory_available(), - ) - return output_tokens_tensor.to(device, non_blocking=True) From 81b979f2a8f7ec91c262dac7dcbf30ed577ebafd Mon Sep 17 00:00:00 2001 From: Woosuk Kwon Date: Fri, 27 Dec 2024 09:47:10 +0900 Subject: [PATCH 8/9] [V1] Fix yapf (#11538) Signed-off-by: Woosuk Kwon --- vllm/v1/sample/ops/penalties.py | 24 +++++++++++++----------- vllm/v1/sample/sampler.py | 16 ++++++++-------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/vllm/v1/sample/ops/penalties.py b/vllm/v1/sample/ops/penalties.py index 91ebaf9269f32..2796d049457d0 100644 --- a/vllm/v1/sample/ops/penalties.py +++ b/vllm/v1/sample/ops/penalties.py @@ -2,8 +2,7 @@ from typing import List, Set, Tuple import torch -from vllm.model_executor.layers.utils import ( - apply_penalties as _apply_penalties) +from vllm.model_executor.layers.utils import apply_penalties from vllm.utils import is_pin_memory_available, make_tensor_with_pad @@ -17,27 +16,30 @@ def apply_min_token_penalties(logits: torch.Tensor, """ min_tokens_logits_to_penalize: List[Tuple[int, int]] = [] for index, min_token in enumerate(min_tokens): - if (len(output_token_ids[index]) < min_token): + if len(output_token_ids[index]) < min_token: for stop_token_id in stop_token_ids[index]: min_tokens_logits_to_penalize.append((index, stop_token_id)) if min_tokens_logits_to_penalize: logits[tuple(zip(*min_tokens_logits_to_penalize))] = -float("inf") -def apply_penalties(logits: torch.Tensor, prompt_token_ids: torch.Tensor, - presence_penalties: torch.Tensor, - frequency_penalties: torch.Tensor, - repetition_penalties: torch.Tensor, - output_token_ids: List[List[int]]) -> torch.Tensor: +def apply_all_penalties( + logits: torch.Tensor, + prompt_token_ids: torch.Tensor, + presence_penalties: torch.Tensor, + frequency_penalties: torch.Tensor, + repetition_penalties: torch.Tensor, + output_token_ids: List[List[int]], +) -> torch.Tensor: """ Applies presence, frequency and repetition penalties to the logits. """ _, vocab_size = logits.shape output_tokens_t = _convert_to_tensors(output_token_ids, vocab_size, logits.device) - return _apply_penalties(logits, prompt_token_ids, output_tokens_t, - presence_penalties, frequency_penalties, - repetition_penalties) + return apply_penalties(logits, prompt_token_ids, output_tokens_t, + presence_penalties, frequency_penalties, + repetition_penalties) def _convert_to_tensors(output_token_ids: List[List[int]], vocab_size: int, diff --git a/vllm/v1/sample/sampler.py b/vllm/v1/sample/sampler.py index 1e38453a0ff28..7cd42ca211a22 100644 --- a/vllm/v1/sample/sampler.py +++ b/vllm/v1/sample/sampler.py @@ -6,8 +6,8 @@ import torch.nn as nn from vllm.v1.outputs import SamplerOutput from vllm.v1.sample.metadata import SamplingMetadata -from vllm.v1.sample.ops.penalties import (apply_min_token_penalties, - apply_penalties) +from vllm.v1.sample.ops.penalties import (apply_all_penalties, + apply_min_token_penalties) from vllm.v1.sample.ops.topk_topp_sampler import TopKTopPSampler _SAMPLING_EPS = 1e-5 @@ -127,10 +127,10 @@ class Sampler(nn.Module): sampling_metadata.min_tokens) if not sampling_metadata.no_penalties: assert sampling_metadata.prompt_token_ids is not None - logits = apply_penalties(logits, - sampling_metadata.prompt_token_ids, - sampling_metadata.presence_penalties, - sampling_metadata.frequency_penalties, - sampling_metadata.repetition_penalties, - sampling_metadata.output_token_ids) + logits = apply_all_penalties( + logits, sampling_metadata.prompt_token_ids, + sampling_metadata.presence_penalties, + sampling_metadata.frequency_penalties, + sampling_metadata.repetition_penalties, + sampling_metadata.output_token_ids) return logits From 46d4359450cd194ab2a4f2fdc370ff4b33a188e2 Mon Sep 17 00:00:00 2001 From: Robert Shaw <114415538+robertgshaw2-neuralmagic@users.noreply.github.com> Date: Thu, 26 Dec 2024 21:49:16 -0500 Subject: [PATCH 9/9] [CI] Fix broken CI (#11543) --- tests/models/registry.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/models/registry.py b/tests/models/registry.py index 819ef957a07f3..f5a37420a2909 100644 --- a/tests/models/registry.py +++ b/tests/models/registry.py @@ -61,6 +61,8 @@ _TEXT_GENERATION_EXAMPLE_MODELS = { "DeepseekForCausalLM": _HfExamplesInfo("deepseek-ai/deepseek-llm-7b-chat"), "DeepseekV2ForCausalLM": _HfExamplesInfo("deepseek-ai/DeepSeek-V2-Lite-Chat", # noqa: E501 trust_remote_code=True), + "DeepseekV3ForCausalLM": _HfExamplesInfo("deepseek-ai/DeepSeek-V3", # noqa: E501 + trust_remote_code=True), "ExaoneForCausalLM": _HfExamplesInfo("LGAI-EXAONE/EXAONE-3.0-7.8B-Instruct"), # noqa: E501 "FalconForCausalLM": _HfExamplesInfo("tiiuae/falcon-7b"), "GemmaForCausalLM": _HfExamplesInfo("google/gemma-2b"),