misc fixes. lm_eval still gets a wrong answer but it no longer hangs

Signed-off-by: Sage Moore <sage@neuralmagic.com>
This commit is contained in:
Sage Moore 2025-06-04 22:46:18 +00:00
parent 2e3484c237
commit f8848bb201
6 changed files with 84 additions and 82 deletions

View File

@ -84,6 +84,10 @@ def main(args, dp_size, local_dp_rank, global_dp_rank, dp_master_ip,
"The capital of France is",
"The future of AI is",
] * 100
# import random
# import string
# prompts = [''.join(random.choices(string.ascii_letters, k=128)) for _ in range(2048)]
# with DP, each rank should process different prompts.
# usually all the DP ranks process a full dataset,
@ -177,7 +181,7 @@ if __name__ == "__main__":
procs.append(proc)
exit_code = 0
for proc in procs:
proc.join(timeout=300)
proc.join(timeout=1200)
if proc.exitcode is None:
print(f"Killing process {proc.pid} that didn't stop within 5 minutes.")
proc.kill()

View File

@ -46,6 +46,18 @@ class DPMetadata:
dist.all_reduce(num_tokens_tensor, group=get_dp_group().cpu_group)
return num_tokens_tensor
@staticmethod
def should_ubatch_across_dp(should_ubatch: bool, dp_size: int, dp_rank: int) -> bool:
should_ubatch_across_dp = [0] * dp_size
should_ubatch_across_dp[dp_rank] = 1 if should_ubatch else 0
should_ubatch_tensor = torch.tensor(should_ubatch_across_dp,
device="cpu",
dtype=torch.int32)
from vllm.distributed.parallel_state import get_dp_group
dist.all_reduce(should_ubatch_tensor, group=get_dp_group().cpu_group)
result: bool = bool(torch.all(should_ubatch_tensor == 1).item())
return result
@staticmethod
def make(
parallel_config: ParallelConfig,

View File

@ -65,20 +65,21 @@ def dump_engine_exception(config: VllmConfig,
def _dump_engine_exception(config: VllmConfig,
scheduler_output: SchedulerOutput,
scheduler_stats: Optional[SchedulerStats]):
logger.error("Dumping input data")
pass
# logger.error("Dumping input data")
logger.error(
"V1 LLM engine (v%s) with config: %s, ",
VLLM_VERSION,
config,
)
# logger.error(
# "V1 LLM engine (v%s) with config: %s, ",
# VLLM_VERSION,
# config,
# )
try:
dump_obj = prepare_object_to_dump(scheduler_output)
logger.error("Dumping scheduler output for model execution:")
logger.error(dump_obj)
if scheduler_stats:
logger.error(scheduler_stats)
except BaseException as exception:
logger.error("Error preparing object to dump")
logger.error(repr(exception))
# try:
# dump_obj = prepare_object_to_dump(scheduler_output)
# logger.error("Dumping scheduler output for model execution:")
# logger.error(dump_obj)
# if scheduler_stats:
# logger.error(scheduler_stats)
# except BaseException as exception:
# logger.error("Error preparing object to dump")
# logger.error(repr(exception))

View File

@ -125,14 +125,14 @@ class PplxPrepareAndFinalize(mk.FusedMoEPrepareAndFinalize):
ubatch_ctx = get_current_ubatch_context()
ubatch_id = ubatch_ctx.id if ubatch_ctx is not None else -1
# yield_and_switch_from_compute_to_comm_impl(schedule="default")
yield_and_switch_from_compute_to_comm_impl(schedule="default")
dispatch(True) # Send
# torch.cuda.synchronize()
# print(f"{ubatch_id} AFTER SEND SYNC", flush=True)
dispatch(False) # Recv
# torch.cuda.synchronize()
# print(f"{ubatch_id} AFTER RECV SYNC", flush=True)
# yield_and_switch_from_comm_to_compute_impl(schedule="default")
yield_and_switch_from_comm_to_compute_impl(schedule="default")
# torch.cuda.synchronize()
return expert_x, expert_x_scale, expert_num_tokens
@ -173,11 +173,11 @@ class PplxPrepareAndFinalize(mk.FusedMoEPrepareAndFinalize):
do_recv=not send,
)
# yield_and_switch_from_compute_to_comm_impl(schedule="default")
yield_and_switch_from_compute_to_comm_impl(schedule="default")
combine(True)
# torch.cuda.synchronize()
# print(f"{ubatch_id} AFTER COMBINE SEND SYNC", flush=True)
combine(False)
# print(f"{ubatch_id} AFTER COMBINE RECV SYNC", flush=True)
# yield_and_switch_from_comm_to_compute_impl(schedule="default")
yield_and_switch_from_comm_to_compute_impl(schedule="default")
torch.cuda.synchronize()

View File

@ -535,18 +535,19 @@ class GPUModelRunner(LoRAModelRunnerMixin):
slice(b0_tokens_end, total_num_scheduled_tokens)),
]
if self.parallel_config.enable_microbatching and \
self.parallel_config.always_microbatch_if_enabled:
# if self.parallel_config.enable_microbatching and \
# self.parallel_config.always_microbatch_if_enabled:
# print(f"PREFIL RUN total_num_scheduled_tokens: {total_num_scheduled_tokens} max_num_scheduled_tokens {max_num_scheduled_tokens}")
# TODO we can do something more advanced here to try to balance,
# i.e. split to the left of `total_num_scheduled_tokens // 2` if it
# is more balanced
req_split_id = np.argmax(
query_start_loc_np > (total_num_scheduled_tokens // 2))
return [(slice(0, req_split_id),
slice(0, query_start_loc_np[req_split_id])),
(slice(req_split_id, num_reqs),
slice(query_start_loc_np[req_split_id],
total_num_scheduled_tokens))]
# req_split_id = np.argmax(
# query_start_loc_np > (total_num_scheduled_tokens // 2))
# return [(slice(0, req_split_id),
# slice(0, query_start_loc_np[req_split_id])),
# (slice(req_split_id, num_reqs),
# slice(query_start_loc_np[req_split_id],
# total_num_scheduled_tokens))]
return None
def _get_cumsum_and_arange(
@ -655,6 +656,11 @@ class GPUModelRunner(LoRAModelRunnerMixin):
ubatch_slices: Optional[UBatchSlices] = self._ubatch_split(
self.query_start_loc_np, max_num_scheduled_tokens,
scheduler_output)
should_ubatch = self.should_ubatch(True if ubatch_slices else False)
# Don't attempt to microbatch unless every other DP worker is also microbatching
if not should_ubatch and ubatch_slices:
ubatch_slices = None
self.seq_lens_np[:num_reqs] = (
self.input_batch.num_computed_tokens_cpu[:num_reqs] +
@ -1212,7 +1218,7 @@ class GPUModelRunner(LoRAModelRunnerMixin):
# TODO(tms) : There are many cases where padding is enabled for
# prefills, causing unnecessary and excessive padding of activations.
if dp_size == 1 or self.vllm_config.model_config.enforce_eager:
if dp_size == 1:
# Early exit.
return 0, None
@ -1224,6 +1230,11 @@ class GPUModelRunner(LoRAModelRunnerMixin):
device="cpu",
dtype=torch.int32)
return max_tokens_across_dp_cpu - num_tokens, num_tokens_after_padding
def should_ubatch(self, should_ubatch: bool) -> bool:
dp_size = self.vllm_config.parallel_config.data_parallel_size
dp_rank = self.vllm_config.parallel_config.data_parallel_rank
return DPMetadata.should_ubatch_across_dp(should_ubatch, dp_size, dp_rank)
def _get_dummy_model_inputs(self, num_tokens: int) -> tuple:
# Dummy batch. (hopefully we are the last one so we can just
@ -1338,6 +1349,7 @@ class GPUModelRunner(LoRAModelRunnerMixin):
def model_inputs(tokens_slice: slice, use_dummy_input: bool) -> tuple:
if use_dummy_input:
# print("MAKING DUMMY BATCH")
# assert num_dummy_tokens == 1
return self._get_dummy_model_inputs(num_dummy_tokens)
else:
@ -1367,12 +1379,12 @@ class GPUModelRunner(LoRAModelRunnerMixin):
model_output = _run(token_slice, ubatch_ctx, use_dummy_input)
if save_results:
results.append(model_output)
results.append((ubatch_ctx.id, model_output))
# print(f"Finishing Request on ubatch: {ubatch_ctx.id}", flush=True)
def _run_ubatches(ubatch_slices, attn_metadata,
is_dummy_run) -> torch.Tensor:
results: list[torch.Tensor] = []
is_dummy_run, num_tokens_across_dp) -> torch.Tensor:
results: list[tuple[int, torch.Tensor]] = []
assert len(ubatch_slices) == 2, "Only two ubatches has been tested"
root_stream = current_stream()
@ -1417,14 +1429,18 @@ class GPUModelRunner(LoRAModelRunnerMixin):
torch.cuda.synchronize()
torch.cuda.set_stream(root_stream)
return torch.cat(results, dim=0)
sorted_results = [value for position, value in sorted(results)]
return torch.cat(sorted_results, dim=0)
# run micro-batched
if ubatch_slices is not None:
# num_tokens = ubatch_slices[1][1].stop
# print(f"RUNNING UBATCH {num_tokens} is_dummy_run: {is_dummy_run} num_tokens_across_dp{num_tokens_across_dp}")
model_output = _run_ubatches(ubatch_slices, attn_metadata,
is_dummy_run)
is_dummy_run, num_tokens_across_dp=num_tokens_across_dp)
# run single batch
else:
# print("RUN NORMAL")
model_output = _run(
slice(0, num_scheduled_tokens),
set_forward_context(attn_metadata,
@ -2025,8 +2041,9 @@ class GPUModelRunner(LoRAModelRunnerMixin):
) -> torch.Tensor:
# Padding for DP
num_pad, num_tokens_across_dp = self.get_dp_padding(num_tokens)
num_tokens += num_pad
# num_pad, num_tokens_across_dp = self.get_dp_padding(num_tokens)
# num_tokens += num_pad
num_tokens_across_dp = None
# Set num_scheduled_tokens based on num_tokens and max_num_seqs
# for dummy run with LoRA so that the num_reqs collectively
@ -2065,58 +2082,26 @@ class GPUModelRunner(LoRAModelRunnerMixin):
for layer_name in kv_cache_group_spec.layer_names:
attn_metadata[layer_name] = attn_metadata_i
should_microbatch = (
allow_microbatching
and self.vllm_config.parallel_config.enable_microbatching
and self.vllm_config.parallel_config.always_microbatch_if_enabled)
dummy_microbatches = [(slice(0, 0), slice(0, 0)),
(slice(0, 0), slice(0, 0))]
# should_microbatch = (
# allow_microbatching
# and self.vllm_config.parallel_config.enable_microbatching
# and self.vllm_config.parallel_config.always_microbatch_if_enabled)
# dummy_microbatches = [(slice(0, 0), slice(0, 0)),
# (slice(0, 0), slice(0, 0))]
should_microbatch = False
# _dummy_run doesn't go through _prepare_inputs so
# we synchronize with other DP ranks here
self.should_ubatch(should_microbatch)
with self.maybe_dummy_run_with_lora(self.lora_config,
num_scheduled_tokens):
outputs = self._run_model(
attn_metadata,
num_tokens,
ubatch_slices=None
if not should_microbatch else dummy_microbatches,
ubatch_slices=None,
is_dummy_run=True,
num_tokens_across_dp=num_tokens_across_dp
)
# model = self.model
# if self.is_multimodal_model:
# input_ids = None
# inputs_embeds = self.inputs_embeds[:num_tokens]
# else:
# input_ids = self.input_ids[:num_tokens]
# inputs_embeds = None
# if self.uses_mrope:
# positions = self.mrope_positions[:, :num_tokens]
# else:
# positions = self.positions[:num_tokens]
# if get_pp_group().is_first_rank:
# intermediate_tensors = None
# else:
# if self.intermediate_tensors is None:
# self.intermediate_tensors = (
# self.model.make_empty_intermediate_tensors(
# batch_size=self.max_num_tokens,
# dtype=self.model_config.dtype,
# device=self.device))
# intermediate_tensors = self.sync_and_slice_intermediate_tensors(
# num_tokens, None, False)
# with set_forward_context(
# attn_metadata,
# self.vllm_config,
# num_tokens=num_tokens,
# num_tokens_across_dp=num_tokens_across_dp):
# outputs = model(
# input_ids=input_ids,
# positions=positions,
# intermediate_tensors=intermediate_tensors,
# inputs_embeds=inputs_embeds,
# )
if self.use_aux_hidden_state_outputs:
hidden_states, _ = outputs
else:

View File

@ -43,7 +43,7 @@ class UBatchContext:
global _CURRENT_CONTEXT
_CURRENT_CONTEXT[threading.get_ident()] = self
# self.cpu_wait_event.clear()
self.cpu_wait_event.clear()
self.cpu_wait_event.wait()
self.cpu_wait_event.clear()
self._restore_context()