From 74bc397b0a8c092089bdd21e3ec9130336797471 Mon Sep 17 00:00:00 2001 From: Jun Duan Date: Sat, 15 Mar 2025 09:28:14 -0400 Subject: [PATCH] [Core] Expose API endpoint `/is_sleeping` (#14312) Signed-off-by: Jun Duan --- tests/entrypoints/openai/test_sleep.py | 7 +++++++ vllm/engine/async_llm_engine.py | 3 +++ vllm/engine/llm_engine.py | 3 +++ vllm/engine/multiprocessing/__init__.py | 16 +++++++++++++-- vllm/engine/multiprocessing/client.py | 27 +++++++++++++++++++++++-- vllm/engine/multiprocessing/engine.py | 13 ++++++++++++ vllm/engine/protocol.py | 5 +++++ vllm/entrypoints/openai/api_server.py | 6 ++++++ vllm/v1/engine/async_llm.py | 3 +++ vllm/v1/engine/core.py | 3 +++ vllm/v1/engine/core_client.py | 15 ++++++++++++++ vllm/v1/engine/llm_engine.py | 3 +++ 12 files changed, 100 insertions(+), 4 deletions(-) diff --git a/tests/entrypoints/openai/test_sleep.py b/tests/entrypoints/openai/test_sleep.py index 1caa743c40185..8bdf00bcee126 100644 --- a/tests/entrypoints/openai/test_sleep.py +++ b/tests/entrypoints/openai/test_sleep.py @@ -28,5 +28,12 @@ def test_sleep_mode(): response = requests.post(remote_server.url_for("/sleep"), data={"level": "1"}) assert response.status_code == 200 + response = requests.get(remote_server.url_for("/is_sleeping")) + assert response.status_code == 200 + assert response.json().get("is_sleeping") is True + response = requests.post(remote_server.url_for("/wake_up")) assert response.status_code == 200 + response = requests.get(remote_server.url_for("/is_sleeping")) + assert response.status_code == 200 + assert response.json().get("is_sleeping") is False diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index 84f5528a06d02..63787590bf47a 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -1225,6 +1225,9 @@ class AsyncLLMEngine(EngineClient): async def wake_up(self) -> None: self.engine.wake_up() + async def is_sleeping(self) -> bool: + return self.engine.is_sleeping() + async def add_lora(self, lora_request: LoRARequest) -> None: self.engine.add_lora(lora_request) diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index 6dc0055bdfb49..ca50f08a38048 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -1948,6 +1948,9 @@ class LLMEngine: "Sleep mode is not enabled in the model config") self.model_executor.wake_up() + def is_sleeping(self) -> bool: + return self.model_executor.is_sleeping + def check_health(self) -> None: if self.tokenizer: self.tokenizer.check_health() diff --git a/vllm/engine/multiprocessing/__init__.py b/vllm/engine/multiprocessing/__init__.py index 26dfb63c3dbf3..144dd822a177c 100644 --- a/vllm/engine/multiprocessing/__init__.py +++ b/vllm/engine/multiprocessing/__init__.py @@ -136,6 +136,18 @@ class RPCWakeUpRequest(Enum): WAKE_UP = 1 +@dataclass +class RPCIsSleepingRequest: + # Set the default value of request_id to a new UUID + request_id: str = field(default_factory=lambda: str(uuid.uuid4())) + + +@dataclass +class RPCIsSleepingResponse: + request_id: str + is_sleeping: bool + + @dataclass class RPCLoadAdapterRequest: lora_request: LoRARequest @@ -151,10 +163,10 @@ class RPCAdapterLoadedResponse: RPC_REQUEST_T = Union[RPCProcessRequest, RPCAbortRequest, RPCStartupRequest, RPCUProfileRequest, RPCLoadAdapterRequest, RPCResetPrefixCacheRequest, RPCSleepRequest, - RPCWakeUpRequest] + RPCWakeUpRequest, RPCIsSleepingRequest] REQUEST_OUTPUTS_T = Union[List[RequestOutput], RPCAdapterLoadedResponse, - RPCError] + RPCIsSleepingResponse, RPCError] def ENGINE_DEAD_ERROR( diff --git a/vllm/engine/multiprocessing/client.py b/vllm/engine/multiprocessing/client.py index b1bb0fd53d67a..e2ae9486e4351 100644 --- a/vllm/engine/multiprocessing/client.py +++ b/vllm/engine/multiprocessing/client.py @@ -27,6 +27,8 @@ from vllm.engine.multiprocessing import (ENGINE_DEAD_ERROR, IPC_DATA_EXT, IPC_OUTPUT_EXT, RPC_REQUEST_T, VLLM_RPC_SUCCESS_STR, RPCAbortRequest, RPCAdapterLoadedResponse, RPCError, + RPCIsSleepingRequest, + RPCIsSleepingResponse, RPCLoadAdapterRequest, RPCProcessRequest, RPCResetPrefixCacheRequest, @@ -246,7 +248,9 @@ class MQLLMEngineClient(EngineClient): if queue is not None: queue.put_nowait(exception) # Put each output into the appropriate queue. - elif isinstance(request_outputs, RPCAdapterLoadedResponse): + elif isinstance( + request_outputs, + (RPCAdapterLoadedResponse, RPCIsSleepingResponse)): self._add_output(request_outputs) else: for request_output in request_outputs: @@ -256,7 +260,8 @@ class MQLLMEngineClient(EngineClient): logger.debug("Shutting down MQLLMEngineClient output handler.") def _add_output(self, request_output: Union[RequestOutput, - RPCAdapterLoadedResponse]): + RPCAdapterLoadedResponse, + RPCIsSleepingResponse]): queue = self.output_queues.get(request_output.request_id) if queue is not None: queue.put_nowait(request_output) @@ -696,6 +701,24 @@ class MQLLMEngineClient(EngineClient): return await self._send_one_way_rpc_request( request=RPCWakeUpRequest.WAKE_UP, socket=self.input_socket) + async def is_sleeping(self) -> bool: + """Check whether the engine is sleeping""" + request = RPCIsSleepingRequest() + + queue: asyncio.Queue[Union[BaseException, + RPCIsSleepingResponse]] = asyncio.Queue() + self.output_queues[request.request_id] = queue + + request_bytes = pickle.dumps(request) + await self.input_socket.send_multipart((request_bytes, ), copy=False) + + request_output = await queue.get() + self.output_queues.pop(request.request_id) + + if isinstance(request_output, BaseException): + raise request_output + return request_output.is_sleeping + async def add_lora(self, lora_request: LoRARequest) -> None: """Load a new LoRA adapter into the engine for future requests.""" # Uses the same I/O as generate requests diff --git a/vllm/engine/multiprocessing/engine.py b/vllm/engine/multiprocessing/engine.py index 312e0e98d56b4..33b96af3018a3 100644 --- a/vllm/engine/multiprocessing/engine.py +++ b/vllm/engine/multiprocessing/engine.py @@ -18,6 +18,8 @@ from vllm.engine.multiprocessing import (ENGINE_DEAD_ERROR, IPC_DATA_EXT, IPC_OUTPUT_EXT, REQUEST_OUTPUTS_T, VLLM_RPC_SUCCESS_STR, RPCAbortRequest, RPCAdapterLoadedResponse, RPCError, + RPCIsSleepingRequest, + RPCIsSleepingResponse, RPCLoadAdapterRequest, RPCProcessRequest, RPCResetPrefixCacheRequest, @@ -271,6 +273,8 @@ class MQLLMEngine: self.sleep(request.value) elif isinstance(request, RPCWakeUpRequest): self.wake_up() + elif isinstance(request, RPCIsSleepingRequest): + self._handle_is_sleeping_request(request) else: raise ValueError("Unknown RPCRequest Type: " f"{type(request)}") @@ -337,6 +341,12 @@ class MQLLMEngine: self._send_outputs( RPCAdapterLoadedResponse(request_id=request.request_id)) + def _handle_is_sleeping_request(self, request: RPCIsSleepingRequest): + is_sleeping = self.is_sleeping() + self._send_outputs( + RPCIsSleepingResponse(request_id=request.request_id, + is_sleeping=is_sleeping)) + def _health_check(self): # Send unhealthy if engine has already errored if self._errored_with is not None: @@ -406,6 +416,9 @@ class MQLLMEngine: def wake_up(self) -> None: self.engine.wake_up() + def is_sleeping(self) -> bool: + return self.engine.is_sleeping() + def signal_handler(*_) -> None: raise KeyboardInterrupt("MQLLMEngine terminated") diff --git a/vllm/engine/protocol.py b/vllm/engine/protocol.py index ee9accd32f218..f314075b166e2 100644 --- a/vllm/engine/protocol.py +++ b/vllm/engine/protocol.py @@ -288,6 +288,11 @@ class EngineClient(ABC): """Wake up the engine""" ... + @abstractmethod + async def is_sleeping(self) -> bool: + """Check whether the engine is sleeping""" + ... + @abstractmethod async def add_lora(self, lora_request: LoRARequest) -> None: """Load a new LoRA adapter into the engine for future requests.""" diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 694d4f9cf1121..bc74ebd205d15 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -694,6 +694,12 @@ if envs.VLLM_SERVER_DEV_MODE: # is sent but does not finish yet when we return a response. return Response(status_code=200) + @router.get("/is_sleeping") + async def is_sleeping(raw_request: Request): + logger.info("check whether the engine is sleeping") + is_sleeping = await engine_client(raw_request).is_sleeping() + return JSONResponse(content={"is_sleeping": is_sleeping}) + @router.post("/invocations", dependencies=[Depends(validate_json_request)]) async def invocations(raw_request: Request): diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 7188f10b18856..d4ac9c066d50d 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -407,6 +407,9 @@ class AsyncLLM(EngineClient): async def wake_up(self) -> None: await self.engine_core.wake_up_async() + async def is_sleeping(self) -> bool: + return await self.engine_core.is_sleeping_async() + async def add_lora(self, lora_request: LoRARequest) -> bool: """Load a new LoRA adapter into the engine for future requests.""" return await self.engine_core.add_lora_async(lora_request) diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index 174d96ec43776..8f93d3c71cdf3 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -253,6 +253,9 @@ class EngineCore: def wake_up(self): self.model_executor.wake_up() + def is_sleeping(self) -> bool: + return self.model_executor.is_sleeping + def execute_dummy_batch(self): self.model_executor.collective_rpc("execute_dummy_batch") diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 0f92adcc86375..5ed4645797846 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -89,6 +89,9 @@ class EngineCoreClient(ABC): def wake_up(self) -> None: raise NotImplementedError + def is_sleeping(self) -> bool: + raise NotImplementedError + def execute_dummy_batch(self) -> None: raise NotImplementedError @@ -128,6 +131,9 @@ class EngineCoreClient(ABC): async def wake_up_async(self) -> None: raise NotImplementedError + async def is_sleeping_async(self) -> bool: + raise NotImplementedError + async def abort_requests_async(self, request_ids: list[str]) -> None: raise NotImplementedError @@ -182,6 +188,9 @@ class InprocClient(EngineCoreClient): def wake_up(self) -> None: self.engine_core.wake_up() + def is_sleeping(self) -> bool: + return self.engine_core.is_sleeping() + def execute_dummy_batch(self) -> None: self.engine_core.execute_dummy_batch() @@ -433,6 +442,9 @@ class SyncMPClient(MPClient): def wake_up(self) -> None: self._call_utility("wake_up") + def is_sleeping(self) -> bool: + return self._call_utility("is_sleeping") + def execute_dummy_batch(self) -> None: self._call_utility("execute_dummy_batch") @@ -523,6 +535,9 @@ class AsyncMPClient(MPClient): async def wake_up_async(self) -> None: await self._call_utility_async("wake_up") + async def is_sleeping_async(self) -> bool: + return await self._call_utility_async("is_sleeping") + async def execute_dummy_batch_async(self) -> None: await self._call_utility_async("execute_dummy_batch") diff --git a/vllm/v1/engine/llm_engine.py b/vllm/v1/engine/llm_engine.py index cbd19d4d637be..63b0a8fca32bd 100644 --- a/vllm/v1/engine/llm_engine.py +++ b/vllm/v1/engine/llm_engine.py @@ -235,6 +235,9 @@ class LLMEngine: def wake_up(self): self.engine_core.wake_up() + def is_sleeping(self) -> bool: + return self.engine_core.is_sleeping() + def get_tokenizer_group( self, group_type: type[_G] = BaseTokenizerGroup,