diff --git a/tests/v1/engine/test_engine_core_client.py b/tests/v1/engine/test_engine_core_client.py index c82285639aee4..37eb869fe69a3 100644 --- a/tests/v1/engine/test_engine_core_client.py +++ b/tests/v1/engine/test_engine_core_client.py @@ -121,8 +121,13 @@ async def loop_until_fully_done_async(client: EngineCoreClient, outputs: dict): # Dummy utility function to monkey-patch into engine core. -def echo(self, msg: str, err_msg: Optional[str] = None) -> str: +def echo(self, + msg: str, + err_msg: Optional[str] = None, + sleep: Optional[float] = None) -> str: print(f"echo util function called: {msg}, {err_msg}") + if sleep is not None: + time.sleep(sleep) if err_msg is not None: raise ValueError(err_msg) return msg @@ -289,6 +294,23 @@ async def test_engine_core_client_asyncio(monkeypatch: pytest.MonkeyPatch): await core_client.call_utility_async("echo", None, "help!") assert str(e_info.value) == "Call to echo method failed: help!" + + # Test that cancelling the utility call doesn't destabilize the + # engine. + util_task = asyncio.create_task( + core_client.call_utility_async("echo", "testarg2", None, + 0.5)) # sleep for 0.5 sec + await asyncio.sleep(0.05) + cancelled = util_task.cancel() + assert cancelled + + # Ensure client is still functional. The engine runs utility + # methods in a single thread so this request won't be processed + # until the cancelled sleeping one is complete. + result = await asyncio.wait_for(core_client.call_utility_async( + "echo", "testarg3"), + timeout=1.0) + assert result == "testarg3" finally: client.shutdown() diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 29ee0a9dfb1e2..079dd9a7d38d1 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -574,13 +574,22 @@ class MPClient(EngineCoreClient): def _process_utility_output(output: UtilityOutput, utility_results: dict[int, AnyFuture]): - """Set the result from a utility method in the waiting future""" + """Set the result from a utility method in the waiting future.""" future = utility_results.pop(output.call_id) - if output.failure_message is not None: - future.set_exception(Exception(output.failure_message)) - else: - assert output.result is not None - future.set_result(output.result.result) + failure_message = output.failure_message + try: + if failure_message is not None: + future.set_exception(Exception(failure_message)) + else: + assert output.result is not None + future.set_result(output.result.result) + except asyncio.InvalidStateError: + # This can happen if the future is cancelled due to the + # original calling task being cancelled. + if failure_message is not None: + logger.error( + "Cancelled call to utility method failed " + "with error: %s", failure_message) class SyncMPClient(MPClient):