[Bugfix] Make async scheduling compatible with DP

Signed-off-by: Woosuk Kwon <woosuk.kwon@berkeley.edu>
This commit is contained in:
Woosuk Kwon 2025-07-20 01:25:34 -07:00 committed by Nick Hill
parent 8f731cf3ab
commit ff6c72db76

View File

@ -330,11 +330,12 @@ class EngineCore:
# Blocking until the first result is available.
model_output = self.execute_model_with_error_logging(
lambda _: future.result(), scheduler_output)
future.result, scheduler_output)
assert model_output is not None
self.batch_queue.task_done()
engine_core_outputs = (self.scheduler.update_from_output(
scheduler_output, model_output))
engine_core_outputs = self.scheduler.update_from_output(
scheduler_output, model_output)
return engine_core_outputs, scheduled_batch
@ -980,6 +981,25 @@ class DPEngineCoreProc(EngineCoreProc):
else:
super()._handle_client_request(request_type, request)
def _process_engine_step(self) -> bool:
# Step the engine core.
outputs, model_executed = self.step_fn()
# Put EngineCoreOutputs into the output queue.
for output in (outputs.items() if outputs else ()):
self.output_queue.put_nowait(output)
if outputs and not model_executed:
# NOTE(woosuk): This branch is taken when the previous step_fn call
# updated the scheduler or worker states without actually executing
# the model. With asynchronous scheduling, this typically occurs
# every other step. To avoid unnecessary dummy runs, we give
# step_fn a second chance to execute the model if possible.
outputs, model_executed = self.step_fn()
for output in (outputs.items() if outputs else ()):
self.output_queue.put_nowait(output)
return model_executed
def _maybe_publish_request_counts(self):
if not self.publish_dp_lb_stats:
return