Merge branch 'main' into woosuk/model-runner-v2

This commit is contained in:
Woosuk Kwon 2025-09-20 11:18:35 +00:00
commit 913b8e9569
69 changed files with 1466 additions and 459 deletions

View File

@ -0,0 +1,38 @@
#pragma once
#include <cuda_runtime_api.h>
#include <algorithm>
// maximum blocks per SM cap
#ifndef VLLM_LAUNCH_BLOCKS_CAP
#define VLLM_LAUNCH_BLOCKS_CAP 4
#endif
// compile-time estimate of max threads per SM for launch bounds.
#ifndef VLLM_MAX_THREADS_PER_SM
#if defined(__CUDA_ARCH__) && __CUDA_ARCH__ < 300
#define VLLM_MAX_THREADS_PER_SM 1536
#else
#define VLLM_MAX_THREADS_PER_SM 2048
#endif
#endif
// compute the number of blocks per SM to request in __launch_bounds__
#define VLLM_BLOCKS_DIV(VAL) (VLLM_MAX_THREADS_PER_SM / (VAL))
#define VLLM_CLAMP_BLOCKS_PER_SM(VAL) \
(((VAL) <= 0) \
? 1 \
: (((VAL) < VLLM_LAUNCH_BLOCKS_CAP) ? (VAL) : VLLM_LAUNCH_BLOCKS_CAP))
#define VLLM_BLOCKS_PER_SM(BLOCK_THREADS) \
VLLM_CLAMP_BLOCKS_PER_SM(VLLM_BLOCKS_DIV(BLOCK_THREADS))
// runtime-time helper to compute blocks/SM
static inline int vllm_runtime_blocks_per_sm(int block_threads) {
int device = -1;
cudaGetDevice(&device);
int max_threads_per_sm = VLLM_MAX_THREADS_PER_SM;
cudaDeviceGetAttribute(&max_threads_per_sm,
cudaDevAttrMaxThreadsPerMultiProcessor, device);
int blocks = (block_threads > 0) ? (max_threads_per_sm / block_threads) : 1;
return VLLM_CLAMP_BLOCKS_PER_SM(blocks);
}

View File

@ -26,6 +26,7 @@
#include "dispatch_utils.h"
#include "cuda_utils.h"
#include "launch_bounds_utils.h"
#include "nvfp4_utils.cuh"
namespace vllm {
@ -63,7 +64,7 @@ __inline__ __device__ PackedVec<Type> compute_silu_mul(PackedVec<Type>& vec,
// Use UE4M3 by default.
template <class Type, bool UE8M0_SF = false>
__global__ void __launch_bounds__(1024, 4)
__global__ void __launch_bounds__(1024, VLLM_BLOCKS_PER_SM(1024))
silu_mul_cvt_fp16_to_fp4(int32_t numRows, int32_t numCols, Type const* in,
float const* SFScale, uint32_t* out,
uint32_t* SFout) {
@ -131,7 +132,8 @@ void silu_and_mul_nvfp4_quant_sm1xxa(torch::Tensor& output, // [..., d]
const at::cuda::OptionalCUDAGuard device_guard(device_of(input));
auto stream = at::cuda::getCurrentCUDAStream(input.get_device());
dim3 block(std::min(int(n / ELTS_PER_THREAD), 1024));
int const numBlocksPerSM = 2048 / block.x;
int const numBlocksPerSM =
vllm_runtime_blocks_per_sm(static_cast<int>(block.x));
dim3 grid(std::min(int(m), multiProcessorCount * numBlocksPerSM));
VLLM_DISPATCH_HALF_TYPES(

View File

@ -26,12 +26,13 @@
#include "dispatch_utils.h"
#include "nvfp4_utils.cuh"
#include "launch_bounds_utils.h"
namespace vllm {
// Use UE4M3 by default.
template <class Type, bool UE8M0_SF = false, bool SMALL_NUM_EXPERTS = false>
__global__ void __launch_bounds__(512, 4)
__global__ void __launch_bounds__(512, VLLM_BLOCKS_PER_SM(512))
cvt_fp16_to_fp4(int32_t numRows, int32_t numCols, Type const* in,
float const* SFScale, uint32_t* out, uint32_t* SFout,
uint32_t* input_offset_by_experts,
@ -129,7 +130,7 @@ __global__ void __launch_bounds__(512, 4)
// Kernel for LARGE_M_TOPK = true (large m_topk optimized version)
template <class Type, bool UE8M0_SF = false, bool SMALL_NUM_EXPERTS = false>
__global__ void __launch_bounds__(1024, 4)
__global__ void __launch_bounds__(1024, VLLM_BLOCKS_PER_SM(1024))
cvt_fp16_to_fp4(int32_t numRows, int32_t numCols, Type const* in,
float const* SFScale, uint32_t* out, uint32_t* SFout,
uint32_t* input_offset_by_experts,
@ -233,8 +234,9 @@ void quant_impl(void* output, void* output_scale, void* input,
int const workSizePerRow = k / ELTS_PER_THREAD;
int const totalWorkSize = m_topk * workSizePerRow;
dim3 block(std::min(workSizePerRow, 512));
// Get number of blocks per SM (assume we can fully utilize the SM).
int const numBlocksPerSM = 2048 / block.x;
// Get number of blocks per SM
int const numBlocksPerSM =
vllm_runtime_blocks_per_sm(static_cast<int>(block.x));
dim3 grid(std::min(static_cast<int>((totalWorkSize + block.x - 1) / block.x),
multiProcessorCount * numBlocksPerSM));
while (grid.x <= multiProcessorCount && block.x > 64) {

View File

@ -26,13 +26,14 @@
#include "dispatch_utils.h"
#include "cuda_utils.h"
#include "launch_bounds_utils.h"
#include "nvfp4_utils.cuh"
namespace vllm {
// Use UE4M3 by default.
template <class Type, bool UE8M0_SF = false>
__global__ void __launch_bounds__(512, 4)
__global__ void __launch_bounds__(512, VLLM_BLOCKS_PER_SM(512))
cvt_fp16_to_fp4(int32_t numRows, int32_t numCols, Type const* in,
float const* SFScale, uint32_t* out, uint32_t* SFout) {
using PackedVec = PackedVec<Type>;
@ -75,8 +76,9 @@ void invokeFP4Quantization(int m, int n, T const* input, float const* SFScale,
// Grid, Block size.
// Each thread converts 8 values.
dim3 block(std::min(int(n / ELTS_PER_THREAD), 512));
// Get number of blocks per SM (assume we can fully utilize the SM).
int const numBlocksPerSM = 2048 / block.x;
// Get number of blocks per SM
int const numBlocksPerSM =
vllm_runtime_blocks_per_sm(static_cast<int>(block.x));
dim3 grid(std::min(int(m), multiProcessorCount * numBlocksPerSM));
// Launch the cvt kernel.

View File

@ -36,22 +36,23 @@ th:not(:first-child) {
}
</style>
| Feature | [CP][chunked-prefill] | [APC](automatic_prefix_caching.md) | [LoRA](lora.md) | [SD](spec_decode.md) | CUDA graph | [pooling](../models/pooling_models.md) | <abbr title="Encoder-Decoder Models">enc-dec</abbr> | <abbr title="Logprobs">logP</abbr> | <abbr title="Prompt Logprobs">prmpt logP</abbr> | <abbr title="Async Output Processing">async output</abbr> | multi-step | <abbr title="Multimodal Inputs">mm</abbr> | best-of | beam-search |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| [CP][chunked-prefill] | ✅ | | | | | | | | | | | | | |
| [APC](automatic_prefix_caching.md) | ✅ | ✅ | | | | | | | | | | | | |
| [LoRA](lora.md) | ✅ | ✅ | ✅ | | | | | | | | | | | |
| [SD](spec_decode.md) | ✅ | ✅ | ❌ | ✅ | | | | | | | | | | |
| CUDA graph | ✅ | ✅ | ✅ | ✅ | ✅ | | | | | | | | | |
| [pooling](../models/pooling_models.md) | 🟠\* | 🟠\* | ✅ | ❌ | ✅ | ✅ | | | | | | | | |
| <abbr title="Encoder-Decoder Models">enc-dec</abbr> | ❌ | [](gh-issue:7366) | ❌ | [](gh-issue:7366) | ✅ | ✅ | ✅ | | | | | | | |
| <abbr title="Logprobs">logP</abbr> | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ✅ | ✅ | | | | | | |
| <abbr title="Prompt Logprobs">prmpt logP</abbr> | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ | | | | | |
| <abbr title="Async Output Processing">async output</abbr> | ✅ | ✅ | ✅ | ❌ | ✅ | ❌ | ❌ | ✅ | ✅ | ✅ | | | | |
| multi-step | ❌ | ✅ | ❌ | ❌ | ✅ | ❌ | ❌ | ✅ | ✅ | ✅ | ✅ | | | |
| [mm](multimodal_inputs.md) | ✅ | ✅ | [🟠](gh-pr:4194)<sup>^</sup> | ❔ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ❔ | ✅ | | |
| best-of | ✅ | ✅ | ✅ | [](gh-issue:6137) | ✅ | ❌ | ✅ | ✅ | ✅ | ❔ | [](gh-issue:7968) | ✅ | ✅ | |
| beam-search | ✅ | ✅ | ✅ | [](gh-issue:6137) | ✅ | ❌ | ✅ | ✅ | ✅ | ❔ | [](gh-issue:7968) | ❔ | ✅ | ✅ |
| Feature | [CP][chunked-prefill] | [APC](automatic_prefix_caching.md) | [LoRA](lora.md) | [SD](spec_decode.md) | CUDA graph | [pooling](../models/pooling_models.md) | <abbr title="Encoder-Decoder Models">enc-dec</abbr> | <abbr title="Logprobs">logP</abbr> | <abbr title="Prompt Logprobs">prmpt logP</abbr> | <abbr title="Async Output Processing">async output</abbr> | multi-step | <abbr title="Multimodal Inputs">mm</abbr> | best-of | beam-search | [prompt-embeds](prompt_embeds.md) |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| [CP][chunked-prefill] | ✅ | | | | | | | | | | | | | | |
| [APC](automatic_prefix_caching.md) | ✅ | ✅ | | | | | | | | | | | | | |
| [LoRA](lora.md) | ✅ | ✅ | ✅ | | | | | | | | | | | | |
| [SD](spec_decode.md) | ✅ | ✅ | ❌ | ✅ | | | | | | | | | | | |
| CUDA graph | ✅ | ✅ | ✅ | ✅ | ✅ | | | | | | | | | | |
| [pooling](../models/pooling_models.md) | 🟠\* | 🟠\* | ✅ | ❌ | ✅ | ✅ | | | | | | | | | |
| <abbr title="Encoder-Decoder Models">enc-dec</abbr> | ❌ | [](gh-issue:7366) | ❌ | [](gh-issue:7366) | ✅ | ✅ | ✅ | | | | | | | | |
| <abbr title="Logprobs">logP</abbr> | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ✅ | ✅ | | | | | | | |
| <abbr title="Prompt Logprobs">prmpt logP</abbr> | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ | | | | | | |
| <abbr title="Async Output Processing">async output</abbr> | ✅ | ✅ | ✅ | ❌ | ✅ | ❌ | ❌ | ✅ | ✅ | ✅ | | | | | |
| multi-step | ❌ | ✅ | ❌ | ❌ | ✅ | ❌ | ❌ | ✅ | ✅ | ✅ | ✅ | | | | |
| [mm](multimodal_inputs.md) | ✅ | ✅ | [🟠](gh-pr:4194)<sup>^</sup> | ❔ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ❔ | ✅ | | | |
| best-of | ✅ | ✅ | ✅ | [](gh-issue:6137) | ✅ | ❌ | ✅ | ✅ | ✅ | ❔ | [](gh-issue:7968) | ✅ | ✅ | | |
| beam-search | ✅ | ✅ | ✅ | [](gh-issue:6137) | ✅ | ❌ | ✅ | ✅ | ✅ | ❔ | [](gh-issue:7968) | ❔ | ✅ | ✅ | |
| [prompt-embeds](prompt_embeds.md) | ✅ | [](gh-issue:25096) | ? | ❌ | ✅ | ❌ | ❌ | ✅ | ❌ | ? | ? | ❌ | ? | ? | ✅ |
\* Chunked prefill and prefix caching are only applicable to last-token pooling.
<sup>^</sup> LoRA is only applicable to the language backbone of multimodal models.
@ -76,3 +77,4 @@ th:not(:first-child) {
| multi-step | ✅ | ✅ | ✅ | ✅ | ✅ | [](gh-issue:8477) | ✅ | ❌ |
| best-of | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ |
| beam-search | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ |
| [prompt-embeds](prompt_embeds.md) | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ? | [](gh-issue:25097) |

View File

@ -6,9 +6,6 @@ This page teaches you how to pass prompt embedding inputs to vLLM.
The traditional flow of text data for a Large Language Model goes from text to token ids (via a tokenizer) then from token ids to prompt embeddings. For a traditional decoder-only model (such as meta-llama/Llama-3.1-8B-Instruct), this step of converting token ids to prompt embeddings happens via a look-up from a learned embedding matrix, but the model is not limited to processing only the embeddings corresponding to its token vocabulary.
!!! note
Prompt embeddings are currently only supported in the v0 engine.
## Offline Inference
To input multi-modal data, follow this schema in [vllm.inputs.EmbedsPrompt][]:

View File

@ -15,6 +15,7 @@ from vllm.config import (CompilationConfig, CompilationLevel, CUDAGraphMode,
VllmConfig, set_current_vllm_config)
from vllm.envs import VLLM_USE_V1
from vllm.forward_context import BatchDescriptor, set_forward_context
from vllm.utils import is_torch_equal_or_newer
# This import automatically registers `torch.ops.silly.attention`
from ..silly_attention import get_global_counter, reset_global_counter
@ -50,16 +51,21 @@ class SillyModel(nn.Module):
return x
@pytest.mark.parametrize("use_inductor", [True, False])
@torch.inference_mode()
def test_simple_piecewise_compile(use_inductor):
assert VLLM_USE_V1
def _run_simple_model(
splitting_ops,
use_inductor_graph_partition,
use_inductor,
expected_num_piecewise_graphs_seen,
expected_num_piecewise_capturable_graphs_seen,
expected_num_backend_compilations,
expected_num_cudagraph_captured,
):
vllm_config = VllmConfig(compilation_config=CompilationConfig(
level=CompilationLevel.PIECEWISE,
use_cudagraph=True,
use_inductor=use_inductor,
splitting_ops=["silly.attention"],
splitting_ops=splitting_ops,
use_inductor_graph_partition=use_inductor_graph_partition,
cudagraph_copy_inputs=True,
cudagraph_capture_sizes=[1, 2],
))
@ -70,11 +76,11 @@ def test_simple_piecewise_compile(use_inductor):
with compilation_counter.expect(
num_graphs_seen=1, # one graph for the model
num_piecewise_graphs_seen=5, # 2 * num_layers + 1
num_piecewise_capturable_graphs_seen=3, # 1 + num_layers
num_backend_compilations=3, # num_piecewise_capturable_graphs_seen
num_cudagraph_captured=
6, # num_cudagraph_sizes * num_piecewise_capturable_graphs_seen
num_piecewise_graphs_seen=expected_num_piecewise_graphs_seen,
num_piecewise_capturable_graphs_seen=
expected_num_piecewise_capturable_graphs_seen,
num_backend_compilations=expected_num_backend_compilations,
num_cudagraph_captured=expected_num_cudagraph_captured,
), set_forward_context(None,
vllm_config=vllm_config): # background context
# warm up with background context
@ -104,3 +110,46 @@ def test_simple_piecewise_compile(use_inductor):
output = model(input)
assert get_global_counter() == 2
assert torch.allclose(output.cpu(), torch.tensor([19.0, 19.0]))
@pytest.mark.parametrize("use_inductor", [True, False])
@torch.inference_mode()
def test_simple_piecewise_compile(use_inductor):
assert VLLM_USE_V1
_run_simple_model(
splitting_ops=["silly.attention"],
use_inductor_graph_partition=False,
use_inductor=use_inductor,
expected_num_piecewise_graphs_seen=5, # 2 * num_layers + 1
expected_num_piecewise_capturable_graphs_seen=3, # 1 + num_layers
expected_num_backend_compilations=
3, # num_piecewise_capturable_graphs_seen
expected_num_cudagraph_captured=
6, # num_cudagraph_sizes * num_piecewise_capturable_graphs_seen
)
@torch.inference_mode()
@pytest.mark.parametrize("splitting_ops", [["silly.attention"], []])
def test_simple_inductor_graph_partition(splitting_ops):
assert VLLM_USE_V1
if not is_torch_equal_or_newer("2.9.0.dev"):
pytest.skip("inductor graph partition is only available "
"in PyTorch 2.9+")
_run_simple_model(
# inductor graph partition automatically resets splitting_ops
# to be an empty list
splitting_ops=splitting_ops,
use_inductor_graph_partition=True,
use_inductor=True,
expected_num_piecewise_graphs_seen=
1, # since not splitting at fx graph level
expected_num_piecewise_capturable_graphs_seen=
1, # since not splitting at fx graph level
expected_num_backend_compilations=
1, # since not splitting at fx graph level
expected_num_cudagraph_captured=
6, # inductor graph partition still captures 6
# graph, same as fx graph partition.
)

View File

@ -60,4 +60,5 @@ direct_register_custom_op(
mutates_args=["out"],
fake_impl=silly_attention_fake,
target_lib=silly_lib,
tags=(torch._C.Tag.cudagraph_unsafe, ),
)

View File

@ -20,7 +20,6 @@ class TestSetting:
tp_size: int
attn_backend: str
method: str
fullgraph: bool
# we cannot afford testing the full Cartesian product
@ -36,7 +35,6 @@ class TestSetting:
tp_size=2,
attn_backend="FLASH_ATTN",
method="generate",
fullgraph=True,
),
# llama model with quantization
TestSetting(
@ -46,7 +44,6 @@ class TestSetting:
tp_size=1,
attn_backend="FLASH_ATTN",
method="generate",
fullgraph=True,
),
# MoE model
TestSetting(
@ -56,7 +53,6 @@ class TestSetting:
tp_size=2,
attn_backend="FLASH_ATTN",
method="generate",
fullgraph=True,
),
# embedding model
TestSetting(
@ -73,7 +69,6 @@ class TestSetting:
tp_size=1,
attn_backend="FLASH_ATTN",
method="encode",
fullgraph=True,
),
TestSetting(
model="BAAI/bge-base-en-v1.5",
@ -82,7 +77,6 @@ class TestSetting:
tp_size=1,
attn_backend="FLASH_ATTN",
method="encode",
fullgraph=True,
),
# vision language model
TestSetting(
@ -92,7 +86,6 @@ class TestSetting:
tp_size=1,
attn_backend="FLASH_ATTN",
method="generate_with_image",
fullgraph=False,
),
],
)
@ -109,9 +102,8 @@ def test_compile_correctness(
tp_size = test_setting.tp_size
attn_backend = test_setting.attn_backend
method = test_setting.method
fullgraph = test_setting.fullgraph
if cuda_device_count_stateless() != pp_size * tp_size:
pytest.skip(f"Need exactly {pp_size}*{tp_size} CUDA gpus but got "
if cuda_device_count_stateless() < pp_size * tp_size:
pytest.skip(f"Need at least {pp_size}*{tp_size} CUDA gpus but got "
f"{cuda_device_count_stateless()}")
with monkeypatch.context() as m:
@ -149,9 +141,5 @@ def test_compile_correctness(
]:
all_args.append(final_args + [f"-O{level}"])
all_envs.append({})
if level != CompilationLevel.DYNAMO_ONCE and not fullgraph:
# "DYNAMO_ONCE" will always use fullgraph
all_envs[-1][
"VLLM_TEST_DYNAMO_FULLGRAPH_CAPTURE"] = "0" # type: ignore
compare_all_settings(model, all_args * 3, all_envs, method=method)

View File

@ -3,6 +3,7 @@
from __future__ import annotations
import logging
import tempfile
from typing import Any, Optional, Union
@ -10,9 +11,13 @@ import pytest
import torch
from tests.quantization.utils import is_quant_method_supported
from tests.v1.attention.utils import _Backend
from vllm import LLM, SamplingParams
from vllm.config import CompilationConfig, CompilationLevel, PassConfig
from vllm.attention.selector import global_force_attn_backend_context_manager
from vllm.config import (CompilationConfig, CompilationLevel, CUDAGraphMode,
PassConfig)
from vllm.platforms import current_platform
from vllm.utils import is_torch_equal_or_newer
from ..utils import create_new_process_for_each_test
@ -79,9 +84,7 @@ def test_full_graph(
):
model, model_kwargs = model_info
with monkeypatch.context() as m:
# make sure these models can be captured in full graph mode
m.setenv("VLLM_TEST_DYNAMO_FULLGRAPH_CAPTURE", "1")
with monkeypatch.context():
print(f"MODEL={model}")
run_model(optimization_level, model, model_kwargs)
@ -107,6 +110,18 @@ def test_full_graph(
(CompilationConfig(level=CompilationLevel.PIECEWISE,
debug_dump_path=tempfile.gettempdir()),
("facebook/opt-125m", {})),
] + [
# graph inductor partition
(
CompilationConfig(
level=CompilationLevel.PIECEWISE,
# inductor graph partition uses
# torch._C.Tag.cudagraph_unsafe to specify splitting ops
use_inductor_graph_partition=True,
cudagraph_mode=CUDAGraphMode.PIECEWISE,
compile_sizes=[1, 2]),
model) for model in models_list(all=False)
if is_torch_equal_or_newer("2.9.0.dev")
])
# only test some of the models
@create_new_process_for_each_test()
@ -114,11 +129,51 @@ def test_custom_compile_config(
compilation_config: CompilationConfig,
model_info: tuple[str, dict[str, Any]],
):
if (compilation_config.use_inductor_graph_partition
and not is_torch_equal_or_newer("2.9.0.dev")):
pytest.skip("inductor graph partition is only available "
"in PyTorch 2.9+")
model, model_kwargs = model_info
print(f"MODEL={model}")
run_model(compilation_config, model, model_kwargs)
def test_inductor_graph_partition_attn_fusion(caplog_vllm):
if not is_torch_equal_or_newer("2.9.0.dev"):
pytest.skip("inductor graph partition is only available "
"in PyTorch 2.9+")
model = "nvidia/Llama-4-Scout-17B-16E-Instruct-FP8"
compilation_config = CompilationConfig(
level=CompilationLevel.PIECEWISE,
use_inductor_graph_partition=True,
cudagraph_mode=CUDAGraphMode.PIECEWISE,
custom_ops=["+quant_fp8"],
pass_config=PassConfig(enable_attn_fusion=True, enable_noop=True),
)
model_kwargs = {
"kv_cache_dtype": "fp8",
"max_model_len": 1024,
}
with caplog_vllm.at_level(
logging.DEBUG), global_force_attn_backend_context_manager(
_Backend.FLASHINFER):
run_model(compilation_config, model, model_kwargs)
try:
assert ("Fused quantization onto 48 attention nodes"
in caplog_vllm.text), caplog_vllm.text
except AssertionError:
# Note: this message is only triggered when the compilation goes
# through the custom pass. Due to multiple layers of cache on
# PyTorch side, the compilation of a graph may be cached such
# that custom pass directly goes through cache. In this case,
# we go through this branch and assert that the pass is not
# triggered.
assert "Fused quantization" not in caplog_vllm.text
def run_model(compile_config: Union[int, CompilationConfig], model: str,
model_kwargs: dict[str, Any]):
prompts = [

View File

@ -27,6 +27,7 @@ from vllm.model_executor.layers.quantization.utils.quant_utils import (
from vllm.model_executor.layers.quantization.utils.w8a8_utils import (
Fp8LinearOp)
from vllm.platforms import current_platform
from vllm.utils import is_torch_equal_or_newer
from vllm.v1.kv_cache_interface import AttentionSpec
FP8_DTYPE = current_platform.fp8_dtype()
@ -339,6 +340,10 @@ else:
@pytest.mark.parametrize(
"split_attention",
[False, True] if current_platform.is_rocm() else [False])
# TODO(boyuan): test inductor graph partition on rocm
@pytest.mark.parametrize(
"use_inductor_graph_partition",
[False] if current_platform.is_rocm() else [False, True])
@pytest.mark.skipif(not current_platform.is_cuda_alike(),
reason="Only test ROCm or CUDA")
@pytest.mark.skipif(not current_platform.supports_fp8(), reason="Need FP8")
@ -352,9 +357,15 @@ def test_attention_quant_pattern(num_qo_heads: int, num_kv_heads: int,
dtype: torch.dtype, model_name: str,
model_class: type[AttentionQuantPatternModel],
backend: _Backend, split_attention: bool,
monkeypatch, dist_init):
use_inductor_graph_partition: bool,
monkeypatch, dist_init, caplog_vllm):
"""Test AttentionStaticQuantPattern fusion pass"""
if use_inductor_graph_partition and not is_torch_equal_or_newer(
"2.9.0.dev"):
pytest.skip("inductor graph partition is only available "
"in PyTorch 2.9+")
monkeypatch.setenv("VLLM_USE_V1", "1")
if split_attention:
monkeypatch.setenv("VLLM_V1_USE_PREFILL_DECODE_ATTENTION", "1")
@ -372,6 +383,7 @@ def test_attention_quant_pattern(num_qo_heads: int, num_kv_heads: int,
compilation_config=CompilationConfig(
level=CompilationLevel.PIECEWISE,
custom_ops=["+quant_fp8"],
use_inductor_graph_partition=use_inductor_graph_partition,
),
cache_config=CacheConfig(cache_dtype="fp8"))
@ -444,6 +456,7 @@ def test_attention_quant_pattern(num_qo_heads: int, num_kv_heads: int,
backend=test_backend,
fullgraph=True)
assert model_compiled.attn._o_scale_float is None
result_fused_1 = model_compiled(q, k, v)
if backend == _Backend.FLASHINFER:
@ -453,6 +466,7 @@ def test_attention_quant_pattern(num_qo_heads: int, num_kv_heads: int,
# _o_scale_float
assert model_compiled.attn._o_scale_float is not None
result_fused_2 = model_compiled(q, k, v)
assert model_compiled.attn._o_scale_float is not None
torch.testing.assert_close(result_unfused,

View File

@ -987,17 +987,7 @@ class VllmRunner:
return [req_output.outputs.score for req_output in req_outputs]
def apply_model(self, func: Callable[[nn.Module], _R]) -> list[_R]:
if hasattr(self.llm.llm_engine, "model_executor"):
# This works either in V0 or in V1 with
# VLLM_ENABLE_V1_MULTIPROCESSING=0
executor = self.llm.llm_engine.model_executor
return executor.apply_model(func)
# This works in V1 with VLLM_ALLOW_INSECURE_SERIALIZATION=1
def _apply_model(self):
return func(self.get_model())
return self.llm.llm_engine.collective_rpc(_apply_model)
return self.llm.apply_model(func)
def get_llm(self) -> LLM:
return self.llm

View File

@ -1,6 +1,7 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import datetime
from typing import Union
import openai # use the official client for correctness check
@ -284,3 +285,62 @@ async def test_tool_id_kimi_k2(k2_client: openai.AsyncOpenAI, model_name: str,
output.extend(chunk.choices[0].delta.tool_calls)
for o in output:
assert o.id is None or o.id == 'functions.get_current_weather:0'
@pytest.mark.asyncio
@pytest.mark.parametrize("model_name", [MODEL_NAME])
@pytest.mark.parametrize("arguments", ["{}", ''])
async def test_no_args_tool_call(client: openai.AsyncOpenAI, model_name: str,
arguments: str):
# Step 1: Define a tool that requires no parameters
tools = [{
"type": "function",
"function": {
"name": "get_current_time",
"description":
"Get the current date and time. No parameters needed.",
"parameters": {
"type": "object",
"properties": {}, # No parameters
"required": [] # No required fields
}
}
}]
messages = [{"role": "user", "content": "What time is it now?"}]
# Step 2: Send user message and let model decide whether to call the tool
response = await client.chat.completions.create(
model=model_name,
messages=messages,
tools=tools,
tool_choice="auto" # Let model choose automatically
)
# Step 3: Check if model wants to call a tool
message = response.choices[0].message
if message.tool_calls:
# Get the first tool call
tool_call = message.tool_calls[0]
tool_name = tool_call.function.name
# Step 4: Execute the tool locally (no parameters)
if tool_name == "get_current_time":
# Test both empty string and "{}" for no-arg tool calls
tool_call.function.arguments = arguments
messages.append(message)
current_time = datetime.datetime.now()
result = current_time.isoformat()
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result,
})
# Step 5: Send tool result back to model to continue conversation
final_response = await client.chat.completions.create(
model=model_name,
messages=messages,
)
# Output final natural language response
assert final_response.choices[0].message.content is not None
else:
# No tool called — just print model's direct reply
assert message.content is not None

View File

@ -14,9 +14,6 @@ from transformers import AutoConfig
from ...utils import RemoteOpenAIServer
pytest.skip("Skipping prompt_embeds test until V1 supports it.",
allow_module_level=True)
# any model with a chat template should work here
MODEL_NAME = "HuggingFaceH4/zephyr-7b-beta"

View File

@ -744,3 +744,18 @@ async def test_function_calling_full_history(client: OpenAI, model_name: str):
assert response_2 is not None
assert response_2.status == "completed"
assert response_2.output_text is not None
@pytest.mark.asyncio
@pytest.mark.parametrize("model_name", [MODEL_NAME])
async def test_output_messages_enabled(client: OpenAI, model_name: str,
server):
response = await client.responses.create(
model=model_name,
input="What is the capital of South Korea?",
extra_body={"enable_response_messages": True})
assert response is not None
assert response.status == "completed"
assert len(response.input_messages) > 0
assert len(response.output_messages) > 0

View File

@ -11,7 +11,8 @@ import pytest
import torch
import vllm.model_executor.layers.fused_moe.modular_kernel as mk
from vllm.config import VllmConfig, current_platform, set_current_vllm_config
from vllm.config import VllmConfig, set_current_vllm_config
from vllm.platforms import current_platform
from vllm.utils import has_deep_ep, has_deep_gemm, has_pplx
from vllm.utils.flashinfer import has_flashinfer_cutlass_fused_moe

View File

@ -1,21 +1,24 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import importlib
import importlib.metadata
from dataclasses import dataclass
from importlib.util import find_spec
from typing import Optional
import pytest
import torch
from packaging import version
from vllm.model_executor.layers.quantization.quark.quark import ( # noqa: E501
QuarkLinearMethod, QuarkW4A4MXFP4)
from vllm.model_executor.layers.quantization.quark.quark_moe import ( # noqa: E501
QuarkW4A4MXFp4MoEMethod)
from vllm.platforms import current_platform
from vllm.utils.flashinfer import has_flashinfer
QUARK_MXFP4_AVAILABLE = importlib.util.find_spec(
"quark") is not None and version.parse(
importlib.metadata.version("amd-quark")) >= version.parse('0.8.99')
QUARK_MXFP4_AVAILABLE = find_spec("quark") is not None and version.parse(
importlib.metadata.version("amd-quark")) >= version.parse('0.8.99')
TRTLLM_GEN_MXFP4_AVAILABLE = current_platform.is_cuda(
) and current_platform.is_device_capability(100)
@ -39,6 +42,12 @@ class ModelCase:
tp: int
@pytest.fixture(scope="function", autouse=True)
def enable_pickle(monkeypatch):
"""`LLM.apply_model` requires pickling a function."""
monkeypatch.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1")
@pytest.mark.parametrize('model_case', [
ModelCase("fxmarty/qwen_1.5-moe-a2.7b-mxfp4", tp=1),
ModelCase("fxmarty/deepseek_r1_3_layers_mxfp4", tp=8),
@ -55,21 +64,19 @@ def test_mxfp4_loading_and_execution_moe(vllm_runner, model_case: ModelCase):
tensor_parallel_size=model_case.tp,
load_format="dummy") as llm:
# TODO: llm.apply_model(check_model) currently relies on V0 internals.
# Re-enable this later.
# def check_model(model):
# layer = model.model.layers[0]
def check_model(model):
layer = model.model.layers[0]
# qkv_proj = layer.self_attn.qkv_proj
qkv_proj = layer.self_attn.qkv_proj
# assert isinstance(qkv_proj.quant_method, QuarkLinearMethod)
# assert isinstance(qkv_proj.scheme, QuarkW4A4MXFP4)
assert isinstance(qkv_proj.quant_method, QuarkLinearMethod)
assert isinstance(qkv_proj.scheme, QuarkW4A4MXFP4)
# assert isinstance(layer.mlp.experts.quant_method,
# QuarkW4A4MXFp4MoEMethod)
assert isinstance(layer.mlp.experts.quant_method,
QuarkW4A4MXFp4MoEMethod)
# if model_case.model_id == "fxmarty/qwen_1.5-moe-a2.7b-mxfp4":
# llm.apply_model(check_model)
if model_case.model_id == "fxmarty/qwen_1.5-moe-a2.7b-mxfp4":
llm.apply_model(check_model)
output = llm.generate_greedy("Today I am in the French Alps and",
max_tokens=20)

View File

@ -10,6 +10,7 @@ from PIL import Image
from vllm.multimodal.image import rescale_image_size
from vllm.multimodal.video import rescale_video_size, sample_frames_from_video
from vllm.utils import set_default_torch_num_threads
from ....conftest import (IMAGE_ASSETS, VIDEO_ASSETS, PromptImageInput,
PromptVideoInput, VllmRunner)
@ -17,11 +18,9 @@ from ...utils import check_logprobs_close
@pytest.fixture(scope="function", autouse=True)
def use_v0_only(monkeypatch):
"""
V1 Test: batch_make_xxxxx_embeddings calls a V0 internal
"""
monkeypatch.setenv('VLLM_USE_V1', '0')
def enable_pickle(monkeypatch):
"""`LLM.apply_model` requires pickling a function."""
monkeypatch.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1")
models = ["Qwen/Qwen2-VL-2B-Instruct"]
@ -126,9 +125,8 @@ def batch_make_image_embeddings(
image_grid_thw_on_device = image_grid_thw.to(visual.device,
dtype=torch.int64)
return visual(pixel_values_on_device,
grid_thw=image_grid_thw_on_device)
grid_thw=image_grid_thw_on_device).cpu()
# V1 Test: this calls a V0 internal.
image_embeds = torch.concat(llm.apply_model(get_image_embeds))
# split into original batches
@ -210,7 +208,7 @@ def batch_make_video_embeddings(
video_grid_thw_on_device = video_grid_thw.to(visual.device,
dtype=torch.int64)
return visual(pixel_values_on_device,
grid_thw=video_grid_thw_on_device)
grid_thw=video_grid_thw_on_device).cpu()
# V1 Test: this calls a V0 internal.
video_embeds = torch.concat(llm.apply_model(get_image_embeds))
@ -266,19 +264,22 @@ def run_embedding_input_test(
processor = AutoProcessor.from_pretrained(model)
# max_model_len should be greater than image_feature_size
with vllm_runner(model,
runner="generate",
max_model_len=4000,
max_num_seqs=3,
dtype=dtype,
limit_mm_per_prompt={
"image": mm_limit,
"video": mm_limit
},
tensor_parallel_size=tensor_parallel_size,
distributed_executor_backend=distributed_executor_backend
) as vllm_model:
with set_default_torch_num_threads(1):
vllm_model = vllm_runner(
model,
runner="generate",
max_model_len=4000,
max_num_seqs=3,
dtype=dtype,
limit_mm_per_prompt={
"image": mm_limit,
"video": mm_limit
},
tensor_parallel_size=tensor_parallel_size,
distributed_executor_backend=distributed_executor_backend,
)
with vllm_model:
outputs_per_case_for_original_input = [
vllm_model.generate_greedy_logprobs(prompts,
max_tokens,
@ -329,9 +330,8 @@ def run_embedding_input_test(
@pytest.mark.parametrize("max_tokens", [128])
@pytest.mark.parametrize("num_logprobs", [10])
def test_qwen2_vl_image_embeddings_input(vllm_runner, image_assets, model,
size_factors, dtype: str,
max_tokens: int,
num_logprobs: int) -> None:
size_factors, dtype, max_tokens,
num_logprobs, monkeypatch) -> None:
images = [asset.pil_image for asset in image_assets]
inputs_per_case: list[tuple[

View File

@ -112,7 +112,7 @@ def test_awq_models(vllm_runner, image_assets, source_model, quant_model,
monkeypatch) -> None:
# Test V1: this test hangs during setup on single-scale input.
# TODO: fixure out why and re-enable this on V1.
# TODO: figure out why and re-enable this on V1.
monkeypatch.setenv("VLLM_USE_V1", "0")
run_awq_test(
vllm_runner,

View File

@ -43,12 +43,9 @@ ROCM_TRITON_SCALED_MM_SUPPORTED_INT8_MODEL = [
@pytest.fixture(scope="function", autouse=True)
def use_v0_only(monkeypatch):
"""
This module relies on V0 internals, so set VLLM_USE_V1=0.
"""
if not current_platform.is_cpu():
monkeypatch.setenv('VLLM_USE_V1', '0')
def enable_pickle(monkeypatch):
"""`LLM.apply_model` requires pickling a function."""
monkeypatch.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1")
@pytest.mark.parametrize(
@ -176,10 +173,11 @@ def test_compressed_tensors_w8a8_logprobs(
dtype = "bfloat16"
# skip language translation prompt for the static per tensor asym model
if (model_path ==
"nm-testing/Meta-Llama-3-8B-Instruct-W8A8-Static-Per-Tensor-Asym"
): # noqa: E501
# skip language translation prompt for the static per tensor models
if model_path in (
"nm-testing/Meta-Llama-3-8B-Instruct-W8A8-Static-Per-Tensor-Sym",
"nm-testing/Meta-Llama-3-8B-Instruct-W8A8-Static-Per-Tensor-Asym",
):
example_prompts = example_prompts[0:-1]
with hf_runner(model_path, dtype=dtype) as hf_model:

View File

@ -60,8 +60,8 @@ def test_kv_cache_model_load_and_run(vllm_runner, model_id: str,
if use_rocm_aiter:
monkeypatch.setenv("VLLM_ROCM_USE_AITER", "1")
# vllm_runner.apply_model() relies on V0 internals.
monkeypatch.setenv("VLLM_USE_V1", "0")
# `LLM.apply_model` requires pickling a function.
monkeypatch.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1")
with vllm_runner(model_id, kv_cache_dtype="fp8") as llm:
def check_model(model):
@ -104,8 +104,8 @@ def test_load_fp16_model(vllm_runner, kv_cache_dtype: str, force_marlin: bool,
if use_rocm_aiter:
monkeypatch.setenv("VLLM_ROCM_USE_AITER", "1")
# vllm_runner.apply_model() relies on V0 internals.
monkeypatch.setenv("VLLM_USE_V1", "0")
# `LLM.apply_model` requires pickling a function.
monkeypatch.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1")
if force_marlin:
monkeypatch.setenv("VLLM_TEST_FORCE_FP8_MARLIN", "1")

View File

@ -31,41 +31,46 @@ MODEL_QUANT = [
@pytest.mark.parametrize("model_id, use_marlin_kernel", MODEL_QUANT)
def test_gptq_with_dynamic(vllm_runner, model_id: str, use_marlin_kernel: bool,
monkeypatch):
# vllm_runner.apply_model() relies on V0 internals.
monkeypatch.setenv("VLLM_USE_V1", "0")
vllm_model = vllm_runner(model_id, dtype=torch.float16, max_model_len=2048)
# `LLM.apply_model` requires pickling a function.
monkeypatch.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1")
linear_method_cls = GPTQMarlinLinearMethod if use_marlin_kernel else (
GPTQLinearMethod)
for name, submodule in (vllm_model.llm.llm_engine.model_executor.
driver_worker.model_runner.model.named_modules()):
if name == "lm_head":
assert isinstance(submodule.quant_method, linear_method_cls)
elif name == 'model.layers.0.self_attn.qkv_proj':
# The first layer is quantized using bits=4, group_size=128
# desc_act=True
assert isinstance(submodule.quant_method, linear_method_cls)
config = submodule.quant_method.quant_config
assert config.weight_bits == 4
assert config.group_size == 128
assert config.desc_act
elif name == 'model.layers.1.self_attn.qkv_proj':
# The second layer is quantized using bits=8, group_size=32
# desc_act=False
assert isinstance(submodule.quant_method, linear_method_cls)
config = submodule.quant_method.quant_config
assert get_dynamic_override(config, layer_name=name,
key="bits") == 8
assert get_dynamic_override(config,
layer_name=name,
key="group_size") == 32
assert not get_dynamic_override(
config, layer_name=name, key="desc_act")
elif (name == 'model.layers.2.self_attn.qkv_proj'
or name == 'model.layers.2.mlp.gate_up_proj'):
# All other layers (layer index >= 2) are not quantized
assert isinstance(submodule.quant_method, UnquantizedLinearMethod)
with vllm_runner(model_id, dtype=torch.float16, max_model_len=2048) as llm:
del vllm_model
def check_model(model):
for name, submodule in model.named_modules():
if name == "lm_head":
assert isinstance(submodule.quant_method,
linear_method_cls)
elif name == 'model.layers.0.self_attn.qkv_proj':
# The first layer is quantized using bits=4, group_size=128
# desc_act=True
assert isinstance(submodule.quant_method,
linear_method_cls)
config = submodule.quant_method.quant_config
assert config.weight_bits == 4
assert config.group_size == 128
assert config.desc_act
elif name == 'model.layers.1.self_attn.qkv_proj':
# The second layer is quantized using bits=8, group_size=32
# desc_act=False
assert isinstance(submodule.quant_method,
linear_method_cls)
config = submodule.quant_method.quant_config
assert get_dynamic_override(config,
layer_name=name,
key="bits") == 8
assert get_dynamic_override(config,
layer_name=name,
key="group_size") == 32
assert not get_dynamic_override(
config, layer_name=name, key="desc_act")
elif (name == 'model.layers.2.self_attn.qkv_proj'
or name == 'model.layers.2.mlp.gate_up_proj'):
# All other layers (layer index >= 2) are not quantized
assert isinstance(submodule.quant_method,
UnquantizedLinearMethod)
llm.apply_model(check_model)

View File

@ -29,8 +29,8 @@ def test_lm_head(
lm_head_quantized: bool,
monkeypatch,
) -> None:
# vllm_runner.apply_model() relies on V0 internals.
monkeypatch.setenv("VLLM_USE_V1", "0")
# `LLM.apply_model` requires pickling a function.
monkeypatch.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1")
with vllm_runner(model_id, dtype=torch.float16,
max_model_len=2048) as vllm_model:

View File

@ -11,16 +11,12 @@ import pytest
import torch
from tests.quantization.utils import is_quant_method_supported
from vllm.platforms import current_platform
@pytest.fixture(scope="function", autouse=True)
def use_v0_only(monkeypatch):
"""
This module relies on V0 internals, so set VLLM_USE_V1=0.
"""
if not current_platform.is_cpu():
monkeypatch.setenv('VLLM_USE_V1', '0')
def enable_pickle(monkeypatch):
"""`LLM.apply_model` requires pickling a function."""
monkeypatch.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1")
@pytest.mark.skipif(not is_quant_method_supported("modelopt"),

View File

@ -13,6 +13,16 @@ from vllm.model_executor.layers.quantization.ptpc_fp8 import (
PTPCFp8LinearMethod)
from vllm.platforms import current_platform
UNSUPPORTED_STR = (
"Currently torch._scaled_mm (hipBLASLt) rowwise gemm only "
"support output dtype of bfloat16. torch.float16 is specified.")
@pytest.fixture(scope="function", autouse=True)
def enable_pickle(monkeypatch):
"""`LLM.apply_model` requires pickling a function."""
monkeypatch.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1")
@pytest.mark.skipif(not is_quant_method_supported("ptpc_fp8"),
reason="PTPC FP8 is not supported on this GPU type.")
@ -21,14 +31,22 @@ from vllm.platforms import current_platform
@pytest.mark.parametrize("dtype", ["auto", "bfloat16", "float16"])
@pytest.mark.parametrize("kv_cache_dtype", ["auto", "fp8", "fp8_e4m3"])
def test_ptpc_fp8_rocm(vllm_runner, dtype: str, kv_cache_dtype: str) -> None:
try:
with vllm_runner("facebook/opt-125m",
dtype=dtype,
quantization="ptpc_fp8",
kv_cache_dtype=kv_cache_dtype) as llm:
llm = vllm_runner("facebook/opt-125m",
dtype=dtype,
quantization="ptpc_fp8",
kv_cache_dtype=kv_cache_dtype)
except AssertionError as e:
if str(e) == UNSUPPORTED_STR:
# If the error message matches, the test passes
return
else:
# If the error message does not match, re-raise the exception
raise
model = llm.model.llm_engine.model_executor.driver_worker.model_runner.model # noqa: E501
with llm:
def check_model(model):
fc1 = model.model.decoder.layers[0].fc1
assert isinstance(fc1.quant_method, PTPCFp8LinearMethod)
if kv_cache_dtype == "ptpc_fp8":
@ -40,17 +58,8 @@ def test_ptpc_fp8_rocm(vllm_runner, dtype: str, kv_cache_dtype: str) -> None:
if current_platform.has_device_capability(94):
# For GPUs with hardware support, we keep weights in fp8
assert fc1.weight.dtype == torch.float8_e4m3fnuz
else:
pytest.skip()
output = llm.generate_greedy("Hello my name is", max_tokens=20)
assert output
except AssertionError as e:
if str(
e
) == "Currently torch._scaled_mm (hipBLASLt) rowwise gemm only support output dtype of bfloat16. torch.float16 is specified.": # noqa: E501
# If the error message matches, the test passes
pass
else:
# If the error message does not match, re-raise the exception
raise
llm.apply_model(check_model)
output = llm.generate_greedy("Hello my name is", max_tokens=20)
assert output

View File

@ -7,10 +7,10 @@ Run `pytest tests/quantization/test_quark.py`.
See also `tests/kernels/moe/test_mxfp4_moe.py`.
"""
import importlib
import importlib.metadata
import os
from dataclasses import dataclass
from importlib.util import find_spec
import huggingface_hub
import lm_eval
@ -24,9 +24,8 @@ from vllm.platforms import current_platform
from .reference_mxfp4 import dq_mxfp4_torch, qdq_mxfp4_torch
QUARK_MXFP4_AVAILABLE = importlib.util.find_spec(
"quark") is not None and version.parse(
importlib.metadata.version("amd-quark")) >= version.parse('0.8.99')
QUARK_MXFP4_AVAILABLE = find_spec("quark") is not None and version.parse(
importlib.metadata.version("amd-quark")) >= version.parse('0.8.99')
if QUARK_MXFP4_AVAILABLE:
from quark.torch.export.nn.modules.realquantizer import (
@ -43,11 +42,9 @@ except huggingface_hub.errors.RepositoryNotFoundError:
@pytest.fixture(scope="function", autouse=True)
def use_v0_only(monkeypatch):
"""
This module relies on V0 internals, so set VLLM_USE_V1=0.
"""
monkeypatch.setenv('VLLM_USE_V1', '0')
def enable_pickle(monkeypatch):
"""`LLM.apply_model` requires pickling a function."""
monkeypatch.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1")
@pytest.mark.parametrize('kv_cache_dtype', ['auto', 'fp8'])
@ -132,13 +129,12 @@ def test_quark_fp8_parity(vllm_runner):
}
with (vllm_runner(quark_model_id, **llm_kwargs) as
quark_handle, vllm_runner(fp8_model_id, **llm_kwargs) as fp8_handle):
quark_model = (quark_handle.llm.llm_engine.model_executor.
driver_worker.model_runner.model)
quark_state_dict = quark_model.state_dict()
fp8_model = (fp8_handle.llm.llm_engine.model_executor.driver_worker.
model_runner.model)
fp8_state_dict = fp8_model.state_dict()
def get_state_dict(model):
return {k: v.cpu() for k, v in model.state_dict().items()}
quark_state_dict, = quark_handle.apply_model(get_state_dict)
fp8_state_dict, = fp8_handle.apply_model(get_state_dict)
assert fp8_state_dict.keys() == quark_state_dict.keys()

View File

@ -105,18 +105,21 @@ def test_register_quantization_config():
])
def test_custom_quant(vllm_runner, model, monkeypatch):
"""Test infer with the custom quantization method."""
# vllm_runner.apply_model() relies on V0 internals.
monkeypatch.setenv("VLLM_USE_V1", "0")
# `LLM.apply_model` requires pickling a function.
monkeypatch.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1")
with vllm_runner(model_name=model,
quantization="custom_quant",
enforce_eager=True) as llm:
model = llm.llm.llm_engine.model_executor.driver_worker.model_runner.model # noqa: E501
layer = model.model.layers[0]
qkv_proj = layer.self_attn.qkv_proj
def check_model(model):
layer = model.model.layers[0]
qkv_proj = layer.self_attn.qkv_proj
# Check the quantization method is FakeQuantLinearMethod
assert isinstance(qkv_proj.quant_method, FakeQuantLinearMethod)
# Check the quantization method is FakeQuantLinearMethod
assert isinstance(qkv_proj.quant_method, FakeQuantLinearMethod)
llm.apply_model(check_model)
output = llm.generate_greedy("Hello my name is", max_tokens=20)
assert output

View File

@ -18,12 +18,14 @@ from vllm.v1.core.kv_cache_manager import KVCacheManager
from vllm.v1.core.kv_cache_utils import (
BlockHash, FreeKVCacheBlockQueue, KVCacheBlock, PrefixCachingMetrics,
estimate_max_model_len, generate_block_hash_extra_keys,
get_kv_cache_configs, get_max_concurrency_for_kv_cache_config,
get_request_block_hasher, hash_block_tokens, init_none_hash,
is_kv_cache_type_uniform, make_block_hash_with_group_id)
generate_scheduler_kv_cache_config, get_kv_cache_configs,
get_max_concurrency_for_kv_cache_config, get_request_block_hasher,
hash_block_tokens, init_none_hash, is_kv_cache_spec_uniform,
make_block_hash_with_group_id)
from vllm.v1.kv_cache_interface import (FullAttentionSpec, KVCacheConfig,
KVCacheGroupSpec, KVCacheSpec,
KVCacheTensor, SlidingWindowSpec)
KVCacheTensor, SlidingWindowSpec,
UniformTypeKVCacheSpecs)
from vllm.v1.metrics.stats import PrefixCacheStats
from vllm.v1.request import Request
@ -927,36 +929,36 @@ def test_merge_kv_cache_spec():
assert merged_layer_spec.sliding_window == 1
def test_is_kv_cache_type_uniform():
def test_is_kv_cache_spec_uniform():
kv_cache_spec = {
"layer_1": new_kv_cache_spec(num_kv_heads=32),
"layer_2": new_kv_cache_spec(num_kv_heads=32),
}
assert is_kv_cache_type_uniform(kv_cache_spec)
assert is_kv_cache_spec_uniform(kv_cache_spec)
kv_cache_spec = {
"layer_1": new_kv_cache_spec(num_kv_heads=32),
"layer_2": new_kv_cache_spec(num_kv_heads=32, sliding_window=1),
}
assert is_kv_cache_type_uniform(kv_cache_spec)
assert is_kv_cache_spec_uniform(kv_cache_spec)
kv_cache_spec = {
"layer_1": new_kv_cache_spec(num_kv_heads=32),
"layer_2": new_sliding_window_spec(num_kv_heads=32, sliding_window=1),
}
assert not is_kv_cache_type_uniform(kv_cache_spec)
assert not is_kv_cache_spec_uniform(kv_cache_spec)
kv_cache_spec = {
"layer_1": new_sliding_window_spec(num_kv_heads=32, sliding_window=1),
"layer_2": new_sliding_window_spec(num_kv_heads=32, sliding_window=1),
}
assert is_kv_cache_type_uniform(kv_cache_spec)
assert is_kv_cache_spec_uniform(kv_cache_spec)
kv_cache_spec = {
"layer_1": new_sliding_window_spec(num_kv_heads=32, sliding_window=1),
"layer_2": new_sliding_window_spec(num_kv_heads=32, sliding_window=2),
}
assert not is_kv_cache_type_uniform(kv_cache_spec)
assert not is_kv_cache_spec_uniform(kv_cache_spec)
@pytest.mark.parametrize(
@ -1286,14 +1288,28 @@ def test_get_kv_cache_config_one_worker():
],
)
# different hidden size, unimplemented
# different hidden size
kv_cache_specs_hybrid = {
'layer_1': new_kv_cache_spec(head_size=128),
'layer_2': new_kv_cache_spec(),
'layer_2': new_kv_cache_spec(head_size=64),
}
with pytest.raises(NotImplementedError):
get_kv_cache_configs(vllm_config, [kv_cache_specs_hybrid],
[mem_per_block_per_layer * 2 * 32])[0]
kv_cache_config_hybrid = get_kv_cache_configs(
vllm_config, [kv_cache_specs_hybrid],
[mem_per_block_per_layer * 3 * 32])[0]
assert kv_cache_config_hybrid == KVCacheConfig(
num_blocks=32,
kv_cache_tensors=[
KVCacheTensor(size=mem_per_block_per_layer * 32 * 2,
shared_by=["layer_1"]),
KVCacheTensor(size=mem_per_block_per_layer * 32,
shared_by=["layer_2"]),
],
kv_cache_groups=[
KVCacheGroupSpec(["layer_1", "layer_2"],
UniformTypeKVCacheSpecs(
block_size=16,
kv_cache_specs=kv_cache_specs_hybrid))
])
# Test num_gpu_blocks_override
vllm_config.cache_config.num_gpu_blocks_override = 16
@ -1324,3 +1340,75 @@ def test_get_kv_cache_configs_attention_free():
kv_cache_groups=[],
)
]
def test_generate_uniform_type_kv_cache_specs():
# All layers are full attention, can be merged
kv_cache_specs = {
'layer_1': new_kv_cache_spec(),
'layer_2': new_kv_cache_spec(head_size=128),
}
uniform_spec = UniformTypeKVCacheSpecs.from_specs(kv_cache_specs)
assert uniform_spec == UniformTypeKVCacheSpecs(
block_size=16, kv_cache_specs=kv_cache_specs)
# Full attention + sliding window, cannot be merged
kv_cache_specs = {
'layer_1': new_kv_cache_spec(),
'layer_2': new_sliding_window_spec(sliding_window=1),
}
uniform_spec = UniformTypeKVCacheSpecs.from_specs(kv_cache_specs)
assert uniform_spec is None
# different order of full attention + sliding window, cannot be merged
kv_cache_specs = {
'layer_1': new_sliding_window_spec(sliding_window=1),
'layer_2': new_kv_cache_spec(),
}
uniform_spec = UniformTypeKVCacheSpecs.from_specs(kv_cache_specs)
assert uniform_spec is None
# Same-size sliding window, can be merged
kv_cache_specs = {
'layer_1': new_sliding_window_spec(sliding_window=1),
'layer_2': new_sliding_window_spec(sliding_window=1, head_size=128),
}
uniform_spec = UniformTypeKVCacheSpecs.from_specs(kv_cache_specs)
assert uniform_spec == UniformTypeKVCacheSpecs(
block_size=16, kv_cache_specs=kv_cache_specs)
# different block sizes, cannot be merged
kv_cache_specs = {
'layer_1': new_kv_cache_spec(block_size=16),
'layer_2': new_kv_cache_spec(block_size=32),
}
uniform_spec = UniformTypeKVCacheSpecs.from_specs(kv_cache_specs)
assert uniform_spec is None
def test_generate_scheduler_kv_cache_config():
kv_cache_specs = {
'layer_1': new_kv_cache_spec(),
'layer_2': new_kv_cache_spec(head_size=128),
}
kv_cache_configs = [
KVCacheConfig(
num_blocks=10,
kv_cache_tensors=[],
kv_cache_groups=[
KVCacheGroupSpec(['layer_1', 'layer_2'],
UniformTypeKVCacheSpecs(
block_size=16,
kv_cache_specs=kv_cache_specs)),
],
)
]
scheduler_kv_cache_config = generate_scheduler_kv_cache_config(
kv_cache_configs)
assert scheduler_kv_cache_config == KVCacheConfig(
num_blocks=10,
kv_cache_tensors=[],
kv_cache_groups=[
KVCacheGroupSpec(['layer_1', 'layer_2'], new_kv_cache_spec())
],
)

View File

@ -8,7 +8,7 @@ import time
import uuid
from dataclasses import dataclass
from threading import Thread
from typing import Optional, Union
from typing import Any, Optional, Union
from unittest.mock import MagicMock
import pytest
@ -331,6 +331,46 @@ def echo_dc(
return [val for _ in range(3)] if return_list else val
# Dummy utility function to test dict serialization with custom types.
def echo_dc_dict(
self,
msg: str,
return_dict: bool = False,
) -> Union[MyDataclass, dict[str, MyDataclass]]:
print(f"echo dc dict util function called: {msg}")
val = None if msg is None else MyDataclass(msg)
# Return dict of dataclasses to verify support for returning dicts
# with custom value types.
if return_dict:
return {"key1": val, "key2": val, "key3": val}
else:
return val
# Dummy utility function to test nested structures with custom types.
def echo_dc_nested(
self,
msg: str,
structure_type: str = "list_of_dicts",
) -> Any:
print(f"echo dc nested util function called: {msg}, "
f"structure: {structure_type}")
val = None if msg is None else MyDataclass(msg)
if structure_type == "list_of_dicts": # noqa
# Return list of dicts: [{"a": val, "b": val}, {"c": val, "d": val}]
return [{"a": val, "b": val}, {"c": val, "d": val}]
elif structure_type == "dict_of_lists":
# Return dict of lists: {"list1": [val, val], "list2": [val, val]}
return {"list1": [val, val], "list2": [val, val]}
elif structure_type == "deep_nested":
# Return deeply nested: {"outer": [{"inner": [val, val]},
# {"inner": [val]}]}
return {"outer": [{"inner": [val, val]}, {"inner": [val]}]}
else:
return val
@pytest.mark.asyncio(loop_scope="function")
async def test_engine_core_client_util_method_custom_return(
monkeypatch: pytest.MonkeyPatch):
@ -384,6 +424,167 @@ async def test_engine_core_client_util_method_custom_return(
client.shutdown()
@pytest.mark.asyncio(loop_scope="function")
async def test_engine_core_client_util_method_custom_dict_return(
monkeypatch: pytest.MonkeyPatch):
with monkeypatch.context() as m:
m.setenv("VLLM_USE_V1", "1")
# Must set insecure serialization to allow returning custom types.
m.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1")
# Monkey-patch core engine utility function to test.
m.setattr(EngineCore, "echo_dc_dict", echo_dc_dict, raising=False)
engine_args = EngineArgs(model=MODEL_NAME, enforce_eager=True)
vllm_config = engine_args.create_engine_config(
usage_context=UsageContext.UNKNOWN_CONTEXT)
executor_class = Executor.get_class(vllm_config)
with set_default_torch_num_threads(1):
client = EngineCoreClient.make_client(
multiprocess_mode=True,
asyncio_mode=True,
vllm_config=vllm_config,
executor_class=executor_class,
log_stats=True,
)
try:
# Test utility method returning custom / non-native data type.
core_client: AsyncMPClient = client
# Test single object return
result = await core_client.call_utility_async(
"echo_dc_dict", "testarg3", False)
assert isinstance(result,
MyDataclass) and result.message == "testarg3"
# Test dict return with custom value types
result = await core_client.call_utility_async(
"echo_dc_dict", "testarg3", True)
assert isinstance(result, dict) and len(result) == 3
for key, val in result.items():
assert key in ["key1", "key2", "key3"]
assert isinstance(val,
MyDataclass) and val.message == "testarg3"
# Test returning dict with None values
result = await core_client.call_utility_async(
"echo_dc_dict", None, True)
assert isinstance(result, dict) and len(result) == 3
for key, val in result.items():
assert key in ["key1", "key2", "key3"]
assert val is None
finally:
client.shutdown()
@pytest.mark.asyncio(loop_scope="function")
async def test_engine_core_client_util_method_nested_structures(
monkeypatch: pytest.MonkeyPatch):
with monkeypatch.context() as m:
m.setenv("VLLM_USE_V1", "1")
# Must set insecure serialization to allow returning custom types.
m.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1")
# Monkey-patch core engine utility function to test.
m.setattr(EngineCore, "echo_dc_nested", echo_dc_nested, raising=False)
engine_args = EngineArgs(model=MODEL_NAME, enforce_eager=True)
vllm_config = engine_args.create_engine_config(
usage_context=UsageContext.UNKNOWN_CONTEXT)
executor_class = Executor.get_class(vllm_config)
with set_default_torch_num_threads(1):
client = EngineCoreClient.make_client(
multiprocess_mode=True,
asyncio_mode=True,
vllm_config=vllm_config,
executor_class=executor_class,
log_stats=True,
)
try:
core_client: AsyncMPClient = client
# Test list of dicts: [{"a": val, "b": val}, {"c": val, "d": val}]
result = await core_client.call_utility_async(
"echo_dc_nested", "nested1", "list_of_dicts")
assert isinstance(result, list) and len(result) == 2
for i, item in enumerate(result):
assert isinstance(item, dict)
if i == 0:
assert "a" in item and "b" in item
assert isinstance(
item["a"],
MyDataclass) and item["a"].message == "nested1"
assert isinstance(
item["b"],
MyDataclass) and item["b"].message == "nested1"
else:
assert "c" in item and "d" in item
assert isinstance(
item["c"],
MyDataclass) and item["c"].message == "nested1"
assert isinstance(
item["d"],
MyDataclass) and item["d"].message == "nested1"
# Test dict of lists: {"list1": [val, val], "list2": [val, val]}
result = await core_client.call_utility_async(
"echo_dc_nested", "nested2", "dict_of_lists")
assert isinstance(result, dict) and len(result) == 2
assert "list1" in result and "list2" in result
for key, lst in result.items():
assert isinstance(lst, list) and len(lst) == 2
for item in lst:
assert isinstance(
item, MyDataclass) and item.message == "nested2"
# Test deeply nested: {"outer": [{"inner": [val, val]},
# {"inner": [val]}]}
result = await core_client.call_utility_async(
"echo_dc_nested", "nested3", "deep_nested")
assert isinstance(result, dict) and "outer" in result
outer_list = result["outer"]
assert isinstance(outer_list, list) and len(outer_list) == 2
# First dict in outer list should have "inner" with 2 items
inner_dict1 = outer_list[0]
assert isinstance(inner_dict1, dict) and "inner" in inner_dict1
inner_list1 = inner_dict1["inner"]
assert isinstance(inner_list1, list) and len(inner_list1) == 2
for item in inner_list1:
assert isinstance(item,
MyDataclass) and item.message == "nested3"
# Second dict in outer list should have "inner" with 1 item
inner_dict2 = outer_list[1]
assert isinstance(inner_dict2, dict) and "inner" in inner_dict2
inner_list2 = inner_dict2["inner"]
assert isinstance(inner_list2, list) and len(inner_list2) == 1
assert isinstance(
inner_list2[0],
MyDataclass) and inner_list2[0].message == "nested3"
# Test with None values in nested structures
result = await core_client.call_utility_async(
"echo_dc_nested", None, "list_of_dicts")
assert isinstance(result, list) and len(result) == 2
for item in result:
assert isinstance(item, dict)
for val in item.values():
assert val is None
finally:
client.shutdown()
@pytest.mark.parametrize(
"multiprocessing_mode,publisher_config",
[(True, "tcp"), (False, "inproc")],

View File

@ -29,6 +29,10 @@ from vllm.utils import GiB_bytes, direct_register_custom_op
logger = init_logger(__name__)
USE_XFORMERS_OPS = None
try:
tag_cudagraph_unsafe = (torch._C.Tag.cudagraph_unsafe, )
except AttributeError:
tag_cudagraph_unsafe = () # type: ignore[assignment]
def check_xformers_availability():
@ -577,6 +581,7 @@ direct_register_custom_op(
mutates_args=[],
fake_impl=unified_attention_fake,
dispatch_key=current_platform.dispatch_key,
tags=tag_cudagraph_unsafe,
)
@ -627,4 +632,5 @@ direct_register_custom_op(
mutates_args=["output", "output_block_scale"],
fake_impl=unified_attention_with_output_fake,
dispatch_key=current_platform.dispatch_key,
tags=tag_cudagraph_unsafe,
)

View File

@ -31,8 +31,11 @@ logger = init_logger(__name__)
def make_compiler(compilation_config: CompilationConfig) -> CompilerInterface:
if compilation_config.use_inductor:
if envs.VLLM_USE_STANDALONE_COMPILE and is_torch_equal_or_newer(
"2.8.0.dev"):
# Use standalone compile only if requested, version is new enough,
# and the symbol actually exists in this PyTorch build.
if (envs.VLLM_USE_STANDALONE_COMPILE
and is_torch_equal_or_newer("2.8.0.dev")
and hasattr(torch._inductor, "standalone_compile")):
logger.debug("Using InductorStandaloneAdaptor")
return InductorStandaloneAdaptor()
else:
@ -326,6 +329,7 @@ class PiecewiseCompileInterpreter(torch.fx.Interpreter):
i for i, x in enumerate(args) if isinstance(x, torch.SymInt)
]
global compilation_start_time
compiled_graph_for_dynamic_shape = self.vllm_backend.\
compiler_manager.compile(
submod,
@ -336,7 +340,6 @@ class PiecewiseCompileInterpreter(torch.fx.Interpreter):
num_graphs=len(self.compile_submod_names),
runtime_shape=None)
# Lazy import here to avoid circular import
from .cuda_graph import CUDAGraphOptions
from .cuda_piecewise_backend import PiecewiseBackend
piecewise_backend = PiecewiseBackend(
@ -344,7 +347,13 @@ class PiecewiseCompileInterpreter(torch.fx.Interpreter):
len(self.compile_submod_names), sym_shape_indices,
compiled_graph_for_dynamic_shape, self.vllm_backend)
if self.compilation_config.cudagraph_mode != CUDAGraphMode.NONE:
if (self.compilation_config.cudagraph_mode != CUDAGraphMode.NONE
and
not self.compilation_config.use_inductor_graph_partition):
# We're using Dynamo-based piecewise splitting, so we wrap
# the whole subgraph with a static graph wrapper.
from .cuda_graph import CUDAGraphOptions
# resolve the static graph wrapper class (e.g. CUDAGraphWrapper
# class) as platform dependent.
static_graph_wrapper_class = resolve_obj_by_qualname(

View File

@ -1183,7 +1183,7 @@ class AllReduceFusionPass(VllmInductorPass):
self.end_and_log()
def __del__(self):
if self.disabled:
if getattr(self, "disabled", True):
return
if flashinfer_comm is not None:
flashinfer_comm.trtllm_destroy_ipc_workspace_for_all_reduce(

View File

@ -1,6 +1,7 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import contextlib
import inspect
from typing import Callable, Optional, TypeVar, Union, overload
from unittest.mock import patch
@ -14,7 +15,7 @@ from vllm.compilation.wrapper import TorchCompileWrapperWithCustomDispatcher
from vllm.config import CompilationLevel, VllmConfig
from vllm.logger import init_logger
from vllm.sequence import IntermediateTensors
from vllm.utils import supports_dynamo
from vllm.utils import resolve_obj_by_qualname, supports_dynamo
from .monitor import start_monitoring_torch_compile
@ -301,8 +302,11 @@ def _support_torch_compile(
with patch.object(InliningInstructionTranslator, 'inline_call',
patched_inline_call), torch._dynamo.config.patch(
**dynamo_config_patches):
**dynamo_config_patches
), maybe_use_cudagraph_partition_wrapper(
self.vllm_config):
output = self.compiled_callable(*args, **kwargs)
return output
# usually, capturing the model once is enough, and then we can
@ -314,3 +318,52 @@ def _support_torch_compile(
cls.__call__ = __call__
return cls
@contextlib.contextmanager
def maybe_use_cudagraph_partition_wrapper(vllm_config: VllmConfig):
"""
Context manager to set/unset customized cudagraph partition wrappers.
If we're using Inductor-based graph partitioning, we currently have the
whole `fx.Graph` before Inductor lowering and and the piecewise
splitting happens after all graph passes and fusions. Here, we add
a custom hook for Inductor to wrap each partition with our static
graph wrapper class to maintain more control over static graph
capture and replay.
"""
from vllm.config import CUDAGraphMode
compilation_config = vllm_config.compilation_config
if (compilation_config.cudagraph_mode != CUDAGraphMode.NONE
and compilation_config.use_inductor_graph_partition):
from torch._inductor.utils import CUDAGraphWrapperMetadata
from vllm.compilation.cuda_graph import CUDAGraphOptions
from vllm.platforms import current_platform
static_graph_wrapper_class = resolve_obj_by_qualname(
current_platform.get_static_graph_wrapper_cls())
def customized_cudagraph_wrapper(f,
metadata: CUDAGraphWrapperMetadata):
partition_id = metadata.partition_index
num_partitions = metadata.num_partitions
return static_graph_wrapper_class(
runnable=f,
vllm_config=vllm_config,
runtime_mode=CUDAGraphMode.PIECEWISE,
cudagraph_options=CUDAGraphOptions(
debug_log_enable=partition_id == 0,
gc_disable=partition_id != 0,
weak_ref_output=partition_id == num_partitions - 1,
))
torch._inductor.utils.set_customized_partition_wrappers(
customized_cudagraph_wrapper)
yield
if (compilation_config.cudagraph_mode != CUDAGraphMode.NONE
and compilation_config.use_inductor_graph_partition):
torch._inductor.utils.set_customized_partition_wrappers(None)

View File

@ -10,7 +10,6 @@ from typing import Callable, Optional
import torch
import vllm.envs as envs
from vllm.config import (CompilationLevel, CUDAGraphMode,
get_current_vllm_config)
from vllm.logger import init_logger
@ -47,11 +46,10 @@ class TorchCompileWrapperWithCustomDispatcher:
options = get_current_vllm_config(
).compilation_config.inductor_compile_config
compiled_callable = torch.compile(
self.forward,
fullgraph=envs.VLLM_TEST_DYNAMO_FULLGRAPH_CAPTURE,
backend=backend,
options=options)
compiled_callable = torch.compile(self.forward,
fullgraph=True,
backend=backend,
options=options)
self.compiled_callable = compiled_callable
self.original_code_object = self.__class__.forward.__code__

View File

@ -299,6 +299,26 @@ class CompilationConfig:
minor release, i.e. v0.11.0 or v1.0.0. Please use cudagraph_mode instead.
"""
use_inductor_graph_partition: bool = False
"""Use inductor graph partition to split the graph at cudagraph_unsafe ops.
This partition happens at inductor codegen time after all passes and fusions
are finished. It generates a single `call` function which wraps
cudagraph-safe ops into partition functions and leave cudagraph-unsafe ops
outside the partition functions. For a graph with N cudagraph-unsafe ops
(e.g., Attention), there would be N+1 partitions. To mark an op as
cudagraph unsafe, we can add `tags=(torch._C.Tag.cudagraph_unsafe)` when
register the custom op.
This config supports both full cudagraph and piecewise cudagraph without
compiling twice. For piecewise cudagraph, it applies vLLM CUDAGraph wrapper
to each partition. For N+1 partitions, there would be N+1
CUDAGraph wrapper instances.
For full CUDAGraph, we always apply a single CUDAGraph wrapper outside the
inductor `call` function in the model runner. The top-level full cudagraph
capture ignores all partitioning.
"""
pass_config: PassConfig = field(default_factory=PassConfig)
"""Custom inductor passes, see PassConfig for more details"""
@ -461,6 +481,12 @@ class CompilationConfig:
"since full_cuda_graph is deprecated.")
self.cudagraph_mode = CUDAGraphMode.FULL
if (self.use_inductor_graph_partition
and not is_torch_equal_or_newer("2.9.0.dev")):
raise ValueError("use_inductor_graph_partition is only "
"supported with torch>=2.9.0.dev. Set "
"use_inductor_graph_partition=False instead.")
def init_backend(self, vllm_config: "VllmConfig") -> Union[str, Callable]:
if self.level == CompilationLevel.NO_COMPILATION:
raise ValueError("No compilation level is set.")
@ -540,19 +566,36 @@ class CompilationConfig:
"set_splitting_ops_for_v1 should only be called when "
"level is CompilationLevel.PIECEWISE")
use_inductor_graph_partition_msg = (
"When use_inductor_graph_partition=True, splitting_ops "
"are ignored and set to an empty list. Instead, "
"\"tags=(torch._C.Tag.cudagraph_unsafe, ),\" is "
"used to annotate custom ops for graph partition.")
if self.splitting_ops is None:
# NOTE: When using full cudagraph, instead of setting an empty
# list and capture the full cudagraph inside the flattened fx
# graph, we keep the piecewise fx graph structure but capture the
# full cudagraph outside the fx graph. This reduces some cpu
# overhead when the runtime batch_size is not cudagraph captured.
# see https://github.com/vllm-project/vllm/pull/20059 for details.
# make a copy to avoid mutating the class-level list via reference.
self.splitting_ops = list(self._attention_ops)
if self.use_inductor_graph_partition:
# When using inductor graph partition, we set splitting_ops
# to be empty and rely on torch._C.Tag.cudagraph_unsafe to
# annotate custom ops as splitting ops.
logger.warning_once(use_inductor_graph_partition_msg)
self.splitting_ops = []
else:
# NOTE: When using full cudagraph, instead of setting an empty
# list and capture the full cudagraph inside the flattened fx
# graph, we keep the piecewise fx graph structure but capture
# the full cudagraph outside the fx graph. This reduces some
# cpu overhead when the runtime batch_size is not cudagraph
# captured. see https://github.com/vllm-project/vllm/pull/20059
# for details. make a copy to avoid mutating the class-level
# list via reference.
self.splitting_ops = list(self._attention_ops)
elif len(self.splitting_ops) == 0:
logger.warning_once("Using piecewise compilation with empty "
"splitting_ops.")
if self.cudagraph_mode == CUDAGraphMode.PIECEWISE:
logger.warning_once(
"Using piecewise compilation with empty "
"splitting_ops and use_inductor_graph_partition"
f"={self.use_inductor_graph_partition}.")
if (self.cudagraph_mode == CUDAGraphMode.PIECEWISE
and not self.use_inductor_graph_partition):
logger.warning_once(
"When compilation level is piecewise with empty "
"splitting_ops, PIECEWISE cudagraph_mode will be "
@ -562,7 +605,26 @@ class CompilationConfig:
"any problems.")
self.cudagraph_mode = CUDAGraphMode.FULL
self.splitting_ops = []
elif self.use_inductor_graph_partition:
logger.warning_once(use_inductor_graph_partition_msg)
self.splitting_ops = []
def splitting_ops_contain_attention(self) -> bool:
return self.splitting_ops is not None and all(
op in self.splitting_ops for op in self._attention_ops)
def is_attention_compiled_piecewise(self) -> bool:
use_fx_graph_piecewise_compilation = (
self.level == CompilationLevel.PIECEWISE
and self.splitting_ops_contain_attention())
inductor_used = (self.level == CompilationLevel.PIECEWISE
and self.use_inductor) or (
self.level >= CompilationLevel.DYNAMO_AS_IS
and self.backend == "inductor")
use_inductor_piecewise_compilation = (
inductor_used and self.use_inductor_graph_partition
and not self.splitting_ops_contain_attention())
return use_fx_graph_piecewise_compilation or \
use_inductor_piecewise_compilation

View File

@ -322,8 +322,28 @@ class ModelConfig:
factors.append(self.override_generation_config)
factors.append(self.rope_scaling)
factors.append(self.rope_theta)
# hf_config can control how the model looks!
factors.append(self.hf_config.to_json_string())
try:
hf_config_json = self.hf_config.to_json_string(use_diff=False)
except TypeError:
from transformers import PretrainedConfig
from vllm.utils.jsontree import json_map_leaves
# Handle nested HF configs with unserializable values gracefully
hf_config_json = json.dumps(
json_map_leaves(
lambda v: v.to_dict()
if isinstance(v, PretrainedConfig) else str(v),
self.hf_config.to_dict(),
),
indent=2,
sort_keys=True,
) + "\n"
factors.append(hf_config_json)
str_factors = str(factors)
assert_hashable(str_factors)
return hashlib.sha256(str(factors).encode()).hexdigest()
@ -964,6 +984,9 @@ class ModelConfig:
"modelopt",
"modelopt_fp4",
"petit_nvfp4",
# Ensure heavy backends are probed last to avoid unnecessary
# imports during override detection (e.g., MXFP4 imports Triton)
"mxfp4",
]
quantization_methods = [
q for q in supported_quantization if q not in overrides

View File

@ -569,9 +569,10 @@ class NixlConnectorWorker:
def __del__(self):
"""Cleanup background threads on destruction."""
self._handshake_initiation_executor.shutdown(wait=False)
if self._nixl_handshake_listener_t:
self._nixl_handshake_listener_t.join(timeout=0)
if executor := getattr(self, "_handshake_initiation_executor", None):
executor.shutdown(wait=False)
if listener_t := getattr(self, "_nixl_handshake_listener_t", None):
listener_t.join(timeout=0)
@staticmethod
def _nixl_handshake_listener(metadata: NixlAgentMetadata,
@ -1379,4 +1380,4 @@ class NixlKVConnectorStats(KVConnectorStats):
# TODO: reduce stats to a single value, calculate latency/throughput
return {
"num_successful_transfers": self.data["num_successful_transfers"]
}
}

View File

@ -13,6 +13,7 @@ from typing import Sequence as GenericSequence
from typing import Set, Type, Union, cast
import torch
import torch.nn as nn
from typing_extensions import TypeVar
import vllm.envs as envs
@ -55,6 +56,7 @@ from vllm.usage.usage_lib import (UsageContext, is_usage_stats_enabled,
from vllm.utils import Counter, Device, resolve_obj_by_qualname, weak_bind
from vllm.version import __version__ as VLLM_VERSION
from vllm.worker.model_runner_base import InputProcessingError
from vllm.worker.worker_base import WorkerBase
logger = init_logger(__name__)
_LOCAL_LOGGING_INTERVAL_SEC = 5
@ -1817,13 +1819,16 @@ class LLMEngine:
return sampling_params
def collective_rpc(self,
method: Union[str, Callable[..., _R]],
method: Union[str, Callable[[WorkerBase], _R]],
timeout: Optional[float] = None,
args: tuple = (),
kwargs: Optional[dict[str, Any]] = None) -> list[_R]:
return self.model_executor.collective_rpc(method, timeout, args,
kwargs)
def apply_model(self, func: Callable[[nn.Module], _R]) -> list[_R]:
return self.collective_rpc("apply_model", args=(func, ))
if envs.is_set("VLLM_USE_V1") and envs.VLLM_USE_V1:
from vllm.v1.engine.llm_engine import LLMEngine as V1LLMEngine

View File

@ -1450,9 +1450,11 @@ def _postprocess_messages(messages: list[ConversationMessage]) -> None:
and isinstance(message["tool_calls"], list)
):
for item in message["tool_calls"]:
item["function"]["arguments"] = json.loads(
item["function"]["arguments"]
)
# if arguments is None or empty string, set to {}
if content := item["function"].get("arguments"):
item["function"]["arguments"] = json.loads(content)
else:
item["function"]["arguments"] = {}
def parse_chat_messages(

View File

@ -522,9 +522,14 @@ class LLM:
"""
Run a function directly on the model inside each worker,
returning the result for each of them.
!!! warning
To reduce the overhead of data transfer, avoid returning large
arrays or tensors from this method. If you must return them,
make sure you move them to CPU first to avoid taking up additional
VRAM!
"""
executor = self.llm_engine.model_executor
return executor.apply_model(func)
return self.llm_engine.apply_model(func)
def _get_beam_search_lora_requests(
self,

View File

@ -328,6 +328,13 @@ class ResponsesRequest(OpenAIBaseModel):
"access by 3rd parties, and long enough to be "
"unpredictable (e.g., 43 characters base64-encoded, corresponding "
"to 256 bit). Not supported by vLLM engine V0."))
enable_response_messages: bool = Field(
default=False,
description=(
"Dictates whether or not to return messages as part of the "
"response object. Currently only supported for non-streaming "
"non-background and gpt-oss only. "))
# --8<-- [end:responses-extra-params]
_DEFAULT_SAMPLING_PARAMS = {
@ -1831,6 +1838,11 @@ class ResponsesResponse(OpenAIBaseModel):
model: str
object: Literal["response"] = "response"
output: list[ResponseOutputItem]
# These are populated when enable_response_messages is set to True
# TODO: Currently an issue where content of harmony messages
# is not available when these are serialized. Metadata is available
input_messages: Optional[list[ChatCompletionMessageParam]] = None
output_messages: Optional[list[ChatCompletionMessageParam]] = None
parallel_tool_calls: bool
temperature: float
tool_choice: ToolChoice
@ -1860,6 +1872,8 @@ class ResponsesResponse(OpenAIBaseModel):
output: list[ResponseOutputItem],
status: ResponseStatus,
usage: Optional[ResponseUsage] = None,
input_messages: Optional[list[ChatCompletionMessageParam]] = None,
output_messages: Optional[list[ChatCompletionMessageParam]] = None,
) -> "ResponsesResponse":
incomplete_details: Optional[IncompleteDetails] = None
@ -1868,7 +1882,6 @@ class ResponsesResponse(OpenAIBaseModel):
# TODO: implement the other reason for incomplete_details,
# which is content_filter
# incomplete_details = IncompleteDetails(reason='content_filter')
return cls(
id=request.request_id,
created_at=created_time,
@ -1877,6 +1890,8 @@ class ResponsesResponse(OpenAIBaseModel):
metadata=request.metadata,
model=model_name,
output=output,
input_messages=input_messages,
output_messages=output_messages,
parallel_tool_calls=request.parallel_tool_calls,
temperature=sampling_params.temperature,
tool_choice=request.tool_choice,

View File

@ -475,9 +475,14 @@ class OpenAIServingResponses(OpenAIServing):
# "completed" is implemented as the "catch-all" for now.
status: ResponseStatus = "completed"
input_messages = None
output_messages = None
if self.use_harmony:
assert isinstance(context, HarmonyContext)
output = self._make_response_output_items_with_harmony(context)
if request.enable_response_messages:
input_messages = context.messages[:context.num_init_messages]
output_messages = context.messages[context.num_init_messages:]
num_tool_output_tokens = context.num_tool_output_tokens
if len(output) > 0:
if context.finish_reason == "length":
@ -496,6 +501,12 @@ class OpenAIServingResponses(OpenAIServing):
output = self._make_response_output_items(request, final_output,
tokenizer)
# TODO: context for non-gptoss models doesn't use messages
# so we can't get them out yet
if request.enable_response_messages:
raise NotImplementedError(
"enable_response_messages is currently"
" only supported for gpt-oss")
# Calculate usage.
assert final_res.prompt_token_ids is not None
num_tool_output_tokens = 0
@ -519,6 +530,8 @@ class OpenAIServingResponses(OpenAIServing):
response = ResponsesResponse.from_request(
request,
sampling_params,
input_messages=input_messages,
output_messages=output_messages,
model_name=model_name,
created_time=created_time,
output=output,

View File

@ -98,6 +98,15 @@ class Hermes2ProToolParser(ToolParser):
else:
return delta_text
def adjust_request(
self, request: ChatCompletionRequest) -> ChatCompletionRequest:
if request.tools and request.tool_choice != 'none':
# do not skip special tokens because the tool_call tokens are
# marked "special" in some models. Since they are skipped
# prior to the call to the tool parser, it breaks tool calling.
request.skip_special_tokens = False
return request
def extract_tool_calls(
self,
model_output: str,

View File

@ -32,6 +32,7 @@ if TYPE_CHECKING:
VLLM_CONFIG_ROOT: str = os.path.expanduser("~/.config/vllm")
VLLM_USAGE_STATS_SERVER: str = "https://stats.vllm.ai"
VLLM_NO_USAGE_STATS: bool = False
VLLM_DISABLE_FLASHINFER_PREFILL: bool = False
VLLM_DO_NOT_TRACK: bool = False
VLLM_USAGE_SOURCE: str = ""
VLLM_CONFIGURE_LOGGING: int = 1
@ -433,11 +434,6 @@ environment_variables: dict[str, Callable[[], Any]] = {
"VLLM_FLASH_ATTN_VERSION":
lambda: maybe_convert_int(os.environ.get("VLLM_FLASH_ATTN_VERSION", None)),
# Internal flag to enable Dynamo fullgraph capture
"VLLM_TEST_DYNAMO_FULLGRAPH_CAPTURE":
lambda: bool(
os.environ.get("VLLM_TEST_DYNAMO_FULLGRAPH_CAPTURE", "1") != "0"),
# Feature flag to enable/disable Inductor standalone compile.
# In torch <= 2.7 we ignore this flag; in torch >= 2.8 this is
# enabled by default.
@ -479,6 +475,8 @@ environment_variables: dict[str, Callable[[], Any]] = {
lambda: os.environ.get("VLLM_USAGE_STATS_SERVER", "https://stats.vllm.ai"),
"VLLM_NO_USAGE_STATS":
lambda: os.environ.get("VLLM_NO_USAGE_STATS", "0") == "1",
"VLLM_DISABLE_FLASHINFER_PREFILL":
lambda: os.environ.get("VLLM_DISABLE_FLASHINFER_PREFILL", "0") == "1",
"VLLM_DO_NOT_TRACK":
lambda: (os.environ.get("VLLM_DO_NOT_TRACK", None) or os.environ.get(
"DO_NOT_TRACK", None) or "0") == "1",

View File

@ -5,11 +5,10 @@ import asyncio
import time
from abc import ABC, abstractmethod
from functools import cached_property
from typing import (Any, Awaitable, Callable, Dict, List, Optional, Set, Tuple,
Union)
from typing import Any, Awaitable, Callable, List, Optional, Set, Union
import torch.nn as nn
from typing_extensions import TypeVar
from typing_extensions import TypeVar, deprecated
import vllm.platforms
from vllm.config import VllmConfig
@ -63,10 +62,10 @@ class ExecutorBase(ABC):
@abstractmethod
def collective_rpc(self,
method: Union[str, Callable[..., _R]],
method: Union[str, Callable[[WorkerBase], _R]],
timeout: Optional[float] = None,
args: Tuple = (),
kwargs: Optional[Dict[str, Any]] = None) -> List[_R]:
args: tuple = (),
kwargs: Optional[dict[str, Any]] = None) -> list[_R]:
"""
Execute an RPC call on all workers.
@ -91,7 +90,7 @@ class ExecutorBase(ABC):
"""
raise NotImplementedError
def determine_num_available_blocks(self) -> Tuple[int, int]:
def determine_num_available_blocks(self) -> tuple[int, int]:
"""Determine the number of available blocks for the GPU KV cache and
swappable CPU KV cache.
@ -99,9 +98,10 @@ class ExecutorBase(ABC):
ExecutorBase may require modification of the result, e.g. to ensure the
selected cache sizes are compatible with all workers.
Returns a Tuple[num_gpu_blocks, num_cpu_blocks], where num_gpu_blocks
are blocks that are "active" on the device and can be appended to.
num_cpu_blocks refers to "swapped" blocks in CPU memory and cannot be
Returns a tuple `(num_gpu_blocks, num_cpu_blocks)`, where
`num_gpu_blocks` are blocks that are "active" on the device and can be
appended to.
`num_cpu_blocks` refers to "swapped" blocks in CPU memory and cannot be
appended to.
"""
results = self.collective_rpc("determine_num_available_blocks")
@ -127,16 +127,15 @@ class ExecutorBase(ABC):
self.collective_rpc("initialize_cache",
args=(num_gpu_blocks, num_cpu_blocks))
@deprecated("`llm_engine.model_executor.apply_model` will no longer work "
"in V1 Engine. Please replace with `llm_engine.apply_model` "
"and set `VLLM_ALLOW_INSECURE_SERIALIZATION=1`.")
def apply_model(self, func: Callable[[nn.Module], _R]) -> list[_R]:
"""
Run a function directly on the model inside each worker,
returning the result for each of them.
"""
def rpc_func(worker: WorkerBase) -> _R:
return func(worker.get_model())
return self.collective_rpc(rpc_func)
return self.collective_rpc("apply_model", args=(func, ))
@cached_property # Avoid unnecessary RPC calls
def supported_tasks(self) -> tuple[SupportedTask, ...]:
@ -235,9 +234,6 @@ class ExecutorBase(ABC):
"""Shutdown the executor."""
self.collective_rpc("shutdown")
def __del__(self):
self.shutdown()
async def execute_model_async(
self,
execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]:
@ -311,8 +307,8 @@ class DistributedExecutorBase(ExecutorBase):
def collective_rpc(self,
method: Union[str, Callable],
timeout: Optional[float] = None,
args: Tuple = (),
kwargs: Optional[Dict] = None) -> List[Any]:
args: tuple = (),
kwargs: Optional[dict[str, Any]] = None) -> list[Any]:
return self._run_workers(method, *args, **(kwargs or {}))
@abstractmethod

View File

@ -20,10 +20,10 @@ if has_triton_kernels():
from triton_kernels.matmul_ogs import (FnSpecs, FusedActivation,
matmul_ogs)
from triton_kernels.routing import routing
except ModuleNotFoundError:
except (ModuleNotFoundError, AttributeError) as e:
logger.error(
"Failed to import Triton kernels. Please make sure your triton "
"version is compatible.")
"version is compatible. Error: %s", e)
def triton_kernel_moe_forward(

View File

@ -241,7 +241,7 @@ class AutoRoundConfig(QuantizationConfig):
if isinstance(layer, FusedMoE):
if use_marlin:
return AWQMoEMethod(quant_args_marlin, layer.moe)
return AWQMoEMethod(quant_args_marlin, layer.moe_config)
from vllm.model_executor.layers.quantization.moe_wna16 import (
MoeWNA16Config)
@ -327,7 +327,7 @@ class AutoRoundConfig(QuantizationConfig):
if isinstance(layer, FusedMoE):
if use_marlin:
return GPTQMarlinMoEMethod(quant_args_marlin, layer.moe)
return GPTQMarlinMoEMethod(quant_args_marlin, layer.moe_config)
else:
from vllm.model_executor.layers.quantization.moe_wna16 import (
MoeWNA16Config)

View File

@ -160,6 +160,7 @@ class ModelOptFp8Config(QuantizationConfig):
def is_layer_excluded(self, prefix: str) -> bool:
"""
Check if a layer should be excluded from quantization.
Handles both exact matching (for fused layers) and substring matching.
This method handles both regular models and multimodal models that use
the language_model prefix. For multimodal models, it checks if the
@ -168,11 +169,18 @@ class ModelOptFp8Config(QuantizationConfig):
if self.exclude_modules is None:
return False
# Check if any excluded module matches the prefix
# First check exact matching with fused layer support
if is_layer_skipped(prefix, self.exclude_modules,
self.packed_modules_mapping):
return True
# Then check substring matching for patterns not caught by exact match
for module in self.exclude_modules:
if (module in prefix
or (prefix.startswith("language_model.")
and module in prefix.removeprefix("language_model."))):
# Skip exact matches already handled above
if (module != prefix and
(module in prefix or
(prefix.startswith("language_model.")
and module in prefix.removeprefix("language_model.")))):
return True
return False
@ -180,9 +188,10 @@ class ModelOptFp8Config(QuantizationConfig):
prefix: str) -> Optional["QuantizeMethodBase"]:
from vllm.attention.layer import Attention # Avoid circular import
if isinstance(layer, LinearBase):
if (is_layer_skipped(prefix, self.exclude_modules,
self.packed_modules_mapping)
or self.is_layer_excluded(prefix)):
if self.is_layer_excluded(prefix):
return UnquantizedLinearMethod()
# Check if this is a vision model layer that should not be quantized
if ("vision_tower" in prefix or "vision_model" in prefix):
return UnquantizedLinearMethod()
return ModelOptFp8LinearMethod(self)
elif isinstance(layer, Attention):
@ -778,22 +787,34 @@ class ModelOptNvFp4Config(QuantizationConfig):
return cls(is_checkpoint_nvfp4_serialized, kv_cache_quant_algo,
exclude_modules, group_size)
def is_layer_excluded(self, prefix: str,
exclude_modules: list[str]) -> bool:
def is_layer_excluded(self, prefix: str) -> bool:
"""
Check if a layer should be excluded from quantization.
Handles both exact matching (for fused layers) and pattern matching.
"""
# First check exact matching with fused layer support
if is_layer_skipped(prefix, self.exclude_modules,
self.packed_modules_mapping):
return True
# Check regex pattern matching for patterns not caught by exact match
import regex as re
for pattern in exclude_modules:
regex_str = pattern.replace('.', r'\.').replace('*', r'.*')
if re.fullmatch(regex_str, prefix):
return True
for pattern in self.exclude_modules:
# Skip patterns that would be caught by exact matching
if '*' in pattern or '.' in pattern:
regex_str = pattern.replace('.', r'\.').replace('*', r'.*')
if re.fullmatch(regex_str, prefix):
return True
return False
def get_quant_method(self, layer: torch.nn.Module,
prefix: str) -> Optional["QuantizeMethodBase"]:
from vllm.attention.layer import Attention # Avoid circular import
if isinstance(layer, LinearBase):
if (is_layer_skipped(prefix, self.exclude_modules,
self.packed_modules_mapping)
or self.is_layer_excluded(prefix, self.exclude_modules)):
if self.is_layer_excluded(prefix):
return UnquantizedLinearMethod()
# Check if this is a vision model layer that should not be quantized
if ("vision_tower" in prefix or "vision_model" in prefix):
return UnquantizedLinearMethod()
return ModelOptNvFp4LinearMethod(self)
elif isinstance(layer, Attention):

View File

@ -6,8 +6,6 @@ from typing import Optional
import torch
from vllm.model_executor.custom_op import CustomOp
from vllm.platforms import current_platform
from vllm.utils.flashinfer import has_flashinfer
from .common import apply_rotary_emb_torch
@ -32,13 +30,15 @@ class RotaryEmbedding(CustomOp):
self.base = base
self.is_neox_style = is_neox_style
self.dtype = dtype
# TODO(mgoin): disabled for now due to failures
# Flashinfer only supports head_size=64, 128, 256, 512.
# https://github.com/flashinfer-ai/flashinfer/blob/ebfd655efe830048dba5d582aaa61d61d1cf9a87/include/flashinfer/utils.cuh#L174-L202
self.use_flashinfer = (self.enabled()
and dtype in (torch.float16, torch.bfloat16)
and current_platform.is_cuda()
and has_flashinfer()
and self.head_size in [64, 128, 256, 512])
# self.use_flashinfer = (self.enabled()
# and dtype in (torch.float16, torch.bfloat16)
# and current_platform.is_cuda()
# and has_flashinfer()
# and self.head_size in [64, 128, 256, 512])
self.use_flashinfer = False
cache = self._compute_cos_sin_cache()
if not self.use_flashinfer:

View File

@ -165,7 +165,11 @@ def device_loading_context(module: torch.nn.Module,
# New parameters or parameters already on target device are untouched
def get_model_architecture(
_MODEL_ARCH_BY_HASH = dict[str, tuple[type[nn.Module], str]]()
"""Caches the outputs of `_get_model_architecture`."""
def _get_model_architecture(
model_config: ModelConfig) -> tuple[type[nn.Module], str]:
architectures = getattr(model_config.hf_config, "architectures", [])
@ -209,6 +213,17 @@ def get_model_architecture(
return model_cls, arch
def get_model_architecture(
model_config: ModelConfig) -> tuple[type[nn.Module], str]:
key = model_config.compute_hash()
if key in _MODEL_ARCH_BY_HASH:
return _MODEL_ARCH_BY_HASH[key]
model_arch = _get_model_architecture(model_config)
_MODEL_ARCH_BY_HASH[key] = model_arch
return model_arch
def get_model_cls(model_config: ModelConfig) -> type[nn.Module]:
return get_model_architecture(model_config)[0]

View File

@ -446,6 +446,22 @@ class Gemma3Model(nn.Module):
weight_loader(param, loaded_weight)
loaded_params.add(scale_name)
continue
# Check if this is a scale parameter that needs remapping first
if name.endswith(
(".k_scale", ".v_scale", ".q_scale", ".prob_scale")):
# Try to remap the scale name first
remapped_name = maybe_remap_kv_scale_name(name, params_dict)
if remapped_name is not None and remapped_name in params_dict:
# Successfully remapped, use the remapped name
param = params_dict[remapped_name]
weight_loader = getattr(param, "weight_loader",
default_weight_loader)
weight_loader(param, loaded_weight)
loaded_params.add(remapped_name)
continue
# If remapping failed, continue with normal processing
for (param_name, shard_name, shard_id) in stacked_params_mapping:
if shard_name not in name:
continue

View File

@ -148,9 +148,11 @@ class Qwen3NextSparseMoeBlock(nn.Module):
def _maybe_ignore_quant_config(self, quant_config: QuantizationConfig):
# GPTQ configs do not have a list of ignored modules, however AutoGPTQ
# seems to avoid gate quantization.
# See: https://huggingface.co/Qwen/Qwen3-30B-A3B-GPTQ-Int4
if isinstance(quant_config, (GPTQConfig, GPTQMarlinConfig)):
# seems to avoid gate quantization while AutoRound does.
if isinstance(
quant_config,
(GPTQConfig,
GPTQMarlinConfig)) and not quant_config.autoround_version:
return None
return quant_config

View File

@ -122,9 +122,10 @@ class Qwen3MoeLLMModel(Qwen3MoeModel):
def load_fused_expert_weights(self, name: str, params_dict: dict,
loaded_weight: torch.Tensor, shard_id: str,
num_experts: int):
num_experts: int) -> bool:
param = params_dict[name]
weight_loader = typing.cast(Callable[..., bool], param.weight_loader)
loaded_local_expert = False
for expert_id in range(num_experts):
curr_expert_weight = loaded_weight[expert_id]
success = weight_loader(param,
@ -133,9 +134,10 @@ class Qwen3MoeLLMModel(Qwen3MoeModel):
shard_id,
expert_id,
return_success=True)
if not success:
return False
return True
if success:
loaded_local_expert = True
return loaded_local_expert
def load_weights(self, weights: Iterable[tuple[str,
torch.Tensor]]) -> set[str]:
@ -345,4 +347,4 @@ class Qwen3VLMoeForConditionalGeneration(Qwen3VLForConditionalGeneration):
for _ in range(self.deepstack_num_level)
] if self.use_deepstack else None
self.visual_dim = config.vision_config.out_hidden_size
self.multiscale_dim = self.visual_dim * self.deepstack_num_level
self.multiscale_dim = self.visual_dim * self.deepstack_num_level

View File

@ -20,7 +20,8 @@ from vllm.model_executor.layers.linear import (ColumnParallelLinear,
from vllm.model_executor.layers.quantization import QuantizationConfig
from vllm.model_executor.layers.vocab_parallel_embedding import (
VocabParallelEmbedding)
from vllm.model_executor.model_loader.weight_utils import default_weight_loader
from vllm.model_executor.model_loader.weight_utils import (
default_weight_loader, maybe_remap_kv_scale_name)
from .vision import VisionEncoderInfo, resolve_visual_encoder_outputs
@ -506,6 +507,21 @@ class SiglipVisionModel(nn.Module):
if layer_idx >= layer_count:
continue
# Check if this is a scale parameter that needs remapping first
if name.endswith(
(".k_scale", ".v_scale", ".q_scale", ".prob_scale")):
# Try to remap the scale name first
remapped_name = maybe_remap_kv_scale_name(name, params_dict)
if remapped_name is not None and remapped_name in params_dict:
# Successfully remapped, use the remapped name
param = params_dict[remapped_name]
weight_loader = getattr(param, "weight_loader",
default_weight_loader)
weight_loader(param, loaded_weight)
loaded_params.add(remapped_name)
continue
# If remapping failed, continue with normal processing
for (param_name, weight_name, shard_id) in stacked_params_mapping:
if weight_name not in name:
continue

View File

@ -702,21 +702,45 @@ class TransformersBase(nn.Module, SupportsQuant, SupportsLoRA, SupportsPP):
class TransformersModel(TransformersBase):
hf_to_vllm_mapper = WeightsMapper(
orig_to_new_prefix={
# Handle BERT-like models
"bert": "model",
# Add `model.` prefix for base model checkpoints
"": "model.",
# Remove `model.` from places it should not be
# Remove `model.` prefix if it was already there
"model.model.": "model.",
# Pooling adapters will be adjacent to `model`
"model.pooler": "pooler",
"model.score": "score",
# Classifier adapter's classifier layer is renamed to score
"model.classifier": "score",
},
orig_to_new_suffix={
# Replace legacy suffixes used for norms
".gamma": ".weight",
".beta": ".bias",
})
def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
super().__init__(vllm_config=vllm_config, prefix=prefix)
# Some encoder models have the position_ids buffer in the checkpoint
# After creating a pooling model, `pooler` will be duplicated.
# The one inside `model` comes from the Transformers modelling code.
# The one after `model` is an adapter from vLLM.
# We want to use the adapter so we nullify the original pooler.
if getattr(self.model, "pooler", None) is not None:
self.skip_prefixes.append("pooler.")
self.model.pooler = torch.nn.Identity()
# Some encoder models have the position_ids buffer in the checkpoint.
# vLLM will always pass position_ids as an argument, so we skip loading
# the buffer if it exists
self.skip_substrs.append("position_ids")
# Some encoder models have the bias of the final classifier layer
# in the checkpoint. vLLM does not use this bias, so we skip loading
# it if it exists
self.skip_substrs.append("score.bias")
def create_attention_instances(
self, attn_type: AttentionType = AttentionType.DECODER):
# TODO(hmellor): Better way to detect encoder models

View File

@ -987,8 +987,10 @@ def find_process_using_port(port: int) -> Optional[psutil.Process]:
if sys.platform.startswith("darwin"):
return None
our_pid = os.getpid()
for conn in psutil.net_connections():
if conn.laddr.port == port:
if conn.laddr.port == port and (conn.pid is not None
and conn.pid != our_pid):
try:
return psutil.Process(conn.pid)
except psutil.NoSuchProcess:

View File

@ -585,9 +585,10 @@ class FlashInferMetadataBuilder(AttentionMetadataBuilder[FlashInferMetadata]):
kv_data_type=self.kv_cache_dtype,
)
else:
attn_metadata.qo_indptr_gpu = qo_indptr_cpu.to(self.device)
attn_metadata.qo_indptr_gpu = qo_indptr_cpu.to(
self.device, non_blocking=True)
attn_metadata.paged_kv_indptr_gpu = paged_kv_indptr_cpu.to(
self.device)
self.device, non_blocking=True)
if num_decodes > 0:
pure_decode = num_prefills == 0

View File

@ -412,7 +412,8 @@ M = TypeVar("M", bound=MLACommonMetadata)
def use_flashinfer_prefill() -> bool:
# For blackwell default to flashinfer prefill if it's available since
# it is faster than FA2.
return (flashinfer_available and not envs.VLLM_USE_CUDNN_PREFILL
return (not envs.VLLM_DISABLE_FLASHINFER_PREFILL and flashinfer_available
and not envs.VLLM_USE_CUDNN_PREFILL
and current_platform.is_device_capability(100))

View File

@ -2,6 +2,7 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""KV-Cache Utilities."""
import copy
import os
from collections import defaultdict, deque
from collections.abc import Iterable, Sequence
@ -15,7 +16,8 @@ from vllm.utils import GiB_bytes, cdiv, sha256_cbor
from vllm.v1.kv_cache_interface import (ChunkedLocalAttentionSpec,
FullAttentionSpec, KVCacheConfig,
KVCacheGroupSpec, KVCacheSpec,
KVCacheTensor, SlidingWindowSpec)
KVCacheTensor, SlidingWindowSpec,
UniformTypeKVCacheSpecs)
from vllm.v1.metrics.stats import PrefixCacheStats
from vllm.v1.request import Request
@ -750,7 +752,7 @@ def create_kv_cache_group_specs(
return kv_cache_groups
def is_kv_cache_type_uniform(kv_cache_spec: dict[str, KVCacheSpec]) -> bool:
def is_kv_cache_spec_uniform(kv_cache_spec: dict[str, KVCacheSpec]) -> bool:
"""
Whether all layers in the given KVCacheSpec have the same KV cache spec.
Note that we regard FullAttentionSpec with and without sliding window as
@ -793,6 +795,21 @@ def get_max_concurrency_for_kv_cache_config(
return max_concurrency
def may_override_num_blocks(vllm_config: VllmConfig, num_blocks: int) -> int:
"""
Override the number of kv cache blocks if `num_gpu_blocks_override` is set.
"""
if vllm_config.cache_config.num_gpu_blocks_override is not None:
num_gpu_blocks_override = \
vllm_config.cache_config.num_gpu_blocks_override
logger.info(
"Overriding num_gpu_blocks=%d with "
"num_gpu_blocks_override=%d", num_blocks, num_gpu_blocks_override)
num_blocks = num_gpu_blocks_override
return num_blocks
def get_num_blocks(vllm_config: VllmConfig, num_layers: int,
available_memory: int, page_size: int) -> int:
"""
@ -806,13 +823,7 @@ def get_num_blocks(vllm_config: VllmConfig, num_layers: int,
"""
num_blocks = int(available_memory // page_size // num_layers)
num_blocks = max(num_blocks, 0)
if vllm_config.cache_config.num_gpu_blocks_override is not None:
num_gpu_blocks_override = \
vllm_config.cache_config.num_gpu_blocks_override
logger.info(
"Overriding num_gpu_blocks=%d with "
"num_gpu_blocks_override=%d", num_blocks, num_gpu_blocks_override)
num_blocks = num_gpu_blocks_override
num_blocks = may_override_num_blocks(vllm_config, num_blocks)
return num_blocks
@ -825,11 +836,11 @@ def get_uniform_page_size(kv_cache_spec: dict[str, KVCacheSpec]) -> int:
return page_sizes.pop()
def _get_kv_cache_groups_uniform_type(
def _get_kv_cache_groups_uniform_spec(
kv_cache_specs: dict[str, KVCacheSpec]) -> list[KVCacheGroupSpec]:
"""
Generates the KV cache configuration for a model with one type of KV cache.
Divide the available memory equally among all layers.
Generates the KV cache configuration for a model with the same KV cache
spec for all layers.
Args:
kv_cache_specs: The kv cache spec of each attention layer in the model
@ -842,6 +853,22 @@ def _get_kv_cache_groups_uniform_type(
[list(kv_cache_specs.keys())])
def _get_kv_cache_groups_uniform_type(
spec: UniformTypeKVCacheSpecs) -> list[KVCacheGroupSpec]:
"""
Generates the KV cache configuration for a model with one type of KV cache
but different hidden sizes. All layers are merged into one group.
Args:
spec: The UniformTypeKVCacheSpecs of the model
Returns:
The generated KVCacheGroupSpecs
"""
return [KVCacheGroupSpec(list(spec.kv_cache_specs.keys()), spec)]
def is_kv_cache_page_size_uniform(
kv_cache_spec: dict[str, KVCacheSpec]) -> bool:
"""
@ -1000,28 +1027,45 @@ def get_kv_cache_config_from_groups(vllm_config: VllmConfig,
)
# Determine how model runners should initialize the KV cache tensors.
# We will have group_size memory pools, each is shared by one layer from
# each group. As layers of different groups have different block table,
# they will use different parts of the shared Tensor.
# The memory layout for 3 groups (full.0, full.1), (sw.0, sw.2),
# (sw.1, padding) will be: (group_size = 2)
# full.0, sw.0, sw.1: share a Tensor with size=available_memory//2
# full.1, sw.2: share another Tensor with size=available_memory//2
group_size = max(len(group.layer_names) for group in kv_cache_groups)
if len(kv_cache_groups) == 1 and \
isinstance(kv_cache_groups[0].kv_cache_spec, UniformTypeKVCacheSpecs):
# Special case: all layers have the same type of KV cache but with
# different hidden size. Allocate different amount of memory for each
# layer based on its hidden size.
num_blocks = available_memory // kv_cache_groups[
0].kv_cache_spec.page_size_bytes
num_blocks = may_override_num_blocks(vllm_config, num_blocks)
per_layer_specs = kv_cache_groups[0].kv_cache_spec.kv_cache_specs
kv_cache_tensors = [
KVCacheTensor(size=per_layer_specs[layer_name].page_size_bytes *
num_blocks,
shared_by=[layer_name])
for layer_name in kv_cache_groups[0].layer_names
]
else:
# General case:
# We will have group_size memory pools, each is shared by one layer from
# each group. As layers of different groups have different block table,
# they will use different parts of the shared Tensor.
# The memory layout for 3 groups (full.0, full.1), (sw.0, sw.2),
# (sw.1, padding) will be: (group_size = 2)
# full.0, sw.0, sw.1: share a Tensor with size=available_memory//2
# full.1, sw.2: share another Tensor with size=available_memory//2
group_size = max(len(group.layer_names) for group in kv_cache_groups)
page_size = get_uniform_page_size(kv_cache_specs)
assert group_size > 0, "group_size must be greater than 0"
num_blocks = get_num_blocks(vllm_config, group_size, available_memory,
page_size)
per_memory_pool_size = page_size * num_blocks
kv_cache_tensors = []
for i in range(group_size):
shared_by = []
for j in range(len(kv_cache_groups)):
if i < len(kv_cache_groups[j].layer_names):
shared_by.append(kv_cache_groups[j].layer_names[i])
kv_cache_tensors.append(
KVCacheTensor(size=per_memory_pool_size, shared_by=shared_by))
page_size = get_uniform_page_size(kv_cache_specs)
assert group_size > 0, "group_size must be greater than 0"
num_blocks = get_num_blocks(vllm_config, group_size, available_memory,
page_size)
kv_cache_tensors = []
for i in range(group_size):
shared_by = []
for j in range(len(kv_cache_groups)):
if i < len(kv_cache_groups[j].layer_names):
shared_by.append(kv_cache_groups[j].layer_names[i])
kv_cache_tensors.append(
KVCacheTensor(size=page_size * num_blocks,
shared_by=shared_by))
kv_cache_config = KVCacheConfig(
num_blocks=num_blocks,
@ -1059,7 +1103,7 @@ def unify_hybrid_kv_cache_specs(kv_cache_spec: dict[str, KVCacheSpec]):
kv_cache_spec: The kv cache spec of each attention layer in the model
"""
if is_kv_cache_type_uniform(kv_cache_spec):
if is_kv_cache_spec_uniform(kv_cache_spec):
return
logger.warning(
@ -1097,7 +1141,7 @@ def unify_hybrid_kv_cache_specs(kv_cache_spec: dict[str, KVCacheSpec]):
attention_chunk_size=spec.attention_chunk_size,
)
if not is_kv_cache_type_uniform(kv_cache_spec):
if not is_kv_cache_spec_uniform(kv_cache_spec):
raise ValueError("Hybrid KV cache manager is disabled but failed to "
"convert the KV cache specs to one unified type.")
@ -1122,11 +1166,16 @@ def get_kv_cache_groups(
# This returns an empty list to allow for the KVCacheManager to handle
# attention free models.
return []
elif is_kv_cache_type_uniform(kv_cache_spec):
elif is_kv_cache_spec_uniform(kv_cache_spec):
# KV cache of all layers are the same, which is true for
# most models. Allocate the same amount of memory for
# each layer.
return _get_kv_cache_groups_uniform_type(kv_cache_spec)
return _get_kv_cache_groups_uniform_spec(kv_cache_spec)
elif uniform_spec := UniformTypeKVCacheSpecs.from_specs(kv_cache_spec):
# All layers need the same number of token slots (e.g., all layers are
# full attention, or all layers are sliding window attention with the
# same window size). Put all layers into one group.
return _get_kv_cache_groups_uniform_type(uniform_spec)
elif is_kv_cache_page_size_uniform(kv_cache_spec):
# Model contains multiple attention types, but KV cache of all layers
# have the same physical memory per block per layer. Split the layers
@ -1137,6 +1186,27 @@ def get_kv_cache_groups(
raise NotImplementedError
def generate_scheduler_kv_cache_config(
kv_cache_configs: list[KVCacheConfig]) -> KVCacheConfig:
"""
Generate the KV cache configuration for the scheduler.
"""
assert all([
cfg.num_blocks == kv_cache_configs[0].num_blocks
for cfg in kv_cache_configs
])
# All workers have the same kv_cache_config except layer names, so use
# an arbitrary one to initialize the scheduler.
cfg = copy.deepcopy(kv_cache_configs[0])
for group in cfg.kv_cache_groups:
if isinstance(group.kv_cache_spec, UniformTypeKVCacheSpecs):
# All layers in the UniformTypeKVCacheSpecs have the same type,
# so use an arbitrary one to initialize the scheduler.
group.kv_cache_spec = next(
iter(group.kv_cache_spec.kv_cache_specs.values()))
return cfg
def get_kv_cache_configs(vllm_config: VllmConfig,
kv_cache_specs: list[dict[str, KVCacheSpec]],
available_memory: list[int]) -> list[KVCacheConfig]:

View File

@ -2,7 +2,7 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from typing import Optional
from vllm.config import CompilationLevel, CUDAGraphMode, VllmConfig
from vllm.config import CUDAGraphMode, VllmConfig
from vllm.forward_context import BatchDescriptor
from vllm.logger import init_logger
@ -39,11 +39,15 @@ class CudagraphDispatcher:
CUDAGraphMode.FULL: set(),
}
assert not self.cudagraph_mode.requires_piecewise_compilation() or \
(self.compilation_config.level == CompilationLevel.PIECEWISE and
self.compilation_config.splitting_ops_contain_attention()), \
not_use_piecewise_compilation = (
not self.cudagraph_mode.requires_piecewise_compilation())
assert not_use_piecewise_compilation or \
self.compilation_config.is_attention_compiled_piecewise(), \
"Compilation level should be CompilationLevel.PIECEWISE when "\
"cudagraph_mode piecewise cudagraphs is used, "\
"and attention should be in splitting_ops or "\
"inductor splitting should be used. " \
f"cudagraph_mode={self.cudagraph_mode}, "\
f"compilation_level={self.compilation_config.level}, "\
f"splitting_ops={self.compilation_config.splitting_ops}"

View File

@ -29,7 +29,9 @@ from vllm.transformers_utils.config import (
maybe_register_config_serialize_by_value)
from vllm.utils import (decorate_logs, get_hash_fn_by_name, make_zmq_socket,
resolve_obj_by_qualname, set_process_title)
from vllm.v1.core.kv_cache_utils import (BlockHash, get_kv_cache_configs,
from vllm.v1.core.kv_cache_utils import (BlockHash,
generate_scheduler_kv_cache_config,
get_kv_cache_configs,
get_request_block_hasher,
init_none_hash)
from vllm.v1.core.sched.interface import SchedulerInterface
@ -196,16 +198,10 @@ class EngineCore:
kv_cache_configs = get_kv_cache_configs(vllm_config, kv_cache_specs,
available_gpu_memory)
# All workers have the same kv_cache_config except layer names, so use
# an arbitrary one to initialize the scheduler.
assert all([
cfg.num_blocks == kv_cache_configs[0].num_blocks
for cfg in kv_cache_configs
])
num_gpu_blocks = kv_cache_configs[0].num_blocks
scheduler_kv_cache_config = generate_scheduler_kv_cache_config(
kv_cache_configs)
num_gpu_blocks = scheduler_kv_cache_config.num_blocks
num_cpu_blocks = 0
scheduler_kv_cache_config = kv_cache_configs[0]
# Initialize kv cache and warmup the execution
self.model_executor.initialize_from_config(kv_cache_configs)

View File

@ -5,6 +5,7 @@ from collections.abc import Mapping
from copy import copy
from typing import Any, Callable, Optional, Union
import torch.nn as nn
from typing_extensions import TypeVar
import vllm.envs as envs
@ -33,6 +34,7 @@ from vllm.v1.metrics.loggers import (PrometheusStatLogger, StatLoggerBase,
StatLoggerFactory)
from vllm.v1.metrics.reader import Metric, get_metrics_snapshot
from vllm.v1.metrics.stats import IterationStats
from vllm.v1.worker.worker_base import WorkerBase
logger = init_logger(__name__)
@ -319,12 +321,15 @@ class LLMEngine:
return self.engine_core.pin_lora(lora_id)
def collective_rpc(self,
method: Union[str, Callable[..., _R]],
method: Union[str, Callable[[WorkerBase], _R]],
timeout: Optional[float] = None,
args: tuple = (),
kwargs: Optional[dict[str, Any]] = None) -> list[_R]:
return self.engine_core.collective_rpc(method, timeout, args, kwargs)
def apply_model(self, func: Callable[[nn.Module], _R]) -> list[_R]:
return self.collective_rpc("apply_model", args=(func, ))
def __del__(self):
if dp_group := getattr(self, "dp_group", None):
stateless_destroy_torch_distributed_process_group(dp_group)

View File

@ -235,6 +235,76 @@ class CrossAttentionSpec(AttentionSpec):
return cdiv(max_encoder_len, self.block_size) * self.page_size_bytes
@dataclass(frozen=True)
class UniformTypeKVCacheSpecs(KVCacheSpec):
"""
A KV cache spec for multiple layers with the same type of attention. Here,
same types means always need the same number of token slots. For example,
sliding window attentions with different window sizes are not the same type
and should not be merged into one UniformTypeKVCacheSpecs.
"""
kv_cache_specs: dict[str, KVCacheSpec]
@property
def page_size_bytes(self) -> int:
return sum(spec.page_size_bytes
for spec in self.kv_cache_specs.values())
def max_memory_usage_bytes(self, vllm_config: VllmConfig) -> int:
max_num_pages = max(
cdiv(spec.max_memory_usage_bytes(vllm_config),
spec.page_size_bytes)
for spec in self.kv_cache_specs.values())
return max_num_pages * self.page_size_bytes
@classmethod
def is_uniform_type(cls, kv_cache_specs: dict[str, KVCacheSpec]) -> bool:
"""
Whether all layers have the same type of KV cache spec.
"""
block_sizes = set(spec.block_size for spec in kv_cache_specs.values())
if len(block_sizes) > 1:
# Different block sizes, not uniform.
return False
one_spec = next(iter(kv_cache_specs.values()))
if isinstance(one_spec, (FullAttentionSpec, CrossAttentionSpec)):
return all(
isinstance(spec, type(one_spec))
for spec in kv_cache_specs.values())
elif isinstance(one_spec, SlidingWindowSpec):
return all(
isinstance(spec, SlidingWindowSpec)
and spec.sliding_window == one_spec.sliding_window
for spec in kv_cache_specs.values())
elif isinstance(one_spec, ChunkedLocalAttentionSpec):
return all(
isinstance(spec, ChunkedLocalAttentionSpec)
and spec.attention_chunk_size == one_spec.attention_chunk_size
for spec in kv_cache_specs.values())
elif isinstance(one_spec, MambaSpec):
return all(
isinstance(spec, MambaSpec) and spec.num_speculative_blocks ==
one_spec.num_speculative_blocks
for spec in kv_cache_specs.values())
else:
# NOTE(Chen): Please add new branches for new KV cache spec types.
raise NotImplementedError(
f"Unsupported KV cache spec type: {type(one_spec)}")
@classmethod
def from_specs(cls, kv_cache_specs: dict[str,
KVCacheSpec]) -> Optional[Self]:
"""
Return a SameTypeKVCacheSpecs object if all layers have the same type
of KV cache spec. Return None if not.
"""
if cls.is_uniform_type(kv_cache_specs):
block_size = next(iter(kv_cache_specs.values())).block_size
return cls(block_size=block_size, kv_cache_specs=kv_cache_specs)
else:
return None
@dataclass
class KVCacheTensor:
"""

View File

@ -7,7 +7,7 @@ import pickle
from collections.abc import Sequence
from inspect import isclass
from types import FunctionType
from typing import Any, Optional, Union
from typing import Any, Callable, Optional, Union
import cloudpickle
import msgspec
@ -59,6 +59,42 @@ def _typestr(val: Any) -> Optional[tuple[str, str]]:
return t.__module__, t.__qualname__
def _encode_type_info_recursive(obj: Any) -> Any:
"""Recursively encode type information for nested structures of
lists/dicts."""
if obj is None:
return None
if type(obj) is list:
return [_encode_type_info_recursive(item) for item in obj]
if type(obj) is dict:
return {k: _encode_type_info_recursive(v) for k, v in obj.items()}
return _typestr(obj)
def _decode_type_info_recursive(
type_info: Any, data: Any, convert_fn: Callable[[Sequence[str], Any],
Any]) -> Any:
"""Recursively decode type information for nested structures of
lists/dicts."""
if type_info is None:
return data
if isinstance(type_info, dict):
assert isinstance(data, dict)
return {
k: _decode_type_info_recursive(type_info[k], data[k], convert_fn)
for k in type_info
}
if isinstance(type_info, list) and (
# Exclude serialized tensors/numpy arrays.
len(type_info) != 2 or not isinstance(type_info[0], str)):
assert isinstance(data, list)
return [
_decode_type_info_recursive(ti, d, convert_fn)
for ti, d in zip(type_info, data)
]
return convert_fn(type_info, data)
class MsgpackEncoder:
"""Encoder with custom torch tensor and numpy array serialization.
@ -129,12 +165,10 @@ class MsgpackEncoder:
result = obj.result
if not envs.VLLM_ALLOW_INSECURE_SERIALIZATION:
return None, result
# Since utility results are not strongly typed, we also encode
# the type (or a list of types in the case it's a list) to
# help with correct msgspec deserialization.
return _typestr(result) if type(result) is not list else [
_typestr(v) for v in result
], result
# Since utility results are not strongly typed, we recursively
# encode type information for nested structures of lists/dicts
# to help with correct msgspec deserialization.
return _encode_type_info_recursive(result), result
if not envs.VLLM_ALLOW_INSECURE_SERIALIZATION:
raise TypeError(f"Object of type {type(obj)} is not serializable"
@ -288,15 +322,9 @@ class MsgpackDecoder:
if not envs.VLLM_ALLOW_INSECURE_SERIALIZATION:
raise TypeError("VLLM_ALLOW_INSECURE_SERIALIZATION must "
"be set to use custom utility result types")
assert isinstance(result_type, list)
if len(result_type) == 2 and isinstance(result_type[0], str):
result = self._convert_result(result_type, result)
else:
assert isinstance(result, list)
result = [
self._convert_result(rt, r)
for rt, r in zip(result_type, result)
]
# Use recursive decoding to handle nested structures
result = _decode_type_info_recursive(result_type, result,
self._convert_result)
return UtilityResult(result)
def _convert_result(self, result_type: Sequence[str], result: Any) -> Any:

View File

@ -8,7 +8,7 @@ from collections import defaultdict
from collections.abc import Iterator
from contextlib import contextmanager
from copy import deepcopy
from typing import TYPE_CHECKING, Any, Optional, Union, cast
from typing import TYPE_CHECKING, Any, NamedTuple, Optional, Union, cast
import numpy as np
import torch
@ -74,7 +74,8 @@ from vllm.v1.kv_cache_interface import (AttentionSpec,
EncoderOnlyAttentionSpec,
FullAttentionSpec, KVCacheConfig,
KVCacheGroupSpec, KVCacheSpec,
MambaSpec, SlidingWindowSpec)
MambaSpec, SlidingWindowSpec,
UniformTypeKVCacheSpecs)
# yapf: enable
from vllm.v1.outputs import (EMPTY_MODEL_RUNNER_OUTPUT, AsyncModelRunnerOutput,
DraftTokenIds, LogprobsLists, LogprobsTensors,
@ -1187,7 +1188,7 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin):
common_prefix_len = self._compute_cascade_attn_prefix_len(
num_scheduled_tokens,
num_common_prefix_blocks,
kv_cache_group_spec.kv_cache_spec,
attn_group.kv_cache_spec,
builder,
)
@ -1903,7 +1904,7 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin):
**self._init_model_kwargs(num_scheduled_tokens),
**self._extract_mm_kwargs(scheduler_output),
}
elif (self.enable_prompt_embeds and get_pp_group().is_first_rank):
elif self.enable_prompt_embeds and get_pp_group().is_first_rank:
# Get the input embeddings for the tokens that are not input embeds,
# then put them into the appropriate positions.
# TODO(qthequartermasterman): Since even when prompt embeds are
@ -2125,6 +2126,21 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin):
invalid_req_indices,
)
@contextmanager
def synchronize_input_prep(self):
if self.prepare_inputs_event is None:
yield
return
# Ensure prior step has finished with reused CPU tensors.
# This is required in the async scheduling case because
# the CPU->GPU transfer happens async.
self.prepare_inputs_event.synchronize()
try:
yield
finally:
self.prepare_inputs_event.record()
@torch.inference_mode()
def execute_model(
self,
@ -2132,33 +2148,28 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin):
intermediate_tensors: Optional[IntermediateTensors] = None,
) -> Union[ModelRunnerOutput, AsyncModelRunnerOutput, IntermediateTensors]:
with record_function_or_nullcontext("Preprocess"):
self._update_states(scheduler_output)
if not scheduler_output.total_num_scheduled_tokens:
if not has_kv_transfer_group():
# Return empty ModelRunnerOutput if there's no work to do.
return EMPTY_MODEL_RUNNER_OUTPUT
return self.kv_connector_no_forward(scheduler_output,
self.vllm_config)
if self.cache_config.kv_sharing_fast_prefill:
assert not self.input_batch.num_prompt_logprobs, (
"--kv-sharing-fast-prefill produces incorrect logprobs for "
"prompt tokens, tokens, please disable it when the requests"
" need prompt logprobs")
with self.synchronize_input_prep():
# Update persistent batch states.
self._update_states(scheduler_output)
if not scheduler_output.total_num_scheduled_tokens:
if not has_kv_transfer_group():
# Return empty ModelRunnerOutput if no work to do.
return EMPTY_MODEL_RUNNER_OUTPUT
return self.kv_connector_no_forward(
scheduler_output, self.vllm_config)
if self.cache_config.kv_sharing_fast_prefill:
assert not self.input_batch.num_prompt_logprobs, (
"--kv-sharing-fast-prefill produces incorrect "
"logprobs for prompt tokens, tokens, please disable "
"it when the requests need prompt logprobs")
if self.prepare_inputs_event is not None:
# Ensure prior step has finished with reused CPU tensors.
self.prepare_inputs_event.synchronize()
try:
# Prepare the decoder inputs.
(attn_metadata, logits_indices, spec_decode_metadata,
num_scheduled_tokens_np, spec_decode_common_attn_metadata,
max_query_len, ubatch_slices, num_tokens_after_padding
) = self._prepare_inputs(scheduler_output)
finally:
if self.prepare_inputs_event is not None:
self.prepare_inputs_event.record()
(
num_scheduled_tokens,
num_input_tokens,
@ -2592,9 +2603,7 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin):
backend = self.vllm_config.compilation_config.init_backend(
self.vllm_config)
compilation_counter.dynamo_as_is_count += 1
self.model.compile(
fullgraph=envs.VLLM_TEST_DYNAMO_FULLGRAPH_CAPTURE,
backend=backend)
self.model.compile(fullgraph=True, backend=backend)
return
# for other compilation levels, cudagraph behavior is controlled by
# CudagraphWraper and CudagraphDispatcher of vllm.
@ -3445,12 +3454,16 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin):
assert len(self.attn_groups) == 0, \
"Attention backends are already initialized"
def get_attn_backends_for_layers(
layer_names: list[str]
) -> dict[type[AttentionBackend], list[str]]:
layers = get_layers_from_vllm_config(self.vllm_config,
AttentionLayerBase,
layer_names)
class AttentionGroupKey(NamedTuple):
attn_backend: type[AttentionBackend]
kv_cache_spec: KVCacheSpec
def get_attn_backends_for_group(
kv_cache_group_spec: KVCacheGroupSpec,
) -> dict[AttentionGroupKey, list[str]]:
layers = get_layers_from_vllm_config(
self.vllm_config, AttentionLayerBase,
kv_cache_group_spec.layer_names)
attn_backends = {}
attn_backend_layers = defaultdict(list)
# Dedupe based on full class name; this is a bit safer than
@ -3458,7 +3471,7 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin):
# attention backend subclasses (e.g. ChunkedLocalAttention) unless
# they are cached correctly, there will be different objects per
# layer.
for layer_name in layer_names:
for layer_name in kv_cache_group_spec.layer_names:
attn_backend = layers[layer_name].get_attn_backend()
if layer_name in self.kv_sharing_fast_prefill_eligible_layers:
@ -3467,8 +3480,14 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin):
attn_backend,
)
key = attn_backend.full_cls_name()
attn_backends[key] = attn_backend
full_cls_name = attn_backend.full_cls_name()
layer_kv_cache_spec = kv_cache_group_spec.kv_cache_spec
if isinstance(layer_kv_cache_spec, UniformTypeKVCacheSpecs):
layer_kv_cache_spec = layer_kv_cache_spec.kv_cache_specs[
layer_name]
key = (full_cls_name, layer_kv_cache_spec)
attn_backends[key] = AttentionGroupKey(attn_backend,
layer_kv_cache_spec)
attn_backend_layers[key].append(layer_name)
return {
attn_backends[k]: v
@ -3476,11 +3495,11 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin):
}
def create_attn_groups(
attn_backends_map: dict[AttentionBackend, list[str]],
kv_cache_spec: KVCacheSpec,
attn_backends_map: dict[AttentionGroupKey, list[str]],
) -> list[AttentionGroup]:
attn_groups: list[AttentionGroup] = []
for attn_backend, layer_names in attn_backends_map.items():
for (attn_backend,
kv_cache_spec), layer_names in attn_backends_map.items():
attn_metadata_builders = []
attn_metadata_builders.append(attn_backend.get_builder_cls()(
kv_cache_spec,
@ -3498,16 +3517,13 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin):
))
attn_group = AttentionGroup(attn_backend,
attn_metadata_builders,
layer_names)
layer_names, kv_cache_spec)
attn_groups.append(attn_group)
return attn_groups
for kv_cache_group_spec in kv_cache_config.kv_cache_groups:
kv_cache_spec = kv_cache_group_spec.kv_cache_spec
attn_backends = get_attn_backends_for_layers(
kv_cache_group_spec.layer_names)
self.attn_groups.append(
create_attn_groups(attn_backends, kv_cache_spec))
attn_backends = get_attn_backends_for_group(kv_cache_group_spec)
self.attn_groups.append(create_attn_groups(attn_backends))
# Calculate reorder batch threshold (if needed)
self.calculate_reorder_batch_threshold()
@ -3672,14 +3688,11 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin):
def _attn_group_iterator(self) -> Iterator[AttentionGroup]:
return itertools.chain.from_iterable(self.attn_groups)
def _kv_cache_spec_attn_group_iterator(
self) -> Iterator[tuple[KVCacheSpec, AttentionGroup]]:
def _kv_cache_spec_attn_group_iterator(self) -> Iterator[AttentionGroup]:
if not self.kv_cache_config.kv_cache_groups:
return
for kv_cache_spec_id, attn_groups in enumerate(self.attn_groups):
for attn_group in attn_groups:
yield self.kv_cache_config.kv_cache_groups[
kv_cache_spec_id].kv_cache_spec, attn_group
for attn_groups in self.attn_groups:
yield from attn_groups
def _reshape_kv_cache_tensors(
self,
@ -3699,7 +3712,8 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin):
"""
kv_caches: dict[str, torch.Tensor] = {}
has_attn, has_mamba = False, False
for kv_cache_spec, group in self._kv_cache_spec_attn_group_iterator():
for group in self._kv_cache_spec_attn_group_iterator():
kv_cache_spec = group.kv_cache_spec
attn_backend = group.backend
for layer_name in group.layer_names:
if layer_name in self.runner_only_attn_layers:
@ -3779,7 +3793,8 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin):
kv_caches: The KV cache buffer of each layer.
"""
for kv_cache_spec, group in self._kv_cache_spec_attn_group_iterator():
for group in self._kv_cache_spec_attn_group_iterator():
kv_cache_spec = group.kv_cache_spec
for layer_name in group.layer_names:
kv_cache = kv_caches[layer_name]
if (isinstance(kv_cache_spec, AttentionSpec)

View File

@ -686,8 +686,10 @@ class Worker(WorkerBase):
self.model_runner.save_tensorized_model(
tensorizer_config=tensorizer_config, )
# def shutdown(self) -> None:
# self.model_runner.ensure_kv_transfer_shutdown()
def shutdown(self) -> None:
# if runner := getattr(self, "model_runner", None):
# runner.ensure_kv_transfer_shutdown()
pass
def init_worker_distributed_environment(

View File

@ -15,7 +15,7 @@ from vllm.multimodal.registry import MultiModalRegistry
from vllm.platforms import current_platform
from vllm.v1.attention.backends.utils import AttentionMetadataBuilder
from vllm.v1.core.encoder_cache_manager import compute_mm_encoder_budget
from vllm.v1.kv_cache_interface import KVCacheGroupSpec
from vllm.v1.kv_cache_interface import KVCacheGroupSpec, KVCacheSpec
if TYPE_CHECKING:
from vllm.attention.layer import Attention
@ -132,6 +132,7 @@ class AttentionGroup:
backend: type[AttentionBackend]
metadata_builders: list[AttentionMetadataBuilder]
layer_names: list[str]
kv_cache_spec: KVCacheSpec
def get_metadata_builder(self,
ubatch_id: Optional[int] = None

View File

@ -18,7 +18,6 @@ import torch.distributed
import torch.nn as nn
from tqdm.auto import tqdm
import vllm.envs as envs
from vllm.attention import AttentionMetadata, get_attn_backend
from vllm.attention.backends.abstract import AttentionState
from vllm.attention.backends.utils import CommonAttentionState
@ -1099,10 +1098,9 @@ class GPUModelRunnerBase(ModelRunnerBase[TModelInputForGPU]):
backend = self.vllm_config.compilation_config.init_backend(
self.vllm_config)
compilation_counter.dynamo_as_is_count += 1
self.model = torch.compile(
self.model,
fullgraph=envs.VLLM_TEST_DYNAMO_FULLGRAPH_CAPTURE,
backend=backend)
self.model = torch.compile(self.model,
fullgraph=True,
backend=backend)
def get_model(self) -> nn.Module:
return self.model

View File

@ -5,7 +5,8 @@ import dataclasses
import os
import time
from abc import abstractmethod
from typing import Any, Dict, List, Optional, Set, Tuple, Type, Union
from typing import (Any, Callable, Dict, List, Optional, Set, Tuple, Type,
TypeVar, Union)
import cloudpickle
import torch
@ -28,6 +29,8 @@ from vllm.worker.model_runner_base import (BroadcastableModelInput,
logger = init_logger(__name__)
_R = TypeVar("_R")
@warn_for_unimplemented_methods
class WorkerBase:
@ -70,6 +73,10 @@ class WorkerBase:
def get_model(self) -> nn.Module:
raise NotImplementedError
def apply_model(self, fn: Callable[[nn.Module], _R]) -> _R:
"""Apply a function on the model inside this worker."""
return fn(self.get_model())
def load_model(self) -> None:
"""Load model onto target device."""
raise NotImplementedError