diff --git a/.buildkite/test-pipeline.yaml b/.buildkite/test-pipeline.yaml index 5c08843a00e19..0150721b7931f 100644 --- a/.buildkite/test-pipeline.yaml +++ b/.buildkite/test-pipeline.yaml @@ -179,7 +179,7 @@ steps: - TP_SIZE=2 DP_SIZE=2 pytest -v -s v1/test_async_llm_dp.py - TP_SIZE=2 DP_SIZE=2 pytest -v -s v1/test_external_lb_dp.py - TP_SIZE=1 DP_SIZE=4 pytest -v -s v1/test_hybrid_lb_dp.py - - TP_SIZE=1 DP_SIZE=4 DP_PER_NODE=2 pytest -v -s v1/test_internal_lb_dp.py + - TP_SIZE=1 DP_SIZE=4 pytest -v -s v1/test_internal_lb_dp.py - pytest -v -s v1/engine/test_engine_core_client.py::test_kv_cache_events_dp - pytest -v -s distributed/test_utils.py - pytest -v -s compile/test_basic_correctness.py diff --git a/tests/v1/test_internal_lb_dp.py b/tests/v1/test_internal_lb_dp.py index a043dbfa0bdca..9aef4d5821e82 100644 --- a/tests/v1/test_internal_lb_dp.py +++ b/tests/v1/test_internal_lb_dp.py @@ -20,8 +20,8 @@ DP_SIZE = int(os.getenv("DP_SIZE", "2")) # Default tensor parallel size to use TP_SIZE = int(os.getenv("TP_SIZE", "1")) -# Number of data parallel ranks per node -DP_PER_NODE = int(os.getenv("DP_PER_NODE", "1")) +# Number of nodes to simulate +NUM_NODES = 2 class MultinodeInternalLBServerManager: @@ -46,7 +46,7 @@ class MultinodeInternalLBServerManager: def __enter__(self) -> list[tuple[RemoteOpenAIServer, list[str]]]: """Start all server instances for multi-node internal LB mode.""" - for rank in range(self.dp_size): + for rank in range(0, self.dp_size, self.dp_per_node): # Create server args for this specific rank server_args = self.base_server_args.copy() @@ -88,7 +88,7 @@ class MultinodeInternalLBServerManager: # Use a thread to start each server to allow parallel initialization def start_server(r: int, sargs: list[str]): - gpu_count = self.tp_size * self.dp_per_node + gpus_per_node = self.tp_size * self.dp_per_node try: # Start the server server = RemoteOpenAIServer( @@ -99,8 +99,7 @@ class MultinodeInternalLBServerManager: "CUDA_VISIBLE_DEVICES": ",".join( str(Platform.device_id_to_physical_device_id( - i)) for i in range(r * gpu_count, (r + 1) * - gpu_count)) + i)) for i in range(r, r + gpus_per_node)) }) server.__enter__() if r == 0: @@ -127,7 +126,7 @@ class MultinodeInternalLBServerManager: # Give servers additional time to fully initialize and coordinate time.sleep(3) - if len(self.servers) != self.dp_size: + if len(self.servers) != self.dp_size // self.dp_per_node: raise Exception("Servers failed to start") return self.servers @@ -283,7 +282,8 @@ def servers(request, default_server_args): api_server_count = request.param with MultinodeInternalLBServerManager(MODEL_NAME, DP_SIZE, api_server_count, - default_server_args, DP_PER_NODE, + default_server_args, + DP_SIZE // NUM_NODES, TP_SIZE) as server_list: yield server_list @@ -386,7 +386,7 @@ async def test_multinode_dp_completion(client: openai.AsyncOpenAI, # Check request balancing via Prometheus metrics head_server = servers[0][0] - check_request_balancing(head_server, len(servers)) + check_request_balancing(head_server, DP_SIZE) @pytest.mark.asyncio @@ -474,7 +474,7 @@ async def test_multinode_dp_completion_streaming(client: openai.AsyncOpenAI, # Check request balancing via Prometheus metrics head_server = servers[0][0] - check_request_balancing(head_server, len(servers)) + check_request_balancing(head_server, DP_SIZE) @pytest.mark.asyncio