[EPLB] Reduce EPLB Inference Overhead (#24573)

Signed-off-by: Bowen Wang <abmfy@icloud.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: Tyler Michael Smith <tyler@neuralmagic.com>
This commit is contained in:
Bowen Wang 2025-09-22 09:31:05 -07:00 committed by GitHub
parent 175811e3b5
commit 06a41334c7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 92 additions and 50 deletions

View File

@ -1017,6 +1017,79 @@ def grouped_topk(
return topk_weights.to(torch.float32), topk_ids.to(torch.int32) return topk_weights.to(torch.float32), topk_ids.to(torch.int32)
@torch.compile(dynamic=True, backend=current_platform.simple_compile_backend)
def eplb_map_to_physical_and_record(
topk_ids: torch.Tensor,
expert_load_view: torch.Tensor,
logical_to_physical_map: torch.Tensor,
logical_replica_count: torch.Tensor,
indices_type: Optional[torch.dtype] = None) -> torch.Tensor:
'''
Map the logical expert ids to physical expert ids
and record the expert load metrics.
This will select a pseudo-random replica for each logical expert.
Only used for EPLB.
Args:
topk_ids: The logical expert ids.
expert_load_view: The expert load view.
logical_to_physical_map: The logical to physical map.
logical_replica_count: The logical replica count.
indices_type: The indices type.
Returns:
The physical expert ids.
'''
# 1. Convert the logical expert ids to physical expert ids
# Directly select a random replica for each logical expert
# In case `indices_type` is not `torch.long` or `torch.int`,
# e.g. `torch.uint32` as required by dispatch/combine kernels
topk_ids_long = topk_ids.long()
# Use (token position) modulo (replica count)
# to deterministically choose a replica
replica_count = logical_replica_count[topk_ids_long]
# Flatten-position based index, reshaped back to `topk_ids` shape
pos_indices = torch.arange(topk_ids.numel(),
device=topk_ids.device,
dtype=torch.long).reshape_as(topk_ids)
# Compute pseudo-random indices by modulo
replica_indices = (pos_indices % replica_count).unsqueeze(-1)
physical_ids = logical_to_physical_map[topk_ids_long].gather(
-1, replica_indices).squeeze(-1)
topk_ids = physical_ids
# 2. Record expert load metrics.
# TODO(bowen): When using `FusedMoEModularKernel`, this
# can be done in a more unified way, since
# `FusedMoEPrepareAndFinalize` will return the expert
# token count, in some cases directly from the kernel.
# However, now there are many code paths not using
# the modular kernel, e.g. calling `fused_experts`,
# so we decide to keep the logic here.
#
# If later refactor moved all the MoE kernel calls
# to the modular kernel, we can move this logic there
# to achieve better efficiency.
# `expert_load_view`: (num_physical_experts,)
# `torch.bincount` is not compilable, so use `scatter_add_` instead.
topk_ids_flatten = topk_ids.flatten()
expert_load_view.scatter_add_(
dim=0,
index=topk_ids_flatten.long(),
src=torch.ones_like(topk_ids_flatten).to(expert_load_view))
if indices_type is not None:
topk_ids = topk_ids.to(dtype=indices_type)
return topk_ids
def fused_grouped_topk( def fused_grouped_topk(
hidden_states: torch.Tensor, hidden_states: torch.Tensor,
gating_output: torch.Tensor, gating_output: torch.Tensor,

View File

@ -43,7 +43,8 @@ from vllm.v1.worker.ubatching import dbo_current_ubatch_id
if current_platform.is_cuda_alike(): if current_platform.is_cuda_alike():
from .fused_batched_moe import BatchedTritonExperts from .fused_batched_moe import BatchedTritonExperts
from .fused_moe import TritonExperts, fused_experts from .fused_moe import (TritonExperts, eplb_map_to_physical_and_record,
fused_experts)
if has_pplx(): if has_pplx():
from .pplx_prepare_finalize import (PplxPrepareAndFinalize, from .pplx_prepare_finalize import (PplxPrepareAndFinalize,
pplx_hidden_dim_scale_bytes) pplx_hidden_dim_scale_bytes)
@ -55,6 +56,16 @@ else:
fused_experts = None # type: ignore fused_experts = None # type: ignore
FusedMoEPermuteExpertsUnpermute = None # type: ignore FusedMoEPermuteExpertsUnpermute = None # type: ignore
FusedMoEPrepareAndFinalize = None # type: ignore FusedMoEPrepareAndFinalize = None # type: ignore
def eplb_map_to_physical_and_record(
topk_ids: torch.Tensor, expert_load_view: torch.Tensor,
logical_to_physical_map: torch.Tensor,
logical_replica_count: torch.Tensor,
indices_type: Optional[torch.dtype]) -> torch.Tensor:
# CPU fallback: no EPLB so just return as is
return topk_ids
if is_rocm_aiter_moe_enabled(): if is_rocm_aiter_moe_enabled():
from vllm.model_executor.layers.fused_moe.rocm_aiter_fused_moe import ( # noqa: E501 from vllm.model_executor.layers.fused_moe.rocm_aiter_fused_moe import ( # noqa: E501
rocm_aiter_grouped_topk as grouped_topk) rocm_aiter_grouped_topk as grouped_topk)
@ -1616,55 +1627,13 @@ class FusedMoE(CustomOp):
assert logical_to_physical_map is not None assert logical_to_physical_map is not None
assert logical_replica_count is not None assert logical_replica_count is not None
# 1. Convert the logical expert ids to physical expert ids topk_ids = eplb_map_to_physical_and_record(
# Directly select a random replica for each logical expert topk_ids=topk_ids,
expert_load_view=expert_load_view,
# TODO: maybe optimize this by using specified kernels, logical_to_physical_map=logical_to_physical_map,
# or compute pseudo-random indices by modulo logical_replica_count=logical_replica_count,
indices_type=indices_type,
# In case `indices_type` is not `torch.long` or `torch.int`, )
# e.g. `torch.uint32` as required by dispatch/combine kernels
topk_ids_long = topk_ids.long()
replica_indices = (
torch.rand_like(topk_ids, dtype=torch.float) *
logical_replica_count[topk_ids_long]).long().unsqueeze(-1)
physical_ids = logical_to_physical_map[topk_ids_long].gather(
-1, replica_indices).squeeze(-1)
topk_ids = physical_ids
# 2. Record expert load metrics.
# TODO(bowen): When using `FusedMoEModularKernel`, this
# can be done in a more unified way, since
# `FusedMoEPrepareAndFinalize` will return the expert
# token count, in some cases directly from the kernel.
# However, now there are many code paths not using
# the modular kernel, e.g. calling `fused_experts`,
# so we decide to keep the logic here.
#
# If later refactor moved all the MoE kernel calls
# to the modular kernel, we can move this logic there
# to achieve better efficiency.
# `expert_load_view`: (num_physical_experts,)
topk_ids_flatten = topk_ids.flatten()
# Performance optimization:
# `masked_fill` is significantly faster than `masked_select`
invalid_mask = topk_ids_flatten < 0
# Replace invalid expert ids with 0 (just a dummy position)
# to avoid out-of-bounds errors in scatter_add_
index = topk_ids_flatten.masked_fill_(invalid_mask, 0)
# `src` is the valid mask, which is 1 for valid and 0 for invalid
src = ~invalid_mask
expert_load_view.scatter_add_(dim=0,
index=index.long(),
src=src.to(expert_load_view))
topk_ids = topk_ids.to(dtype=indices_type)
assert topk_ids.dtype == indices_type or indices_type is None assert topk_ids.dtype == indices_type or indices_type is None