add test py

Signed-off-by: clark <panf2333@gmail.com>
This commit is contained in:
clark 2025-01-06 13:52:17 +08:00
parent 2b22290ce0
commit 2a0cb78016
3 changed files with 191 additions and 0 deletions

View File

@ -0,0 +1,68 @@
import time
import zmq
import zmq.asyncio
import random
import asyncio
async def worker_routine(worker_url: str,
context: zmq.asyncio.Context = None, i: int = 0):
"""Worker routine"""
# Socket to talk to dispatcher
socket = context.socket(zmq.DEALER)
socket.setsockopt(zmq.IDENTITY, f"worker-{i}-{time.time()}".encode())
socket.connect(worker_url)
print(f"worker-{i} {worker_url} started")
while True:
identity, string = await socket.recv_multipart()
print(f"worker-{i} Received request: [{identity} {string} ]")
streamreply = ['{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"role":"assistant","content":""},"logprobs":null,"finish_reason":null}]}',
'{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"content":"Hello"},"logprobs":null,"finish_reason":null}]}',
'{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]}'
]
for j in range(len(streamreply)):
# Do some 'work'
# time.sleep(random.randint(1, 5))
await asyncio.sleep(random.randint(1, 5))
# Send reply back to client
reply = f"worker-{i} reply part-{j} {string} \n {streamreply[j]}"
# reply = streamreply[j]
print(f"worker-{i} Sent reply: [{identity} {reply} ]")
await socket.send_multipart([identity, reply.encode()])
async def main():
"""Server routine"""
url_worker = "inproc://workers"
url_client = "tcp://localhost:5555"
# Prepare our context and sockets
context = zmq.asyncio.Context()
# Socket to talk to clients
clients = context.socket(zmq.ROUTER)
clients.bind(url_client)
print("Server ROUTER started at", url_client)
# Socket to talk to workers
workers = context.socket(zmq.DEALER)
workers.bind(url_worker)
print("Worker DEALER started at", url_worker)
tasks = [asyncio.create_task(worker_routine(url_worker, context, i)) for i in range(5)]
proxy_task = asyncio.to_thread(zmq.proxy, clients, workers)
try:
await asyncio.gather(*tasks, proxy_task)
except KeyboardInterrupt:
print("Server interrupted")
except zmq.ZMQError as e:
print("ZMQError:", e)
finally:
# We never get here but clean up anyhow
clients.close()
workers.close()
context.term()
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,68 @@
import time
import zmq
import zmq.asyncio
import random
import asyncio
async def worker_routine(worker_url: str,
context: zmq.asyncio.Context = None, i: int = 0):
"""Worker routine"""
# Socket to talk to dispatcher
socket = context.socket(zmq.DEALER)
socket.setsockopt(zmq.IDENTITY, f"worker-{i}-{time.time()}".encode())
socket.connect(worker_url)
print(f"worker-{i} {worker_url} started")
while True:
identity, string = await socket.recv_multipart()
print(f"worker-{i} Received request: [{identity} {string} ]")
streamreply = ['{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"role":"assistant","content":""},"logprobs":null,"finish_reason":null}]}',
'{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"content":"Hello"},"logprobs":null,"finish_reason":null}]}',
'{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]}'
]
for j in range(len(streamreply)):
# Do some 'work'
# time.sleep(random.randint(1, 5))
await asyncio.sleep(random.randint(1, 5))
# Send reply back to client
reply = f"worker-{i} reply part-{j} {string} \n {streamreply[j]}"
# reply = streamreply[j]
print(f"worker-{i} Sent reply: [{identity} {reply} ]")
await socket.send_multipart([identity, reply.encode()])
async def main():
"""Server routine"""
url_worker = "inproc://workers"
url_client = "tcp://localhost:5556"
# Prepare our context and sockets
context = zmq.asyncio.Context()
# Socket to talk to clients
clients = context.socket(zmq.ROUTER)
clients.bind(url_client)
print("Server ROUTER started at", url_client)
# Socket to talk to workers
workers = context.socket(zmq.DEALER)
workers.bind(url_worker)
print("Worker DEALER started at", url_worker)
tasks = [asyncio.create_task(worker_routine(url_worker, context, i)) for i in range(5)]
proxy_task = asyncio.to_thread(zmq.proxy, clients, workers)
try:
await asyncio.gather(*tasks, proxy_task)
except KeyboardInterrupt:
print("Server interrupted")
except zmq.ZMQError as e:
print("ZMQError:", e)
finally:
# We never get here but clean up anyhow
clients.close()
workers.close()
context.term()
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,55 @@
import asyncio
import aiohttp
async def test_connect(session):
try:
print("Sending request")
async with session.post("http://localhost:8001/v1/connect/completions", json={
"temperature": 0.5,
"top_p": 0.9,
"max_tokens": 150,
"frequency_penalty": 1.3,
"presence_penalty": 0.2,
"repetition_penalty": 1.2,
"model": "meta-llama/Llama-3.2-3B-Instruct",
"messages": [{
"role": "assistant",
"content": "what can i help you?"
}, {
"role": "user",
"content": "tell me about us"
}],
"stream": True,
"stream_options": {
"include_usage": True
}
}) as response:
print(response.status)
if response.status == 200:
transfer_encoding = response.headers.get('Transfer-Encoding')
if transfer_encoding == 'chunked':
async for chunk in response.content.iter_chunked(1024):
try:
decoded_chunk = chunk.decode('utf-8')
print(decoded_chunk)
except UnicodeDecodeError:
print(f"Error decoding chunk: {chunk!r}")
else:
print(f"Unexpected Transfer-Encoding: {transfer_encoding}")
else:
print(f"Request failed with status code {response.status}")
except aiohttp.ClientError as e:
print(f"Error: {e}")
async def main():
async with aiohttp.ClientSession() as session:
tasks = []
for _ in range(2):
tasks.append(test_connect(session))
await asyncio.gather(*tasks)
asyncio.run(main())