diff --git a/vllm/envs.py b/vllm/envs.py index 8a03ba329b028..b34c2df81698f 100644 --- a/vllm/envs.py +++ b/vllm/envs.py @@ -48,7 +48,7 @@ if TYPE_CHECKING: VLLM_FUSED_MOE_CHUNK_SIZE: int = 64 * 1024 VLLM_USE_RAY_SPMD_WORKER: bool = False VLLM_USE_RAY_COMPILED_DAG: bool = False - VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL: bool = True + VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE: str = "auto" VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM: bool = False VLLM_WORKER_MULTIPROC_METHOD: str = "fork" VLLM_ASSETS_CACHE: str = os.path.join(VLLM_CACHE_ROOT, "assets") @@ -380,15 +380,21 @@ environment_variables: dict[str, Callable[[], Any]] = { # (previously known as ADAG) API which optimizes the # control plane overhead. # Run vLLM with VLLM_USE_RAY_COMPILED_DAG=1 to enable it. + # Note that this variable is set to 1 in V1 by default + # when ray distributed executor is used. "VLLM_USE_RAY_COMPILED_DAG": lambda: bool(int(os.getenv("VLLM_USE_RAY_COMPILED_DAG", "0"))), - # If the env var is set, it uses NCCL for communication in - # Ray's Compiled Graph. This flag is ignored if - # VLLM_USE_RAY_COMPILED_DAG is not set. - "VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL": - lambda: bool(int(os.getenv("VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL", "1")) - ), + # If the env var is set, Ray Compiled Graph uses the specified + # channel type to communicate between workers belonging to + # different pipeline-parallel stages. + # Available options: + # - "auto": use the default channel type + # - "nccl": use NCCL for communication + # - "shm": use shared memory and gRPC for communication + # This flag is ignored if VLLM_USE_RAY_COMPILED_DAG is not set. + "VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE": + lambda: os.getenv("VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE", "auto"), # If the env var is set, it enables GPU communication overlap # (experimental feature) in Ray's Compiled Graph. This flag is ignored if diff --git a/vllm/executor/ray_distributed_executor.py b/vllm/executor/ray_distributed_executor.py index c823ab5bf9698..9b0b98731e033 100644 --- a/vllm/executor/ray_distributed_executor.py +++ b/vllm/executor/ray_distributed_executor.py @@ -79,7 +79,7 @@ class RayDistributedExecutor(DistributedExecutorBase): # For TPU, avoid compiling NVIDIA's NCCL if current_platform.is_tpu(): - os.environ["VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL"] = "0" + os.environ["VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE"] = "shm" # If the env var is set, it uses the Ray's compiled DAG API # which optimizes the control plane overhead. @@ -546,10 +546,11 @@ class RayDistributedExecutor(DistributedExecutorBase): "Run `pip install ray[cgraph]` to install it.") cupy_spec = importlib.util.find_spec("cupy") - if cupy_spec is None and envs.VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL: + if (cupy_spec is None + and envs.VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE == "nccl"): raise ValueError( "cupy is not installed but required since " - "VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL is set. " + "VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE is set to 'nccl'. " "Run `pip install ray[cgraph]` and check cupy installation.") def _compiled_ray_dag(self, enable_asyncio: bool): @@ -557,10 +558,17 @@ class RayDistributedExecutor(DistributedExecutorBase): self._check_ray_cgraph_installation() from ray.dag import InputNode, MultiOutputNode - logger.info("VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL = %s", - envs.VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL) + logger.info("VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE = %s", + envs.VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE) logger.info("VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM = %s", envs.VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM) + + channel_type = envs.VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE + if channel_type not in ("auto", "nccl", "shm"): + raise ValueError( + "Invalid value for VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE: " + f"{channel_type}. Valid values are: 'auto', 'nccl', or 'shm'.") + # Enlarge the default value of "RAY_CGRAPH_get_timeout" to 300 seconds # (it is 10 seconds by default). This is a Ray environment variable to # control the timeout of getting result from a compiled graph execution, @@ -605,13 +613,12 @@ class RayDistributedExecutor(DistributedExecutorBase): ] last_pp_rank = len(self.pp_tp_workers) - 1 - if pp_rank < last_pp_rank: + if (pp_rank < last_pp_rank and + envs.VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE != "shm"): # Specify how intermediate tensors should be passed # between pp stages, no need to specify for the last - # pp stage. - transport = "nccl" \ - if envs.VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL \ - else "auto" + # pp stage or when using shared memory (the default). + transport = envs.VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE outputs = [ output.with_tensor_transport(transport=transport) for output in outputs