From 28e7c30b017168f9116e11c0d62c838959205878 Mon Sep 17 00:00:00 2001 From: yewentao256 Date: Mon, 11 Aug 2025 14:06:25 +0000 Subject: [PATCH] Fix pre-commit error Signed-off-by: yewentao256 --- vllm/v1/worker/gpu_model_runner.py | 134 +++++++++++++++-------------- 1 file changed, 68 insertions(+), 66 deletions(-) diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py index baf070ca58144..0eb29bebdd2f9 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py @@ -47,7 +47,7 @@ from vllm.sequence import IntermediateTensors, PoolerOutput from vllm.tasks import GenerationTask, PoolingTask, SupportedTask from vllm.utils import (STR_DTYPE_TO_TORCH_DTYPE, DeviceMemoryProfiler, GiB_bytes, LazyLoader, check_use_alibi, get_dtype_size, - is_pin_memory_available, round_up) + is_pin_memory_available, round_up, supports_dynamo) from vllm.v1.attention.backends.flash_attn import FlashAttentionMetadata from vllm.v1.attention.backends.mamba_selectors import get_mamba_attn_backend from vllm.v1.attention.backends.utils import ( @@ -113,12 +113,14 @@ class UbatchMetadata: intermediate_tensors: Optional[IntermediateTensors] num_tokens: int + @dataclasses.dataclass class CUDAGraphMetaData: cudagraph: torch.cuda.CUDAGraph ubatch_metadata: UbatchMetadata outputs: Optional[Any] = None + class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): def __init__( @@ -251,8 +253,8 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): is_spec_decode=bool(self.vllm_config.speculative_config), ) - can_use_cudagraphs = (self.vllm_config.compilation_config.level - == CompilationLevel.PIECEWISE + can_use_cudagraphs = (self.vllm_config.compilation_config.level + == CompilationLevel.PIECEWISE or self.compilation_config.full_cuda_graph) self.use_cuda_graph = ( can_use_cudagraphs @@ -266,7 +268,7 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): self.cudagraph_batch_sizes = list( reversed(self.compilation_config.cudagraph_capture_sizes)) self.full_cuda_graph = self.compilation_config.full_cuda_graph - self.cudagraphs = {} + self.cudagraphs = {} # type: ignore # Cache the device properties. self._init_device_properties() @@ -362,7 +364,10 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): self.kv_sharing_fast_prefill_logits_indices = torch.zeros( self.max_num_tokens, dtype=torch.int32, device=self.device) - def get_builder(self, index: int, ubatch_id: Optional[int] = None) -> AttentionMetadataBuilder: + def get_builder( + self, + index: int, + ubatch_id: Optional[int] = None) -> AttentionMetadataBuilder: if ubatch_id is None: return self.attn_metadata_builders[index][0] else: @@ -386,8 +391,7 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): if len(self.kv_cache_config.kv_cache_groups) == 0: return - self.get_builder(0).reorder_batch(self.input_batch, - scheduler_output) + self.get_builder(0).reorder_batch(self.input_batch, scheduler_output) # For models with multiple KV cache groups, the groups should agree on # the same order of requests. We ensure this by only allowing the first @@ -966,8 +970,8 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): for ubid, common_attn_metadata in enumerate( common_attn_metadata_list): assert common_attn_metadata.max_query_len == 1 - attn_metadata_i = ( - self.get_builder(kv_cache_group_id, ubatch_id=ubid).build( + attn_metadata_i = (self.get_builder( + kv_cache_group_id, ubatch_id=ubid).build( common_prefix_len=common_prefix_len, common_attn_metadata=common_attn_metadata)) for layer_name in kv_cache_group_spec.layer_names: @@ -996,7 +1000,8 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): ) for layer_name in kv_cache_group_spec.layer_names: - if (self.cache_config.kv_sharing_fast_prefill and layer_name + if (self.cache_config.kv_sharing_fast_prefill + and layer_name in self.kv_sharing_fast_prefill_eligible_layers): attn_metadata[layer_name] = fast_prefill_metadata continue @@ -1554,9 +1559,10 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): num_tokens_padded = num_tokens_unpadded - if (self.use_cuda_graph and not self.parallel_config.enable_microbatching + if (self.use_cuda_graph + and not self.parallel_config.enable_microbatching and num_tokens_unpadded <= self.cudagraph_batch_sizes[-1]): - # if False: + # if False: # Use piecewise CUDA graphs. # Add padding to the batch size. num_tokens_padded = self.vllm_config.pad_for_cudagraph( @@ -1602,7 +1608,8 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): if (self.full_cuda_graph and num_tokens_unpadded <= self.cudagraph_batch_sizes[-1]): # Add padding to the batch size. - num_tokens_padded = self.vllm_config.pad_for_cudagraph(num_tokens_unpadded) + num_tokens_padded = self.vllm_config.pad_for_cudagraph( + num_tokens_unpadded) else: # Eager mode. # Pad tokens to multiple of tensor_parallel_size when @@ -1678,8 +1685,7 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): if get_pp_group().is_first_rank: intermediate_tensors = None else: - assert False - + raise RuntimeError("It is not first rank") return input_ids, positions, inputs_embeds, intermediate_tensors @@ -1730,8 +1736,8 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): tokens_slice, intermediate_tensors, True) return input_ids, positions, inputs_embeds, intermediate_tensors - def model_inputs(self, tokens_slice: slice, - scheduler_output: Optional["SchedulerOutput"], + def model_inputs(self, tokens_slice: slice, + scheduler_output: Optional["SchedulerOutput"], use_dummy_input: bool) -> tuple: if use_dummy_input: # print("MAKING DUMMY BATCH") @@ -1739,10 +1745,11 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): return self._get_dummy_model_inputs(tokens_slice) else: return self._get_model_inputs(tokens_slice, scheduler_output) + def _make_ubatch_metadata(self, ubatch_slices, attn_metadata, compute_stream, num_tokens_across_dp, - skip_cuda_graphs, - scheduler_output, is_dummy_run) -> list[UbatchMetadata]: + skip_cuda_graphs, scheduler_output, + is_dummy_run) -> list[UbatchMetadata]: # Create one forward context per ubatch forward_contexts = [] @@ -1761,7 +1768,7 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): comm_stream=self.comm_stream, compute_stream=compute_stream, forward_contexts=forward_contexts, - device=self.device, + device=self.device, enable_async_comms=self.parallel_config.enable_async_comms) ubatch_metadata: list[UbatchMetadata] = [] @@ -1774,11 +1781,13 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): positions=positions, inputs_embeds=inputs_embeds, intermediate_tensors=intermediate_tensors, - num_tokens=tokens_slice.stop - tokens_slice.start)) + num_tokens=tokens_slice.stop - + tokens_slice.start)) return ubatch_metadata def _capture_ubatches(self, ubatch_metadata, model) -> torch.Tensor: + def _capture_ubatch_thread(results, ubatch_metadata, start_signal): # print(f"Starting Request on ubatch: {ubatch_ctx.id}", flush=True) context = ubatch_metadata.context @@ -1799,7 +1808,8 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): results: list[tuple[int, torch.Tensor]] = [] compute_stream = ubatch_metadata[0].context.compute_stream - num_tokens = ubatch_metadata[0].num_tokens + ubatch_metadata[1].num_tokens + num_tokens = ubatch_metadata[0].num_tokens + ubatch_metadata[ + 1].num_tokens # Ubatches will manually manage the forward context, so we override # it to None here so we can have it restored correctly later @@ -1809,11 +1819,11 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): for metadata in ubatch_metadata: start_signal = threading.Event() thread = threading.Thread(target=_capture_ubatch_thread, - args=( - results, - metadata, - start_signal, - )) + args=( + results, + metadata, + start_signal, + )) ubatch_threads.append(thread) thread.start() start_signals.append(start_signal) @@ -1824,8 +1834,8 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): cudagraph=torch.cuda.CUDAGraph(), ubatch_metadata=ubatch_metadata, ) - with torch.cuda.graph(cudagraph_metadata.cudagraph, - stream=compute_stream): + with torch.cuda.graph(cudagraph_metadata.cudagraph, + stream=compute_stream): # logger.info("STARTING WAKEUP LOOP") for start_signal in start_signals: start_signal.set() @@ -1837,7 +1847,8 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): result = torch.cat(sorted_results, dim=0) cudagraph_metadata.outputs = result # if is_global_first_rank(): - # logger.info(f"IN UBATCH RUNNER: Capturing for {num_tokens} tokens") + # logger.info(f"IN UBATCH RUNNER: " + # f"Capturing for {num_tokens} tokens") self.cudagraphs[num_tokens] = cudagraph_metadata return cudagraph_metadata.outputs @@ -1906,8 +1917,8 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): # if is_global_first_rank(): # logger.info(f"CAPTURING {num_scheduled_tokens}") return self._capture_ubatches(ubatch_metadata, self.model) - elif num_scheduled_tokens in self.cudagraphs and not skip_cuda_graphs: - # assert False + elif num_scheduled_tokens in self.cudagraphs \ + and not skip_cuda_graphs: cudagraph_metadata = self.cudagraphs[num_scheduled_tokens] # if is_global_first_rank(): # logger.info(f"UBATCH REPLAY {num_scheduled_tokens}") @@ -1920,7 +1931,7 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): # run normal batch else: input_ids, positions, inputs_embeds, intermediate_tensors = \ - self.model_inputs(slice(0, num_scheduled_tokens), + self.model_inputs(slice(0, num_scheduled_tokens), scheduler_output, is_dummy_run) # if is_global_first_rank(): # logger.info(f"RUNNING FULL BATCH {num_scheduled_tokens}") @@ -2017,7 +2028,6 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): # If attention doesn't support CUDA Graphs for this batch, but we # compiled with full CUDA graphs, we have to skip them entirely. skip_cuda_graphs = self.full_cuda_graph and not attention_cuda_graphs - # print(f"SKIPPING CUDA GRAPHS: {skip_cuda_graphs} {self.full_cuda_graph}") # Run the model. # Use persistent buffers for CUDA graphs. @@ -2629,14 +2639,12 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): is_profile: bool = False, ) -> tuple[torch.Tensor, torch.Tensor]: - # if allow_microbatching: # logger.info("ATTEMPTING TO UBATCH THE DUMMY RUN") - - # TODO(Sage) We need some more code to properly handle + # TODO(Sage) We need some more code to properly handle # mixing normal and dummy runs. The DP padding needs to - # be properly setup. Since we only support microbatching + # be properly setup. Since we only support microbatching # in cuda graph capture it's fine to ignore the DP padding # for now. ubatch_enabled = self.parallel_config.enable_microbatching @@ -2646,7 +2654,7 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): self.parallel_config.microbatching_token_threshold and \ allow_microbatching and capture_attn_cudagraph should_ubatch = self.should_ubatch(should_ubatch) - # _dummy_run doesn't go through _prepare_inputs so + # _dummy_run doesn't go through _prepare_inputs so # we synchronize with other DP ranks here # logger.info(f"NUM TOKENS {num_tokens} SHOULD UBATCH {should_ubatch}") # Padding for DP @@ -2671,25 +2679,24 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): dtype=np.int32) ubatch_slices = None - # We currently only microbatch if the number of tokens is - # over a certain threshold. + # We currently only microbatch if the number of tokens is + # over a certain threshold. # logger.info("PADDING DUMMY DONE") if should_ubatch: # We only support decode-only cudagraphs assert num_reqs == num_tokens assert num_tokens % 2 == 0 num_tokens_per_ubatch = num_tokens // 2 - num_tokens_across_dp = torch.tensor([num_tokens_per_ubatch] * 2, - device="cpu", + num_tokens_across_dp = torch.tensor([num_tokens_per_ubatch] * 2, + device="cpu", dtype=torch.int32) - ubatch_slices = [(slice(0, num_reqs // 2), - slice(0, num_tokens // 2)), - (slice(num_reqs // 2, num_reqs), + ubatch_slices = [(slice(0, + num_reqs // 2), slice(0, num_tokens // 2)), + (slice(num_reqs // 2, num_reqs), slice(num_tokens // 2, num_tokens))] - # attn_metadata: Optional[dict[str, Any]] = None - attn_metadata: Optional[PerLayerAttnMetadata]= None + attn_metadata: Optional[PerLayerAttnMetadata] = None if capture_attn_cudagraph: attn_metadata = {} if ubatch_slices is not None: @@ -2704,7 +2711,7 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): max_query_len = num_tokens if ubatch_slices is not None: max_query_len = 1 - for kv_cache_group_id, kv_cache_group_spec in enumerate( + for kv_cache_group_id, kv_cache_group_spec in enumerate( self.kv_cache_config.kv_cache_groups): common_attn_metadata = CommonAttentionMetadata( query_start_loc=self.query_start_loc[:num_reqs + 1], @@ -2723,19 +2730,15 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): block_table[kv_cache_group_id].slot_mapping[:num_tokens], causal=True) - if ubatch_slices is not None: common_attn_metadata_list = split_attn_metadata( - ubatch_slices, - common_attn_metadata - ) - for ubid, common_attn_metadata in enumerate(common_attn_metadata_list): - attn_metadata_i = ( - self.get_builder(kv_cache_group_id, ubatch_id=ubid). - build( + ubatch_slices, common_attn_metadata) + for ubid, common_attn_metadata in enumerate( + common_attn_metadata_list): + attn_metadata_i = (self.get_builder( + kv_cache_group_id, ubatch_id=ubid).build( common_prefix_len=0, - common_attn_metadata=common_attn_metadata - )) + common_attn_metadata=common_attn_metadata)) for layer_name in kv_cache_group_spec.layer_names: assert type(attn_metadata) is list attn_metadata[ubid][layer_name] = attn_metadata_i @@ -2744,8 +2747,8 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): kv_cache_group_id).build_for_cudagraph_capture( common_attn_metadata) for layer_name in kv_cache_group_spec.layer_names: - attn_metadata[layer_name] = attn_metadata_i - + attn_metadata[ + layer_name] = attn_metadata_i # type: ignore with self.maybe_dummy_run_with_lora(self.lora_config, num_scheduled_tokens): @@ -2755,8 +2758,7 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): ubatch_slices=ubatch_slices, is_dummy_run=True, num_tokens_across_dp=num_tokens_across_dp, - build_cuda_graph=build_cuda_graph - ) + build_cuda_graph=build_cuda_graph) if self.use_aux_hidden_state_outputs: hidden_states, _ = outputs else: @@ -3051,7 +3053,8 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): for num_tokens in compilation_cases: # We skip EPLB here since we don't want to record dummy metrics # if is_global_first_rank(): - # logger.info(f"CAPTURE SIZE {num_tokens} WARMING UP {self.compilation_config.cudagraph_num_of_warmups}") + # logger.info(f"CAPTURE SIZE {num_tokens} WARMING UP " + # f"{self.compilation_config.cudagraph_num_of_warmups}") for _ in range( self.compilation_config.cudagraph_num_of_warmups): self._dummy_run(num_tokens, @@ -3120,7 +3123,6 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): ) builders.append(attn_metadata_builder_2) - if (self.full_cuda_graph and not attn_metadata_builder_i.full_cudagraph_supported): raise ValueError( @@ -3552,7 +3554,7 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): causal=False, ) - return common_metadata, builder.build( + return common_metadata, builder.build( # type: ignore common_prefix_len=0, # No cascade for encoder common_attn_metadata=common_metadata, )