mirror of
https://git.datalinker.icu/vllm-project/vllm.git
synced 2026-04-06 13:37:03 +08:00
update disagg_connect test_request.py
Signed-off-by: clark <panf2333@gmail.com>
This commit is contained in:
parent
6e1fba8a73
commit
b7ffb43792
@ -1,71 +0,0 @@
|
||||
import time
|
||||
import zmq
|
||||
import zmq.asyncio
|
||||
import random
|
||||
import asyncio
|
||||
|
||||
|
||||
async def worker_routine(worker_url: str,
|
||||
context: zmq.asyncio.Context = None, i: int = 0):
|
||||
"""Worker routine"""
|
||||
# Socket to talk to dispatcher
|
||||
socket = context.socket(zmq.DEALER)
|
||||
socket.setsockopt(zmq.IDENTITY, f"worker-{i}-{time.time()}".encode())
|
||||
socket.connect(worker_url)
|
||||
print(f"worker-{i} {worker_url} started")
|
||||
while True:
|
||||
identity, url, headers, string = await socket.recv_multipart()
|
||||
print(f"worker-{i} Received request identity: [{identity} ]")
|
||||
print(f"worker-{i} Received request url: [{url} ]")
|
||||
print(f"worker-{i} Received request headers: [{headers} ]")
|
||||
print(f"worker-{i} Received request string: [{string} ]")
|
||||
streamreply = ['{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"role":"assistant","content":""},"logprobs":null,"finish_reason":null}]}',
|
||||
'{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"content":"Hello"},"logprobs":null,"finish_reason":null}]}',
|
||||
'{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]}'
|
||||
]
|
||||
for j in range(len(streamreply)):
|
||||
# Do some 'work'
|
||||
# time.sleep(random.randint(1, 5))
|
||||
await asyncio.sleep(random.randint(1, 5))
|
||||
# Send reply back to client
|
||||
reply = f"worker-{i} reply part-{j} {string} \n {streamreply[j]}"
|
||||
# reply = streamreply[j]
|
||||
print(f"worker-{i} Sent reply: [{identity} {reply} ]")
|
||||
await socket.send_multipart([identity, reply.encode()])
|
||||
|
||||
async def main():
|
||||
"""Server routine"""
|
||||
|
||||
url_worker = "inproc://workers"
|
||||
url_client = "tcp://localhost:5555"
|
||||
|
||||
# Prepare our context and sockets
|
||||
context = zmq.asyncio.Context()
|
||||
|
||||
# Socket to talk to clients
|
||||
clients = context.socket(zmq.ROUTER)
|
||||
clients.bind(url_client)
|
||||
print("Server ROUTER started at", url_client)
|
||||
# Socket to talk to workers
|
||||
workers = context.socket(zmq.DEALER)
|
||||
workers.bind(url_worker)
|
||||
print("Worker DEALER started at", url_worker)
|
||||
|
||||
tasks = [asyncio.create_task(worker_routine(url_worker, context, i)) for i in range(5)]
|
||||
proxy_task = asyncio.to_thread(zmq.proxy, clients, workers)
|
||||
|
||||
try:
|
||||
await asyncio.gather(*tasks, proxy_task)
|
||||
except KeyboardInterrupt:
|
||||
print("Server interrupted")
|
||||
except zmq.ZMQError as e:
|
||||
print("ZMQError:", e)
|
||||
finally:
|
||||
# We never get here but clean up anyhow
|
||||
clients.close()
|
||||
workers.close()
|
||||
context.term()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@ -1,71 +0,0 @@
|
||||
import time
|
||||
import zmq
|
||||
import zmq.asyncio
|
||||
import random
|
||||
import asyncio
|
||||
|
||||
|
||||
async def worker_routine(worker_url: str,
|
||||
context: zmq.asyncio.Context = None, i: int = 0):
|
||||
"""Worker routine"""
|
||||
# Socket to talk to dispatcher
|
||||
socket = context.socket(zmq.DEALER)
|
||||
socket.setsockopt(zmq.IDENTITY, f"worker-{i}-{time.time()}".encode())
|
||||
socket.connect(worker_url)
|
||||
print(f"worker-{i} {worker_url} started")
|
||||
while True:
|
||||
identity, url, headers, string = await socket.recv_multipart()
|
||||
print(f"worker-{i} Received request identity: [{identity} ]")
|
||||
print(f"worker-{i} Received request url: [{url} ]")
|
||||
print(f"worker-{i} Received request headers: [{headers} ]")
|
||||
print(f"worker-{i} Received request string: [{string} ]")
|
||||
streamreply = ['{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"role":"assistant","content":""},"logprobs":null,"finish_reason":null}]}',
|
||||
'{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"content":"Hello"},"logprobs":null,"finish_reason":null}]}',
|
||||
'{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]}'
|
||||
]
|
||||
for j in range(len(streamreply)):
|
||||
# Do some 'work'
|
||||
# time.sleep(random.randint(1, 5))
|
||||
await asyncio.sleep(random.randint(1, 5))
|
||||
# Send reply back to client
|
||||
reply = f"worker-{i} reply part-{j} {string} \n {streamreply[j]}"
|
||||
# reply = streamreply[j]
|
||||
print(f"worker-{i} Sent reply: [{identity} {reply} ]")
|
||||
await socket.send_multipart([identity, reply.encode()])
|
||||
|
||||
async def main():
|
||||
"""Server routine"""
|
||||
|
||||
url_worker = "inproc://workers"
|
||||
url_client = "tcp://localhost:5556"
|
||||
|
||||
# Prepare our context and sockets
|
||||
context = zmq.asyncio.Context()
|
||||
|
||||
# Socket to talk to clients
|
||||
clients = context.socket(zmq.ROUTER)
|
||||
clients.bind(url_client)
|
||||
print("Server ROUTER started at", url_client)
|
||||
# Socket to talk to workers
|
||||
workers = context.socket(zmq.DEALER)
|
||||
workers.bind(url_worker)
|
||||
print("Worker DEALER started at", url_worker)
|
||||
|
||||
tasks = [asyncio.create_task(worker_routine(url_worker, context, i)) for i in range(5)]
|
||||
proxy_task = asyncio.to_thread(zmq.proxy, clients, workers)
|
||||
|
||||
try:
|
||||
await asyncio.gather(*tasks, proxy_task)
|
||||
except KeyboardInterrupt:
|
||||
print("Server interrupted")
|
||||
except zmq.ZMQError as e:
|
||||
print("ZMQError:", e)
|
||||
finally:
|
||||
# We never get here but clean up anyhow
|
||||
clients.close()
|
||||
workers.close()
|
||||
context.term()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@ -1,53 +1,77 @@
|
||||
import asyncio
|
||||
import json
|
||||
import aiohttp
|
||||
|
||||
# test connect completions we assume prefill and decode are on the same node
|
||||
# 1. node:vllm serve facebook/opt-125m --port 7001 --zmq-server-port 7010 --chat-template ~/vllm/examples/template_chatglm2.jinja
|
||||
# 2. vllm connect --prefill-addr nodeIp:7010 --decode-addr nodeIp:7010
|
||||
# 3. python test_request.py
|
||||
|
||||
async def test_connect(session):
|
||||
async def test_connect_completions(session):
|
||||
try:
|
||||
print("Sending request")
|
||||
async with session.post("http://localhost:8001/v1/connect/completions", json={
|
||||
base_url = "http://localhost:8001/v1/connect/completions"
|
||||
body = {
|
||||
"temperature": 0.5,
|
||||
"top_p": 0.9,
|
||||
"max_tokens": 150,
|
||||
"frequency_penalty": 1.3,
|
||||
"presence_penalty": 0.2,
|
||||
"repetition_penalty": 1.2,
|
||||
"model": "meta-llama/Llama-3.2-3B-Instruct",
|
||||
"messages": [{
|
||||
"role": "assistant",
|
||||
"content": "what can i help you?"
|
||||
}, {
|
||||
"role": "user",
|
||||
"content": "tell me about us"
|
||||
}],
|
||||
"model": "facebook/opt-125m",
|
||||
"prompt": "Can you introduce vllm?",
|
||||
"stream": True,
|
||||
"stream_options": {
|
||||
"include_usage": True
|
||||
}
|
||||
}, headers={"Content-Type": "application/json"}) as response:
|
||||
}}
|
||||
print(f"Sending request to {base_url}, body {body}")
|
||||
async with session.post(base_url, json= body) as response:
|
||||
|
||||
print(response.status)
|
||||
print(response.headers)
|
||||
responseText = ""
|
||||
if response.status == 200:
|
||||
transfer_encoding = response.headers.get('Transfer-Encoding')
|
||||
if transfer_encoding == 'chunked':
|
||||
async for chunk in response.content.iter_chunked(1024):
|
||||
try:
|
||||
decoded_chunk = chunk.decode('utf-8')
|
||||
print(decoded_chunk)
|
||||
responseText += decoded_chunk
|
||||
except UnicodeDecodeError:
|
||||
print(f"Error decoding chunk: {chunk!r}")
|
||||
else:
|
||||
print(f"Unexpected Transfer-Encoding: {transfer_encoding}")
|
||||
# Print the headers and JSON response
|
||||
print(f"Unexpected Transfer-Encoding: {transfer_encoding} {response.headers} {await response.json()}")
|
||||
else:
|
||||
print(f"Request failed with status code {response.status}")
|
||||
print(f"baseurl {base_url} response data {extract_data(responseText)}")
|
||||
except aiohttp.ClientError as e:
|
||||
print(f"Error: {e}")
|
||||
|
||||
def extract_data(responseText):
|
||||
reply = ""
|
||||
for data in responseText.split("\n\n"):
|
||||
if data.startswith('data: '):
|
||||
content = data[6:]
|
||||
try:
|
||||
json_data = json.loads(content)
|
||||
choices = json_data["choices"]
|
||||
if len(choices) > 0:
|
||||
content = choices[0]["text"]
|
||||
reply += content
|
||||
except json.JSONDecodeError:
|
||||
print(f"Error: Invalid data format: {data}")
|
||||
return reply
|
||||
else:
|
||||
print(f"Error: Invalid data format: {data}")
|
||||
|
||||
return reply
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
tasks = []
|
||||
for _ in range(2):
|
||||
tasks.append(test_connect(session))
|
||||
for _ in range(1):
|
||||
tasks.append(test_connect_completions(session))
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user