change log level and fix some comments

Signed-off-by: clark <panf2333@gmail.com>
This commit is contained in:
clark 2025-02-04 20:27:13 +08:00
parent d6945ecdf0
commit 0cb2e05256
2 changed files with 21 additions and 16 deletions

View File

@ -84,22 +84,22 @@ async def execute_task_async(route: str, headers: dict, request: dict,
headersJson.encode(), headersJson.encode(),
requestBody.encode()]), requestBody.encode()]),
timeout=time_out) timeout=time_out)
logger.info("Sent end") logger.debug("Sent end")
while True: while True:
logger.info("Waiting for reply") logger.debug("Waiting for reply")
[contentType, [contentType,
reply] = await asyncio.wait_for(sock.recv_multipart(), reply] = await asyncio.wait_for(sock.recv_multipart(),
timeout=time_out) timeout=time_out)
contentType_str = contentType.decode() contentType_str = contentType.decode()
reply_str = reply.decode() reply_str = reply.decode()
logger.info("Received result: %s, %s", contentType_str, reply_str) logger.debug("Received result: %s, %s", contentType_str, reply_str)
yield (contentType_str, reply_str) yield (contentType_str, reply_str)
if context_type_json == contentType_str: if context_type_json == contentType_str:
logger.info("Received %s message, return socket", logger.debug("Received %s message, return socket",
contentType_str) contentType_str)
break break
if "[DONE]" in reply_str: if "[DONE]" in reply_str:
logger.info("Received stop signal, return socket") logger.debug("Received stop signal, return socket")
break break
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
@ -122,7 +122,7 @@ async def prefill(route: str, header: dict, original_request_data: dict):
generator = execute_task_async(route, header, original_request_data, generator = execute_task_async(route, header, original_request_data,
app.state.sockets_prefill) app.state.sockets_prefill)
async for contentType, reply in generator: async for contentType, reply in generator:
logger.info("contentType: %s, reply: %s", contentType, reply) logger.debug("contentType: %s, reply: %s", contentType, reply)
if context_type_error == contentType: if context_type_error == contentType:
response = JSONResponse({"error": reply}) response = JSONResponse({"error": reply})
response.status_code = 500 response.status_code = 500
@ -136,7 +136,7 @@ async def decode(route: str, header: dict, original_request_data: dict):
app.state.sockets_decode) app.state.sockets_decode)
async for contentType, reply in generator: async for contentType, reply in generator:
logger.info("contentType: %s, reply: %s", contentType, reply) logger.debug("contentType: %s, reply: %s", contentType, reply)
if context_type_error == contentType: if context_type_error == contentType:
response = JSONResponse({"error": reply}) response = JSONResponse({"error": reply})
response.status_code = 500 response.status_code = 500
@ -158,20 +158,21 @@ async def chat_completions(request: Request):
if header.get("X-Request-Id") is None: if header.get("X-Request-Id") is None:
logger.info("add X-Request-Id: %s", x_request_id) logger.info("add X-Request-Id: %s", x_request_id)
header["X-Request-Id"] = x_request_id header["X-Request-Id"] = x_request_id
original_request_data = await request.json() request_data = await request.json()
logger.info("Received request: %s header: %s", original_request_data, logger.info("Received request: %s header: %s", request_data,
header) header)
prefill_request = original_request_data.copy() original_max_tokens = request_data['max_tokens']
# change max_tokens = 1 to let it only do prefill # change max_tokens = 1 to let it only do prefill
prefill_request['max_tokens'] = 1 request_data['max_tokens'] = 1
route = "/v1/completions" route = "/v1/completions"
# finish prefill # finish prefill
try: try:
prefill_response = await prefill(route, header, prefill_request) prefill_response = await prefill(route, header, request_data)
if isinstance(prefill_response, JSONResponse): if isinstance(prefill_response, JSONResponse):
return prefill_response return prefill_response
logger.info("finish prefill start decode") logger.info("finish prefill start decode")
response = await decode(route, header, original_request_data) request_data['max_tokens'] = original_max_tokens
response = await decode(route, header, request_data)
logger.info("finish decode") logger.info("finish decode")
except Exception as e: except Exception as e:
logger.error("Error occurred in disagg prefill proxy server, %s", logger.error("Error occurred in disagg prefill proxy server, %s",
@ -231,6 +232,4 @@ if __name__ == "__main__":
args = parser.parse_args() args = parser.parse_args()
uvloop.run(run_disagg_connector(args)) uvloop.run(run_disagg_connector(args))
# uvicorn.run(app, host="0.0.0.0", port=fastapi_port)

View File

@ -97,8 +97,14 @@ async def serve_zmq(arg, zmq_server_port: int, app: FastAPI) -> None:
# thread safety proxy create socket in the background: # thread safety proxy create socket in the background:
# https://pyzmq.readthedocs.io/en/latest/api/zmq.devices.html#proxy-devices # https://pyzmq.readthedocs.io/en/latest/api/zmq.devices.html#proxy-devices
thread_proxy = zmq.devices.ThreadProxy(zmq.ROUTER, zmq.DEALER) thread_proxy = zmq.devices.ThreadProxy(zmq.ROUTER, zmq.DEALER)
# unlimited HWM
hwm_limit = 0
thread_proxy.bind_in(clients_addr) thread_proxy.bind_in(clients_addr)
thread_proxy.setsockopt_in(zmq.SNDHWM, hwm_limit)
thread_proxy.setsockopt_in(zmq.RCVHWM, hwm_limit)
thread_proxy.bind_out(workers_addr) thread_proxy.bind_out(workers_addr)
thread_proxy.setsockopt_out(zmq.SNDHWM, hwm_limit)
thread_proxy.setsockopt_out(zmq.RCVHWM, hwm_limit)
thread_proxy.start() thread_proxy.start()
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
except KeyboardInterrupt: except KeyboardInterrupt: