more hacking

Signed-off-by: Sage Moore <sage@neuralmagic.com>
This commit is contained in:
Sage Moore 2025-06-12 20:36:13 +00:00
parent d682f5e1bd
commit b74c731342
3 changed files with 51 additions and 20 deletions

View File

@ -44,9 +44,9 @@ class DPMetadata:
device="cpu",
dtype=torch.int32)
from vllm.distributed.parallel_state import get_dp_group
# print("STARTING AR num_tokens_across_dp")
# logger.info("STARTING AR num_tokens_across_dp")
dist.all_reduce(num_tokens_tensor, group=get_dp_group().cpu_group)
# print("finishing num_tokens_across_dp")
# logger.info("finishing num_tokens_across_dp")
return num_tokens_tensor
@staticmethod
@ -57,7 +57,14 @@ class DPMetadata:
device="cpu",
dtype=torch.int32)
from vllm.distributed.parallel_state import get_dp_group
# logger.info(f"should_ubatch_tensor before ar {should_ubatch_tensor}")
dist.all_reduce(should_ubatch_tensor, group=get_dp_group().cpu_group)
# logger.info(f"should_ubatch_tensor after ar {should_ubatch_tensor}")
# If there's an incorrect ordering of ARs across DP ranks, this tensor
# can end up containing the number of padded tokens for a DP rank
assert torch.all(should_ubatch_tensor <= 1)
result: bool = bool(torch.all(should_ubatch_tensor == 1).item())
# print(f"FINISHING AR should_ubatch_across_dp {result} {should_ubatch_tensor}")
return result

View File

@ -134,14 +134,14 @@ class PplxPrepareAndFinalize(mk.FusedMoEPrepareAndFinalize):
do_recv=not send,
)
yield_and_switch_from_compute_to_comm_impl(schedule="default")
# yield_and_switch_from_compute_to_comm_impl(schedule="default")
dispatch(True) # Send
# torch.cuda.synchronize()
# print(f"{ubatch_id} AFTER SEND SYNC", flush=True)
dispatch(False) # Recv
# torch.cuda.synchronize()
# print(f"{ubatch_id} AFTER RECV SYNC", flush=True)
yield_and_switch_from_comm_to_compute_impl(schedule="default")
# yield_and_switch_from_comm_to_compute_impl(schedule="default")
# torch.cuda.synchronize()
if expert_x_scale is not None:
expert_x_scale = expert_x_scale[:, :, 0:1]
@ -185,11 +185,11 @@ class PplxPrepareAndFinalize(mk.FusedMoEPrepareAndFinalize):
do_recv=not send,
)
yield_and_switch_from_compute_to_comm_impl(schedule="default")
# yield_and_switch_from_compute_to_comm_impl(schedule="default")
combine(True)
# torch.cuda.synchronize()
# print(f"{ubatch_id} AFTER COMBINE SEND SYNC", flush=True)
combine(False)
# print(f"{ubatch_id} AFTER COMBINE RECV SYNC", flush=True)
yield_and_switch_from_comm_to_compute_impl(schedule="default")
# yield_and_switch_from_comm_to_compute_impl(schedule="default")
# torch.cuda.synchronize()

View File

@ -686,34 +686,48 @@ class GPUModelRunner(LoRAModelRunnerMixin):
self.query_start_loc_np, max_num_scheduled_tokens,
scheduler_output)
should_ubatch = self.should_ubatch(True if ubatch_slices else False)
if should_ubatch:
assert ubatch_slices
# Don't attempt to microbatch unless every other DP worker is also microbatching
if not should_ubatch and ubatch_slices:
if not should_ubatch:
ubatch_slices = None
num_pad_tokens = 0
num_tokens_after_padding = None
ubatch_bailout = False
if ubatch_slices:
# logger.info(f"ATTEMPTING TO PAD UBATCH {should_ubatch}")
assert should_ubatch
num_pad_tokens, num_tokens_after_padding = self.get_dp_padding_ubatch(ubatch_slices)
# logger.info("UBATCH PADDING DONE")
if num_pad_tokens > 0:
if num_pad_tokens < scheduler_output.total_num_scheduled_tokens:
self.pad_out_ubatch_first_stage(ubatch_slices, num_pad_tokens)
else:
assert False
# We bail out of ubatching here. This accounts for the case where
# the padding would result in an "empty" second ubatch.
# TODO: just make the second ubatch a dummy ubatch
ubatch_slices = None
logger.info("FALLING BACK AND DISABLING UBATCHING")
ubatch_bailout = True
# Note that if we are attempting to ubatch by this point then we know that no
# DP ranks are doing dummy runs
# if ubatch_slices:
# should_ubatch = self.should_ubatch(False if ubatch_bailout else True)
# if not should_ubatch:
# logger.info("SUCCESSFULLY BAILED OUT")
# num_pad_tokens = 0
# num_tokens_after_padding = None
# ubatch_slices = None
# This AR is only necessary in the case described above where
# the second ubatch ends up being empty. NOte if you delete this go delete
# the second should_ubatch call in _dummy_run
should_ubatch = self.should_ubatch(True if ubatch_slices else False)
if not should_ubatch:
num_pad_tokens = 0
num_tokens_after_padding = None
ubatch_slices = None
# should_ubatch = self.should_ubatch(True if ubatch_slices else False)
# if not should_ubatch:
# num_pad_tokens = 0
# num_tokens_after_padding = None
# ubatch_slices = None
@ -1643,9 +1657,12 @@ class GPUModelRunner(LoRAModelRunnerMixin):
num_input_tokens += num_pad_tokens
self.pad_out_ubatch_second_stage(ubatch_slices, num_input_tokens)
elif ubatch_slices is None:
# logger.info("ATTEMPTING TO PAD NORMAL BATCH")
num_pad, num_tokens_after_padding = self.get_padding(num_input_tokens)
# logger.info("NORMAL BATCH DONE")
num_input_tokens += num_pad
# logger.info("RUNNING MODEL")
# Run the decoder.
# Use persistent buffers for CUDA graphs.
self.maybe_setup_kv_connector(scheduler_output)
@ -2135,9 +2152,16 @@ class GPUModelRunner(LoRAModelRunnerMixin):
allow_microbatching: bool = False,
) -> torch.Tensor:
should_microbatch = False
# _dummy_run doesn't go through _prepare_inputs so
# we synchronize with other DP ranks here
self.should_ubatch(should_microbatch)
# Padding for DP
# logger.info("PADDING DUMMY")
num_pad, num_tokens_across_dp = self.get_dp_padding(num_tokens)
# logger.info("PADDING DUMMY DONE")
num_tokens += num_pad
# num_tokens_across_dp = None
# Set num_scheduled_tokens based on num_tokens and max_num_seqs
# for dummy run with LoRA so that the num_reqs collectively
@ -2187,11 +2211,6 @@ class GPUModelRunner(LoRAModelRunnerMixin):
# and self.vllm_config.parallel_config.always_microbatch_if_enabled)
# dummy_microbatches = [(slice(0, 0), slice(0, 0)),
# (slice(0, 0), slice(0, 0))]
should_microbatch = False
# _dummy_run doesn't go through _prepare_inputs so
# we synchronize with other DP ranks here
self.should_ubatch(should_microbatch)
self.should_ubatch(should_microbatch)
with self.maybe_dummy_run_with_lora(self.lora_config,
num_scheduled_tokens):
@ -2212,6 +2231,7 @@ class GPUModelRunner(LoRAModelRunnerMixin):
self.drafter.dummy_run(num_tokens)
logit_indices = np.cumsum(num_scheduled_tokens) - 1
# logger.info("DUMMY RUN RETURNING HIDDEN STATES")
return hidden_states[logit_indices]
@torch.inference_mode()
@ -2360,7 +2380,9 @@ class GPUModelRunner(LoRAModelRunnerMixin):
# Cache the dummy encoder outputs.
self.encoder_cache["tmp"] = dict(enumerate(dummy_encoder_outputs))
# logger.info("STARTING HIDDEN STATES")
hidden_states = self._dummy_run(self.max_num_tokens)
# logger.info("HIDDEN STATES")
if get_pp_group().is_last_rank:
sampler_output = self._dummy_sampler_run(hidden_states)
else:
@ -2389,7 +2411,9 @@ class GPUModelRunner(LoRAModelRunnerMixin):
for _ in range(self.vllm_config.compilation_config.
cudagraph_num_of_warmups):
self._dummy_run(num_tokens, skip_attn=skip_attn)
# print("CUDAGRAPH CAPTURE START")
self._dummy_run(num_tokens, skip_attn=skip_attn)
# print("CUDAGRAPH CAPTURE END")
end_time = time.perf_counter()
end_free_gpu_memory = torch.cuda.mem_get_info()[0]