mirror of
https://git.datalinker.icu/vllm-project/vllm.git
synced 2026-06-09 21:15:41 +08:00
[BugFix] Handle case where async utility call is cancelled (#22996)
Signed-off-by: Nick Hill <nhill@redhat.com> Co-authored-by: Yinghai Lu <yinghai@thinkingmachines.ai>
This commit is contained in:
parent
1fc375dc05
commit
b9dc9d2607
@ -121,8 +121,13 @@ async def loop_until_fully_done_async(client: EngineCoreClient, outputs: dict):
|
|||||||
|
|
||||||
|
|
||||||
# Dummy utility function to monkey-patch into engine core.
|
# Dummy utility function to monkey-patch into engine core.
|
||||||
def echo(self, msg: str, err_msg: Optional[str] = None) -> str:
|
def echo(self,
|
||||||
|
msg: str,
|
||||||
|
err_msg: Optional[str] = None,
|
||||||
|
sleep: Optional[float] = None) -> str:
|
||||||
print(f"echo util function called: {msg}, {err_msg}")
|
print(f"echo util function called: {msg}, {err_msg}")
|
||||||
|
if sleep is not None:
|
||||||
|
time.sleep(sleep)
|
||||||
if err_msg is not None:
|
if err_msg is not None:
|
||||||
raise ValueError(err_msg)
|
raise ValueError(err_msg)
|
||||||
return msg
|
return msg
|
||||||
@ -289,6 +294,23 @@ async def test_engine_core_client_asyncio(monkeypatch: pytest.MonkeyPatch):
|
|||||||
await core_client.call_utility_async("echo", None, "help!")
|
await core_client.call_utility_async("echo", None, "help!")
|
||||||
|
|
||||||
assert str(e_info.value) == "Call to echo method failed: help!"
|
assert str(e_info.value) == "Call to echo method failed: help!"
|
||||||
|
|
||||||
|
# Test that cancelling the utility call doesn't destabilize the
|
||||||
|
# engine.
|
||||||
|
util_task = asyncio.create_task(
|
||||||
|
core_client.call_utility_async("echo", "testarg2", None,
|
||||||
|
0.5)) # sleep for 0.5 sec
|
||||||
|
await asyncio.sleep(0.05)
|
||||||
|
cancelled = util_task.cancel()
|
||||||
|
assert cancelled
|
||||||
|
|
||||||
|
# Ensure client is still functional. The engine runs utility
|
||||||
|
# methods in a single thread so this request won't be processed
|
||||||
|
# until the cancelled sleeping one is complete.
|
||||||
|
result = await asyncio.wait_for(core_client.call_utility_async(
|
||||||
|
"echo", "testarg3"),
|
||||||
|
timeout=1.0)
|
||||||
|
assert result == "testarg3"
|
||||||
finally:
|
finally:
|
||||||
client.shutdown()
|
client.shutdown()
|
||||||
|
|
||||||
|
|||||||
@ -574,13 +574,22 @@ class MPClient(EngineCoreClient):
|
|||||||
|
|
||||||
def _process_utility_output(output: UtilityOutput,
|
def _process_utility_output(output: UtilityOutput,
|
||||||
utility_results: dict[int, AnyFuture]):
|
utility_results: dict[int, AnyFuture]):
|
||||||
"""Set the result from a utility method in the waiting future"""
|
"""Set the result from a utility method in the waiting future."""
|
||||||
future = utility_results.pop(output.call_id)
|
future = utility_results.pop(output.call_id)
|
||||||
if output.failure_message is not None:
|
failure_message = output.failure_message
|
||||||
future.set_exception(Exception(output.failure_message))
|
try:
|
||||||
else:
|
if failure_message is not None:
|
||||||
assert output.result is not None
|
future.set_exception(Exception(failure_message))
|
||||||
future.set_result(output.result.result)
|
else:
|
||||||
|
assert output.result is not None
|
||||||
|
future.set_result(output.result.result)
|
||||||
|
except asyncio.InvalidStateError:
|
||||||
|
# This can happen if the future is cancelled due to the
|
||||||
|
# original calling task being cancelled.
|
||||||
|
if failure_message is not None:
|
||||||
|
logger.error(
|
||||||
|
"Cancelled call to utility method failed "
|
||||||
|
"with error: %s", failure_message)
|
||||||
|
|
||||||
|
|
||||||
class SyncMPClient(MPClient):
|
class SyncMPClient(MPClient):
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user