From 4db44264047c2d30d9ba4eac55acaef821ae18b8 Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Wed, 10 Sep 2025 13:53:21 -0700 Subject: [PATCH] [CI] Fail subprocess tests with root-cause error (#23795) Signed-off-by: Nick Hill --- requirements/test.in | 1 + requirements/test.txt | 4 +- tests/async_engine/test_api_server.py | 26 +++++ tests/conftest.py | 10 ++ tests/utils.py | 120 +++++++++++++++++----- vllm/executor/ray_distributed_executor.py | 10 +- 6 files changed, 138 insertions(+), 33 deletions(-) diff --git a/requirements/test.in b/requirements/test.in index 1bbf0074a8881..744cfbe885278 100644 --- a/requirements/test.in +++ b/requirements/test.in @@ -21,6 +21,7 @@ ray[cgraph,default]>=2.48.0 # Ray Compiled Graph, required by pipeline paralleli sentence-transformers # required for embedding tests soundfile # required for audio tests jiwer # required for audio tests +tblib # for pickling test exceptions timm >=1.0.17 # required for internvl and gemma3n-mm test torch==2.8.0 torchaudio==2.8.0 diff --git a/requirements/test.txt b/requirements/test.txt index 65ef7c3c64ba6..5eebdc788aa3d 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -137,7 +137,7 @@ contourpy==1.3.0 # via matplotlib cramjam==2.9.0 # via fastparquet -cupy-cuda12x==13.3.0 +cupy-cuda12x==13.6.0 # via ray cycler==0.12.1 # via matplotlib @@ -1032,6 +1032,8 @@ tabledata==1.3.3 # via pytablewriter tabulate==0.9.0 # via sacrebleu +tblib==3.1.0 + # via -r requirements/test.in tcolorpy==0.1.6 # via pytablewriter tenacity==9.0.0 diff --git a/tests/async_engine/test_api_server.py b/tests/async_engine/test_api_server.py index 90f63e7ea17db..07370a8803291 100644 --- a/tests/async_engine/test_api_server.py +++ b/tests/async_engine/test_api_server.py @@ -1,6 +1,7 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project +import copyreg import os import subprocess import sys @@ -10,6 +11,30 @@ from pathlib import Path import pytest import requests +import urllib3.exceptions + + +def _pickle_new_connection_error(obj): + """Custom pickler for NewConnectionError to fix tblib compatibility.""" + # Extract the original message by removing the "conn: " prefix + full_message = obj.args[0] if obj.args else "" + if ': ' in full_message: + # Split off the connection part and keep the actual message + _, actual_message = full_message.split(': ', 1) + else: + actual_message = full_message + return _unpickle_new_connection_error, (actual_message, ) + + +def _unpickle_new_connection_error(message): + """Custom unpickler for NewConnectionError.""" + # Create with None as conn and the actual message + return urllib3.exceptions.NewConnectionError(None, message) + + +# Register the custom pickle/unpickle functions for tblib compatibility +copyreg.pickle(urllib3.exceptions.NewConnectionError, + _pickle_new_connection_error) def _query_server(prompt: str, max_tokens: int = 5) -> dict: @@ -52,6 +77,7 @@ def api_server(distributed_executor_backend: str): uvicorn_process.terminate() +@pytest.mark.timeout(300) @pytest.mark.parametrize("distributed_executor_backend", ["mp", "ray"]) def test_api_server(api_server, distributed_executor_backend: str): """ diff --git a/tests/conftest.py b/tests/conftest.py index 1052aeb35bac7..0440e859fe02d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,15 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +# ruff: noqa + +from tblib import pickling_support + +# Install support for pickling exceptions so that we can nicely propagate +# failures from tests running in a subprocess. +# This should be run before any custom exception subclasses are defined. +pickling_support.install() + import http.server import json import math diff --git a/tests/utils.py b/tests/utils.py index e47235002657d..16e1e60393290 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -2,6 +2,7 @@ # SPDX-FileCopyrightText: Copyright contributors to the vLLM project import asyncio +import contextlib import copy import functools import importlib @@ -13,7 +14,7 @@ import sys import tempfile import time import warnings -from contextlib import contextmanager, suppress +from contextlib import ExitStack, contextmanager, suppress from multiprocessing import Process from pathlib import Path from typing import Any, Callable, Literal, Optional, Union @@ -800,43 +801,106 @@ _P = ParamSpec("_P") def fork_new_process_for_each_test( - f: Callable[_P, None]) -> Callable[_P, None]: + func: Callable[_P, None]) -> Callable[_P, None]: """Decorator to fork a new process for each test function. See https://github.com/vllm-project/vllm/issues/7053 for more details. """ - @functools.wraps(f) + @functools.wraps(func) def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> None: # Make the process the leader of its own process group # to avoid sending SIGTERM to the parent process os.setpgrp() from _pytest.outcomes import Skipped - pid = os.fork() - print(f"Fork a new process to run a test {pid}") - if pid == 0: - try: - f(*args, **kwargs) - except Skipped as e: - # convert Skipped to exit code 0 - print(str(e)) - os._exit(0) - except Exception: - import traceback - traceback.print_exc() - os._exit(1) + + # Create a unique temporary file to store exception info from child + # process. Use test function name and process ID to avoid collisions. + with tempfile.NamedTemporaryFile( + delete=False, + mode='w+b', + prefix=f"vllm_test_{func.__name__}_{os.getpid()}_", + suffix=".exc") as exc_file, ExitStack() as delete_after: + exc_file_path = exc_file.name + delete_after.callback(os.remove, exc_file_path) + + pid = os.fork() + print(f"Fork a new process to run a test {pid}") + if pid == 0: + # Parent process responsible for deleting, don't delete + # in child. + delete_after.pop_all() + try: + func(*args, **kwargs) + except Skipped as e: + # convert Skipped to exit code 0 + print(str(e)) + os._exit(0) + except Exception as e: + import traceback + tb_string = traceback.format_exc() + + # Try to serialize the exception object first + exc_to_serialize: dict[str, Any] + try: + # First, try to pickle the actual exception with + # its traceback. + exc_to_serialize = {'pickled_exception': e} + # Test if it can be pickled + cloudpickle.dumps(exc_to_serialize) + except (Exception, KeyboardInterrupt): + # Fall back to string-based approach. + exc_to_serialize = { + 'exception_type': type(e).__name__, + 'exception_msg': str(e), + 'traceback': tb_string, + } + try: + with open(exc_file_path, 'wb') as f: + cloudpickle.dump(exc_to_serialize, f) + except Exception: + # Fallback: just print the traceback. + print(tb_string) + os._exit(1) + else: + os._exit(0) else: - os._exit(0) - else: - pgid = os.getpgid(pid) - _pid, _exitcode = os.waitpid(pid, 0) - # ignore SIGTERM signal itself - old_signal_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN) - # kill all child processes - os.killpg(pgid, signal.SIGTERM) - # restore the signal handler - signal.signal(signal.SIGTERM, old_signal_handler) - assert _exitcode == 0, (f"function {f} failed when called with" - f" args {args} and kwargs {kwargs}") + pgid = os.getpgid(pid) + _pid, _exitcode = os.waitpid(pid, 0) + # ignore SIGTERM signal itself + old_signal_handler = signal.signal(signal.SIGTERM, + signal.SIG_IGN) + # kill all child processes + os.killpg(pgid, signal.SIGTERM) + # restore the signal handler + signal.signal(signal.SIGTERM, old_signal_handler) + if _exitcode != 0: + # Try to read the exception from the child process + exc_info = {} + if os.path.exists(exc_file_path): + with contextlib.suppress(Exception), \ + open(exc_file_path, 'rb') as f: + exc_info = cloudpickle.load(f) + + if (original_exception := + exc_info.get('pickled_exception')) is not None: + # Re-raise the actual exception object if it was + # successfully pickled. + assert isinstance(original_exception, Exception) + raise original_exception + + if (original_tb := exc_info.get("traceback")) is not None: + # Use string-based traceback for fallback case + raise AssertionError( + f"Test {func.__name__} failed when called with" + f" args {args} and kwargs {kwargs}" + f" (exit code: {_exitcode}):\n{original_tb}" + ) from None + + # Fallback to the original generic error + raise AssertionError( + f"function {func.__name__} failed when called with" + f" args {args} and kwargs {kwargs}" + f" (exit code: {_exitcode})") from None return wrapper diff --git a/vllm/executor/ray_distributed_executor.py b/vllm/executor/ray_distributed_executor.py index 37c3fe59c65dd..78d0ee6c1e3fc 100644 --- a/vllm/executor/ray_distributed_executor.py +++ b/vllm/executor/ray_distributed_executor.py @@ -117,10 +117,12 @@ class RayDistributedExecutor(DistributedExecutorBase): self.driver_worker.execute_method) def shutdown(self) -> None: - logger.info( - "Shutting down Ray distributed executor. If you see error log " - "from logging.cc regarding SIGTERM received, please ignore because " - "this is the expected termination process in Ray.") + if logger: + # Somehow logger can be None here. + logger.info( + "Shutting down Ray distributed executor. If you see error log " + "from logging.cc regarding SIGTERM received, please ignore " + "because this is the expected termination process in Ray.") if hasattr(self, "forward_dag") and self.forward_dag is not None: self.forward_dag.teardown() import ray