From 7f6d47c1a2a74bd68a69103149ca729560076b9e Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Mon, 7 Apr 2025 15:30:15 -0700 Subject: [PATCH] [V1][BugFix] Exit properly if engine core fails during startup (#16137) Signed-off-by: Nick Hill --- requirements/test.in | 1 + requirements/test.txt | 3 ++ tests/v1/engine/test_engine_core_client.py | 41 ++++++++++++++++++++++ vllm/v1/engine/core_client.py | 23 +++++++----- vllm/v1/utils.py | 13 ++++--- 5 files changed, 67 insertions(+), 14 deletions(-) diff --git a/requirements/test.in b/requirements/test.in index ac7f451e96a8..73bd3e4a4f57 100644 --- a/requirements/test.in +++ b/requirements/test.in @@ -5,6 +5,7 @@ pytest-forked pytest-asyncio pytest-rerunfailures pytest-shard +pytest-timeout # testing utils awscli diff --git a/requirements/test.txt b/requirements/test.txt index 39d6ed1acff0..88b5ab51d26e 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -444,6 +444,7 @@ pytest==8.3.3 # pytest-mock # pytest-rerunfailures # pytest-shard + # pytest-timeout pytest-asyncio==0.24.0 # via -r requirements/test.in pytest-forked==1.6.0 @@ -454,6 +455,8 @@ pytest-rerunfailures==14.0 # via -r requirements/test.in pytest-shard==0.1.2 # via -r requirements/test.in +pytest-timeout==2.3.1 + # via -r requirements/test.in python-dateutil==2.9.0.post0 # via # botocore diff --git a/tests/v1/engine/test_engine_core_client.py b/tests/v1/engine/test_engine_core_client.py index 68844b877c17..8ebdaf63b484 100644 --- a/tests/v1/engine/test_engine_core_client.py +++ b/tests/v1/engine/test_engine_core_client.py @@ -3,8 +3,10 @@ import asyncio import time import uuid +from threading import Thread from typing import Optional +import psutil import pytest from transformers import AutoTokenizer @@ -245,3 +247,42 @@ async def test_engine_core_client_asyncio(monkeypatch: pytest.MonkeyPatch): await core_client.call_utility_async("echo", None, "help!") assert str(e_info.value) == "Call to echo method failed: help!" + + +@pytest.mark.timeout(10) +def test_startup_failure(monkeypatch: pytest.MonkeyPatch): + + with monkeypatch.context() as m, pytest.raises(Exception) as e_info: + m.setenv("VLLM_USE_V1", "1") + + engine_args = EngineArgs(model=MODEL_NAME) + vllm_config = engine_args.create_engine_config( + usage_context=UsageContext.UNKNOWN_CONTEXT) + executor_class = Executor.get_class(vllm_config) + + # Start another thread to wait for engine core process to start + # and kill it - simulate fatal uncaught process exit. + this_proc = psutil.Process() + children_before = set(this_proc.children()) + + def kill_first_child(): + while True: + time.sleep(0.5) + children = set(this_proc.children()) - children_before + if children: + child = children.pop() + print("Killing child core process", child.pid) + child.kill() + break + + Thread(target=kill_first_child, daemon=True).start() + + _core_client = EngineCoreClient.make_client( + multiprocess_mode=True, + asyncio_mode=True, + vllm_config=vllm_config, + executor_class=executor_class, + log_stats=True, + ) + + assert "Engine core initialization failed" in str(e_info.value) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index b94b0aa75386..2e5f9021f100 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -411,10 +411,21 @@ class MPClient(EngineCoreClient): # Wait for engine core process(es) to send ready messages. identities = set(eng.index for eng in self.resources.core_engines) + poller = zmq.Poller() + poller.register(sync_input_socket, zmq.POLLIN) + for eng in self.resources.core_engines: + poller.register(eng.proc_handle, zmq.POLLIN) while identities: - while not sync_input_socket.poll(timeout=STARTUP_POLL_PERIOD_MS): - logger.info("Waiting for %d core engine proc(s) to start: %s", - len(identities), identities) + events = poller.poll(STARTUP_POLL_PERIOD_MS) + if not events: + logger.debug("Waiting for %d core engine proc(s) to start: %s", + len(identities), identities) + continue + if len(events) > 1 or events[0][0] != sync_input_socket: + # One of the core processes exited. + raise RuntimeError("Engine core initialization failed. " + "See root cause above.") + eng_id_bytes, msg = sync_input_socket.recv_multipart() eng_id = int.from_bytes(eng_id_bytes, byteorder="little") if eng_id not in identities: @@ -424,12 +435,6 @@ class MPClient(EngineCoreClient): logger.info("Core engine process %d ready.", eng_id) identities.discard(eng_id) - # Double check that the process are running. - for engine in self.resources.core_engines: - proc = engine.proc_handle.proc - if proc.exitcode is not None: - raise RuntimeError(f"Engine proc {proc.name} not running") - def _init_core_engines( self, vllm_config: VllmConfig, diff --git a/vllm/v1/utils.py b/vllm/v1/utils.py index fed5761b04b6..32d8101f681d 100644 --- a/vllm/v1/utils.py +++ b/vllm/v1/utils.py @@ -1,10 +1,10 @@ # SPDX-License-Identifier: Apache-2.0 -import multiprocessing import os import weakref from collections import defaultdict from collections.abc import Sequence +from multiprocessing import Process from typing import (TYPE_CHECKING, Any, Callable, Generic, Optional, TypeVar, Union, overload) @@ -112,20 +112,23 @@ class BackgroundProcHandle: process_kwargs["output_path"] = output_path # Run busy loop in background process. - self.proc = context.Process(target=target_fn, - kwargs=process_kwargs, - name=process_name) + self.proc: Process = context.Process(target=target_fn, + kwargs=process_kwargs, + name=process_name) self._finalizer = weakref.finalize(self, shutdown, self.proc, input_path, output_path) self.proc.start() + def fileno(self): + return self.proc.sentinel + def shutdown(self): self._finalizer() # Note(rob): shutdown function cannot be a bound method, # else the gc cannot collect the object. -def shutdown(proc: multiprocessing.Process, input_path: str, output_path: str): +def shutdown(proc: Process, input_path: str, output_path: str): # Shutdown the process. if proc.is_alive(): proc.terminate()