diff --git a/tests/v1/test_external_lb_dp.py b/tests/v1/test_external_lb_dp.py index 98fefad1ff4a..4a5c47fead58 100644 --- a/tests/v1/test_external_lb_dp.py +++ b/tests/v1/test_external_lb_dp.py @@ -11,7 +11,7 @@ import pytest import pytest_asyncio from tests.utils import RemoteOpenAIServer -from vllm.platforms import Platform +from vllm.platforms import current_platform MODEL_NAME = "ibm-research/PowerMoE-3b" @@ -70,10 +70,11 @@ class ExternalLBServerManager: sargs, auto_port=False, env_dict={ - "CUDA_VISIBLE_DEVICES": + current_platform.device_control_env_var: ",".join( - str(Platform.device_id_to_physical_device_id( - i)) + str( + current_platform. + device_id_to_physical_device_id(i)) for i in range(r * TP_SIZE, (r + 1) * TP_SIZE)) }) server.__enter__() diff --git a/tests/v1/test_hybrid_lb_dp.py b/tests/v1/test_hybrid_lb_dp.py index 74708b61765c..293b1257be6b 100644 --- a/tests/v1/test_hybrid_lb_dp.py +++ b/tests/v1/test_hybrid_lb_dp.py @@ -12,7 +12,7 @@ import pytest_asyncio from tests.utils import RemoteOpenAIServer from tests.v1.test_utils import check_request_balancing -from vllm.platforms import Platform +from vllm.platforms import current_platform MODEL_NAME = "ibm-research/PowerMoE-3b" @@ -92,10 +92,12 @@ class HybridLBServerManager: sargs, auto_port=False, env_dict={ - "CUDA_VISIBLE_DEVICES": + current_platform.device_control_env_var: ",".join( - str(Platform.device_id_to_physical_device_id( - i)) for i in range(gpu_start, gpu_end)) + str( + current_platform. + device_id_to_physical_device_id(i)) + for i in range(gpu_start, gpu_end)) }) server.__enter__() print(f"Hybrid LB node {node} started successfully with " @@ -180,7 +182,7 @@ async def test_hybrid_lb_completion(clients: list[openai.AsyncOpenAI], completion = await client.completions.create( model=model_name, prompt="Hello, my name is", - max_tokens=10, + max_tokens=5, temperature=1.0) assert completion.id is not None @@ -212,27 +214,28 @@ async def test_hybrid_lb_completion(clients: list[openai.AsyncOpenAI], await asyncio.sleep(0.5) # Send requests to all nodes - each should balance within its local DP ranks - num_requests_per_node = 25 # Total 50 requests across 2 nodes + num_requests = 200 # Total 200 requests across 2 nodes all_tasks = [] - - for i, client in enumerate(clients): - tasks = [make_request(client) for _ in range(num_requests_per_node)] - all_tasks.extend(tasks) + for i in range(num_requests): + client = clients[i % len(clients)] + all_tasks.append(asyncio.create_task(make_request(client))) + await asyncio.sleep(0.01) results = await asyncio.gather(*all_tasks) - assert len(results) == num_requests_per_node * len(clients) + assert len(results) == num_requests assert all(completion is not None for completion in results) await asyncio.sleep(0.5) # Second burst of requests all_tasks = [] - for i, client in enumerate(clients): - tasks = [make_request(client) for _ in range(num_requests_per_node)] - all_tasks.extend(tasks) + for i in range(num_requests): + client = clients[i % len(clients)] + all_tasks.append(asyncio.create_task(make_request(client))) + await asyncio.sleep(0.01) results = await asyncio.gather(*all_tasks) - assert len(results) == num_requests_per_node * len(clients) + assert len(results) == num_requests assert all(completion is not None for completion in results) _, server_args = servers[0] @@ -309,33 +312,28 @@ async def test_hybrid_lb_completion_streaming(clients: list[ await asyncio.sleep(0.5) # Send streaming requests to all nodes - num_requests_per_node = 25 # Total 50 requests across 2 nodes + num_requests = 200 # Total 200 requests across 2 nodes all_tasks = [] - - for i, client in enumerate(clients): - tasks = [ - make_streaming_request(client) - for _ in range(num_requests_per_node) - ] - all_tasks.extend(tasks) + for i in range(num_requests): + client = clients[i % len(clients)] + all_tasks.append(asyncio.create_task(make_streaming_request(client))) + await asyncio.sleep(0.01) results = await asyncio.gather(*all_tasks) - assert len(results) == num_requests_per_node * len(clients) + assert len(results) == num_requests assert all(results), "Not all streaming requests completed successfully." await asyncio.sleep(0.5) # Second burst of streaming requests all_tasks = [] - for i, client in enumerate(clients): - tasks = [ - make_streaming_request(client) - for _ in range(num_requests_per_node) - ] - all_tasks.extend(tasks) + for i in range(num_requests): + client = clients[i % len(clients)] + all_tasks.append(asyncio.create_task(make_streaming_request(client))) + await asyncio.sleep(0.01) results = await asyncio.gather(*all_tasks) - assert len(results) == num_requests_per_node * len(clients) + assert len(results) == num_requests assert all(results), "Not all streaming requests completed successfully." _, server_args = servers[0] diff --git a/tests/v1/test_internal_lb_dp.py b/tests/v1/test_internal_lb_dp.py index 9aef4d5821e8..ca80d3a4949d 100644 --- a/tests/v1/test_internal_lb_dp.py +++ b/tests/v1/test_internal_lb_dp.py @@ -11,7 +11,7 @@ import pytest_asyncio from tests.utils import RemoteOpenAIServer from tests.v1.test_utils import check_request_balancing -from vllm.platforms import Platform +from vllm.platforms import current_platform MODEL_NAME = "ibm-research/PowerMoE-3b" @@ -96,10 +96,12 @@ class MultinodeInternalLBServerManager: sargs, auto_port=False, env_dict={ - "CUDA_VISIBLE_DEVICES": + current_platform.device_control_env_var: ",".join( - str(Platform.device_id_to_physical_device_id( - i)) for i in range(r, r + gpus_per_node)) + str( + current_platform. + device_id_to_physical_device_id(i)) + for i in range(r, r + gpus_per_node)) }) server.__enter__() if r == 0: @@ -219,9 +221,11 @@ class APIOnlyServerManager: engines_server_args, auto_port=False, env_dict={ - "CUDA_VISIBLE_DEVICES": + current_platform.device_control_env_var: ",".join( - str(Platform.device_id_to_physical_device_id(i)) + str( + current_platform. + device_id_to_physical_device_id(i)) for i in range(self.dp_size * self.tp_size)) }) server.__enter__() @@ -330,7 +334,7 @@ async def test_multinode_dp_completion(client: openai.AsyncOpenAI, completion = await client.completions.create( model=model_name, prompt="Hello, my name is", - max_tokens=10, + max_tokens=5, temperature=1.0) assert completion.id is not None @@ -361,8 +365,11 @@ async def test_multinode_dp_completion(client: openai.AsyncOpenAI, await asyncio.sleep(0.5) # Send multiple requests - internal LB should distribute across DP ranks - num_requests = 50 - all_tasks = [make_request() for _ in range(num_requests)] + num_requests = 200 + all_tasks = [] + for _ in range(num_requests): + all_tasks.append(asyncio.create_task(make_request())) + await asyncio.sleep(0.01) results = await asyncio.gather(*all_tasks) assert len(results) == num_requests @@ -371,7 +378,10 @@ async def test_multinode_dp_completion(client: openai.AsyncOpenAI, await asyncio.sleep(0.5) # Second burst of requests - all_tasks = [make_request() for _ in range(num_requests)] + all_tasks = [] + for _ in range(num_requests): + all_tasks.append(asyncio.create_task(make_request())) + await asyncio.sleep(0.01) results = await asyncio.gather(*all_tasks) assert len(results) == num_requests @@ -449,8 +459,11 @@ async def test_multinode_dp_completion_streaming(client: openai.AsyncOpenAI, # Send multiple streaming requests - internal LB should distribute across # DP ranks - num_requests = 50 - all_tasks = [make_streaming_request() for _ in range(num_requests)] + num_requests = 200 + all_tasks = [] + for _ in range(num_requests): + all_tasks.append(asyncio.create_task(make_streaming_request())) + await asyncio.sleep(0.01) results = await asyncio.gather(*all_tasks) assert len(results) == num_requests @@ -459,7 +472,10 @@ async def test_multinode_dp_completion_streaming(client: openai.AsyncOpenAI, await asyncio.sleep(0.5) # Second burst of streaming requests - all_tasks = [make_streaming_request() for _ in range(num_requests)] + all_tasks = [] + for _ in range(num_requests): + all_tasks.append(asyncio.create_task(make_streaming_request())) + await asyncio.sleep(0.01) results = await asyncio.gather(*all_tasks) assert len(results) == num_requests @@ -492,7 +508,7 @@ async def test_api_only_multinode_dp_completion( completion = await api_only_client.completions.create( model=model_name, prompt="Hello, my name is", - max_tokens=10, + max_tokens=5, temperature=1.0) assert completion.id is not None @@ -522,8 +538,11 @@ async def test_api_only_multinode_dp_completion( # Send multiple requests - should be distributed across engines on # headless server - num_requests = 50 - all_tasks = [make_request() for _ in range(num_requests)] + num_requests = 200 + all_tasks = [] + for _ in range(num_requests): + all_tasks.append(asyncio.create_task(make_request())) + await asyncio.sleep(0.01) results = await asyncio.gather(*all_tasks) assert len(results) == num_requests @@ -532,7 +551,10 @@ async def test_api_only_multinode_dp_completion( await asyncio.sleep(0.5) # Second burst of requests - all_tasks = [make_request() for _ in range(num_requests)] + all_tasks = [] + for _ in range(num_requests): + all_tasks.append(asyncio.create_task(make_request())) + await asyncio.sleep(0.01) results = await asyncio.gather(*all_tasks) assert len(results) == num_requests @@ -610,8 +632,11 @@ async def test_api_only_multinode_dp_completion_streaming( await asyncio.sleep(0.5) # Send multiple streaming requests - should be distributed across engines - num_requests = 50 - all_tasks = [make_streaming_request() for _ in range(num_requests)] + num_requests = 200 + all_tasks = [] + for _ in range(num_requests): + all_tasks.append(asyncio.create_task(make_streaming_request())) + await asyncio.sleep(0.01) results = await asyncio.gather(*all_tasks) assert len(results) == num_requests @@ -620,7 +645,10 @@ async def test_api_only_multinode_dp_completion_streaming( await asyncio.sleep(0.5) # Second burst of streaming requests - all_tasks = [make_streaming_request() for _ in range(num_requests)] + all_tasks = [] + for _ in range(num_requests): + all_tasks.append(asyncio.create_task(make_streaming_request())) + await asyncio.sleep(0.01) results = await asyncio.gather(*all_tasks) assert len(results) == num_requests