diff --git a/vllm/distributed/device_communicators/shm_broadcast.py b/vllm/distributed/device_communicators/shm_broadcast.py index 499c6927f2f9..0ee432fea15e 100644 --- a/vllm/distributed/device_communicators/shm_broadcast.py +++ b/vllm/distributed/device_communicators/shm_broadcast.py @@ -387,23 +387,21 @@ class MessageQueue: # Release the processor to other threads sched_yield() - # if we wait for a long time, log a message - if (time.monotonic() - start_time - > VLLM_RINGBUFFER_WARNING_INTERVAL * n_warning): - logger.info( - ("No available shared memory broadcast block found" - " in %s seconds. This typically happens when some" - " processes are hanging, doing some time-consuming" - " work (e.g. compilation), or sitting idle."), - VLLM_RINGBUFFER_WARNING_INTERVAL, - ) - n_warning += 1 - # if we time out, raise an exception - if (timeout is not None - and time.monotonic() - start_time > timeout): + elapsed = time.monotonic() - start_time + if timeout is not None and elapsed > timeout: raise TimeoutError + # if we wait for a long time, log a message + if elapsed > VLLM_RINGBUFFER_WARNING_INTERVAL * n_warning: + logger.info( + "No available shared memory broadcast block found" + " in %s seconds. This typically happens when some" + " processes are hanging or doing some" + " time-consuming work (e.g. compilation)", + VLLM_RINGBUFFER_WARNING_INTERVAL) + n_warning += 1 + continue # found a block that is either # (1) not written @@ -432,7 +430,8 @@ class MessageQueue: @contextmanager def acquire_read(self, timeout: Optional[float] = None, - cancel: Optional[Event] = None): + cancel: Optional[Event] = None, + indefinite: bool = False): assert self._is_local_reader, "Only readers can acquire read" start_time = time.monotonic() n_warning = 1 @@ -452,26 +451,26 @@ class MessageQueue: # Release the processor to other threads self._read_spin_timer.spin() - # if we wait for a long time, log a message - if (time.monotonic() - start_time - > VLLM_RINGBUFFER_WARNING_INTERVAL * n_warning): - logger.info( - ("No available shared memory broadcast block found" - " in %s seconds. This typically happens when some" - " processes are hanging, doing some time-consuming" - " work (e.g. compilation), or sitting idle."), - VLLM_RINGBUFFER_WARNING_INTERVAL, - ) - n_warning += 1 - if cancel is not None and cancel.is_set(): raise RuntimeError("cancelled") # if we time out, raise an exception - if (timeout is not None - and time.monotonic() - start_time > timeout): + elapsed = time.monotonic() - start_time + if timeout is not None and elapsed > timeout: raise TimeoutError + # if we wait for a long time, log a message + if not indefinite and (elapsed + > VLLM_RINGBUFFER_WARNING_INTERVAL * + n_warning): + logger.info( + "No available shared memory broadcast block found" + " in %s seconds. This typically happens when some" + " processes are hanging or doing some" + " time-consuming work (e.g. compilation).", + VLLM_RINGBUFFER_WARNING_INTERVAL) + n_warning += 1 + continue # found a block that is not read by this reader # let caller read from the buffer @@ -505,10 +504,11 @@ class MessageQueue: def dequeue(self, timeout: Optional[float] = None, - cancel: Optional[Event] = None): + cancel: Optional[Event] = None, + indefinite: bool = False): """ Read from message queue with optional timeout (in seconds) """ if self._is_local_reader: - with self.acquire_read(timeout, cancel) as buf: + with self.acquire_read(timeout, cancel, indefinite) as buf: overflow = buf[0] == 1 if not overflow: # no need to know the size of serialized object diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index 2aa732f34bcc..ef90af263664 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -653,7 +653,7 @@ class WorkerProc: """Main busy loop for Multiprocessing Workers""" while True: method, args, kwargs, output_rank = self.rpc_broadcast_mq.dequeue( - cancel=cancel) + cancel=cancel, indefinite=True) try: if isinstance(method, str): func = getattr(self.worker, method)