random push

Signed-off-by: Sage Moore <sage@neuralmagic.com>
This commit is contained in:
Sage Moore 2025-06-30 17:08:51 +00:00
parent 4672c72f44
commit d833982e48
3 changed files with 24 additions and 23 deletions

View File

@ -84,7 +84,7 @@ def main(args, dp_size, local_dp_rank, global_dp_rank, dp_master_ip,
"The president of the United States is", "The president of the United States is",
"The capital of France is", "The capital of France is",
"The future of AI is", "The future of AI is",
] * 5 ] * 10
# import random # import random
# import string # import string
# prompts = [''.join(random.choices(string.ascii_letters, k=128)) for _ in range(2048)] # prompts = [''.join(random.choices(string.ascii_letters, k=128)) for _ in range(2048)]
@ -112,7 +112,7 @@ def main(args, dp_size, local_dp_rank, global_dp_rank, dp_master_ip,
# sampling params. here we set different max_tokens for different # sampling params. here we set different max_tokens for different
# ranks for demonstration. # ranks for demonstration.
sampling_params = SamplingParams( sampling_params = SamplingParams(
temperature=0.8, top_p=0.95, max_tokens=[16, 20][global_dp_rank % 2] temperature=0.8, top_p=0.95, max_tokens=[20, 16][global_dp_rank % 2]
) )
# Fixed params # Fixed params

View File

@ -1367,7 +1367,8 @@ class GPUModelRunner(LoRAModelRunnerMixin):
return num_dp_pad_tokens + num_pad_tokens, num_tokens_after_padding return num_dp_pad_tokens + num_pad_tokens, num_tokens_after_padding
def get_dp_padding_ubatch(self, def get_dp_padding_ubatch(self,
ubatch_slices: UBatchSlices) -> tuple[int, Optional[torch.Tensor]]: ubatch_slices: UBatchSlices,
include_cudagraphs: bool = True) -> tuple[int, Optional[torch.Tensor]]:
dp_size = self.vllm_config.parallel_config.data_parallel_size dp_size = self.vllm_config.parallel_config.data_parallel_size
if dp_size == 1: if dp_size == 1:
@ -1387,7 +1388,7 @@ class GPUModelRunner(LoRAModelRunnerMixin):
num_tokens_unpadded = first_ubatch_num_tokens + second_ubatch_num_tokens num_tokens_unpadded = first_ubatch_num_tokens + second_ubatch_num_tokens
num_tokens_padded = round_up(num_tokens_unpadded, 2) num_tokens_padded = round_up(num_tokens_unpadded, 2)
if (self.use_cuda_graph if (include_cudagraphs and self.use_cuda_graph
and num_tokens_unpadded <= self.cudagraph_batch_sizes[-1]): and num_tokens_unpadded <= self.cudagraph_batch_sizes[-1]):
# Add padding to the batch size. # Add padding to the batch size.
num_tokens_padded = self.vllm_config.pad_for_cudagraph(num_tokens_unpadded) num_tokens_padded = self.vllm_config.pad_for_cudagraph(num_tokens_unpadded)
@ -1437,7 +1438,7 @@ class GPUModelRunner(LoRAModelRunnerMixin):
def pad_out_ubatch_second_stage(self, ubatch_slices: UBatchSlices, num_total_tokens: int): def pad_out_ubatch_second_stage(self, ubatch_slices: UBatchSlices, num_total_tokens: int):
# TODO Add asserts to make sure stage one ran # TODO Add asserts to make sure stage one ran
padded_second_ubatch_slice = slice(ubatch_slices[1][1].start, num_total_tokens) padded_second_ubatch_slice = slice(ubatch_slices[1][1].start, num_total_tokens)
ubatch_slices[1] = (ubatch_slices[1][0], padded_second_ubatch_slice) ubatch_slices[1] = (padded_second_ubatch_slice, padded_second_ubatch_slice)
# Returns num_padded_tokens. This is just a number that should be added to the # Returns num_padded_tokens. This is just a number that should be added to the
@ -1590,12 +1591,7 @@ class GPUModelRunner(LoRAModelRunnerMixin):
device=self.device) device=self.device)
for i, (_, tokens_slice) in enumerate(ubatch_slices): for i, (_, tokens_slice) in enumerate(ubatch_slices):
is_dummy_ubatch = tokens_slice.stop <= tokens_slice.start num_tokens = (tokens_slice.stop - tokens_slice.start)
assert not is_dummy_ubatch or i == len(
ubatch_slices) - 1 or is_dummy_run
num_tokens = num_dummy_tokens if is_dummy_ubatch or \
is_dummy_run else (tokens_slice.stop - tokens_slice.start)
# TODO (Sage) Instead of using this setter we should be able # TODO (Sage) Instead of using this setter we should be able
# to just create the forward context in advance and pass it # to just create the forward context in advance and pass it
# to the UBatchContext's __init__ method # to the UBatchContext's __init__ method
@ -1731,7 +1727,7 @@ class GPUModelRunner(LoRAModelRunnerMixin):
if ubatch_slices is not None: if ubatch_slices is not None:
assert len(ubatch_slices) == 2, "Only two ubatches has been tested" assert len(ubatch_slices) == 2, "Only two ubatches has been tested"
# num_tokens = ubatch_slices[1][1].stop # num_tokens = ubatch_slices[1][1].stop
# print(f"RUNNING UBATCH {num_tokens} is_dummy_run: {is_dummy_run} num_tokens_across_dp{num_tokens_across_dp}") print(f"RUNNING UBATCH {ubatch_slices} is_dummy_run: {is_dummy_run} num_tokens_across_dp{num_tokens_across_dp}")
# assert not is_dummy_run # assert not is_dummy_run
compute_stream = torch.cuda.Stream(device=self.device) compute_stream = torch.cuda.Stream(device=self.device)
ubatch_metadata = _make_ubatch_metadata( ubatch_metadata = _make_ubatch_metadata(
@ -2376,11 +2372,11 @@ class GPUModelRunner(LoRAModelRunnerMixin):
should_ubatch = self.should_ubatch(allow_microbatching) should_ubatch = self.should_ubatch(allow_microbatching)
# Padding for DP # Padding for DP
# logger.info("PADDING DUMMY") # logger.info("PADDING DUMMY")
num_pad, num_tokens_across_dp = self.get_dp_padding(num_tokens) num_tokens_across_dp = None
# logger.info("PADDING DUMMY DONE") num_pad = 0
if not should_ubatch:
num_pad, num_tokens_across_dp = self.get_dp_padding(num_tokens)
num_tokens += num_pad num_tokens += num_pad
# num_tokens_across_dp = None
# Set num_scheduled_tokens based on num_tokens and max_num_seqs # Set num_scheduled_tokens based on num_tokens and max_num_seqs
# for dummy run with LoRA so that the num_reqs collectively # for dummy run with LoRA so that the num_reqs collectively
# has num_tokens in total. # has num_tokens in total.
@ -2398,10 +2394,15 @@ class GPUModelRunner(LoRAModelRunnerMixin):
ubatch_slices = None ubatch_slices = None
# We currently only microbatch if the number of tokens is # We currently only microbatch if the number of tokens is
# over a certain threshold. # over a certain threshold.
# logger.info("PADDING DUMMY DONE")
if should_ubatch: if should_ubatch:
# We only support decode-only cudagraphs # We only support decode-only cudagraphs
assert num_reqs == num_tokens assert num_reqs == num_tokens
assert num_tokens % 2 == 0 assert num_tokens % 2 == 0
num_tokens_per_ubatch = num_tokens // 2
num_tokens_across_dp = torch.tensor([num_tokens_per_ubatch] * 2,
device="cpu",
dtype=torch.int32)
ubatch_slices = [(slice(0, num_reqs // 2), ubatch_slices = [(slice(0, num_reqs // 2),
slice(0, num_tokens // 2)), slice(0, num_tokens // 2)),
(slice(num_reqs // 2, num_reqs), (slice(num_reqs // 2, num_reqs),

View File

@ -75,31 +75,31 @@ class UBatchContext:
pass pass
def _signal_comm_done(self): def _signal_comm_done(self):
assert False # assert False
self.ctx_valid_state() self.ctx_valid_state()
self.gpu_comm_done_event.record(self.comm_stream) self.gpu_comm_done_event.record(self.comm_stream)
def _signal_compute_done(self): def _signal_compute_done(self):
assert False # assert False
self.ctx_valid_state() self.ctx_valid_state()
self.gpu_compute_done_event.record(self.compute_stream) self.gpu_compute_done_event.record(self.compute_stream)
def _wait_compute_done(self): def _wait_compute_done(self):
assert False # assert False
# print(f"{self.id} Waiting on COMPUTE stream", flush=True) # print(f"{self.id} Waiting on COMPUTE stream", flush=True)
self.ctx_valid_state() self.ctx_valid_state()
self.comm_stream.wait_event(self.gpu_compute_done_event) self.comm_stream.wait_event(self.gpu_compute_done_event)
# print("Compute stream done", flush=True) # print("Compute stream done", flush=True)
def _wait_comm_done(self): def _wait_comm_done(self):
assert False # assert False
# print(f"{self.id} Waiting on COMM stream", flush=True) # print(f"{self.id} Waiting on COMM stream", flush=True)
self.ctx_valid_state() self.ctx_valid_state()
self.compute_stream.wait_event(self.gpu_comm_done_event) self.compute_stream.wait_event(self.gpu_comm_done_event)
# print("Comm stream done", flush=True) # print("Comm stream done", flush=True)
def stream_string(self): def stream_string(self):
assert False # assert False
if current_stream() == self.compute_stream: if current_stream() == self.compute_stream:
assert self.current_stream == self.compute_stream assert self.current_stream == self.compute_stream
return "COMPUTE" return "COMPUTE"
@ -118,7 +118,7 @@ class UBatchContext:
# print(f"UBatchContext: {self.id} resuming CPU", flush=True) # print(f"UBatchContext: {self.id} resuming CPU", flush=True)
def yield_and_switch_from_compute_to_comm(self): def yield_and_switch_from_compute_to_comm(self):
assert False # assert False
assert current_stream() == self.compute_stream assert current_stream() == self.compute_stream
# dp_rank = get_dp_group().rank_in_group # dp_rank = get_dp_group().rank_in_group
# print(f"DP: {dp_rank} UB: {self.id} " # print(f"DP: {dp_rank} UB: {self.id} "
@ -134,7 +134,7 @@ class UBatchContext:
self._wait_compute_done() self._wait_compute_done()
def yield_and_switch_from_comm_to_compute(self): def yield_and_switch_from_comm_to_compute(self):
assert False # assert False
assert current_stream() == self.comm_stream assert current_stream() == self.comm_stream
# dp_rank = get_dp_group().rank_in_group # dp_rank = get_dp_group().rank_in_group
# print(f"DP: {dp_rank} UB: {self.id} " # print(f"DP: {dp_rank} UB: {self.id} "