diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index ca8fd83314ae..ba0cdee461a2 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -528,6 +528,13 @@ class _AsyncLLMEngine(LLMEngine): async def check_health_async(self) -> None: self.model_executor.check_health() + async def collective_rpc_async(self, + method: str, + timeout: Optional[float] = None, + args: tuple = (), + kwargs: Optional[dict] = None): + raise NotImplementedError + async def build_guided_decoding_logits_processor_async( sampling_params: SamplingParams, tokenizer: AnyTokenizer, @@ -1236,6 +1243,17 @@ class AsyncLLMEngine(EngineClient): async def add_lora(self, lora_request: LoRARequest) -> None: self.engine.add_lora(lora_request) + async def collective_rpc(self, + method: str, + timeout: Optional[float] = None, + args: tuple = (), + kwargs: Optional[dict] = None): + """ + Perform a collective RPC call to the given path. + """ + return await self.engine.collective_rpc_async(method, timeout, args, + kwargs) + # TODO(v1): Remove this class proxy when V1 goes default. if envs.is_set("VLLM_USE_V1") and envs.VLLM_USE_V1: diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index fdda812c2853..54f0232da2b2 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -492,6 +492,17 @@ class AsyncLLM(EngineClient): """Prevent an adapter from being evicted.""" return await self.engine_core.pin_lora_async(lora_id) + async def collective_rpc(self, + method: str, + timeout: Optional[float] = None, + args: tuple = (), + kwargs: Optional[dict] = None): + """ + Perform a collective RPC call to the given path. + """ + return await self.engine_core.collective_rpc_async( + method, timeout, args, kwargs) + @property def is_running(self) -> bool: # Is None before the loop is started.