diff --git a/benchmarks/disagg_benchmarks/zmq/test_connect_server1.py b/benchmarks/disagg_benchmarks/zmq/test_connect_server1.py new file mode 100644 index 0000000000000..8747acfea0cce --- /dev/null +++ b/benchmarks/disagg_benchmarks/zmq/test_connect_server1.py @@ -0,0 +1,68 @@ +import time +import zmq +import zmq.asyncio +import random +import asyncio + + +async def worker_routine(worker_url: str, + context: zmq.asyncio.Context = None, i: int = 0): + """Worker routine""" + # Socket to talk to dispatcher + socket = context.socket(zmq.DEALER) + socket.setsockopt(zmq.IDENTITY, f"worker-{i}-{time.time()}".encode()) + socket.connect(worker_url) + print(f"worker-{i} {worker_url} started") + while True: + identity, string = await socket.recv_multipart() + print(f"worker-{i} Received request: [{identity} {string} ]") + streamreply = ['{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"role":"assistant","content":""},"logprobs":null,"finish_reason":null}]}', +'{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"content":"Hello"},"logprobs":null,"finish_reason":null}]}', +'{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]}' +] + for j in range(len(streamreply)): + # Do some 'work' + # time.sleep(random.randint(1, 5)) + await asyncio.sleep(random.randint(1, 5)) + # Send reply back to client + reply = f"worker-{i} reply part-{j} {string} \n {streamreply[j]}" + # reply = streamreply[j] + print(f"worker-{i} Sent reply: [{identity} {reply} ]") + await socket.send_multipart([identity, reply.encode()]) + +async def main(): + """Server routine""" + + url_worker = "inproc://workers" + url_client = "tcp://localhost:5555" + + # Prepare our context and sockets + context = zmq.asyncio.Context() + + # Socket to talk to clients + clients = context.socket(zmq.ROUTER) + clients.bind(url_client) + print("Server ROUTER started at", url_client) + # Socket to talk to workers + workers = context.socket(zmq.DEALER) + workers.bind(url_worker) + print("Worker DEALER started at", url_worker) + + tasks = [asyncio.create_task(worker_routine(url_worker, context, i)) for i in range(5)] + proxy_task = asyncio.to_thread(zmq.proxy, clients, workers) + + try: + await asyncio.gather(*tasks, proxy_task) + except KeyboardInterrupt: + print("Server interrupted") + except zmq.ZMQError as e: + print("ZMQError:", e) + finally: + # We never get here but clean up anyhow + clients.close() + workers.close() + context.term() + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/benchmarks/disagg_benchmarks/zmq/test_connect_server2.py b/benchmarks/disagg_benchmarks/zmq/test_connect_server2.py new file mode 100644 index 0000000000000..6b7ed4da6554d --- /dev/null +++ b/benchmarks/disagg_benchmarks/zmq/test_connect_server2.py @@ -0,0 +1,68 @@ +import time +import zmq +import zmq.asyncio +import random +import asyncio + + +async def worker_routine(worker_url: str, + context: zmq.asyncio.Context = None, i: int = 0): + """Worker routine""" + # Socket to talk to dispatcher + socket = context.socket(zmq.DEALER) + socket.setsockopt(zmq.IDENTITY, f"worker-{i}-{time.time()}".encode()) + socket.connect(worker_url) + print(f"worker-{i} {worker_url} started") + while True: + identity, string = await socket.recv_multipart() + print(f"worker-{i} Received request: [{identity} {string} ]") + streamreply = ['{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"role":"assistant","content":""},"logprobs":null,"finish_reason":null}]}', +'{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"content":"Hello"},"logprobs":null,"finish_reason":null}]}', +'{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]}' +] + for j in range(len(streamreply)): + # Do some 'work' + # time.sleep(random.randint(1, 5)) + await asyncio.sleep(random.randint(1, 5)) + # Send reply back to client + reply = f"worker-{i} reply part-{j} {string} \n {streamreply[j]}" + # reply = streamreply[j] + print(f"worker-{i} Sent reply: [{identity} {reply} ]") + await socket.send_multipart([identity, reply.encode()]) + +async def main(): + """Server routine""" + + url_worker = "inproc://workers" + url_client = "tcp://localhost:5556" + + # Prepare our context and sockets + context = zmq.asyncio.Context() + + # Socket to talk to clients + clients = context.socket(zmq.ROUTER) + clients.bind(url_client) + print("Server ROUTER started at", url_client) + # Socket to talk to workers + workers = context.socket(zmq.DEALER) + workers.bind(url_worker) + print("Worker DEALER started at", url_worker) + + tasks = [asyncio.create_task(worker_routine(url_worker, context, i)) for i in range(5)] + proxy_task = asyncio.to_thread(zmq.proxy, clients, workers) + + try: + await asyncio.gather(*tasks, proxy_task) + except KeyboardInterrupt: + print("Server interrupted") + except zmq.ZMQError as e: + print("ZMQError:", e) + finally: + # We never get here but clean up anyhow + clients.close() + workers.close() + context.term() + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/benchmarks/disagg_benchmarks/zmq/test_request.py b/benchmarks/disagg_benchmarks/zmq/test_request.py new file mode 100644 index 0000000000000..59af34d162b6e --- /dev/null +++ b/benchmarks/disagg_benchmarks/zmq/test_request.py @@ -0,0 +1,55 @@ +import asyncio +import aiohttp + + +async def test_connect(session): + try: + print("Sending request") + async with session.post("http://localhost:8001/v1/connect/completions", json={ + "temperature": 0.5, + "top_p": 0.9, + "max_tokens": 150, + "frequency_penalty": 1.3, + "presence_penalty": 0.2, + "repetition_penalty": 1.2, + "model": "meta-llama/Llama-3.2-3B-Instruct", + "messages": [{ + "role": "assistant", + "content": "what can i help you?" + }, { + "role": "user", + "content": "tell me about us" + }], + "stream": True, + "stream_options": { + "include_usage": True + } +}) as response: + print(response.status) + if response.status == 200: + transfer_encoding = response.headers.get('Transfer-Encoding') + if transfer_encoding == 'chunked': + async for chunk in response.content.iter_chunked(1024): + try: + decoded_chunk = chunk.decode('utf-8') + print(decoded_chunk) + except UnicodeDecodeError: + print(f"Error decoding chunk: {chunk!r}") + else: + print(f"Unexpected Transfer-Encoding: {transfer_encoding}") + else: + print(f"Request failed with status code {response.status}") + except aiohttp.ClientError as e: + print(f"Error: {e}") + + +async def main(): + async with aiohttp.ClientSession() as session: + tasks = [] + for _ in range(2): + tasks.append(test_connect(session)) + await asyncio.gather(*tasks) + + +asyncio.run(main()) +