From cbae7af5522a5cb49da9d2798ff438bb62f444a9 Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Sun, 23 Feb 2025 13:07:43 -0800 Subject: [PATCH] [V1][BugFix] Fix engine core client shutdown hangs (#13298) Even though ZMQ context.destroy() is meant to close open sockets before terminating the context, it appears to be necessary to do this explicitly or else it can hang in the context.term() method. Close zmq sockets explicitly before terminating context, make shutdown of client resource more robust, shut down engine core process prior to terminating zmq context. Signed-off-by: Nick Hill --- tests/v1/engine/test_engine_core_client.py | 4 +- vllm/v1/engine/core_client.py | 51 ++++++++++++++++------ 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/tests/v1/engine/test_engine_core_client.py b/tests/v1/engine/test_engine_core_client.py index 828d7eed309f..a7c02322ff02 100644 --- a/tests/v1/engine/test_engine_core_client.py +++ b/tests/v1/engine/test_engine_core_client.py @@ -3,7 +3,6 @@ import asyncio import time import uuid -from contextlib import ExitStack from typing import Dict, List, Optional import pytest @@ -178,7 +177,7 @@ def test_engine_core_client(monkeypatch, multiprocessing_mode: bool): @pytest.mark.asyncio(loop_scope="function") async def test_engine_core_client_asyncio(monkeypatch): - with monkeypatch.context() as m, ExitStack() as after: + with monkeypatch.context() as m: m.setenv("VLLM_USE_V1", "1") # Monkey-patch core engine utility function to test. @@ -195,7 +194,6 @@ async def test_engine_core_client_asyncio(monkeypatch): executor_class=executor_class, log_stats=True, ) - after.callback(client.shutdown) MAX_TOKENS = 20 params = SamplingParams(max_tokens=MAX_TOKENS) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 527aa72833ba..5ffaf63e6cec 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -8,6 +8,7 @@ import uuid import weakref from abc import ABC, abstractmethod from concurrent.futures import Future +from dataclasses import dataclass from threading import Thread from typing import Any, Dict, List, Optional, Type, Union @@ -169,6 +170,31 @@ class InprocClient(EngineCoreClient): self.engine_core.add_lora(lora_request) +@dataclass +class BackgroundResources: + """Used as a finalizer for clean shutdown, avoiding + circular reference back to the client object.""" + + ctx: Union[zmq.Context, zmq.asyncio.Context] = None + output_socket: Union[zmq.Socket, zmq.asyncio.Socket] = None + input_socket: Union[zmq.Socket, zmq.asyncio.Socket] = None + proc_handle: Optional[BackgroundProcHandle] = None + + def __call__(self): + """Clean up background resources.""" + + if self.proc_handle is not None: + self.proc_handle.shutdown() + # ZMQ context termination can hang if the sockets + # aren't explicitly closed first. + if self.output_socket is not None: + self.output_socket.close(linger=0) + if self.input_socket is not None: + self.input_socket.close(linger=0) + if self.ctx is not None: + self.ctx.destroy(linger=0) + + class MPClient(EngineCoreClient): """ MPClient: base client for multi-proc EngineCore. @@ -212,21 +238,22 @@ class MPClient(EngineCoreClient): zmq.asyncio.Context() # type: ignore[attr-defined] if asyncio_mode else zmq.Context()) # type: ignore[attr-defined] - # Note(rob): shutdown function cannot be a bound method, - # else the gc cannot collect the object. - self._finalizer = weakref.finalize(self, lambda x: x.destroy(linger=0), - self.ctx) + # This will ensure resources created so far are closed + # when the client is garbage collected, even if an + # exception is raised mid-construction. + resources = BackgroundResources(ctx=self.ctx) + self._finalizer = weakref.finalize(self, resources) # Paths and sockets for IPC. output_path = get_open_zmq_ipc_path() input_path = get_open_zmq_ipc_path() - self.output_socket = make_zmq_socket(self.ctx, output_path, - zmq.constants.PULL) - self.input_socket = make_zmq_socket(self.ctx, input_path, - zmq.constants.PUSH) + resources.output_socket = make_zmq_socket(self.ctx, output_path, + zmq.constants.PULL) + resources.input_socket = make_zmq_socket(self.ctx, input_path, + zmq.constants.PUSH) # Start EngineCore in background process. - self.proc_handle = BackgroundProcHandle( + resources.proc_handle = BackgroundProcHandle( input_path=input_path, output_path=output_path, process_name="EngineCore", @@ -237,13 +264,11 @@ class MPClient(EngineCoreClient): "log_stats": log_stats, }) + self.output_socket = resources.output_socket + self.input_socket = resources.input_socket self.utility_results: Dict[int, AnyFuture] = {} def shutdown(self): - """Clean up background resources.""" - if hasattr(self, "proc_handle"): - self.proc_handle.shutdown() - self._finalizer()