ffn dp use all2all

Signed-off-by: jiangkuaixue123 <jiangxiaozhou111@163.com>
This commit is contained in:
jiangkuaixue123 2025-12-15 16:15:28 +08:00
parent eb2355c600
commit 00570c9fac
2 changed files with 23 additions and 1 deletions

View File

@ -15,6 +15,10 @@ from vllm.distributed.parallel_state import (
init_model_parallel_group,
)
from vllm.logger import init_logger
from vllm.forward_context import (
DPMetadata,
get_forward_context,
)
from .base import AFDConnectorBase
from .metadata import AFDConnectorMetadata
@ -53,6 +57,7 @@ class P2PAFDConnector(AFDConnectorBase):
)
self.recv_attn_output_counter: int = 0
self.recv_ffn_output_counter: int = 0
self.dp_metadata_list: dict[int, DPMetadata] = {}
def close(self) -> None:
"""Close the connector and release resources."""
@ -169,6 +174,17 @@ class P2PAFDConnector(AFDConnectorBase):
self._tensor_metadata_list = self._build_tensor_metadata_list(
tensor_metadata, self._current_afd_connector_metadata
)
if self.config.parallel_config.data_parallel_size > 1:
logger.info("jcz recv_metadata num_of_stages:{}".format(self._current_afd_connector_metadata.num_of_stages))
for stage_idx in range(self._current_afd_connector_metadata.num_of_stages):
num_tokens_per_ubatch = self._tensor_metadata_list[stage_idx].size[0]
self.dp_metadata_list[stage_idx] = DPMetadata.make(
self.config.parallel_config,
num_tokens_per_ubatch,
torch.tensor([num_tokens_per_ubatch] * self.config.parallel_config.data_parallel_size,
device="cpu", dtype=torch.int32),
)
logger.info("jcz recv_metadata self.dp_metadata_list:{}".format(self.dp_metadata_list))
def _send_hidden_states(
self,

View File

@ -17,7 +17,7 @@ from vllm.distributed.parallel_state import (
get_world_group,
graph_capture,
)
from vllm.forward_context import set_forward_context
from vllm.forward_context import set_forward_context, get_forward_context
from vllm.logger import init_logger
from vllm.model_executor.model_loader import get_model_loader
from vllm.utils.mem_utils import DeviceMemoryProfiler, GiB_bytes
@ -130,6 +130,10 @@ class GPUFFNModelRunner(LoRAModelRunnerMixin):
try:
hidden_states, recv_metadata = self.connector.recv_attn_output()
if hasattr(self.connector, 'dp_metadata_list'):
dp_metadata = self.connector.dp_metadata_list.get(recv_metadata.stage_idx, None)
else:
dp_metadata = None
current_layer_idx = recv_metadata.layer_idx
logger.info(
f"layer {current_layer_idx} moe recv hidden states type:{type(hidden_states)}, shape:{hidden_states.shape}"
@ -145,6 +149,7 @@ class GPUFFNModelRunner(LoRAModelRunnerMixin):
with set_forward_context(
attn_metadata=None, vllm_config=self.vllm_config
):
get_forward_context().dp_metadata = dp_metadata
rank_ffn_output = self._execute_with_cuda_graph(
hidden_states, cuda_graph_info
)
@ -153,6 +158,7 @@ class GPUFFNModelRunner(LoRAModelRunnerMixin):
with set_forward_context(
attn_metadata=None, vllm_config=self.vllm_config
):
get_forward_context().dp_metadata = dp_metadata
rank_ffn_output = self._execute_eager_mode(
hidden_states, current_layer_idx
)