diff --git a/examples/offline_inference/data_parallel.py b/examples/offline_inference/data_parallel.py index 24a006e6968e0..9ea1b9997ad70 100644 --- a/examples/offline_inference/data_parallel.py +++ b/examples/offline_inference/data_parallel.py @@ -84,7 +84,7 @@ def main(args, dp_size, local_dp_rank, global_dp_rank, dp_master_ip, "The president of the United States is", "The capital of France is", "The future of AI is", - ] * 5 + ] * 10 # import random # import string # prompts = [''.join(random.choices(string.ascii_letters, k=128)) for _ in range(2048)] @@ -112,7 +112,7 @@ def main(args, dp_size, local_dp_rank, global_dp_rank, dp_master_ip, # sampling params. here we set different max_tokens for different # ranks for demonstration. sampling_params = SamplingParams( - temperature=0.8, top_p=0.95, max_tokens=[16, 20][global_dp_rank % 2] + temperature=0.8, top_p=0.95, max_tokens=[20, 16][global_dp_rank % 2] ) # Fixed params diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py index e62f3e8d045be..1d981feb373fb 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py @@ -1367,7 +1367,8 @@ class GPUModelRunner(LoRAModelRunnerMixin): return num_dp_pad_tokens + num_pad_tokens, num_tokens_after_padding def get_dp_padding_ubatch(self, - ubatch_slices: UBatchSlices) -> tuple[int, Optional[torch.Tensor]]: + ubatch_slices: UBatchSlices, + include_cudagraphs: bool = True) -> tuple[int, Optional[torch.Tensor]]: dp_size = self.vllm_config.parallel_config.data_parallel_size if dp_size == 1: @@ -1387,7 +1388,7 @@ class GPUModelRunner(LoRAModelRunnerMixin): num_tokens_unpadded = first_ubatch_num_tokens + second_ubatch_num_tokens num_tokens_padded = round_up(num_tokens_unpadded, 2) - if (self.use_cuda_graph + if (include_cudagraphs and self.use_cuda_graph and num_tokens_unpadded <= self.cudagraph_batch_sizes[-1]): # Add padding to the batch size. num_tokens_padded = self.vllm_config.pad_for_cudagraph(num_tokens_unpadded) @@ -1437,7 +1438,7 @@ class GPUModelRunner(LoRAModelRunnerMixin): def pad_out_ubatch_second_stage(self, ubatch_slices: UBatchSlices, num_total_tokens: int): # TODO Add asserts to make sure stage one ran padded_second_ubatch_slice = slice(ubatch_slices[1][1].start, num_total_tokens) - ubatch_slices[1] = (ubatch_slices[1][0], padded_second_ubatch_slice) + ubatch_slices[1] = (padded_second_ubatch_slice, padded_second_ubatch_slice) # Returns num_padded_tokens. This is just a number that should be added to the @@ -1590,12 +1591,7 @@ class GPUModelRunner(LoRAModelRunnerMixin): device=self.device) for i, (_, tokens_slice) in enumerate(ubatch_slices): - is_dummy_ubatch = tokens_slice.stop <= tokens_slice.start - assert not is_dummy_ubatch or i == len( - ubatch_slices) - 1 or is_dummy_run - - num_tokens = num_dummy_tokens if is_dummy_ubatch or \ - is_dummy_run else (tokens_slice.stop - tokens_slice.start) + num_tokens = (tokens_slice.stop - tokens_slice.start) # TODO (Sage) Instead of using this setter we should be able # to just create the forward context in advance and pass it # to the UBatchContext's __init__ method @@ -1731,7 +1727,7 @@ class GPUModelRunner(LoRAModelRunnerMixin): if ubatch_slices is not None: assert len(ubatch_slices) == 2, "Only two ubatches has been tested" # num_tokens = ubatch_slices[1][1].stop - # print(f"RUNNING UBATCH {num_tokens} is_dummy_run: {is_dummy_run} num_tokens_across_dp{num_tokens_across_dp}") + print(f"RUNNING UBATCH {ubatch_slices} is_dummy_run: {is_dummy_run} num_tokens_across_dp{num_tokens_across_dp}") # assert not is_dummy_run compute_stream = torch.cuda.Stream(device=self.device) ubatch_metadata = _make_ubatch_metadata( @@ -2376,11 +2372,11 @@ class GPUModelRunner(LoRAModelRunnerMixin): should_ubatch = self.should_ubatch(allow_microbatching) # Padding for DP # logger.info("PADDING DUMMY") - num_pad, num_tokens_across_dp = self.get_dp_padding(num_tokens) - # logger.info("PADDING DUMMY DONE") + num_tokens_across_dp = None + num_pad = 0 + if not should_ubatch: + num_pad, num_tokens_across_dp = self.get_dp_padding(num_tokens) num_tokens += num_pad - # num_tokens_across_dp = None - # Set num_scheduled_tokens based on num_tokens and max_num_seqs # for dummy run with LoRA so that the num_reqs collectively # has num_tokens in total. @@ -2398,10 +2394,15 @@ class GPUModelRunner(LoRAModelRunnerMixin): ubatch_slices = None # We currently only microbatch if the number of tokens is # over a certain threshold. + # logger.info("PADDING DUMMY DONE") if should_ubatch: # We only support decode-only cudagraphs assert num_reqs == num_tokens assert num_tokens % 2 == 0 + num_tokens_per_ubatch = num_tokens // 2 + num_tokens_across_dp = torch.tensor([num_tokens_per_ubatch] * 2, + device="cpu", + dtype=torch.int32) ubatch_slices = [(slice(0, num_reqs // 2), slice(0, num_tokens // 2)), (slice(num_reqs // 2, num_reqs), diff --git a/vllm/v1/worker/ubatching.py b/vllm/v1/worker/ubatching.py index bcc8c03e66cb2..3defe34e06bf5 100644 --- a/vllm/v1/worker/ubatching.py +++ b/vllm/v1/worker/ubatching.py @@ -75,31 +75,31 @@ class UBatchContext: pass def _signal_comm_done(self): - assert False + # assert False self.ctx_valid_state() self.gpu_comm_done_event.record(self.comm_stream) def _signal_compute_done(self): - assert False + # assert False self.ctx_valid_state() self.gpu_compute_done_event.record(self.compute_stream) def _wait_compute_done(self): - assert False + # assert False # print(f"{self.id} Waiting on COMPUTE stream", flush=True) self.ctx_valid_state() self.comm_stream.wait_event(self.gpu_compute_done_event) # print("Compute stream done", flush=True) def _wait_comm_done(self): - assert False + # assert False # print(f"{self.id} Waiting on COMM stream", flush=True) self.ctx_valid_state() self.compute_stream.wait_event(self.gpu_comm_done_event) # print("Comm stream done", flush=True) def stream_string(self): - assert False + # assert False if current_stream() == self.compute_stream: assert self.current_stream == self.compute_stream return "COMPUTE" @@ -118,7 +118,7 @@ class UBatchContext: # print(f"UBatchContext: {self.id} resuming CPU", flush=True) def yield_and_switch_from_compute_to_comm(self): - assert False + # assert False assert current_stream() == self.compute_stream # dp_rank = get_dp_group().rank_in_group # print(f"DP: {dp_rank} UB: {self.id} " @@ -134,7 +134,7 @@ class UBatchContext: self._wait_compute_done() def yield_and_switch_from_comm_to_compute(self): - assert False + # assert False assert current_stream() == self.comm_stream # dp_rank = get_dp_group().rank_in_group # print(f"DP: {dp_rank} UB: {self.id} "