diff --git a/vllm/model_executor/layers/fused_moe/pplx_prepare_finalize.py b/vllm/model_executor/layers/fused_moe/pplx_prepare_finalize.py index e795c8545773a..cd5ec2f4cbf2d 100644 --- a/vllm/model_executor/layers/fused_moe/pplx_prepare_finalize.py +++ b/vllm/model_executor/layers/fused_moe/pplx_prepare_finalize.py @@ -192,4 +192,4 @@ class PplxPrepareAndFinalize(mk.FusedMoEPrepareAndFinalize): combine(False) # print(f"{ubatch_id} AFTER COMBINE RECV SYNC", flush=True) yield_and_switch_from_comm_to_compute_impl(schedule="default") - torch.cuda.synchronize() + # torch.cuda.synchronize() diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py index 15067ab0d3d8e..311656f364674 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py @@ -1286,10 +1286,34 @@ class GPUModelRunner(LoRAModelRunnerMixin): dtype=torch.int32) return max_tokens_across_dp_cpu - num_tokens, num_tokens_after_padding + def get_padding(self, + num_tokens_unpadded: int) -> tuple[int, Optional[torch.Tensor]]: + + num_tokens_padded = num_tokens_unpadded + + if (self.use_cuda_graph + and num_tokens_unpadded <= self.cudagraph_batch_sizes[-1]): + # Use piecewise CUDA graphs. + # Add padding to the batch size. + num_tokens_padded = self.vllm_config.pad_for_cudagraph(num_tokens_unpadded) + else: + # Eager mode. + # Pad tokens to multiple of tensor_parallel_size when + # enabled collective fusion for SP + tp_size = self.vllm_config.parallel_config.tensor_parallel_size + if self.vllm_config.compilation_config.pass_config. \ + enable_sequence_parallelism and tp_size > 1: + from vllm.utils import round_up + num_tokens_padded = round_up(num_tokens_unpadded, tp_size) + + num_pad_tokens = num_tokens_padded - num_tokens_unpadded + num_dp_pad_tokens, num_tokens_after_padding = self.get_dp_padding(num_tokens_padded) + + return num_dp_pad_tokens + num_pad_tokens, num_tokens_after_padding + def get_dp_padding_ubatch(self, ubatch_slices: UBatchSlices) -> tuple[int, Optional[torch.Tensor]]: dp_size = self.vllm_config.parallel_config.data_parallel_size - dp_rank = self.vllm_config.parallel_config.data_parallel_rank if dp_size == 1: # Early exit. @@ -1300,26 +1324,35 @@ class GPUModelRunner(LoRAModelRunnerMixin): first_ubatch_num_tokens = first_ubatch_slice[1].stop - first_ubatch_slice[1].start second_ubatch_num_tokens = second_ubatch_slice[1].stop - second_ubatch_slice[1].start - - max_tokens_per_ubatch_local = first_ubatch_num_tokens + second_ubatch_num_tokens - + # We don't support prefills yet so the two ubatches should only differ + # by at most one token assert abs(first_ubatch_num_tokens - second_ubatch_num_tokens) <= 1 - max_tokens_per_ubatch_local = max(first_ubatch_num_tokens, second_ubatch_num_tokens) + + from vllm.utils import round_up + + num_tokens_unpadded = first_ubatch_num_tokens + second_ubatch_num_tokens + num_tokens_padded = round_up(num_tokens_unpadded, 2) + if (self.use_cuda_graph + and num_tokens_unpadded <= self.cudagraph_batch_sizes[-1]): + # Use piecewise CUDA graphs. + # Add padding to the batch size. + num_tokens_padded = self.vllm_config.pad_for_cudagraph(num_tokens_unpadded) + else: + # Eager mode. + # Pad tokens to multiple of tensor_parallel_size when + # enabled collective fusion for SP + tp_size = self.vllm_config.parallel_config.tensor_parallel_size + if self.vllm_config.compilation_config.pass_config. \ + enable_sequence_parallelism and tp_size > 1: + num_tokens_padded = round_up(num_tokens_unpadded, tp_size) + + num_tokens_per_ubatch = num_tokens_padded // 2 + + # Note that we compute the number of padded tokens per ubatch + num_pad_tokens, num_tokens_after_padding = self.get_dp_padding(num_tokens_per_ubatch) - assert first_ubatch_num_tokens > 0 and second_ubatch_num_tokens > 0 - - num_tokens_across_dp = DPMetadata.num_tokens_across_dp( - max_tokens_per_ubatch_local, dp_size, dp_rank) - max_tokens_across_dp = torch.max(num_tokens_across_dp).item() - num_tokens_after_padding = torch.tensor([max_tokens_across_dp] * - dp_size, - device="cpu", - dtype=torch.int32) - - # assert max_tokens_across_dp <= 2 * max_tokens_per_ubatch_local, \ - # f"max_tokens_across_dp: {max_tokens_across_dp} max_tokens_per_ubatch{max_tokens_per_ubatch_local}" - num_pad_tokens = (max_tokens_across_dp * 2) - \ - (first_ubatch_num_tokens + second_ubatch_num_tokens) + num_pad_tokens = ((num_pad_tokens + num_tokens_per_ubatch) * 2) - \ + num_tokens_unpadded return num_pad_tokens, num_tokens_after_padding # This doesn't actually pad the ubatch slices. It just shifts the @@ -1352,6 +1385,14 @@ class GPUModelRunner(LoRAModelRunnerMixin): padded_second_ubatch_slice = slice(ubatch_slices[1][1].start, num_total_tokens) ubatch_slices[1] = (ubatch_slices[1][0], padded_second_ubatch_slice) + + # Returns num_padded_tokens. This is just a number that should be added to the + # current number of tokens. It is a sum of the number of padded tokens from DP + # padding along with the number of padded tokens from cudagraph padding. + # The second tensor object is None when DP is disabled. When DP is enabled. + # it contains the number of tokens on each dp rank + def compute_padding(self,) -> tuple[int, Optional[torch.Tensor]]: + return (0, torch.Tensor()) def should_ubatch(self, should_ubatch: bool) -> bool: dp_size = self.vllm_config.parallel_config.data_parallel_size @@ -1395,27 +1436,27 @@ class GPUModelRunner(LoRAModelRunnerMixin): tokens_slice = slice(tokens_slice.start, tokens_slice.start + 1) num_tokens = 1 - if (self.use_cuda_graph - and num_tokens <= self.cudagraph_batch_sizes[-1]): - # Use piecewise CUDA graphs. - # Add padding to the batch size. - tokens_slice = \ - slice(tokens_slice.start, tokens_slice.start+ - self.vllm_config.pad_for_cudagraph(num_tokens)) - else: - # Eager mode. - # Pad tokens to multiple of tensor_parallel_size when - # enabled collective fusion for SP - tp_size = self.vllm_config.parallel_config.tensor_parallel_size - if self.vllm_config.compilation_config.pass_config. \ - enable_sequence_parallelism and tp_size > 1: - from vllm.utils import round_up - tokens_slice = slice( - tokens_slice.start, - tokens_slice.start + round_up(num_tokens, tp_size)) + # if (self.use_cuda_graph + # and num_tokens <= self.cudagraph_batch_sizes[-1]): + # # Use piecewise CUDA graphs. + # # Add padding to the batch size. + # tokens_slice = \ + # slice(tokens_slice.start, tokens_slice.start+ + # self.vllm_config.pad_for_cudagraph(num_tokens)) + # else: + # # Eager mode. + # # Pad tokens to multiple of tensor_parallel_size when + # # enabled collective fusion for SP + # tp_size = self.vllm_config.parallel_config.tensor_parallel_size + # if self.vllm_config.compilation_config.pass_config. \ + # enable_sequence_parallelism and tp_size > 1: + # from vllm.utils import round_up + # tokens_slice = slice( + # tokens_slice.start, + # tokens_slice.start + round_up(num_tokens, tp_size)) # update num tokens for padding - num_tokens = tokens_slice.stop - tokens_slice.start + # num_tokens = tokens_slice.stop - tokens_slice.start # _prepare_inputs may reorder the batch, so we must gather multi # modal outputs after that to ensure the correct order @@ -1566,8 +1607,6 @@ class GPUModelRunner(LoRAModelRunnerMixin): # run single batch else: # print("RUN NORMAL") - # No padding for the non ubatch case - assert num_tokens_across_dp is None model_output = _run( slice(0, num_scheduled_tokens), set_forward_context(attn_metadata, @@ -1599,16 +1638,20 @@ class GPUModelRunner(LoRAModelRunnerMixin): attn_metadata, logits_indices, spec_decode_metadata, ubatch_slices, num_pad_tokens, num_tokens_after_padding = ( self._prepare_inputs(scheduler_output)) num_scheduled_tokens = scheduler_output.total_num_scheduled_tokens + num_input_tokens = num_scheduled_tokens if ubatch_slices and num_pad_tokens > 0: - num_scheduled_tokens += num_pad_tokens - self.pad_out_ubatch_second_stage(ubatch_slices, num_scheduled_tokens) + num_input_tokens += num_pad_tokens + self.pad_out_ubatch_second_stage(ubatch_slices, num_input_tokens) + elif ubatch_slices is None: + num_pad, num_tokens_after_padding = self.get_padding(num_input_tokens) + num_input_tokens += num_pad # Run the decoder. # Use persistent buffers for CUDA graphs. self.maybe_setup_kv_connector(scheduler_output) model_output = self._run_model( attn_metadata=attn_metadata, - num_scheduled_tokens=num_scheduled_tokens, + num_scheduled_tokens=num_input_tokens, ubatch_slices=ubatch_slices, scheduler_output=scheduler_output, num_tokens_across_dp=num_tokens_after_padding @@ -1616,87 +1659,6 @@ class GPUModelRunner(LoRAModelRunnerMixin): self.maybe_wait_for_kv_save() finished_sending, finished_recving = ( self.get_finished_kv_transfers(scheduler_output)) - # if (self.use_cuda_graph - # and num_scheduled_tokens <= self.cudagraph_batch_sizes[-1]): - # # Use piecewise CUDA graphs. - # # Add padding to the batch size. - # num_input_tokens = self.vllm_config.pad_for_cudagraph( - # num_scheduled_tokens) - # else: - # # Eager mode. - # # Pad tokens to multiple of tensor_parallel_size when - # # enabled collective fusion for SP - # tp_size = self.vllm_config.parallel_config.tensor_parallel_size - # if self.vllm_config.compilation_config.pass_config. \ - # enable_sequence_parallelism and tp_size > 1: - # from vllm.utils import round_up - # num_input_tokens = round_up(num_scheduled_tokens, tp_size) - # else: - # num_input_tokens = num_scheduled_tokens - - # # Padding for DP - # num_pad, num_tokens_across_dp = self.get_dp_padding(num_input_tokens) - # num_input_tokens += num_pad - - # # _prepare_inputs may reorder the batch, so we must gather multi - # # modal outputs after that to ensure the correct order - # if self.is_multimodal_model: - # # Run the multimodal encoder if any. - # self._execute_mm_encoder(scheduler_output) - # mm_embeds = self._gather_mm_embeddings(scheduler_output) - # else: - # mm_embeds = [] - - # if self.is_multimodal_model and get_pp_group().is_first_rank: - # # NOTE(woosuk): To unify token ids and soft tokens (vision - # # embeddings), we always use embeddings (rather than token ids) - # # as input to the multimodal model, even when the input is text. - # input_ids = self.input_ids[:num_scheduled_tokens] - # if mm_embeds: - # inputs_embeds = self.model.get_input_embeddings( - # input_ids, mm_embeds) - # else: - # inputs_embeds = self.model.get_input_embeddings(input_ids) - # # TODO(woosuk): Avoid the copy. Optimize. - # self.inputs_embeds[:num_scheduled_tokens].copy_(inputs_embeds) - # inputs_embeds = self.inputs_embeds[:num_input_tokens] - # input_ids = None - # else: - # # For text-only models, we use token ids as input. - # # While it is possible to use embeddings as input just like the - # # multimodal models, it is not desirable for performance since - # # then the embedding layer is not included in the CUDA graph. - # input_ids = self.input_ids[:num_input_tokens] - # inputs_embeds = None - # if self.uses_mrope: - # positions = self.mrope_positions[:, :num_input_tokens] - # else: - # positions = self.positions[:num_input_tokens] - - # if get_pp_group().is_first_rank: - # intermediate_tensors = None - # else: - # intermediate_tensors = self.sync_and_slice_intermediate_tensors( - # num_input_tokens, intermediate_tensors, True) - - # # Run the decoder. - # # Use persistent buffers for CUDA graphs. - # with set_forward_context(attn_metadata, - # self.vllm_config, - # num_tokens=num_input_tokens, - # num_tokens_across_dp=num_tokens_across_dp): - # self.maybe_setup_kv_connector(scheduler_output) - - # model_output = self.model( - # input_ids=input_ids, - # positions=positions, - # intermediate_tensors=intermediate_tensors, - # inputs_embeds=inputs_embeds, - # ) - - # self.maybe_wait_for_kv_save() - # finished_sending, finished_recving = ( - # self.get_finished_kv_transfers(scheduler_output)) if self.use_aux_hidden_state_outputs: hidden_states, aux_hidden_states = model_output @@ -2174,9 +2136,8 @@ class GPUModelRunner(LoRAModelRunnerMixin): ) -> torch.Tensor: # Padding for DP - # num_pad, num_tokens_across_dp = self.get_dp_padding(num_tokens) - # num_tokens += num_pad - num_tokens_across_dp = None + num_pad, num_tokens_across_dp = self.get_dp_padding(num_tokens) + num_tokens += num_pad # Set num_scheduled_tokens based on num_tokens and max_num_seqs # for dummy run with LoRA so that the num_reqs collectively