diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index ab037af77c7e3..a90ffbe58b98c 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -330,11 +330,12 @@ class EngineCore: # Blocking until the first result is available. model_output = self.execute_model_with_error_logging( - lambda _: future.result(), scheduler_output) + future.result, scheduler_output) + assert model_output is not None self.batch_queue.task_done() - engine_core_outputs = (self.scheduler.update_from_output( - scheduler_output, model_output)) + engine_core_outputs = self.scheduler.update_from_output( + scheduler_output, model_output) return engine_core_outputs, scheduled_batch @@ -980,6 +981,25 @@ class DPEngineCoreProc(EngineCoreProc): else: super()._handle_client_request(request_type, request) + def _process_engine_step(self) -> bool: + # Step the engine core. + outputs, model_executed = self.step_fn() + # Put EngineCoreOutputs into the output queue. + for output in (outputs.items() if outputs else ()): + self.output_queue.put_nowait(output) + + if outputs and not model_executed: + # NOTE(woosuk): This branch is taken when the previous step_fn call + # updated the scheduler or worker states without actually executing + # the model. With asynchronous scheduling, this typically occurs + # every other step. To avoid unnecessary dummy runs, we give + # step_fn a second chance to execute the model if possible. + outputs, model_executed = self.step_fn() + for output in (outputs.items() if outputs else ()): + self.output_queue.put_nowait(output) + + return model_executed + def _maybe_publish_request_counts(self): if not self.publish_dp_lb_stats: return