[core] Bump ray to use _overlap_gpu_communication in compiled graph tests (#10410)

Signed-off-by: Rui Qiao <ubuntu@ip-172-31-15-128.us-west-2.compute.internal>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Co-authored-by: Rui Qiao <ubuntu@ip-172-31-15-128.us-west-2.compute.internal>
This commit is contained in:
Rui Qiao 2024-12-11 11:36:35 -08:00 committed by GitHub
parent 66aaa7722d
commit 72ff3a9686
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 20 additions and 9 deletions

View File

@ -13,7 +13,7 @@ einops # required for MPT, qwen-vl and Mamba
httpx
librosa # required for audio tests
peft
ray[adag]==2.35
ray[adag]==2.40.0
sentence-transformers # required for embedding tests
soundfile # required for audio tests
timm # required for internvl test

View File

@ -410,7 +410,7 @@ pyyaml==6.0.2
# ray
# timm
# transformers
ray[adag]==2.35.0
ray[adag]==2.40.0
# via -r requirements-test.in
redis==5.2.0
# via tensorizer

View File

@ -45,6 +45,7 @@ if TYPE_CHECKING:
VLLM_USE_RAY_SPMD_WORKER: bool = False
VLLM_USE_RAY_COMPILED_DAG: bool = False
VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL: bool = True
VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM: bool = True
VLLM_WORKER_MULTIPROC_METHOD: str = "fork"
VLLM_ASSETS_CACHE: str = os.path.join(VLLM_CACHE_ROOT, "assets")
VLLM_IMAGE_FETCH_TIMEOUT: int = 5
@ -337,6 +338,13 @@ environment_variables: Dict[str, Callable[[], Any]] = {
lambda: bool(int(os.getenv("VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL", "1"))
),
# If the env var is set, it enables GPU communication overlap in
# Ray's compiled DAG. This flag is ignored if
# VLLM_USE_RAY_COMPILED_DAG is not set.
"VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM":
lambda: bool(int(os.getenv("VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM", "1"))
),
# Use dedicated multiprocess context for workers.
# Both spawn and fork work
"VLLM_WORKER_MULTIPROC_METHOD":

View File

@ -414,12 +414,10 @@ class RayGPUExecutor(DistributedGPUExecutor):
import pkg_resources
from packaging import version
required_version = version.parse("2.35")
required_version = version.parse("2.40")
current_version = version.parse(
pkg_resources.get_distribution("ray").version)
# TODO: update the constraint once we adapt to the backward
# incompatible API change from ray 2.36
if current_version != required_version:
if current_version < required_version:
raise ValueError(f"Ray version {required_version} is "
f"required, but found {current_version}")
@ -445,6 +443,8 @@ class RayGPUExecutor(DistributedGPUExecutor):
logger.info("VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL = %s",
envs.VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL)
logger.info("VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM = %s",
envs.VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM)
with InputNode() as input_data:
# Example DAG: PP=2, TP=4
# (ExecuteModelReq, None) -> 0 -> (ExecuteModelReq, IntermediateOutput) -> 4 -> SamplerOutput # noqa: E501
@ -480,7 +480,10 @@ class RayGPUExecutor(DistributedGPUExecutor):
forward_dag = MultiOutputNode(outputs)
return forward_dag.experimental_compile(enable_asyncio=enable_asyncio)
return forward_dag.experimental_compile(
enable_asyncio=enable_asyncio,
_overlap_gpu_communication=envs.
VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM)
def __del__(self):
self.shutdown()
@ -507,8 +510,8 @@ class RayGPUExecutorAsync(RayGPUExecutor, DistributedGPUExecutorAsync):
serialized_data = self.input_encoder.encode(execute_model_req)
dag_future = await self.forward_dag.execute_async(serialized_data)
outputs = await dag_future
return self.output_decoder.decode(outputs[0])
output = await dag_future[0]
return self.output_decoder.decode(output)
async def _driver_execute_model_async(
self,