[Frontend] error suppression cleanup (#7786)

Signed-off-by: Joe Runde <Joseph.Runde@ibm.com>
This commit is contained in:
Joe Runde 2024-08-22 15:50:21 -06:00 committed by GitHub
parent a152246428
commit b903e1ba7f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 18 additions and 7 deletions

View File

@ -75,11 +75,12 @@ async def test_client_aborts_use_timeouts(monkeypatch, dummy_server,
m.setattr(dummy_server, "abort", lambda x: None) m.setattr(dummy_server, "abort", lambda x: None)
m.setattr(client, "_data_timeout", 10) m.setattr(client, "_data_timeout", 10)
# Ensure the client doesn't hang # The client should suppress timeouts on `abort`s
# and return normally, assuming the server will eventually
# abort the request.
client_task = asyncio.get_running_loop().create_task( client_task = asyncio.get_running_loop().create_task(
client.abort("test request id")) client.abort("test request id"))
with pytest.raises(TimeoutError, match="Server didn't reply within"): await asyncio.wait_for(client_task, timeout=0.05)
await asyncio.wait_for(client_task, timeout=0.05)
@pytest.mark.asyncio @pytest.mark.asyncio

View File

@ -6,7 +6,7 @@ import os
import re import re
import tempfile import tempfile
from argparse import Namespace from argparse import Namespace
from contextlib import asynccontextmanager, suppress from contextlib import asynccontextmanager
from http import HTTPStatus from http import HTTPStatus
from typing import AsyncIterator, Optional, Set from typing import AsyncIterator, Optional, Set
@ -83,8 +83,7 @@ async def lifespan(app: FastAPI):
async def _force_log(): async def _force_log():
while True: while True:
await asyncio.sleep(10) await asyncio.sleep(10)
with suppress(Exception): await async_engine_client.do_log_stats()
await async_engine_client.do_log_stats()
if not engine_args.disable_log_stats: if not engine_args.disable_log_stats:
task = asyncio.create_task(_force_log()) task = asyncio.create_task(_force_log())

View File

@ -335,7 +335,18 @@ class AsyncEngineRPCClient:
async def abort(self, request_id: str): async def abort(self, request_id: str):
"""Send an ABORT_REQUEST signal to the RPC Server""" """Send an ABORT_REQUEST signal to the RPC Server"""
with suppress(RPCClientClosedError):
# Suppress timeouts as well.
# In cases where the server is busy processing requests and a very
# large volume of abort requests arrive, it is likely that the server
# will not be able to ack all of them in time. We have seen this when
# we abort 20k requests at once while another 2k are processing- many
# of them time out, but we see the server successfully abort all of the
# requests.
# In this case we assume that the server has received or will receive
# these abort requests, and ignore the timeout. This prevents a massive
# wall of `TimeoutError` stack traces.
with suppress(RPCClientClosedError, TimeoutError):
await self._send_one_way_rpc_request( await self._send_one_way_rpc_request(
request=RPCAbortRequest(request_id), request=RPCAbortRequest(request_id),
error_message=f"RPCAbortRequest {request_id} failed") error_message=f"RPCAbortRequest {request_id} failed")