Add threading for load-balancing to different workers

This commit is contained in:
Pravein Govindan Kannan 2025-06-30 14:40:18 +05:30
parent 5d8b665366
commit 17546dc79f

View File

@ -13,6 +13,7 @@ from typing import TYPE_CHECKING, Any, Optional
import msgspec
import torch
import zmq
from concurrent.futures import ThreadPoolExecutor, as_completed
from vllm import envs
from vllm.attention.selector import backend_name_to_enum, get_attn_backend
@ -986,16 +987,23 @@ class NixlConnectorWorker:
# Prepare transfer with Nixl.
CHUNK_SIZE = 100
handles = []
for i in range(0, len(local_block_descs_ids), CHUNK_SIZE):
handles.append(
self.nixl_wrapper.make_prepped_xfer(
futures = []
with ThreadPoolExecutor() as executor:
for i in range(0, len(local_block_descs_ids), CHUNK_SIZE):
future = executor.submit(
self.nixl_wrapper.make_prepped_xfer,
"READ",
local_xfer_side_handle,
local_block_descs_ids[i:i + CHUNK_SIZE],
remote_xfer_side_handle,
remote_block_descs_ids[i:i + CHUNK_SIZE],
skip_desc_merge=True,
))
)
futures.append(future)
for future in futures:
handles.append(future.result())
# Begin async xfer.
start = time.perf_counter()