[V1][BugFix] Exit properly if engine core fails during startup (#16137)

Signed-off-by: Nick Hill <nhill@redhat.com>
This commit is contained in:
Nick Hill 2025-04-07 15:30:15 -07:00 committed by GitHub
parent 3147586ebd
commit 7f6d47c1a2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 67 additions and 14 deletions

View File

@ -5,6 +5,7 @@ pytest-forked
pytest-asyncio pytest-asyncio
pytest-rerunfailures pytest-rerunfailures
pytest-shard pytest-shard
pytest-timeout
# testing utils # testing utils
awscli awscli

View File

@ -444,6 +444,7 @@ pytest==8.3.3
# pytest-mock # pytest-mock
# pytest-rerunfailures # pytest-rerunfailures
# pytest-shard # pytest-shard
# pytest-timeout
pytest-asyncio==0.24.0 pytest-asyncio==0.24.0
# via -r requirements/test.in # via -r requirements/test.in
pytest-forked==1.6.0 pytest-forked==1.6.0
@ -454,6 +455,8 @@ pytest-rerunfailures==14.0
# via -r requirements/test.in # via -r requirements/test.in
pytest-shard==0.1.2 pytest-shard==0.1.2
# via -r requirements/test.in # via -r requirements/test.in
pytest-timeout==2.3.1
# via -r requirements/test.in
python-dateutil==2.9.0.post0 python-dateutil==2.9.0.post0
# via # via
# botocore # botocore

View File

@ -3,8 +3,10 @@
import asyncio import asyncio
import time import time
import uuid import uuid
from threading import Thread
from typing import Optional from typing import Optional
import psutil
import pytest import pytest
from transformers import AutoTokenizer from transformers import AutoTokenizer
@ -245,3 +247,42 @@ async def test_engine_core_client_asyncio(monkeypatch: pytest.MonkeyPatch):
await core_client.call_utility_async("echo", None, "help!") await core_client.call_utility_async("echo", None, "help!")
assert str(e_info.value) == "Call to echo method failed: help!" assert str(e_info.value) == "Call to echo method failed: help!"
@pytest.mark.timeout(10)
def test_startup_failure(monkeypatch: pytest.MonkeyPatch):
with monkeypatch.context() as m, pytest.raises(Exception) as e_info:
m.setenv("VLLM_USE_V1", "1")
engine_args = EngineArgs(model=MODEL_NAME)
vllm_config = engine_args.create_engine_config(
usage_context=UsageContext.UNKNOWN_CONTEXT)
executor_class = Executor.get_class(vllm_config)
# Start another thread to wait for engine core process to start
# and kill it - simulate fatal uncaught process exit.
this_proc = psutil.Process()
children_before = set(this_proc.children())
def kill_first_child():
while True:
time.sleep(0.5)
children = set(this_proc.children()) - children_before
if children:
child = children.pop()
print("Killing child core process", child.pid)
child.kill()
break
Thread(target=kill_first_child, daemon=True).start()
_core_client = EngineCoreClient.make_client(
multiprocess_mode=True,
asyncio_mode=True,
vllm_config=vllm_config,
executor_class=executor_class,
log_stats=True,
)
assert "Engine core initialization failed" in str(e_info.value)

View File

@ -411,10 +411,21 @@ class MPClient(EngineCoreClient):
# Wait for engine core process(es) to send ready messages. # Wait for engine core process(es) to send ready messages.
identities = set(eng.index for eng in self.resources.core_engines) identities = set(eng.index for eng in self.resources.core_engines)
poller = zmq.Poller()
poller.register(sync_input_socket, zmq.POLLIN)
for eng in self.resources.core_engines:
poller.register(eng.proc_handle, zmq.POLLIN)
while identities: while identities:
while not sync_input_socket.poll(timeout=STARTUP_POLL_PERIOD_MS): events = poller.poll(STARTUP_POLL_PERIOD_MS)
logger.info("Waiting for %d core engine proc(s) to start: %s", if not events:
len(identities), identities) logger.debug("Waiting for %d core engine proc(s) to start: %s",
len(identities), identities)
continue
if len(events) > 1 or events[0][0] != sync_input_socket:
# One of the core processes exited.
raise RuntimeError("Engine core initialization failed. "
"See root cause above.")
eng_id_bytes, msg = sync_input_socket.recv_multipart() eng_id_bytes, msg = sync_input_socket.recv_multipart()
eng_id = int.from_bytes(eng_id_bytes, byteorder="little") eng_id = int.from_bytes(eng_id_bytes, byteorder="little")
if eng_id not in identities: if eng_id not in identities:
@ -424,12 +435,6 @@ class MPClient(EngineCoreClient):
logger.info("Core engine process %d ready.", eng_id) logger.info("Core engine process %d ready.", eng_id)
identities.discard(eng_id) identities.discard(eng_id)
# Double check that the process are running.
for engine in self.resources.core_engines:
proc = engine.proc_handle.proc
if proc.exitcode is not None:
raise RuntimeError(f"Engine proc {proc.name} not running")
def _init_core_engines( def _init_core_engines(
self, self,
vllm_config: VllmConfig, vllm_config: VllmConfig,

View File

@ -1,10 +1,10 @@
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
import multiprocessing
import os import os
import weakref import weakref
from collections import defaultdict from collections import defaultdict
from collections.abc import Sequence from collections.abc import Sequence
from multiprocessing import Process
from typing import (TYPE_CHECKING, Any, Callable, Generic, Optional, TypeVar, from typing import (TYPE_CHECKING, Any, Callable, Generic, Optional, TypeVar,
Union, overload) Union, overload)
@ -112,20 +112,23 @@ class BackgroundProcHandle:
process_kwargs["output_path"] = output_path process_kwargs["output_path"] = output_path
# Run busy loop in background process. # Run busy loop in background process.
self.proc = context.Process(target=target_fn, self.proc: Process = context.Process(target=target_fn,
kwargs=process_kwargs, kwargs=process_kwargs,
name=process_name) name=process_name)
self._finalizer = weakref.finalize(self, shutdown, self.proc, self._finalizer = weakref.finalize(self, shutdown, self.proc,
input_path, output_path) input_path, output_path)
self.proc.start() self.proc.start()
def fileno(self):
return self.proc.sentinel
def shutdown(self): def shutdown(self):
self._finalizer() self._finalizer()
# Note(rob): shutdown function cannot be a bound method, # Note(rob): shutdown function cannot be a bound method,
# else the gc cannot collect the object. # else the gc cannot collect the object.
def shutdown(proc: multiprocessing.Process, input_path: str, output_path: str): def shutdown(proc: Process, input_path: str, output_path: str):
# Shutdown the process. # Shutdown the process.
if proc.is_alive(): if proc.is_alive():
proc.terminate() proc.terminate()