diff --git a/vllm/entrypoints/logger.py b/vllm/entrypoints/logger.py index d4655dd5e6ab8..ffe215fb1b7b4 100644 --- a/vllm/entrypoints/logger.py +++ b/vllm/entrypoints/logger.py @@ -39,11 +39,11 @@ class RequestLogger: if prompt_token_ids is not None: prompt_token_ids = prompt_token_ids[:max_log_len] - logger.info( - "Received request %s: prompt: %r, " - "params: %s, prompt_token_ids: %s, " - "prompt_embeds shape: %s, " - "lora_request: %s, prompt_adapter_request: %s.", request_id, - prompt, params, prompt_token_ids, - prompt_embeds.shape if prompt_embeds is not None else None, - lora_request, prompt_adapter_request) + # logger.info( + # "Received request %s: prompt: %r, " + # "params: %s, prompt_token_ids: %s, " + # "prompt_embeds shape: %s, " + # "lora_request: %s, prompt_adapter_request: %s.", request_id, + # prompt, params, prompt_token_ids, + # prompt_embeds.shape if prompt_embeds is not None else None, + # lora_request, prompt_adapter_request) diff --git a/vllm/model_executor/layers/fused_moe/pplx_prepare_finalize.py b/vllm/model_executor/layers/fused_moe/pplx_prepare_finalize.py index 88c007b6c8950..ab1648606897e 100644 --- a/vllm/model_executor/layers/fused_moe/pplx_prepare_finalize.py +++ b/vllm/model_executor/layers/fused_moe/pplx_prepare_finalize.py @@ -125,14 +125,14 @@ class PplxPrepareAndFinalize(mk.FusedMoEPrepareAndFinalize): ubatch_ctx = get_current_ubatch_context() ubatch_id = ubatch_ctx.id if ubatch_ctx is not None else -1 - yield_and_switch_from_compute_to_comm_impl(schedule="default") + # yield_and_switch_from_compute_to_comm_impl(schedule="default") dispatch(True) # Send # torch.cuda.synchronize() # print(f"{ubatch_id} AFTER SEND SYNC", flush=True) dispatch(False) # Recv # torch.cuda.synchronize() # print(f"{ubatch_id} AFTER RECV SYNC", flush=True) - yield_and_switch_from_comm_to_compute_impl(schedule="default") + # yield_and_switch_from_comm_to_compute_impl(schedule="default") # torch.cuda.synchronize() return expert_x, expert_x_scale, expert_num_tokens @@ -173,11 +173,11 @@ class PplxPrepareAndFinalize(mk.FusedMoEPrepareAndFinalize): do_recv=not send, ) - yield_and_switch_from_compute_to_comm_impl(schedule="default") + # yield_and_switch_from_compute_to_comm_impl(schedule="default") combine(True) # torch.cuda.synchronize() # print(f"{ubatch_id} AFTER COMBINE SEND SYNC", flush=True) combine(False) # print(f"{ubatch_id} AFTER COMBINE RECV SYNC", flush=True) - yield_and_switch_from_comm_to_compute_impl(schedule="default") + # yield_and_switch_from_comm_to_compute_impl(schedule="default") torch.cuda.synchronize() diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py index 98e3835e6dbeb..5d2a339237a08 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py @@ -1236,43 +1236,55 @@ class GPUModelRunner(LoRAModelRunnerMixin): dp_size = self.vllm_config.parallel_config.data_parallel_size dp_rank = self.vllm_config.parallel_config.data_parallel_rank + if dp_size == 1: + # Early exit. + return 0, None + first_ubatch_slice = ubatch_slices[0] second_ubatch_slice = ubatch_slices[1] first_ubatch_num_tokens = first_ubatch_slice[1].stop - first_ubatch_slice[1].start second_ubatch_num_tokens = second_ubatch_slice[1].stop - second_ubatch_slice[1].start - max_tokens_per_ubatch = max(first_ubatch_num_tokens, second_ubatch_num_tokens) + max_tokens_per_ubatch_local = first_ubatch_num_tokens + second_ubatch_num_tokens - # For DP: Don't pad when setting enforce_eager. - # This lets us set enforce_eager on the prefiller in a P/D setup and - # still use CUDA graphs (enabled by this padding) on the decoder. - # - # TODO(tms) : There are many cases where padding is enabled for - # prefills, causing unnecessary and excessive padding of activations. - - if dp_size == 1: - # Early exit. - return 0, None + assert abs(first_ubatch_num_tokens - second_ubatch_num_tokens) <= 1 + max_tokens_per_ubatch_local = max(first_ubatch_num_tokens, second_ubatch_num_tokens) + + assert first_ubatch_num_tokens > 0 and second_ubatch_num_tokens > 0 num_tokens_across_dp = DPMetadata.num_tokens_across_dp( - max_tokens_per_ubatch, dp_size, dp_rank) - max_tokens_across_dp_cpu = torch.max(num_tokens_across_dp).item() - num_tokens_after_padding = torch.tensor([max_tokens_across_dp_cpu] * + max_tokens_per_ubatch_local, dp_size, dp_rank) + max_tokens_across_dp = torch.max(num_tokens_across_dp).item() + num_tokens_after_padding = torch.tensor([max_tokens_across_dp] * dp_size, device="cpu", dtype=torch.int32) - num_pad_tokens_first_ubatch = max_tokens_across_dp_cpu - first_ubatch_num_tokens - num_pad_tokens_second_ubatch = max_tokens_across_dp_cpu - second_ubatch_num_tokens + padded_first_ubatch_slice = slice(0, max_tokens_across_dp) + padded_second_ubatch_slice = slice(max_tokens_across_dp, max_tokens_across_dp * 2) - padded_first_ubatch_slice = slice(0, max_tokens_across_dp_cpu) - padded_second_ubatch_slice = slice(max_tokens_across_dp_cpu, 2 * max_tokens_across_dp_cpu) + assert max_tokens_across_dp <= 2 * max_tokens_per_ubatch_local, \ + f"max_tokens_across_dp: {max_tokens_across_dp} max_tokens_per_ubatch{max_tokens_per_ubatch_local}" - ubatch_slices[0] = (ubatch_slices[0][0], padded_first_ubatch_slice) - ubatch_slices[1] = (ubatch_slices[1][0], padded_second_ubatch_slice) + assert padded_first_ubatch_slice.stop - padded_first_ubatch_slice.start == \ + padded_second_ubatch_slice.stop - padded_second_ubatch_slice.start - return num_pad_tokens_first_ubatch + num_pad_tokens_second_ubatch, num_tokens_after_padding + ubatch_slices[0] = (padded_first_ubatch_slice, padded_first_ubatch_slice) + ubatch_slices[1] = (padded_first_ubatch_slice, padded_second_ubatch_slice) + + # Need to assert that none of the padding is on the first ubatch + assert padded_first_ubatch_slice.stop - padded_first_ubatch_slice.start + + # if (num_pad_tokens_first_ubatch > 0): + # print(f"FIRST UBATCH PADDING {num_pad_tokens_first_ubatch} TOTAL: {max_tokens_across_dp_cpu} ORIGINAL{first_ubatch_num_tokens}") + # if (num_pad_tokens_second_ubatch > 0): + # print(f"SECOND UBATCH PADDING {num_pad_tokens_second_ubatch} TOTAL: {max_tokens_across_dp_cpu} ORIGINAL{second_ubatch_num_tokens}") + + num_pad_tokens = (max_tokens_across_dp * 2) - \ + (first_ubatch_num_tokens + second_ubatch_num_tokens) + print(f"num padded tokens: {num_pad_tokens} num tokens tensor: {num_tokens_after_padding} first num_tokens: {first_ubatch_num_tokens} second num tokens {second_ubatch_num_tokens}") + return num_pad_tokens, num_tokens_after_padding def should_ubatch(self, should_ubatch: bool) -> bool: dp_size = self.vllm_config.parallel_config.data_parallel_size @@ -1392,7 +1404,7 @@ class GPUModelRunner(LoRAModelRunnerMixin): def model_inputs(tokens_slice: slice, use_dummy_input: bool) -> tuple: if use_dummy_input: - # print("MAKING DUMMY BATCH") + print("MAKING DUMMY BATCH") # assert num_dummy_tokens == 1 return self._get_dummy_model_inputs(num_dummy_tokens) else: @@ -1451,7 +1463,7 @@ class GPUModelRunner(LoRAModelRunnerMixin): if attn_metadata is not None else None, self.vllm_config, num_tokens=num_tokens, - num_tokens_across_dp=num_tokens_across_dp) + num_tokens_across_dp=num_tokens_across_dp if i == 1 else None) thread = threading.Thread(target=_ubatch_thread, args=( @@ -1479,11 +1491,14 @@ class GPUModelRunner(LoRAModelRunnerMixin): if ubatch_slices is not None: # num_tokens = ubatch_slices[1][1].stop # print(f"RUNNING UBATCH {num_tokens} is_dummy_run: {is_dummy_run} num_tokens_across_dp{num_tokens_across_dp}") + assert not is_dummy_run model_output = _run_ubatches(ubatch_slices, attn_metadata, is_dummy_run, num_tokens_across_dp=num_tokens_across_dp) # run single batch else: # print("RUN NORMAL") + # No padding for the non ubatch case + assert not num_tokens_across_dp model_output = _run( slice(0, num_scheduled_tokens), set_forward_context(attn_metadata,