diff --git a/vllm/v1/engine/coordinator.py b/vllm/v1/engine/coordinator.py index 4f6ba099c650c..031e9b85f24c6 100644 --- a/vllm/v1/engine/coordinator.py +++ b/vllm/v1/engine/coordinator.py @@ -183,11 +183,12 @@ class CoordinatorProc: # engines are paused, so that we can wake the other # engines. engine_to_exclude, wave = msgspec.msgpack.decode(buffer) - if wave < self.current_wave: - # If the wave number is stale, ensure the message is - # handled by all the engines. - engine_to_exclude = None if not self.engines_running: + if wave < self.current_wave: + # If the wave number is stale, ensure the message + # is handled by all the engines. + engine_to_exclude = None + self.engines_running = True self.stats_changed = True self._send_start_wave(publish_back, self.current_wave, @@ -203,22 +204,24 @@ class CoordinatorProc: assert outputs.utility_output is None eng_index = outputs.engine_index - if outputs.scheduler_stats: + scheduler_stats = outputs.scheduler_stats + if scheduler_stats: # 1. Updated request load stats - update our local # state with these. stats = self.engines[eng_index].request_counts - stats[0] = outputs.scheduler_stats.num_waiting_reqs - stats[1] = outputs.scheduler_stats.num_running_reqs + stats[0] = scheduler_stats.num_waiting_reqs + stats[1] = scheduler_stats.num_running_reqs self.stats_changed = True if (wave := outputs.wave_complete) is not None: # 2. Notification from rank 0 engine that we've # moved into the global paused state - # (engines_running==False) + # (engines_running==False). if self.current_wave <= wave: + new_wave = wave + 1 logger.debug("Moving DP wave from %d to %d.", - self.current_wave, wave) - self.current_wave = wave + 1 + self.current_wave, new_wave) + self.current_wave = new_wave self.engines_running = False self.stats_changed = True elif (wave := outputs.start_wave) is not None and (