Merge branch 'main' into pynccl_symm_fix

This commit is contained in:
Amir Samani 2025-11-25 15:15:19 -08:00 committed by GitHub
commit aeac905e6f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 64 additions and 48 deletions

View File

@ -133,7 +133,7 @@ def main(args):
tensor_parallel_size=args.tp, tensor_parallel_size=args.tp,
enable_chunked_prefill=args.enable_chunked_prefill, enable_chunked_prefill=args.enable_chunked_prefill,
enforce_eager=args.enforce_eager, enforce_eager=args.enforce_eager,
gpu_memory_utilization=0.8, gpu_memory_utilization=0.9,
speculative_config=speculative_config, speculative_config=speculative_config,
disable_log_stats=False, disable_log_stats=False,
max_model_len=args.max_model_len, max_model_len=args.max_model_len,

View File

@ -87,6 +87,11 @@ def test_with_spec_decoding(monkeypatch: pytest.MonkeyPatch):
# Set small draft model len to force doesn't-fit-in-drafter case. # Set small draft model len to force doesn't-fit-in-drafter case.
spec_config_short = spec_config | {"max_model_len": 50} spec_config_short = spec_config | {"max_model_len": 50}
test_sampling_params = [
dict(),
dict(logprobs=2),
]
# test_preemption, executor, async_scheduling, # test_preemption, executor, async_scheduling,
# spec_config, test_prefill_chunking # spec_config, test_prefill_chunking
test_configs = [ test_configs = [
@ -103,7 +108,7 @@ def test_with_spec_decoding(monkeypatch: pytest.MonkeyPatch):
(True, "uni", True, spec_config_short, True), (True, "uni", True, spec_config_short, True),
] ]
run_tests(monkeypatch, MTP_MODEL, test_configs, [{}]) run_tests(monkeypatch, MTP_MODEL, test_configs, test_sampling_params)
@dynamo_config.patch(cache_size_limit=16) @dynamo_config.patch(cache_size_limit=16)

View File

@ -11,6 +11,7 @@ import pprint
import time import time
from collections.abc import Callable, Sequence from collections.abc import Callable, Sequence
from contextlib import contextmanager from contextlib import contextmanager
from copy import deepcopy
from functools import partial from functools import partial
from typing import Any from typing import Any
@ -429,7 +430,7 @@ class PiecewiseCompileInterpreter(torch.fx.Interpreter):
self.vllm_backend.compiler_manager.compile( self.vllm_backend.compiler_manager.compile(
submod, submod,
args, args,
self.compilation_config.inductor_compile_config, self.vllm_backend.inductor_config,
self.compilation_config, self.compilation_config,
graph_index=index, graph_index=index,
num_graphs=len(self.compile_submod_names), num_graphs=len(self.compile_submod_names),
@ -531,6 +532,9 @@ class VllmBackend:
sym_tensor_indices: list[int] sym_tensor_indices: list[int]
input_buffers: list[torch.Tensor] input_buffers: list[torch.Tensor]
compiler_manager: CompilerManager compiler_manager: CompilerManager
# Copy of CompilationConfig.inductor_compile_config +
# an entry for PostGradPassManager
inductor_config: dict[str, Any]
def __init__( def __init__(
self, self,
@ -561,25 +565,30 @@ class VllmBackend:
self.compilation_config self.compilation_config
) )
# Deepcopy the inductor config to detach the post-grad custom pass
# from CompilationConfig.
# We want to avoid PostGradPassManager in CompilationConfig because
# in future we need PostGradPassManager.uuid() to be executed
# only at compile time.
self.inductor_config = deepcopy(self.compilation_config.inductor_compile_config)
# `torch.compile` is JIT compiled, so we don't need to # `torch.compile` is JIT compiled, so we don't need to
# do anything here # do anything here
def configure_post_pass(self): def configure_post_pass(self):
config = self.compilation_config
self.pass_manager.configure(self.vllm_config) self.pass_manager.configure(self.vllm_config)
# Post-grad custom passes are run using the post_grad_custom_post_pass # Post-grad custom passes are run using the post_grad_custom_post_pass
# hook. If a pass for that hook exists, add it to the pass manager. # hook. If a pass for that hook exists, add it to the pass manager.
inductor_config = config.inductor_compile_config if self.pass_key in self.inductor_config:
if self.pass_key in inductor_config: if isinstance(self.inductor_config[self.pass_key], PostGradPassManager):
if isinstance(inductor_config[self.pass_key], PostGradPassManager): raise ValueError(
# PassManager already added to config, make sure it's correct "PostGradPassManager can not be kept in CompilationConfig."
assert inductor_config[self.pass_key].uuid() == self.pass_manager.uuid() )
else: else:
# Config should automatically wrap all inductor passes # Config should automatically wrap all inductor passes
assert isinstance(inductor_config[self.pass_key], InductorPass) assert isinstance(self.inductor_config[self.pass_key], InductorPass)
self.pass_manager.add(inductor_config[self.pass_key]) self.pass_manager.add(self.inductor_config[self.pass_key])
inductor_config[self.pass_key] = self.pass_manager self.inductor_config[self.pass_key] = self.pass_manager
def __call__( def __call__(
self, graph: fx.GraphModule, example_inputs self, graph: fx.GraphModule, example_inputs
@ -638,9 +647,7 @@ class VllmBackend:
self.compilation_config.local_cache_dir = local_cache_dir self.compilation_config.local_cache_dir = local_cache_dir
# Honors opt-outs such as CompilationMode.NONE or VLLM_DISABLE_COMPILE_CACHE. # Honors opt-outs such as CompilationMode.NONE or VLLM_DISABLE_COMPILE_CACHE.
disable_cache = not is_compile_cache_enabled( disable_cache = not is_compile_cache_enabled(self.inductor_config)
self.compilation_config.inductor_compile_config
)
if disable_cache: if disable_cache:
logger.info_once("vLLM's torch.compile cache is disabled.", scope="local") logger.info_once("vLLM's torch.compile cache is disabled.", scope="local")

View File

@ -107,7 +107,7 @@ class PiecewiseBackend:
entry.runnable = self.vllm_backend.compiler_manager.compile( entry.runnable = self.vllm_backend.compiler_manager.compile(
self.graph, self.graph,
args, args,
self.compilation_config.inductor_compile_config, self.vllm_backend.inductor_config,
self.compilation_config, self.compilation_config,
graph_index=self.piecewise_compile_index, graph_index=self.piecewise_compile_index,
num_graphs=self.total_piecewise_compiles, num_graphs=self.total_piecewise_compiles,

View File

@ -167,8 +167,6 @@ class CacheConfig:
"num_gpu_blocks_override", "num_gpu_blocks_override",
"enable_prefix_caching", "enable_prefix_caching",
"prefix_caching_hash_algo", "prefix_caching_hash_algo",
# `cpu_offload_gb` does not use `torch.compile` yet.
"cpu_offload_gb",
"cpu_kvcache_space_bytes", "cpu_kvcache_space_bytes",
"mamba_page_size_padded", "mamba_page_size_padded",
# Post-init/derived counters # Post-init/derived counters

View File

@ -345,7 +345,6 @@ class ModelConfig:
"logprobs_mode", "logprobs_mode",
"disable_cascade_attn", "disable_cascade_attn",
"skip_tokenizer_init", "skip_tokenizer_init",
"enable_prompt_embeds",
"served_model_name", "served_model_name",
"config_format", "config_format",
"hf_token", "hf_token",

View File

@ -196,9 +196,10 @@ class Mxfp4Config(QuantizationConfig):
# TODO: Add support for MXFP4 Linear Method. # TODO: Add support for MXFP4 Linear Method.
# MXFP4 LinearMethod is available in AMD-Quark, refer to that implementation # MXFP4 LinearMethod is available in AMD-Quark, refer to that implementation
# if you are interested in enabling MXFP4 here. # if you are interested in enabling MXFP4 here.
logger.warning_once( logger.debug_once(
"MXFP4 linear layer is not implemented - falling back to " "MXFP4 linear layer is not implemented - falling back to "
"UnquantizedLinearMethod." "UnquantizedLinearMethod.",
scope="local",
) )
return UnquantizedLinearMethod() return UnquantizedLinearMethod()
elif isinstance(layer, FusedMoE): elif isinstance(layer, FusedMoE):
@ -208,9 +209,10 @@ class Mxfp4Config(QuantizationConfig):
return Mxfp4MoEMethod(layer.moe_config) return Mxfp4MoEMethod(layer.moe_config)
elif isinstance(layer, Attention): elif isinstance(layer, Attention):
# TODO: Add support for MXFP4 Attention. # TODO: Add support for MXFP4 Attention.
logger.warning_once( logger.debug_once(
"MXFP4 attention layer is not implemented. " "MXFP4 attention layer is not implemented. "
"Skipping quantization for this layer." "Skipping quantization for this layer.",
scope="local",
) )
return None return None

View File

@ -1089,8 +1089,6 @@ class Scheduler(SchedulerInterface):
and request.sampling_params.logprobs is not None and request.sampling_params.logprobs is not None
and logprobs and logprobs
): ):
# NOTE: once we support N tokens per step (spec decode),
# the outer lists can be of length > 1.
new_logprobs = logprobs.slice(req_index, req_index + 1) new_logprobs = logprobs.slice(req_index, req_index + 1)
if new_token_ids and self.structured_output_manager.should_advance(request): if new_token_ids and self.structured_output_manager.should_advance(request):

View File

@ -1,6 +1,7 @@
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project # SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from collections.abc import Sequence
from dataclasses import replace from dataclasses import replace
import torch import torch
@ -204,7 +205,9 @@ class RejectionSampler(nn.Module):
def parse_output( def parse_output(
output_token_ids: torch.Tensor, output_token_ids: torch.Tensor,
vocab_size: int, vocab_size: int,
) -> list[list[int]]: discard_req_indices: Sequence[int] = (),
return_cu_num_tokens: bool = False,
) -> tuple[list[list[int]], list[int] | None]:
"""Parse the output of the rejection sampler. """Parse the output of the rejection sampler.
Args: Args:
output_token_ids: The sampled token IDs in shape output_token_ids: The sampled token IDs in shape
@ -212,6 +215,8 @@ class RejectionSampler(nn.Module):
replaced with `PLACEHOLDER_TOKEN_ID` by the rejection sampler replaced with `PLACEHOLDER_TOKEN_ID` by the rejection sampler
and will be filtered out in this function. and will be filtered out in this function.
vocab_size: The size of the vocabulary. vocab_size: The size of the vocabulary.
discard_req_indices: Optional row indices to discard tokens in.
return_cu_num_tokens: Whether to also return cumulative token counts.
Returns: Returns:
A list of lists of token IDs. A list of lists of token IDs.
""" """
@ -220,10 +225,15 @@ class RejectionSampler(nn.Module):
valid_mask = (output_token_ids_np != PLACEHOLDER_TOKEN_ID) & ( valid_mask = (output_token_ids_np != PLACEHOLDER_TOKEN_ID) & (
output_token_ids_np < vocab_size output_token_ids_np < vocab_size
) )
cu_num_tokens = None
if return_cu_num_tokens:
cu_num_tokens = [0] + valid_mask.sum(axis=1).cumsum().tolist()
if len(discard_req_indices) > 0:
valid_mask[discard_req_indices] = False
outputs = [ outputs = [
row[valid_mask[i]].tolist() for i, row in enumerate(output_token_ids_np) row[valid_mask[i]].tolist() for i, row in enumerate(output_token_ids_np)
] ]
return outputs return outputs, cu_num_tokens
def apply_logits_processors( def apply_logits_processors(
self, self,

View File

@ -183,7 +183,7 @@ class AsyncGPUModelRunnerOutput(AsyncModelRunnerOutput):
self, self,
model_runner_output: ModelRunnerOutput, model_runner_output: ModelRunnerOutput,
sampled_token_ids: torch.Tensor, sampled_token_ids: torch.Tensor,
logprobs_tensors: torch.Tensor | None, logprobs_tensors: LogprobsTensors | None,
invalid_req_indices: list[int], invalid_req_indices: list[int],
async_output_copy_stream: torch.cuda.Stream, async_output_copy_stream: torch.cuda.Stream,
vocab_size: int, vocab_size: int,
@ -219,28 +219,29 @@ class AsyncGPUModelRunnerOutput(AsyncModelRunnerOutput):
This function blocks until the copy is finished. This function blocks until the copy is finished.
""" """
max_gen_len = self.sampled_token_ids_cpu.shape[-1]
self.async_copy_ready_event.synchronize() self.async_copy_ready_event.synchronize()
# Release the device tensors once the copy has completed. # Release the device tensors once the copy has completed.
del self._logprobs_tensors del self._logprobs_tensors
del self._sampled_token_ids del self._sampled_token_ids
max_gen_len = self.sampled_token_ids_cpu.shape[-1]
if max_gen_len == 1: if max_gen_len == 1:
valid_sampled_token_ids = self.sampled_token_ids_cpu.tolist() valid_sampled_token_ids = self.sampled_token_ids_cpu.tolist()
for i in self._invalid_req_indices:
valid_sampled_token_ids[i].clear()
cu_num_tokens = None
else: else:
valid_sampled_token_ids = RejectionSampler.parse_output( valid_sampled_token_ids, cu_num_tokens = RejectionSampler.parse_output(
self.sampled_token_ids_cpu, self.sampled_token_ids_cpu,
self.vocab_size, self.vocab_size,
self._invalid_req_indices,
return_cu_num_tokens=self._logprobs_tensors_cpu is not None,
) )
for i in self._invalid_req_indices:
valid_sampled_token_ids[i].clear()
output = self._model_runner_output output = self._model_runner_output
output.sampled_token_ids = valid_sampled_token_ids output.sampled_token_ids = valid_sampled_token_ids
if self._logprobs_tensors_cpu: if self._logprobs_tensors_cpu:
# NOTE(nick): this will need to be updated to use cu_num_accepted_tokens output.logprobs = self._logprobs_tensors_cpu.tolists(cu_num_tokens)
# for async sched + spec decode + logprobs compatibility.
output.logprobs = self._logprobs_tensors_cpu.tolists()
return output return output
@ -2597,28 +2598,24 @@ class GPUModelRunner(
sampled_token_ids = sampler_output.sampled_token_ids sampled_token_ids = sampler_output.sampled_token_ids
logprobs_tensors = sampler_output.logprobs_tensors logprobs_tensors = sampler_output.logprobs_tensors
invalid_req_indices = [] invalid_req_indices = []
cu_num_new_tokens: list[int] | None = None cu_num_tokens: list[int] | None = None
if not self.use_async_scheduling: if not self.use_async_scheduling:
# Get the valid generated tokens. # Get the valid generated tokens.
max_gen_len = sampled_token_ids.shape[-1] max_gen_len = sampled_token_ids.shape[-1]
if max_gen_len == 1: if max_gen_len == 1:
# No spec decode tokens. # No spec decode tokens.
valid_sampled_token_ids = self._to_list(sampled_token_ids) valid_sampled_token_ids = self._to_list(sampled_token_ids)
# Mask out the sampled tokens that should not be sampled.
for i in discard_sampled_tokens_req_indices:
valid_sampled_token_ids[int(i)].clear()
else: else:
# Includes spec decode tokens. # Includes spec decode tokens.
valid_sampled_token_ids = self.rejection_sampler.parse_output( valid_sampled_token_ids, cu_num_tokens = RejectionSampler.parse_output(
sampled_token_ids, sampled_token_ids,
self.input_batch.vocab_size, self.input_batch.vocab_size,
discard_sampled_tokens_req_indices,
return_cu_num_tokens=logprobs_tensors is not None,
) )
if logprobs_tensors:
# Needed for extracting logprobs when spec decoding.
# This must be done prior to discarding sampled tokens.
cu_num_new_tokens = [0]
for toks in valid_sampled_token_ids:
cu_num_new_tokens.append(cu_num_new_tokens[-1] + len(toks))
# Mask out the sampled tokens that should not be sampled.
for i in discard_sampled_tokens_req_indices:
valid_sampled_token_ids[int(i)].clear()
else: else:
valid_sampled_token_ids = [] valid_sampled_token_ids = []
invalid_req_indices = discard_sampled_tokens_req_indices.tolist() invalid_req_indices = discard_sampled_tokens_req_indices.tolist()
@ -2672,7 +2669,7 @@ class GPUModelRunner(
req_state.output_token_ids.extend(sampled_ids) req_state.output_token_ids.extend(sampled_ids)
logprobs_lists = ( logprobs_lists = (
logprobs_tensors.tolists(cu_num_new_tokens) logprobs_tensors.tolists(cu_num_tokens)
if not self.use_async_scheduling and logprobs_tensors is not None if not self.use_async_scheduling and logprobs_tensors is not None
else None else None
) )