[Tests] Harden DP tests (#21508)

Signed-off-by: Nick Hill <nhill@redhat.com>
This commit is contained in:
Nick Hill 2025-07-25 10:27:24 +01:00 committed by GitHub
parent 40d86ee412
commit e38e96a3c0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 82 additions and 55 deletions

View File

@ -11,7 +11,7 @@ import pytest
import pytest_asyncio
from tests.utils import RemoteOpenAIServer
from vllm.platforms import Platform
from vllm.platforms import current_platform
MODEL_NAME = "ibm-research/PowerMoE-3b"
@ -70,10 +70,11 @@ class ExternalLBServerManager:
sargs,
auto_port=False,
env_dict={
"CUDA_VISIBLE_DEVICES":
current_platform.device_control_env_var:
",".join(
str(Platform.device_id_to_physical_device_id(
i))
str(
current_platform.
device_id_to_physical_device_id(i))
for i in range(r * TP_SIZE, (r + 1) * TP_SIZE))
})
server.__enter__()

View File

@ -12,7 +12,7 @@ import pytest_asyncio
from tests.utils import RemoteOpenAIServer
from tests.v1.test_utils import check_request_balancing
from vllm.platforms import Platform
from vllm.platforms import current_platform
MODEL_NAME = "ibm-research/PowerMoE-3b"
@ -92,10 +92,12 @@ class HybridLBServerManager:
sargs,
auto_port=False,
env_dict={
"CUDA_VISIBLE_DEVICES":
current_platform.device_control_env_var:
",".join(
str(Platform.device_id_to_physical_device_id(
i)) for i in range(gpu_start, gpu_end))
str(
current_platform.
device_id_to_physical_device_id(i))
for i in range(gpu_start, gpu_end))
})
server.__enter__()
print(f"Hybrid LB node {node} started successfully with "
@ -180,7 +182,7 @@ async def test_hybrid_lb_completion(clients: list[openai.AsyncOpenAI],
completion = await client.completions.create(
model=model_name,
prompt="Hello, my name is",
max_tokens=10,
max_tokens=5,
temperature=1.0)
assert completion.id is not None
@ -212,27 +214,28 @@ async def test_hybrid_lb_completion(clients: list[openai.AsyncOpenAI],
await asyncio.sleep(0.5)
# Send requests to all nodes - each should balance within its local DP ranks
num_requests_per_node = 25 # Total 50 requests across 2 nodes
num_requests = 200 # Total 200 requests across 2 nodes
all_tasks = []
for i, client in enumerate(clients):
tasks = [make_request(client) for _ in range(num_requests_per_node)]
all_tasks.extend(tasks)
for i in range(num_requests):
client = clients[i % len(clients)]
all_tasks.append(asyncio.create_task(make_request(client)))
await asyncio.sleep(0.01)
results = await asyncio.gather(*all_tasks)
assert len(results) == num_requests_per_node * len(clients)
assert len(results) == num_requests
assert all(completion is not None for completion in results)
await asyncio.sleep(0.5)
# Second burst of requests
all_tasks = []
for i, client in enumerate(clients):
tasks = [make_request(client) for _ in range(num_requests_per_node)]
all_tasks.extend(tasks)
for i in range(num_requests):
client = clients[i % len(clients)]
all_tasks.append(asyncio.create_task(make_request(client)))
await asyncio.sleep(0.01)
results = await asyncio.gather(*all_tasks)
assert len(results) == num_requests_per_node * len(clients)
assert len(results) == num_requests
assert all(completion is not None for completion in results)
_, server_args = servers[0]
@ -309,33 +312,28 @@ async def test_hybrid_lb_completion_streaming(clients: list[
await asyncio.sleep(0.5)
# Send streaming requests to all nodes
num_requests_per_node = 25 # Total 50 requests across 2 nodes
num_requests = 200 # Total 200 requests across 2 nodes
all_tasks = []
for i, client in enumerate(clients):
tasks = [
make_streaming_request(client)
for _ in range(num_requests_per_node)
]
all_tasks.extend(tasks)
for i in range(num_requests):
client = clients[i % len(clients)]
all_tasks.append(asyncio.create_task(make_streaming_request(client)))
await asyncio.sleep(0.01)
results = await asyncio.gather(*all_tasks)
assert len(results) == num_requests_per_node * len(clients)
assert len(results) == num_requests
assert all(results), "Not all streaming requests completed successfully."
await asyncio.sleep(0.5)
# Second burst of streaming requests
all_tasks = []
for i, client in enumerate(clients):
tasks = [
make_streaming_request(client)
for _ in range(num_requests_per_node)
]
all_tasks.extend(tasks)
for i in range(num_requests):
client = clients[i % len(clients)]
all_tasks.append(asyncio.create_task(make_streaming_request(client)))
await asyncio.sleep(0.01)
results = await asyncio.gather(*all_tasks)
assert len(results) == num_requests_per_node * len(clients)
assert len(results) == num_requests
assert all(results), "Not all streaming requests completed successfully."
_, server_args = servers[0]

View File

@ -11,7 +11,7 @@ import pytest_asyncio
from tests.utils import RemoteOpenAIServer
from tests.v1.test_utils import check_request_balancing
from vllm.platforms import Platform
from vllm.platforms import current_platform
MODEL_NAME = "ibm-research/PowerMoE-3b"
@ -96,10 +96,12 @@ class MultinodeInternalLBServerManager:
sargs,
auto_port=False,
env_dict={
"CUDA_VISIBLE_DEVICES":
current_platform.device_control_env_var:
",".join(
str(Platform.device_id_to_physical_device_id(
i)) for i in range(r, r + gpus_per_node))
str(
current_platform.
device_id_to_physical_device_id(i))
for i in range(r, r + gpus_per_node))
})
server.__enter__()
if r == 0:
@ -219,9 +221,11 @@ class APIOnlyServerManager:
engines_server_args,
auto_port=False,
env_dict={
"CUDA_VISIBLE_DEVICES":
current_platform.device_control_env_var:
",".join(
str(Platform.device_id_to_physical_device_id(i))
str(
current_platform.
device_id_to_physical_device_id(i))
for i in range(self.dp_size * self.tp_size))
})
server.__enter__()
@ -330,7 +334,7 @@ async def test_multinode_dp_completion(client: openai.AsyncOpenAI,
completion = await client.completions.create(
model=model_name,
prompt="Hello, my name is",
max_tokens=10,
max_tokens=5,
temperature=1.0)
assert completion.id is not None
@ -361,8 +365,11 @@ async def test_multinode_dp_completion(client: openai.AsyncOpenAI,
await asyncio.sleep(0.5)
# Send multiple requests - internal LB should distribute across DP ranks
num_requests = 50
all_tasks = [make_request() for _ in range(num_requests)]
num_requests = 200
all_tasks = []
for _ in range(num_requests):
all_tasks.append(asyncio.create_task(make_request()))
await asyncio.sleep(0.01)
results = await asyncio.gather(*all_tasks)
assert len(results) == num_requests
@ -371,7 +378,10 @@ async def test_multinode_dp_completion(client: openai.AsyncOpenAI,
await asyncio.sleep(0.5)
# Second burst of requests
all_tasks = [make_request() for _ in range(num_requests)]
all_tasks = []
for _ in range(num_requests):
all_tasks.append(asyncio.create_task(make_request()))
await asyncio.sleep(0.01)
results = await asyncio.gather(*all_tasks)
assert len(results) == num_requests
@ -449,8 +459,11 @@ async def test_multinode_dp_completion_streaming(client: openai.AsyncOpenAI,
# Send multiple streaming requests - internal LB should distribute across
# DP ranks
num_requests = 50
all_tasks = [make_streaming_request() for _ in range(num_requests)]
num_requests = 200
all_tasks = []
for _ in range(num_requests):
all_tasks.append(asyncio.create_task(make_streaming_request()))
await asyncio.sleep(0.01)
results = await asyncio.gather(*all_tasks)
assert len(results) == num_requests
@ -459,7 +472,10 @@ async def test_multinode_dp_completion_streaming(client: openai.AsyncOpenAI,
await asyncio.sleep(0.5)
# Second burst of streaming requests
all_tasks = [make_streaming_request() for _ in range(num_requests)]
all_tasks = []
for _ in range(num_requests):
all_tasks.append(asyncio.create_task(make_streaming_request()))
await asyncio.sleep(0.01)
results = await asyncio.gather(*all_tasks)
assert len(results) == num_requests
@ -492,7 +508,7 @@ async def test_api_only_multinode_dp_completion(
completion = await api_only_client.completions.create(
model=model_name,
prompt="Hello, my name is",
max_tokens=10,
max_tokens=5,
temperature=1.0)
assert completion.id is not None
@ -522,8 +538,11 @@ async def test_api_only_multinode_dp_completion(
# Send multiple requests - should be distributed across engines on
# headless server
num_requests = 50
all_tasks = [make_request() for _ in range(num_requests)]
num_requests = 200
all_tasks = []
for _ in range(num_requests):
all_tasks.append(asyncio.create_task(make_request()))
await asyncio.sleep(0.01)
results = await asyncio.gather(*all_tasks)
assert len(results) == num_requests
@ -532,7 +551,10 @@ async def test_api_only_multinode_dp_completion(
await asyncio.sleep(0.5)
# Second burst of requests
all_tasks = [make_request() for _ in range(num_requests)]
all_tasks = []
for _ in range(num_requests):
all_tasks.append(asyncio.create_task(make_request()))
await asyncio.sleep(0.01)
results = await asyncio.gather(*all_tasks)
assert len(results) == num_requests
@ -610,8 +632,11 @@ async def test_api_only_multinode_dp_completion_streaming(
await asyncio.sleep(0.5)
# Send multiple streaming requests - should be distributed across engines
num_requests = 50
all_tasks = [make_streaming_request() for _ in range(num_requests)]
num_requests = 200
all_tasks = []
for _ in range(num_requests):
all_tasks.append(asyncio.create_task(make_streaming_request()))
await asyncio.sleep(0.01)
results = await asyncio.gather(*all_tasks)
assert len(results) == num_requests
@ -620,7 +645,10 @@ async def test_api_only_multinode_dp_completion_streaming(
await asyncio.sleep(0.5)
# Second burst of streaming requests
all_tasks = [make_streaming_request() for _ in range(num_requests)]
all_tasks = []
for _ in range(num_requests):
all_tasks.append(asyncio.create_task(make_streaming_request()))
await asyncio.sleep(0.01)
results = await asyncio.gather(*all_tasks)
assert len(results) == num_requests