From cda10fa3e2bb69ea276d663e5369ba16ec42cebb Mon Sep 17 00:00:00 2001 From: jennyyyyzhen <47012288+jennyyyyzhen@users.noreply.github.com> Date: Sun, 8 Jun 2025 06:39:12 -0700 Subject: [PATCH] [Multi Modal] Add an env var for message queue max chunk bytes (#19242) Signed-off-by: yZhen Co-authored-by: yZhen --- vllm/envs.py | 7 +++++++ vllm/v1/executor/multiproc_executor.py | 6 +++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/vllm/envs.py b/vllm/envs.py index ffb630079a84..9d18a1389579 100644 --- a/vllm/envs.py +++ b/vllm/envs.py @@ -123,6 +123,7 @@ if TYPE_CHECKING: VLLM_MAX_TOKENS_PER_EXPERT_FP4_MOE: int = 163840 VLLM_TOOL_PARSE_REGEX_TIMEOUT_SECONDS: int = 1 VLLM_SLEEP_WHEN_IDLE: bool = False + VLLM_MQ_MAX_CHUNK_BYTES_MB: int = 16 def get_default_cache_root(): @@ -847,6 +848,12 @@ environment_variables: dict[str, Callable[[], Any]] = { # latency penalty when a request eventually comes. "VLLM_SLEEP_WHEN_IDLE": lambda: bool(int(os.getenv("VLLM_SLEEP_WHEN_IDLE", "0"))), + + # Control the max chunk bytes (in MB) for the rpc message queue. + # Object larger than this threshold will be broadcast to worker + # processes via zmq. + "VLLM_MQ_MAX_CHUNK_BYTES_MB": + lambda: int(os.getenv("VLLM_MQ_MAX_CHUNK_BYTES_MB", "16")), } # --8<-- [end:env-vars-definition] diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index 0bd7383b5f0e..2148680d5f56 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -20,6 +20,7 @@ from typing import Any, Callable, Optional, Union, cast import cloudpickle +import vllm.envs as envs from vllm.config import VllmConfig from vllm.distributed import (destroy_distributed_environment, destroy_model_parallel) @@ -72,7 +73,10 @@ class MultiprocExecutor(Executor): # Initialize worker and set up message queues for SchedulerOutputs # and ModelRunnerOutputs - self.rpc_broadcast_mq = MessageQueue(self.world_size, self.world_size) + max_chunk_bytes = envs.VLLM_MQ_MAX_CHUNK_BYTES_MB * 1024 * 1024 + self.rpc_broadcast_mq = MessageQueue(self.world_size, + self.world_size, + max_chunk_bytes=max_chunk_bytes) scheduler_output_handle = self.rpc_broadcast_mq.export_handle() # Create workers