diff --git a/vllm/distributed/eplb/async_worker.py b/vllm/distributed/eplb/async_worker.py index 7ea4bf269db23..fcad45b9e2449 100644 --- a/vllm/distributed/eplb/async_worker.py +++ b/vllm/distributed/eplb/async_worker.py @@ -78,8 +78,8 @@ def run_rebalance_experts( eplb_stats.num_gpus, model_state.physical_to_logical_map, ) + assert new_physical_to_logical_map.device == torch.device("cpu") - # Move map to cpu model_state.new_physical_to_logical_map = new_physical_to_logical_map max_slots = model_state.logical_to_physical_map.shape[-1] @@ -109,6 +109,10 @@ async def transfer_run_periodically( continue if not model_state.new_indices_computed: run_rebalance_experts(model_state, state) + logger.info( + "Async worker computed new indices for model %s", + model_state.model_name, + ) current_num_layers = model_state.model.num_moe_layers while ( diff --git a/vllm/distributed/eplb/eplb_state.py b/vllm/distributed/eplb/eplb_state.py index e4a3d9c160453..d169e42df577d 100644 --- a/vllm/distributed/eplb/eplb_state.py +++ b/vllm/distributed/eplb/eplb_state.py @@ -917,7 +917,9 @@ class EplbState: ) else: eplb_model_state.eplb_stats = EplbStats( - global_expert_load_window=global_expert_load_window, + # We copy the tensor to snapshot the workload on the main + # thread to be used on the async thread. + global_expert_load_window=global_expert_load_window.clone(), num_replicas=num_replicas, num_groups=num_groups, num_nodes=num_nodes,