diff --git a/benchmarks/disagg_benchmarks/zmq/test_connect_server1.py b/benchmarks/disagg_benchmarks/zmq/test_connect_server1.py deleted file mode 100644 index 43a6b5c1c17f0..0000000000000 --- a/benchmarks/disagg_benchmarks/zmq/test_connect_server1.py +++ /dev/null @@ -1,71 +0,0 @@ -import time -import zmq -import zmq.asyncio -import random -import asyncio - - -async def worker_routine(worker_url: str, - context: zmq.asyncio.Context = None, i: int = 0): - """Worker routine""" - # Socket to talk to dispatcher - socket = context.socket(zmq.DEALER) - socket.setsockopt(zmq.IDENTITY, f"worker-{i}-{time.time()}".encode()) - socket.connect(worker_url) - print(f"worker-{i} {worker_url} started") - while True: - identity, url, headers, string = await socket.recv_multipart() - print(f"worker-{i} Received request identity: [{identity} ]") - print(f"worker-{i} Received request url: [{url} ]") - print(f"worker-{i} Received request headers: [{headers} ]") - print(f"worker-{i} Received request string: [{string} ]") - streamreply = ['{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"role":"assistant","content":""},"logprobs":null,"finish_reason":null}]}', -'{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"content":"Hello"},"logprobs":null,"finish_reason":null}]}', -'{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]}' -] - for j in range(len(streamreply)): - # Do some 'work' - # time.sleep(random.randint(1, 5)) - await asyncio.sleep(random.randint(1, 5)) - # Send reply back to client - reply = f"worker-{i} reply part-{j} {string} \n {streamreply[j]}" - # reply = streamreply[j] - print(f"worker-{i} Sent reply: [{identity} {reply} ]") - await socket.send_multipart([identity, reply.encode()]) - -async def main(): - """Server routine""" - - url_worker = "inproc://workers" - url_client = "tcp://localhost:5555" - - # Prepare our context and sockets - context = zmq.asyncio.Context() - - # Socket to talk to clients - clients = context.socket(zmq.ROUTER) - clients.bind(url_client) - print("Server ROUTER started at", url_client) - # Socket to talk to workers - workers = context.socket(zmq.DEALER) - workers.bind(url_worker) - print("Worker DEALER started at", url_worker) - - tasks = [asyncio.create_task(worker_routine(url_worker, context, i)) for i in range(5)] - proxy_task = asyncio.to_thread(zmq.proxy, clients, workers) - - try: - await asyncio.gather(*tasks, proxy_task) - except KeyboardInterrupt: - print("Server interrupted") - except zmq.ZMQError as e: - print("ZMQError:", e) - finally: - # We never get here but clean up anyhow - clients.close() - workers.close() - context.term() - - -if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file diff --git a/benchmarks/disagg_benchmarks/zmq/test_connect_server2.py b/benchmarks/disagg_benchmarks/zmq/test_connect_server2.py deleted file mode 100644 index f9f4d1a5ce7ee..0000000000000 --- a/benchmarks/disagg_benchmarks/zmq/test_connect_server2.py +++ /dev/null @@ -1,71 +0,0 @@ -import time -import zmq -import zmq.asyncio -import random -import asyncio - - -async def worker_routine(worker_url: str, - context: zmq.asyncio.Context = None, i: int = 0): - """Worker routine""" - # Socket to talk to dispatcher - socket = context.socket(zmq.DEALER) - socket.setsockopt(zmq.IDENTITY, f"worker-{i}-{time.time()}".encode()) - socket.connect(worker_url) - print(f"worker-{i} {worker_url} started") - while True: - identity, url, headers, string = await socket.recv_multipart() - print(f"worker-{i} Received request identity: [{identity} ]") - print(f"worker-{i} Received request url: [{url} ]") - print(f"worker-{i} Received request headers: [{headers} ]") - print(f"worker-{i} Received request string: [{string} ]") - streamreply = ['{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"role":"assistant","content":""},"logprobs":null,"finish_reason":null}]}', -'{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"content":"Hello"},"logprobs":null,"finish_reason":null}]}', -'{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]}' -] - for j in range(len(streamreply)): - # Do some 'work' - # time.sleep(random.randint(1, 5)) - await asyncio.sleep(random.randint(1, 5)) - # Send reply back to client - reply = f"worker-{i} reply part-{j} {string} \n {streamreply[j]}" - # reply = streamreply[j] - print(f"worker-{i} Sent reply: [{identity} {reply} ]") - await socket.send_multipart([identity, reply.encode()]) - -async def main(): - """Server routine""" - - url_worker = "inproc://workers" - url_client = "tcp://localhost:5556" - - # Prepare our context and sockets - context = zmq.asyncio.Context() - - # Socket to talk to clients - clients = context.socket(zmq.ROUTER) - clients.bind(url_client) - print("Server ROUTER started at", url_client) - # Socket to talk to workers - workers = context.socket(zmq.DEALER) - workers.bind(url_worker) - print("Worker DEALER started at", url_worker) - - tasks = [asyncio.create_task(worker_routine(url_worker, context, i)) for i in range(5)] - proxy_task = asyncio.to_thread(zmq.proxy, clients, workers) - - try: - await asyncio.gather(*tasks, proxy_task) - except KeyboardInterrupt: - print("Server interrupted") - except zmq.ZMQError as e: - print("ZMQError:", e) - finally: - # We never get here but clean up anyhow - clients.close() - workers.close() - context.term() - - -if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file diff --git a/benchmarks/disagg_benchmarks/zmq/test_request.py b/benchmarks/disagg_benchmarks/zmq/test_request.py index f5c247d7d3e97..57cb1e548abe2 100644 --- a/benchmarks/disagg_benchmarks/zmq/test_request.py +++ b/benchmarks/disagg_benchmarks/zmq/test_request.py @@ -1,53 +1,77 @@ import asyncio +import json import aiohttp +# test connect completions we assume prefill and decode are on the same node +# 1. node:vllm serve facebook/opt-125m --port 7001 --zmq-server-port 7010 --chat-template ~/vllm/examples/template_chatglm2.jinja +# 2. vllm connect --prefill-addr nodeIp:7010 --decode-addr nodeIp:7010 +# 3. python test_request.py -async def test_connect(session): +async def test_connect_completions(session): try: - print("Sending request") - async with session.post("http://localhost:8001/v1/connect/completions", json={ + base_url = "http://localhost:8001/v1/connect/completions" + body = { "temperature": 0.5, "top_p": 0.9, "max_tokens": 150, "frequency_penalty": 1.3, "presence_penalty": 0.2, "repetition_penalty": 1.2, - "model": "meta-llama/Llama-3.2-3B-Instruct", - "messages": [{ - "role": "assistant", - "content": "what can i help you?" - }, { - "role": "user", - "content": "tell me about us" - }], + "model": "facebook/opt-125m", + "prompt": "Can you introduce vllm?", "stream": True, "stream_options": { "include_usage": True - } -}, headers={"Content-Type": "application/json"}) as response: + }} + print(f"Sending request to {base_url}, body {body}") + async with session.post(base_url, json= body) as response: + print(response.status) + print(response.headers) + responseText = "" if response.status == 200: transfer_encoding = response.headers.get('Transfer-Encoding') if transfer_encoding == 'chunked': async for chunk in response.content.iter_chunked(1024): try: decoded_chunk = chunk.decode('utf-8') - print(decoded_chunk) + responseText += decoded_chunk except UnicodeDecodeError: print(f"Error decoding chunk: {chunk!r}") else: - print(f"Unexpected Transfer-Encoding: {transfer_encoding}") + # Print the headers and JSON response + print(f"Unexpected Transfer-Encoding: {transfer_encoding} {response.headers} {await response.json()}") else: print(f"Request failed with status code {response.status}") + print(f"baseurl {base_url} response data {extract_data(responseText)}") except aiohttp.ClientError as e: print(f"Error: {e}") +def extract_data(responseText): + reply = "" + for data in responseText.split("\n\n"): + if data.startswith('data: '): + content = data[6:] + try: + json_data = json.loads(content) + choices = json_data["choices"] + if len(choices) > 0: + content = choices[0]["text"] + reply += content + except json.JSONDecodeError: + print(f"Error: Invalid data format: {data}") + return reply + else: + print(f"Error: Invalid data format: {data}") + + return reply + async def main(): async with aiohttp.ClientSession() as session: tasks = [] - for _ in range(2): - tasks.append(test_connect(session)) + for _ in range(1): + tasks.append(test_connect_completions(session)) await asyncio.gather(*tasks)