Futher optimize rearrange

Signed-off-by: ilmarkov <markovilya197@gmail.com>
This commit is contained in:
ilmarkov 2025-12-11 11:35:10 +00:00
parent f28720db88
commit ab0ca861da

View File

@ -233,10 +233,7 @@ def move_to_buffer(
recv_dst_rows[:recv_count] = dst_rows
recv_primary_mask[dst_rows] = True
eligible_local_buffer_mask = np.logical_and(
np.logical_and(~is_unchanged, is_received_locally),
new_local_expert_ids != -1,
)
eligible_local_buffer_mask = np.logical_and(~is_unchanged, is_received_locally)
# 1. Local moves into tmp buffers
if bool(eligible_local_buffer_mask.any()) and send_count > 0:
@ -396,7 +393,7 @@ def move_from_buffer(
dest_indices = np.nonzero(dest_mask_np)[0].tolist()
for dst in dest_indices:
for w, b in zip(expert_weights, expert_weights_buffers):
w[dst].copy_(b[dst])
w[dst].copy_(b[dst], non_blocking=True)
# Duplicate remote received rows to non-primary duplicate dsts
if recv_count == 0:
@ -433,7 +430,7 @@ def move_from_buffer(
for dst, src in zip(matched_dst_rows.tolist(), matched_src_rows.tolist()):
for w in expert_weights:
w[dst].copy_(w[src])
w[dst].copy_(w[src], non_blocking=True)
async def transfer_layer(