[Misc] Don't log shm dequeue delay warning on worker side (#25720)

Signed-off-by: Nick Hill <nhill@redhat.com>
This commit is contained in:
Nick Hill 2025-09-25 18:08:30 -07:00 committed by GitHub
parent 9fe4c2bdb9
commit 8b77328ffe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 32 additions and 32 deletions

View File

@ -387,23 +387,21 @@ class MessageQueue:
# Release the processor to other threads
sched_yield()
# if we wait for a long time, log a message
if (time.monotonic() - start_time
> VLLM_RINGBUFFER_WARNING_INTERVAL * n_warning):
logger.info(
("No available shared memory broadcast block found"
" in %s seconds. This typically happens when some"
" processes are hanging, doing some time-consuming"
" work (e.g. compilation), or sitting idle."),
VLLM_RINGBUFFER_WARNING_INTERVAL,
)
n_warning += 1
# if we time out, raise an exception
if (timeout is not None
and time.monotonic() - start_time > timeout):
elapsed = time.monotonic() - start_time
if timeout is not None and elapsed > timeout:
raise TimeoutError
# if we wait for a long time, log a message
if elapsed > VLLM_RINGBUFFER_WARNING_INTERVAL * n_warning:
logger.info(
"No available shared memory broadcast block found"
" in %s seconds. This typically happens when some"
" processes are hanging or doing some"
" time-consuming work (e.g. compilation)",
VLLM_RINGBUFFER_WARNING_INTERVAL)
n_warning += 1
continue
# found a block that is either
# (1) not written
@ -432,7 +430,8 @@ class MessageQueue:
@contextmanager
def acquire_read(self,
timeout: Optional[float] = None,
cancel: Optional[Event] = None):
cancel: Optional[Event] = None,
indefinite: bool = False):
assert self._is_local_reader, "Only readers can acquire read"
start_time = time.monotonic()
n_warning = 1
@ -452,26 +451,26 @@ class MessageQueue:
# Release the processor to other threads
self._read_spin_timer.spin()
# if we wait for a long time, log a message
if (time.monotonic() - start_time
> VLLM_RINGBUFFER_WARNING_INTERVAL * n_warning):
logger.info(
("No available shared memory broadcast block found"
" in %s seconds. This typically happens when some"
" processes are hanging, doing some time-consuming"
" work (e.g. compilation), or sitting idle."),
VLLM_RINGBUFFER_WARNING_INTERVAL,
)
n_warning += 1
if cancel is not None and cancel.is_set():
raise RuntimeError("cancelled")
# if we time out, raise an exception
if (timeout is not None
and time.monotonic() - start_time > timeout):
elapsed = time.monotonic() - start_time
if timeout is not None and elapsed > timeout:
raise TimeoutError
# if we wait for a long time, log a message
if not indefinite and (elapsed
> VLLM_RINGBUFFER_WARNING_INTERVAL *
n_warning):
logger.info(
"No available shared memory broadcast block found"
" in %s seconds. This typically happens when some"
" processes are hanging or doing some"
" time-consuming work (e.g. compilation).",
VLLM_RINGBUFFER_WARNING_INTERVAL)
n_warning += 1
continue
# found a block that is not read by this reader
# let caller read from the buffer
@ -505,10 +504,11 @@ class MessageQueue:
def dequeue(self,
timeout: Optional[float] = None,
cancel: Optional[Event] = None):
cancel: Optional[Event] = None,
indefinite: bool = False):
""" Read from message queue with optional timeout (in seconds) """
if self._is_local_reader:
with self.acquire_read(timeout, cancel) as buf:
with self.acquire_read(timeout, cancel, indefinite) as buf:
overflow = buf[0] == 1
if not overflow:
# no need to know the size of serialized object

View File

@ -653,7 +653,7 @@ class WorkerProc:
"""Main busy loop for Multiprocessing Workers"""
while True:
method, args, kwargs, output_rank = self.rpc_broadcast_mq.dequeue(
cancel=cancel)
cancel=cancel, indefinite=True)
try:
if isinstance(method, str):
func = getattr(self.worker, method)