diff --git a/docs/features/nixl_connector_usage.md b/docs/features/nixl_connector_usage.md index f0e25e31aa0b3..84c8f9e77d6d3 100644 --- a/docs/features/nixl_connector_usage.md +++ b/docs/features/nixl_connector_usage.md @@ -146,6 +146,8 @@ python tests/v1/kv_connector/nixl_integration/toy_proxy_server.py \ --decoder-ports 8000 8000 ``` +For multi-host DP deployment, only need to provide the host/port of the head instances. + ### KV Role Options - **kv_producer**: For prefiller instances that generate KV caches diff --git a/examples/others/lmcache/disagg_prefill_lmcache_v1/disagg_proxy_server.py b/examples/others/lmcache/disagg_prefill_lmcache_v1/disagg_proxy_server.py index 5d8e38c73b89a..c8965e050ff0b 100644 --- a/examples/others/lmcache/disagg_prefill_lmcache_v1/disagg_proxy_server.py +++ b/examples/others/lmcache/disagg_prefill_lmcache_v1/disagg_proxy_server.py @@ -26,9 +26,21 @@ async def lifespan(app: FastAPI): ) app.state.prefill_client = httpx.AsyncClient( - timeout=None, base_url=prefiller_base_url + timeout=None, + base_url=prefiller_base_url, + limits=httpx.Limits( + max_connections=None, + max_keepalive_connections=None, + ), + ) + app.state.decode_client = httpx.AsyncClient( + timeout=None, + base_url=decoder_base_url, + limits=httpx.Limits( + max_connections=None, + max_keepalive_connections=None, + ), ) - app.state.decode_client = httpx.AsyncClient(timeout=None, base_url=decoder_base_url) yield @@ -105,6 +117,11 @@ async def send_request_to_service( headers = {"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}"} response = await client.post(endpoint, json=req_data, headers=headers) response.raise_for_status() + + # read/consume the response body to release the connection + # otherwise, it would http.ReadError + await response.aread() + return response diff --git a/tests/v1/kv_connector/nixl_integration/toy_proxy_server.py b/tests/v1/kv_connector/nixl_integration/toy_proxy_server.py index 5768fcdb57ceb..b92d3fcd6fb8b 100644 --- a/tests/v1/kv_connector/nixl_integration/toy_proxy_server.py +++ b/tests/v1/kv_connector/nixl_integration/toy_proxy_server.py @@ -30,7 +30,14 @@ async def lifespan(app: FastAPI): prefiller_base_url = f"http://{host}:{port}/v1" app.state.prefill_clients.append( { - "client": httpx.AsyncClient(timeout=None, base_url=prefiller_base_url), + "client": httpx.AsyncClient( + timeout=None, + base_url=prefiller_base_url, + limits=httpx.Limits( + max_connections=None, + max_keepalive_connections=None, + ), + ), "host": host, "port": port, "id": i, @@ -42,7 +49,14 @@ async def lifespan(app: FastAPI): decoder_base_url = f"http://{host}:{port}/v1" app.state.decode_clients.append( { - "client": httpx.AsyncClient(timeout=None, base_url=decoder_base_url), + "client": httpx.AsyncClient( + timeout=None, + base_url=decoder_base_url, + limits=httpx.Limits( + max_connections=None, + max_keepalive_connections=None, + ), + ), "host": host, "port": port, "id": i, @@ -169,6 +183,10 @@ async def send_request_to_service( ) response.raise_for_status() + # read/consume the response body to release the connection + # otherwise, it would http.ReadError + await response.aread() + return response @@ -206,6 +224,7 @@ async def _handle_completions(api: str, request: Request): # Extract the needed fields response_json = response.json() + await response.aclose() # CRITICAL: Release connection back to pool kv_transfer_params = response_json.get("kv_transfer_params", {}) if kv_transfer_params: req_data["kv_transfer_params"] = kv_transfer_params