mirror of
https://git.datalinker.icu/vllm-project/vllm.git
synced 2026-04-29 08:47:09 +08:00
[Bugfix]check health for engine core process exiting unexpectedly (#21728)
Signed-off-by: wuhang <wuhang6@huawei.com>
This commit is contained in:
parent
1395dd9c28
commit
bccc43c033
@ -2,6 +2,7 @@
|
|||||||
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
||||||
import asyncio
|
import asyncio
|
||||||
import contextlib
|
import contextlib
|
||||||
|
import multiprocessing
|
||||||
import queue
|
import queue
|
||||||
import sys
|
import sys
|
||||||
import uuid
|
import uuid
|
||||||
@ -476,6 +477,9 @@ class MPClient(EngineCoreClient):
|
|||||||
# underlying data.
|
# underlying data.
|
||||||
self.pending_messages = deque[tuple[zmq.MessageTracker, Any]]()
|
self.pending_messages = deque[tuple[zmq.MessageTracker, Any]]()
|
||||||
|
|
||||||
|
# Start monitoring engine core processes for unexpected failures
|
||||||
|
self.start_engine_core_monitor()
|
||||||
|
|
||||||
success = True
|
success = True
|
||||||
finally:
|
finally:
|
||||||
if not success:
|
if not success:
|
||||||
@ -505,6 +509,41 @@ class MPClient(EngineCoreClient):
|
|||||||
def dp_engines_running(self) -> bool:
|
def dp_engines_running(self) -> bool:
|
||||||
return self.engines_running
|
return self.engines_running
|
||||||
|
|
||||||
|
def start_engine_core_monitor(self):
|
||||||
|
"""Start a monitor thread for engine core processes."""
|
||||||
|
engine_manager = self.resources.engine_manager
|
||||||
|
if (engine_manager is None or not hasattr(engine_manager, 'processes')
|
||||||
|
or not engine_manager.processes):
|
||||||
|
# No engine processes to monitor
|
||||||
|
return
|
||||||
|
|
||||||
|
engine_processes = engine_manager.processes
|
||||||
|
self_ref = weakref.ref(self)
|
||||||
|
|
||||||
|
# Monitor engine core process liveness. If any die unexpectedly,
|
||||||
|
# logs an error, shuts down the client and invokes the failure
|
||||||
|
# callback to inform the engine.
|
||||||
|
def monitor_engine_cores():
|
||||||
|
sentinels = [proc.sentinel for proc in engine_processes]
|
||||||
|
died = multiprocessing.connection.wait(sentinels)
|
||||||
|
_self = self_ref()
|
||||||
|
if not _self or _self.resources.engine_dead:
|
||||||
|
return
|
||||||
|
_self.resources.engine_dead = True
|
||||||
|
proc_name = next(proc.name for proc in engine_processes
|
||||||
|
if proc.sentinel == died[0])
|
||||||
|
logger.error(
|
||||||
|
"Engine core proc %s died unexpectedly, "
|
||||||
|
"shutting down client.", proc_name)
|
||||||
|
_self.shutdown()
|
||||||
|
# Note: For MPClient, we don't have a failure callback mechanism
|
||||||
|
# like MultiprocExecutor, but we set engine_dead flag which will
|
||||||
|
# cause subsequent operations to raise EngineDeadError
|
||||||
|
|
||||||
|
Thread(target=monitor_engine_cores,
|
||||||
|
daemon=True,
|
||||||
|
name="MPClientEngineMonitor").start()
|
||||||
|
|
||||||
|
|
||||||
def _process_utility_output(output: UtilityOutput,
|
def _process_utility_output(output: UtilityOutput,
|
||||||
utility_results: dict[int, AnyFuture]):
|
utility_results: dict[int, AnyFuture]):
|
||||||
@ -749,6 +788,8 @@ class AsyncMPClient(MPClient):
|
|||||||
outputs_queue.put_nowait(outputs)
|
outputs_queue.put_nowait(outputs)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
outputs_queue.put_nowait(e)
|
outputs_queue.put_nowait(e)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
outputs_queue.put_nowait(EngineDeadError())
|
||||||
|
|
||||||
resources.output_queue_task = asyncio.create_task(
|
resources.output_queue_task = asyncio.create_task(
|
||||||
process_outputs_socket(), name="EngineCoreOutputQueueTask")
|
process_outputs_socket(), name="EngineCoreOutputQueueTask")
|
||||||
|
|||||||
@ -104,8 +104,12 @@ class MultiprocExecutor(Executor):
|
|||||||
finally:
|
finally:
|
||||||
if not success:
|
if not success:
|
||||||
# Clean up the worker procs if there was a failure.
|
# Clean up the worker procs if there was a failure.
|
||||||
|
# Close death_writers first to signal workers to exit
|
||||||
|
for uw in unready_workers:
|
||||||
|
if uw.death_writer is not None:
|
||||||
|
uw.death_writer.close()
|
||||||
self._ensure_worker_termination(
|
self._ensure_worker_termination(
|
||||||
[w.proc for w in unready_workers])
|
[uw.proc for uw in unready_workers])
|
||||||
|
|
||||||
# For pipeline parallel, we use a thread pool for asynchronous
|
# For pipeline parallel, we use a thread pool for asynchronous
|
||||||
# execute_model.
|
# execute_model.
|
||||||
@ -282,6 +286,10 @@ class MultiprocExecutor(Executor):
|
|||||||
|
|
||||||
if workers := getattr(self, 'workers', None):
|
if workers := getattr(self, 'workers', None):
|
||||||
for w in workers:
|
for w in workers:
|
||||||
|
# Close death_writer to signal child processes to exit
|
||||||
|
if w.death_writer is not None:
|
||||||
|
w.death_writer.close()
|
||||||
|
w.death_writer = None
|
||||||
w.worker_response_mq = None
|
w.worker_response_mq = None
|
||||||
self._ensure_worker_termination([w.proc for w in workers])
|
self._ensure_worker_termination([w.proc for w in workers])
|
||||||
|
|
||||||
@ -316,6 +324,7 @@ class UnreadyWorkerProcHandle:
|
|||||||
proc: BaseProcess
|
proc: BaseProcess
|
||||||
rank: int
|
rank: int
|
||||||
ready_pipe: Connection
|
ready_pipe: Connection
|
||||||
|
death_writer: Optional[Connection] = None
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@ -323,6 +332,7 @@ class WorkerProcHandle:
|
|||||||
proc: BaseProcess
|
proc: BaseProcess
|
||||||
rank: int
|
rank: int
|
||||||
worker_response_mq: MessageQueue # The worker process writes to this MQ
|
worker_response_mq: MessageQueue # The worker process writes to this MQ
|
||||||
|
death_writer: Optional[Connection] = None
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_unready_handle(
|
def from_unready_handle(
|
||||||
@ -332,6 +342,7 @@ class WorkerProcHandle:
|
|||||||
proc=unready_handle.proc,
|
proc=unready_handle.proc,
|
||||||
rank=unready_handle.rank,
|
rank=unready_handle.rank,
|
||||||
worker_response_mq=worker_response_mq,
|
worker_response_mq=worker_response_mq,
|
||||||
|
death_writer=unready_handle.death_writer,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -396,6 +407,9 @@ class WorkerProc:
|
|||||||
# (reader, writer)
|
# (reader, writer)
|
||||||
reader, writer = context.Pipe(duplex=False)
|
reader, writer = context.Pipe(duplex=False)
|
||||||
|
|
||||||
|
# Create death pipe to detect parent process exit
|
||||||
|
death_reader, death_writer = context.Pipe(duplex=False)
|
||||||
|
|
||||||
process_kwargs = {
|
process_kwargs = {
|
||||||
"vllm_config": vllm_config,
|
"vllm_config": vllm_config,
|
||||||
"local_rank": local_rank,
|
"local_rank": local_rank,
|
||||||
@ -403,6 +417,7 @@ class WorkerProc:
|
|||||||
"distributed_init_method": distributed_init_method,
|
"distributed_init_method": distributed_init_method,
|
||||||
"input_shm_handle": input_shm_handle,
|
"input_shm_handle": input_shm_handle,
|
||||||
"ready_pipe": (reader, writer),
|
"ready_pipe": (reader, writer),
|
||||||
|
"death_pipe": death_reader,
|
||||||
}
|
}
|
||||||
# Run EngineCore busy loop in background process.
|
# Run EngineCore busy loop in background process.
|
||||||
proc = context.Process(target=WorkerProc.worker_main,
|
proc = context.Process(target=WorkerProc.worker_main,
|
||||||
@ -412,7 +427,9 @@ class WorkerProc:
|
|||||||
|
|
||||||
proc.start()
|
proc.start()
|
||||||
writer.close()
|
writer.close()
|
||||||
return UnreadyWorkerProcHandle(proc, rank, reader)
|
# Keep death_writer open in parent - when parent exits,
|
||||||
|
# death_reader in child will get EOFError
|
||||||
|
return UnreadyWorkerProcHandle(proc, rank, reader, death_writer)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def wait_for_ready(
|
def wait_for_ready(
|
||||||
@ -483,6 +500,28 @@ class WorkerProc:
|
|||||||
worker = None
|
worker = None
|
||||||
# tuple[Connection, Connection]
|
# tuple[Connection, Connection]
|
||||||
reader, ready_writer = kwargs.pop("ready_pipe")
|
reader, ready_writer = kwargs.pop("ready_pipe")
|
||||||
|
death_pipe = kwargs.pop("death_pipe", None)
|
||||||
|
|
||||||
|
# Start death monitoring thread if death_pipe is provided
|
||||||
|
if death_pipe is not None:
|
||||||
|
|
||||||
|
def monitor_parent_death():
|
||||||
|
try:
|
||||||
|
# This will block until parent process exits (pipe closes)
|
||||||
|
death_pipe.recv()
|
||||||
|
except EOFError:
|
||||||
|
# Parent process has exited, terminate this worker
|
||||||
|
logger.info("Parent process exited, terminating worker")
|
||||||
|
# Send signal to self to trigger clean shutdown
|
||||||
|
os.kill(os.getpid(), signal.SIGTERM)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Death monitoring error: %s", e)
|
||||||
|
|
||||||
|
death_monitor = Thread(target=monitor_parent_death,
|
||||||
|
daemon=True,
|
||||||
|
name="WorkerDeathMonitor")
|
||||||
|
death_monitor.start()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
reader.close()
|
reader.close()
|
||||||
worker = WorkerProc(*args, **kwargs)
|
worker = WorkerProc(*args, **kwargs)
|
||||||
@ -523,6 +562,8 @@ class WorkerProc:
|
|||||||
finally:
|
finally:
|
||||||
if ready_writer is not None:
|
if ready_writer is not None:
|
||||||
ready_writer.close()
|
ready_writer.close()
|
||||||
|
if death_pipe is not None:
|
||||||
|
death_pipe.close()
|
||||||
# Clean up once worker exits busy loop
|
# Clean up once worker exits busy loop
|
||||||
if worker is not None:
|
if worker is not None:
|
||||||
worker.shutdown()
|
worker.shutdown()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user