diff --git a/examples/offline_inference/data_parallel.py b/examples/offline_inference/data_parallel.py index 330e860bad0d6..2c322a42cb4a0 100644 --- a/examples/offline_inference/data_parallel.py +++ b/examples/offline_inference/data_parallel.py @@ -84,6 +84,10 @@ def main(args, dp_size, local_dp_rank, global_dp_rank, dp_master_ip, "The capital of France is", "The future of AI is", ] * 100 + # import random + # import string + # prompts = [''.join(random.choices(string.ascii_letters, k=128)) for _ in range(2048)] + # with DP, each rank should process different prompts. # usually all the DP ranks process a full dataset, @@ -177,7 +181,7 @@ if __name__ == "__main__": procs.append(proc) exit_code = 0 for proc in procs: - proc.join(timeout=300) + proc.join(timeout=1200) if proc.exitcode is None: print(f"Killing process {proc.pid} that didn't stop within 5 minutes.") proc.kill() diff --git a/vllm/forward_context.py b/vllm/forward_context.py index 7281d1906b32a..3d0adcf70c363 100644 --- a/vllm/forward_context.py +++ b/vllm/forward_context.py @@ -46,6 +46,18 @@ class DPMetadata: dist.all_reduce(num_tokens_tensor, group=get_dp_group().cpu_group) return num_tokens_tensor + @staticmethod + def should_ubatch_across_dp(should_ubatch: bool, dp_size: int, dp_rank: int) -> bool: + should_ubatch_across_dp = [0] * dp_size + should_ubatch_across_dp[dp_rank] = 1 if should_ubatch else 0 + should_ubatch_tensor = torch.tensor(should_ubatch_across_dp, + device="cpu", + dtype=torch.int32) + from vllm.distributed.parallel_state import get_dp_group + dist.all_reduce(should_ubatch_tensor, group=get_dp_group().cpu_group) + result: bool = bool(torch.all(should_ubatch_tensor == 1).item()) + return result + @staticmethod def make( parallel_config: ParallelConfig, diff --git a/vllm/logging_utils/dump_input.py b/vllm/logging_utils/dump_input.py index 47ce0ab188bd6..9a37091fe8acc 100644 --- a/vllm/logging_utils/dump_input.py +++ b/vllm/logging_utils/dump_input.py @@ -65,20 +65,21 @@ def dump_engine_exception(config: VllmConfig, def _dump_engine_exception(config: VllmConfig, scheduler_output: SchedulerOutput, scheduler_stats: Optional[SchedulerStats]): - logger.error("Dumping input data") + pass + # logger.error("Dumping input data") - logger.error( - "V1 LLM engine (v%s) with config: %s, ", - VLLM_VERSION, - config, - ) + # logger.error( + # "V1 LLM engine (v%s) with config: %s, ", + # VLLM_VERSION, + # config, + # ) - try: - dump_obj = prepare_object_to_dump(scheduler_output) - logger.error("Dumping scheduler output for model execution:") - logger.error(dump_obj) - if scheduler_stats: - logger.error(scheduler_stats) - except BaseException as exception: - logger.error("Error preparing object to dump") - logger.error(repr(exception)) + # try: + # dump_obj = prepare_object_to_dump(scheduler_output) + # logger.error("Dumping scheduler output for model execution:") + # logger.error(dump_obj) + # if scheduler_stats: + # logger.error(scheduler_stats) + # except BaseException as exception: + # logger.error("Error preparing object to dump") + # logger.error(repr(exception)) diff --git a/vllm/model_executor/layers/fused_moe/pplx_prepare_finalize.py b/vllm/model_executor/layers/fused_moe/pplx_prepare_finalize.py index ab1648606897e..88c007b6c8950 100644 --- a/vllm/model_executor/layers/fused_moe/pplx_prepare_finalize.py +++ b/vllm/model_executor/layers/fused_moe/pplx_prepare_finalize.py @@ -125,14 +125,14 @@ class PplxPrepareAndFinalize(mk.FusedMoEPrepareAndFinalize): ubatch_ctx = get_current_ubatch_context() ubatch_id = ubatch_ctx.id if ubatch_ctx is not None else -1 - # yield_and_switch_from_compute_to_comm_impl(schedule="default") + yield_and_switch_from_compute_to_comm_impl(schedule="default") dispatch(True) # Send # torch.cuda.synchronize() # print(f"{ubatch_id} AFTER SEND SYNC", flush=True) dispatch(False) # Recv # torch.cuda.synchronize() # print(f"{ubatch_id} AFTER RECV SYNC", flush=True) - # yield_and_switch_from_comm_to_compute_impl(schedule="default") + yield_and_switch_from_comm_to_compute_impl(schedule="default") # torch.cuda.synchronize() return expert_x, expert_x_scale, expert_num_tokens @@ -173,11 +173,11 @@ class PplxPrepareAndFinalize(mk.FusedMoEPrepareAndFinalize): do_recv=not send, ) - # yield_and_switch_from_compute_to_comm_impl(schedule="default") + yield_and_switch_from_compute_to_comm_impl(schedule="default") combine(True) # torch.cuda.synchronize() # print(f"{ubatch_id} AFTER COMBINE SEND SYNC", flush=True) combine(False) # print(f"{ubatch_id} AFTER COMBINE RECV SYNC", flush=True) - # yield_and_switch_from_comm_to_compute_impl(schedule="default") + yield_and_switch_from_comm_to_compute_impl(schedule="default") torch.cuda.synchronize() diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py index 400939f3543d1..736fd12357034 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py @@ -535,18 +535,19 @@ class GPUModelRunner(LoRAModelRunnerMixin): slice(b0_tokens_end, total_num_scheduled_tokens)), ] - if self.parallel_config.enable_microbatching and \ - self.parallel_config.always_microbatch_if_enabled: + # if self.parallel_config.enable_microbatching and \ + # self.parallel_config.always_microbatch_if_enabled: + # print(f"PREFIL RUN total_num_scheduled_tokens: {total_num_scheduled_tokens} max_num_scheduled_tokens {max_num_scheduled_tokens}") # TODO we can do something more advanced here to try to balance, # i.e. split to the left of `total_num_scheduled_tokens // 2` if it # is more balanced - req_split_id = np.argmax( - query_start_loc_np > (total_num_scheduled_tokens // 2)) - return [(slice(0, req_split_id), - slice(0, query_start_loc_np[req_split_id])), - (slice(req_split_id, num_reqs), - slice(query_start_loc_np[req_split_id], - total_num_scheduled_tokens))] + # req_split_id = np.argmax( + # query_start_loc_np > (total_num_scheduled_tokens // 2)) + # return [(slice(0, req_split_id), + # slice(0, query_start_loc_np[req_split_id])), + # (slice(req_split_id, num_reqs), + # slice(query_start_loc_np[req_split_id], + # total_num_scheduled_tokens))] return None def _get_cumsum_and_arange( @@ -655,6 +656,11 @@ class GPUModelRunner(LoRAModelRunnerMixin): ubatch_slices: Optional[UBatchSlices] = self._ubatch_split( self.query_start_loc_np, max_num_scheduled_tokens, scheduler_output) + should_ubatch = self.should_ubatch(True if ubatch_slices else False) + # Don't attempt to microbatch unless every other DP worker is also microbatching + if not should_ubatch and ubatch_slices: + ubatch_slices = None + self.seq_lens_np[:num_reqs] = ( self.input_batch.num_computed_tokens_cpu[:num_reqs] + @@ -1212,7 +1218,7 @@ class GPUModelRunner(LoRAModelRunnerMixin): # TODO(tms) : There are many cases where padding is enabled for # prefills, causing unnecessary and excessive padding of activations. - if dp_size == 1 or self.vllm_config.model_config.enforce_eager: + if dp_size == 1: # Early exit. return 0, None @@ -1224,6 +1230,11 @@ class GPUModelRunner(LoRAModelRunnerMixin): device="cpu", dtype=torch.int32) return max_tokens_across_dp_cpu - num_tokens, num_tokens_after_padding + + def should_ubatch(self, should_ubatch: bool) -> bool: + dp_size = self.vllm_config.parallel_config.data_parallel_size + dp_rank = self.vllm_config.parallel_config.data_parallel_rank + return DPMetadata.should_ubatch_across_dp(should_ubatch, dp_size, dp_rank) def _get_dummy_model_inputs(self, num_tokens: int) -> tuple: # Dummy batch. (hopefully we are the last one so we can just @@ -1338,6 +1349,7 @@ class GPUModelRunner(LoRAModelRunnerMixin): def model_inputs(tokens_slice: slice, use_dummy_input: bool) -> tuple: if use_dummy_input: + # print("MAKING DUMMY BATCH") # assert num_dummy_tokens == 1 return self._get_dummy_model_inputs(num_dummy_tokens) else: @@ -1367,12 +1379,12 @@ class GPUModelRunner(LoRAModelRunnerMixin): model_output = _run(token_slice, ubatch_ctx, use_dummy_input) if save_results: - results.append(model_output) + results.append((ubatch_ctx.id, model_output)) # print(f"Finishing Request on ubatch: {ubatch_ctx.id}", flush=True) def _run_ubatches(ubatch_slices, attn_metadata, - is_dummy_run) -> torch.Tensor: - results: list[torch.Tensor] = [] + is_dummy_run, num_tokens_across_dp) -> torch.Tensor: + results: list[tuple[int, torch.Tensor]] = [] assert len(ubatch_slices) == 2, "Only two ubatches has been tested" root_stream = current_stream() @@ -1417,14 +1429,18 @@ class GPUModelRunner(LoRAModelRunnerMixin): torch.cuda.synchronize() torch.cuda.set_stream(root_stream) - return torch.cat(results, dim=0) + sorted_results = [value for position, value in sorted(results)] + return torch.cat(sorted_results, dim=0) # run micro-batched if ubatch_slices is not None: + # num_tokens = ubatch_slices[1][1].stop + # print(f"RUNNING UBATCH {num_tokens} is_dummy_run: {is_dummy_run} num_tokens_across_dp{num_tokens_across_dp}") model_output = _run_ubatches(ubatch_slices, attn_metadata, - is_dummy_run) + is_dummy_run, num_tokens_across_dp=num_tokens_across_dp) # run single batch else: + # print("RUN NORMAL") model_output = _run( slice(0, num_scheduled_tokens), set_forward_context(attn_metadata, @@ -2025,8 +2041,9 @@ class GPUModelRunner(LoRAModelRunnerMixin): ) -> torch.Tensor: # Padding for DP - num_pad, num_tokens_across_dp = self.get_dp_padding(num_tokens) - num_tokens += num_pad + # num_pad, num_tokens_across_dp = self.get_dp_padding(num_tokens) + # num_tokens += num_pad + num_tokens_across_dp = None # Set num_scheduled_tokens based on num_tokens and max_num_seqs # for dummy run with LoRA so that the num_reqs collectively @@ -2065,58 +2082,26 @@ class GPUModelRunner(LoRAModelRunnerMixin): for layer_name in kv_cache_group_spec.layer_names: attn_metadata[layer_name] = attn_metadata_i - should_microbatch = ( - allow_microbatching - and self.vllm_config.parallel_config.enable_microbatching - and self.vllm_config.parallel_config.always_microbatch_if_enabled) - dummy_microbatches = [(slice(0, 0), slice(0, 0)), - (slice(0, 0), slice(0, 0))] + # should_microbatch = ( + # allow_microbatching + # and self.vllm_config.parallel_config.enable_microbatching + # and self.vllm_config.parallel_config.always_microbatch_if_enabled) + # dummy_microbatches = [(slice(0, 0), slice(0, 0)), + # (slice(0, 0), slice(0, 0))] + should_microbatch = False + # _dummy_run doesn't go through _prepare_inputs so + # we synchronize with other DP ranks here + self.should_ubatch(should_microbatch) with self.maybe_dummy_run_with_lora(self.lora_config, num_scheduled_tokens): outputs = self._run_model( attn_metadata, num_tokens, - ubatch_slices=None - if not should_microbatch else dummy_microbatches, + ubatch_slices=None, is_dummy_run=True, + num_tokens_across_dp=num_tokens_across_dp ) - # model = self.model - # if self.is_multimodal_model: - # input_ids = None - # inputs_embeds = self.inputs_embeds[:num_tokens] - # else: - # input_ids = self.input_ids[:num_tokens] - # inputs_embeds = None - # if self.uses_mrope: - # positions = self.mrope_positions[:, :num_tokens] - # else: - # positions = self.positions[:num_tokens] - - # if get_pp_group().is_first_rank: - # intermediate_tensors = None - # else: - # if self.intermediate_tensors is None: - # self.intermediate_tensors = ( - # self.model.make_empty_intermediate_tensors( - # batch_size=self.max_num_tokens, - # dtype=self.model_config.dtype, - # device=self.device)) - - # intermediate_tensors = self.sync_and_slice_intermediate_tensors( - # num_tokens, None, False) - - # with set_forward_context( - # attn_metadata, - # self.vllm_config, - # num_tokens=num_tokens, - # num_tokens_across_dp=num_tokens_across_dp): - # outputs = model( - # input_ids=input_ids, - # positions=positions, - # intermediate_tensors=intermediate_tensors, - # inputs_embeds=inputs_embeds, - # ) if self.use_aux_hidden_state_outputs: hidden_states, _ = outputs else: diff --git a/vllm/v1/worker/ubatching.py b/vllm/v1/worker/ubatching.py index a26fb06b6c935..394a5c599df96 100644 --- a/vllm/v1/worker/ubatching.py +++ b/vllm/v1/worker/ubatching.py @@ -43,7 +43,7 @@ class UBatchContext: global _CURRENT_CONTEXT _CURRENT_CONTEXT[threading.get_ident()] = self - # self.cpu_wait_event.clear() + self.cpu_wait_event.clear() self.cpu_wait_event.wait() self.cpu_wait_event.clear() self._restore_context()