mirror of
https://git.datalinker.icu/vllm-project/vllm.git
synced 2025-12-21 23:05:34 +08:00
[Misc] Use envs.VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE (#15831)
Signed-off-by: Rui Qiao <ruisearch42@gmail.com> Co-authored-by: Cody Yu <hao.yu.cody@gmail.com> Co-authored-by: DarkLight1337 <tlleungac@connect.ust.hk>
This commit is contained in:
parent
0a298ea418
commit
8dd41d6bcc
20
vllm/envs.py
20
vllm/envs.py
@ -48,7 +48,7 @@ if TYPE_CHECKING:
|
|||||||
VLLM_FUSED_MOE_CHUNK_SIZE: int = 64 * 1024
|
VLLM_FUSED_MOE_CHUNK_SIZE: int = 64 * 1024
|
||||||
VLLM_USE_RAY_SPMD_WORKER: bool = False
|
VLLM_USE_RAY_SPMD_WORKER: bool = False
|
||||||
VLLM_USE_RAY_COMPILED_DAG: bool = False
|
VLLM_USE_RAY_COMPILED_DAG: bool = False
|
||||||
VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL: bool = True
|
VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE: str = "auto"
|
||||||
VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM: bool = False
|
VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM: bool = False
|
||||||
VLLM_WORKER_MULTIPROC_METHOD: str = "fork"
|
VLLM_WORKER_MULTIPROC_METHOD: str = "fork"
|
||||||
VLLM_ASSETS_CACHE: str = os.path.join(VLLM_CACHE_ROOT, "assets")
|
VLLM_ASSETS_CACHE: str = os.path.join(VLLM_CACHE_ROOT, "assets")
|
||||||
@ -380,15 +380,21 @@ environment_variables: dict[str, Callable[[], Any]] = {
|
|||||||
# (previously known as ADAG) API which optimizes the
|
# (previously known as ADAG) API which optimizes the
|
||||||
# control plane overhead.
|
# control plane overhead.
|
||||||
# Run vLLM with VLLM_USE_RAY_COMPILED_DAG=1 to enable it.
|
# Run vLLM with VLLM_USE_RAY_COMPILED_DAG=1 to enable it.
|
||||||
|
# Note that this variable is set to 1 in V1 by default
|
||||||
|
# when ray distributed executor is used.
|
||||||
"VLLM_USE_RAY_COMPILED_DAG":
|
"VLLM_USE_RAY_COMPILED_DAG":
|
||||||
lambda: bool(int(os.getenv("VLLM_USE_RAY_COMPILED_DAG", "0"))),
|
lambda: bool(int(os.getenv("VLLM_USE_RAY_COMPILED_DAG", "0"))),
|
||||||
|
|
||||||
# If the env var is set, it uses NCCL for communication in
|
# If the env var is set, Ray Compiled Graph uses the specified
|
||||||
# Ray's Compiled Graph. This flag is ignored if
|
# channel type to communicate between workers belonging to
|
||||||
# VLLM_USE_RAY_COMPILED_DAG is not set.
|
# different pipeline-parallel stages.
|
||||||
"VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL":
|
# Available options:
|
||||||
lambda: bool(int(os.getenv("VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL", "1"))
|
# - "auto": use the default channel type
|
||||||
),
|
# - "nccl": use NCCL for communication
|
||||||
|
# - "shm": use shared memory and gRPC for communication
|
||||||
|
# This flag is ignored if VLLM_USE_RAY_COMPILED_DAG is not set.
|
||||||
|
"VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE":
|
||||||
|
lambda: os.getenv("VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE", "auto"),
|
||||||
|
|
||||||
# If the env var is set, it enables GPU communication overlap
|
# If the env var is set, it enables GPU communication overlap
|
||||||
# (experimental feature) in Ray's Compiled Graph. This flag is ignored if
|
# (experimental feature) in Ray's Compiled Graph. This flag is ignored if
|
||||||
|
|||||||
@ -79,7 +79,7 @@ class RayDistributedExecutor(DistributedExecutorBase):
|
|||||||
|
|
||||||
# For TPU, avoid compiling NVIDIA's NCCL
|
# For TPU, avoid compiling NVIDIA's NCCL
|
||||||
if current_platform.is_tpu():
|
if current_platform.is_tpu():
|
||||||
os.environ["VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL"] = "0"
|
os.environ["VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE"] = "shm"
|
||||||
|
|
||||||
# If the env var is set, it uses the Ray's compiled DAG API
|
# If the env var is set, it uses the Ray's compiled DAG API
|
||||||
# which optimizes the control plane overhead.
|
# which optimizes the control plane overhead.
|
||||||
@ -546,10 +546,11 @@ class RayDistributedExecutor(DistributedExecutorBase):
|
|||||||
"Run `pip install ray[cgraph]` to install it.")
|
"Run `pip install ray[cgraph]` to install it.")
|
||||||
|
|
||||||
cupy_spec = importlib.util.find_spec("cupy")
|
cupy_spec = importlib.util.find_spec("cupy")
|
||||||
if cupy_spec is None and envs.VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL:
|
if (cupy_spec is None
|
||||||
|
and envs.VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE == "nccl"):
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
"cupy is not installed but required since "
|
"cupy is not installed but required since "
|
||||||
"VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL is set. "
|
"VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE is set to 'nccl'. "
|
||||||
"Run `pip install ray[cgraph]` and check cupy installation.")
|
"Run `pip install ray[cgraph]` and check cupy installation.")
|
||||||
|
|
||||||
def _compiled_ray_dag(self, enable_asyncio: bool):
|
def _compiled_ray_dag(self, enable_asyncio: bool):
|
||||||
@ -557,10 +558,17 @@ class RayDistributedExecutor(DistributedExecutorBase):
|
|||||||
self._check_ray_cgraph_installation()
|
self._check_ray_cgraph_installation()
|
||||||
from ray.dag import InputNode, MultiOutputNode
|
from ray.dag import InputNode, MultiOutputNode
|
||||||
|
|
||||||
logger.info("VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL = %s",
|
logger.info("VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE = %s",
|
||||||
envs.VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL)
|
envs.VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE)
|
||||||
logger.info("VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM = %s",
|
logger.info("VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM = %s",
|
||||||
envs.VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM)
|
envs.VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM)
|
||||||
|
|
||||||
|
channel_type = envs.VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE
|
||||||
|
if channel_type not in ("auto", "nccl", "shm"):
|
||||||
|
raise ValueError(
|
||||||
|
"Invalid value for VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE: "
|
||||||
|
f"{channel_type}. Valid values are: 'auto', 'nccl', or 'shm'.")
|
||||||
|
|
||||||
# Enlarge the default value of "RAY_CGRAPH_get_timeout" to 300 seconds
|
# Enlarge the default value of "RAY_CGRAPH_get_timeout" to 300 seconds
|
||||||
# (it is 10 seconds by default). This is a Ray environment variable to
|
# (it is 10 seconds by default). This is a Ray environment variable to
|
||||||
# control the timeout of getting result from a compiled graph execution,
|
# control the timeout of getting result from a compiled graph execution,
|
||||||
@ -605,13 +613,12 @@ class RayDistributedExecutor(DistributedExecutorBase):
|
|||||||
]
|
]
|
||||||
|
|
||||||
last_pp_rank = len(self.pp_tp_workers) - 1
|
last_pp_rank = len(self.pp_tp_workers) - 1
|
||||||
if pp_rank < last_pp_rank:
|
if (pp_rank < last_pp_rank and
|
||||||
|
envs.VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE != "shm"):
|
||||||
# Specify how intermediate tensors should be passed
|
# Specify how intermediate tensors should be passed
|
||||||
# between pp stages, no need to specify for the last
|
# between pp stages, no need to specify for the last
|
||||||
# pp stage.
|
# pp stage or when using shared memory (the default).
|
||||||
transport = "nccl" \
|
transport = envs.VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE
|
||||||
if envs.VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL \
|
|
||||||
else "auto"
|
|
||||||
outputs = [
|
outputs = [
|
||||||
output.with_tensor_transport(transport=transport)
|
output.with_tensor_transport(transport=transport)
|
||||||
for output in outputs
|
for output in outputs
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user