From 0cb2e05256dbed57518429bc2a1d44cf0042d73c Mon Sep 17 00:00:00 2001 From: clark Date: Tue, 4 Feb 2025 20:27:13 +0800 Subject: [PATCH] change log level and fix some comments Signed-off-by: clark --- vllm/entrypoints/disagg_connector.py | 31 ++++++++++++++-------------- vllm/entrypoints/launcher.py | 6 ++++++ 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/vllm/entrypoints/disagg_connector.py b/vllm/entrypoints/disagg_connector.py index c6a7d83c7a3ad..ed42b566b2438 100644 --- a/vllm/entrypoints/disagg_connector.py +++ b/vllm/entrypoints/disagg_connector.py @@ -84,22 +84,22 @@ async def execute_task_async(route: str, headers: dict, request: dict, headersJson.encode(), requestBody.encode()]), timeout=time_out) - logger.info("Sent end") + logger.debug("Sent end") while True: - logger.info("Waiting for reply") + logger.debug("Waiting for reply") [contentType, reply] = await asyncio.wait_for(sock.recv_multipart(), timeout=time_out) contentType_str = contentType.decode() reply_str = reply.decode() - logger.info("Received result: %s, %s", contentType_str, reply_str) + logger.debug("Received result: %s, %s", contentType_str, reply_str) yield (contentType_str, reply_str) if context_type_json == contentType_str: - logger.info("Received %s message, return socket", + logger.debug("Received %s message, return socket", contentType_str) break if "[DONE]" in reply_str: - logger.info("Received stop signal, return socket") + logger.debug("Received stop signal, return socket") break except asyncio.TimeoutError: logger.error(traceback.format_exc()) @@ -122,7 +122,7 @@ async def prefill(route: str, header: dict, original_request_data: dict): generator = execute_task_async(route, header, original_request_data, app.state.sockets_prefill) async for contentType, reply in generator: - logger.info("contentType: %s, reply: %s", contentType, reply) + logger.debug("contentType: %s, reply: %s", contentType, reply) if context_type_error == contentType: response = JSONResponse({"error": reply}) response.status_code = 500 @@ -136,7 +136,7 @@ async def decode(route: str, header: dict, original_request_data: dict): app.state.sockets_decode) async for contentType, reply in generator: - logger.info("contentType: %s, reply: %s", contentType, reply) + logger.debug("contentType: %s, reply: %s", contentType, reply) if context_type_error == contentType: response = JSONResponse({"error": reply}) response.status_code = 500 @@ -158,20 +158,21 @@ async def chat_completions(request: Request): if header.get("X-Request-Id") is None: logger.info("add X-Request-Id: %s", x_request_id) header["X-Request-Id"] = x_request_id - original_request_data = await request.json() - logger.info("Received request: %s header: %s", original_request_data, + request_data = await request.json() + logger.info("Received request: %s header: %s", request_data, header) - prefill_request = original_request_data.copy() + original_max_tokens = request_data['max_tokens'] # change max_tokens = 1 to let it only do prefill - prefill_request['max_tokens'] = 1 + request_data['max_tokens'] = 1 route = "/v1/completions" # finish prefill try: - prefill_response = await prefill(route, header, prefill_request) + prefill_response = await prefill(route, header, request_data) if isinstance(prefill_response, JSONResponse): return prefill_response logger.info("finish prefill start decode") - response = await decode(route, header, original_request_data) + request_data['max_tokens'] = original_max_tokens + response = await decode(route, header, request_data) logger.info("finish decode") except Exception as e: logger.error("Error occurred in disagg prefill proxy server, %s", @@ -231,6 +232,4 @@ if __name__ == "__main__": args = parser.parse_args() - uvloop.run(run_disagg_connector(args)) - - # uvicorn.run(app, host="0.0.0.0", port=fastapi_port) + uvloop.run(run_disagg_connector(args)) \ No newline at end of file diff --git a/vllm/entrypoints/launcher.py b/vllm/entrypoints/launcher.py index 869dbae466410..e9c865b953401 100644 --- a/vllm/entrypoints/launcher.py +++ b/vllm/entrypoints/launcher.py @@ -97,8 +97,14 @@ async def serve_zmq(arg, zmq_server_port: int, app: FastAPI) -> None: # thread safety proxy create socket in the background: # https://pyzmq.readthedocs.io/en/latest/api/zmq.devices.html#proxy-devices thread_proxy = zmq.devices.ThreadProxy(zmq.ROUTER, zmq.DEALER) + # unlimited HWM + hwm_limit = 0 thread_proxy.bind_in(clients_addr) + thread_proxy.setsockopt_in(zmq.SNDHWM, hwm_limit) + thread_proxy.setsockopt_in(zmq.RCVHWM, hwm_limit) thread_proxy.bind_out(workers_addr) + thread_proxy.setsockopt_out(zmq.SNDHWM, hwm_limit) + thread_proxy.setsockopt_out(zmq.RCVHWM, hwm_limit) thread_proxy.start() await asyncio.gather(*tasks) except KeyboardInterrupt: