diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 6554cda6b637b..4263fb27265f6 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -188,8 +188,14 @@ class RayGPUExecutor(DistributedGPUExecutor): self.workers = sorted(self.workers, key=sort_by_driver_then_worker_ip) # Get the set of GPU IDs used on each node. - worker_node_and_gpu_ids = self._run_workers("get_node_and_gpu_ids", - use_dummy_driver=True) + worker_node_and_gpu_ids = [] + for worker in [self.driver_dummy_worker] + self.workers: + if worker is None: + # driver_dummy_worker can be None when using ray spmd worker. + continue + worker_node_and_gpu_ids.append( + ray.get(worker.get_node_and_gpu_ids.remote()) \ + ) # type: ignore node_workers = defaultdict(list) # node id -> list of worker ranks node_gpus = defaultdict(list) # node id -> list of gpu ids @@ -329,7 +335,6 @@ class RayGPUExecutor(DistributedGPUExecutor): async_run_tensor_parallel_workers_only: bool = False, all_args: Optional[List[Tuple[Any, ...]]] = None, all_kwargs: Optional[List[Dict[str, Any]]] = None, - use_dummy_driver: bool = False, max_concurrent_workers: Optional[int] = None, **kwargs, ) -> Any: @@ -389,18 +394,10 @@ class RayGPUExecutor(DistributedGPUExecutor): driver_kwargs = kwargs if all_kwargs is None else all_kwargs[0] # Start the driver worker after all the ray workers. - if not use_dummy_driver: - driver_worker_output = [ - self.driver_worker.execute_method(method, *driver_args, - **driver_kwargs) - ] - else: - assert self.driver_dummy_worker is not None - driver_worker_output = [ - ray.get( - self.driver_dummy_worker.execute_method.remote( - method, *driver_args, **driver_kwargs)) - ] + driver_worker_output = [ + self.driver_worker.execute_method(method, *driver_args, + **driver_kwargs) + ] # Get the results of the ray workers. if self.workers: diff --git a/vllm/executor/ray_hpu_executor.py b/vllm/executor/ray_hpu_executor.py index 91c84d9214a60..f3025cb537ab8 100644 --- a/vllm/executor/ray_hpu_executor.py +++ b/vllm/executor/ray_hpu_executor.py @@ -163,9 +163,14 @@ class RayHPUExecutor(DistributedGPUExecutor): # node will be placed first. self.workers = sorted(self.workers, key=sort_by_driver_then_worker_ip) - # Get the set of GPU IDs used on each node. - worker_node_and_gpu_ids = self._run_workers("get_node_and_gpu_ids", - use_dummy_driver=True) + worker_node_and_gpu_ids = [] + for worker in [self.driver_dummy_worker] + self.workers: + if worker is None: + # driver_dummy_worker can be None when using ray spmd worker. + continue + worker_node_and_gpu_ids.append( + ray.get(worker.get_node_and_gpu_ids.remote()) \ + ) # type: ignore node_workers = defaultdict(list) # node id -> list of worker ranks node_gpus = defaultdict(list) # node id -> list of gpu ids @@ -296,7 +301,6 @@ class RayHPUExecutor(DistributedGPUExecutor): async_run_tensor_parallel_workers_only: bool = False, all_args: Optional[List[Tuple[Any, ...]]] = None, all_kwargs: Optional[List[Dict[str, Any]]] = None, - use_dummy_driver: bool = False, max_concurrent_workers: Optional[int] = None, **kwargs, ) -> Any: @@ -356,18 +360,10 @@ class RayHPUExecutor(DistributedGPUExecutor): driver_kwargs = kwargs if all_kwargs is None else all_kwargs[0] # Start the driver worker after all the ray workers. - if not use_dummy_driver: - driver_worker_output = [ - self.driver_worker.execute_method(method, *driver_args, - **driver_kwargs) - ] - else: - assert self.driver_dummy_worker is not None - driver_worker_output = [ - ray.get( - self.driver_dummy_worker.execute_method.remote( - method, *driver_args, **driver_kwargs)) - ] + driver_worker_output = [ + self.driver_worker.execute_method(method, *driver_args, + **driver_kwargs) + ] # Get the results of the ray workers. if self.workers: diff --git a/vllm/executor/ray_tpu_executor.py b/vllm/executor/ray_tpu_executor.py index 3ee59397bf4c9..5118c13934f0d 100644 --- a/vllm/executor/ray_tpu_executor.py +++ b/vllm/executor/ray_tpu_executor.py @@ -137,8 +137,14 @@ class RayTPUExecutor(TPUExecutor): self.workers = sorted(self.workers, key=sort_by_driver_then_worker_ip) # Get the set of TPU IDs used on each node. - worker_node_and_gpu_ids = self._run_workers("get_node_and_gpu_ids", - use_dummy_driver=True) + worker_node_and_gpu_ids = [] + for worker in [self.driver_dummy_worker] + self.workers: + if worker is None: + # driver_dummy_worker can be None when using ray spmd worker. + continue + worker_node_and_gpu_ids.append( + ray.get(worker.get_node_and_gpu_ids.remote()) \ + ) # type: ignore node_workers = defaultdict(list) for i, (node_id, _) in enumerate(worker_node_and_gpu_ids): @@ -199,7 +205,6 @@ class RayTPUExecutor(TPUExecutor): async_run_remote_workers_only: bool = False, all_args: Optional[List[Tuple[Any, ...]]] = None, all_kwargs: Optional[List[Dict[str, Any]]] = None, - use_dummy_driver: bool = False, max_concurrent_workers: Optional[int] = None, use_ray_compiled_dag: bool = False, **kwargs, @@ -241,14 +246,8 @@ class RayTPUExecutor(TPUExecutor): driver_kwargs = kwargs if all_kwargs is None else all_kwargs[0] # Start the driver worker after all the ray workers. - if not use_dummy_driver: - driver_worker_output = self.driver_worker.execute_method( - method, *driver_args, **driver_kwargs) - else: - assert self.driver_dummy_worker is not None - driver_worker_output = ray.get( - self.driver_dummy_worker.execute_method.remote( - method, *driver_args, **driver_kwargs)) + driver_worker_output = self.driver_worker.execute_method( + method, *driver_args, **driver_kwargs) # Get the results of the ray workers. if self.workers: ray_worker_outputs = ray.get(ray_worker_outputs) diff --git a/vllm/executor/ray_xpu_executor.py b/vllm/executor/ray_xpu_executor.py index 61f5d6a65e999..d2086f5fef26c 100644 --- a/vllm/executor/ray_xpu_executor.py +++ b/vllm/executor/ray_xpu_executor.py @@ -1,6 +1,8 @@ import asyncio from typing import List, Optional +import ray + import vllm.envs as envs from vllm.executor.ray_gpu_executor import RayGPUExecutor, RayGPUExecutorAsync from vllm.executor.xpu_executor import XPUExecutor @@ -14,8 +16,13 @@ class RayXPUExecutor(RayGPUExecutor, XPUExecutor): def _get_env_vars_to_be_updated(self): # Get the set of GPU IDs used on each node. - worker_node_and_gpu_ids = self._run_workers("get_node_and_gpu_ids", - use_dummy_driver=True) + worker_node_and_gpu_ids = [] + for worker in [self.driver_dummy_worker] + self.workers: + if worker is None: + # driver_dummy_worker can be None when using ray spmd worker. + continue + worker_node_and_gpu_ids.append( + ray.get(worker.get_node_and_gpu_ids.remote())) # type: ignore # Set environment variables for the driver and workers. all_args_to_update_environment_variables = [({