From 00570c9fac2282b56b7880ed8866c27b6a1949d6 Mon Sep 17 00:00:00 2001 From: jiangkuaixue123 Date: Mon, 15 Dec 2025 16:15:28 +0800 Subject: [PATCH] ffn dp use all2all Signed-off-by: jiangkuaixue123 --- .../afd_transfer/afd_connector/p2p_connector.py | 16 ++++++++++++++++ vllm/v1/worker/gpu_ffn_model_runner.py | 8 +++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/vllm/distributed/afd_transfer/afd_connector/p2p_connector.py b/vllm/distributed/afd_transfer/afd_connector/p2p_connector.py index d9d9730b55365..0655db693bce4 100644 --- a/vllm/distributed/afd_transfer/afd_connector/p2p_connector.py +++ b/vllm/distributed/afd_transfer/afd_connector/p2p_connector.py @@ -15,6 +15,10 @@ from vllm.distributed.parallel_state import ( init_model_parallel_group, ) from vllm.logger import init_logger +from vllm.forward_context import ( + DPMetadata, + get_forward_context, +) from .base import AFDConnectorBase from .metadata import AFDConnectorMetadata @@ -53,6 +57,7 @@ class P2PAFDConnector(AFDConnectorBase): ) self.recv_attn_output_counter: int = 0 self.recv_ffn_output_counter: int = 0 + self.dp_metadata_list: dict[int, DPMetadata] = {} def close(self) -> None: """Close the connector and release resources.""" @@ -169,6 +174,17 @@ class P2PAFDConnector(AFDConnectorBase): self._tensor_metadata_list = self._build_tensor_metadata_list( tensor_metadata, self._current_afd_connector_metadata ) + if self.config.parallel_config.data_parallel_size > 1: + logger.info("jcz recv_metadata num_of_stages:{}".format(self._current_afd_connector_metadata.num_of_stages)) + for stage_idx in range(self._current_afd_connector_metadata.num_of_stages): + num_tokens_per_ubatch = self._tensor_metadata_list[stage_idx].size[0] + self.dp_metadata_list[stage_idx] = DPMetadata.make( + self.config.parallel_config, + num_tokens_per_ubatch, + torch.tensor([num_tokens_per_ubatch] * self.config.parallel_config.data_parallel_size, + device="cpu", dtype=torch.int32), + ) + logger.info("jcz recv_metadata self.dp_metadata_list:{}".format(self.dp_metadata_list)) def _send_hidden_states( self, diff --git a/vllm/v1/worker/gpu_ffn_model_runner.py b/vllm/v1/worker/gpu_ffn_model_runner.py index e269442e74bdc..51dd62255acc7 100644 --- a/vllm/v1/worker/gpu_ffn_model_runner.py +++ b/vllm/v1/worker/gpu_ffn_model_runner.py @@ -17,7 +17,7 @@ from vllm.distributed.parallel_state import ( get_world_group, graph_capture, ) -from vllm.forward_context import set_forward_context +from vllm.forward_context import set_forward_context, get_forward_context from vllm.logger import init_logger from vllm.model_executor.model_loader import get_model_loader from vllm.utils.mem_utils import DeviceMemoryProfiler, GiB_bytes @@ -130,6 +130,10 @@ class GPUFFNModelRunner(LoRAModelRunnerMixin): try: hidden_states, recv_metadata = self.connector.recv_attn_output() + if hasattr(self.connector, 'dp_metadata_list'): + dp_metadata = self.connector.dp_metadata_list.get(recv_metadata.stage_idx, None) + else: + dp_metadata = None current_layer_idx = recv_metadata.layer_idx logger.info( f"layer {current_layer_idx} moe recv hidden states type:{type(hidden_states)}, shape:{hidden_states.shape}" @@ -145,6 +149,7 @@ class GPUFFNModelRunner(LoRAModelRunnerMixin): with set_forward_context( attn_metadata=None, vllm_config=self.vllm_config ): + get_forward_context().dp_metadata = dp_metadata rank_ffn_output = self._execute_with_cuda_graph( hidden_states, cuda_graph_info ) @@ -153,6 +158,7 @@ class GPUFFNModelRunner(LoRAModelRunnerMixin): with set_forward_context( attn_metadata=None, vllm_config=self.vllm_config ): + get_forward_context().dp_metadata = dp_metadata rank_ffn_output = self._execute_eager_mode( hidden_states, current_layer_idx )