updated vllm

Signed-off-by: rshaw@neuralmagic.com <robertgshaw2@gmail.com>
This commit is contained in:
rshaw@neuralmagic.com 2025-07-01 00:16:07 +00:00
parent 15bc311d28
commit 1172b70b79

View File

@ -343,8 +343,8 @@ class NixlConnectorWorker:
print(f"NUM_WORKERS: {num_workers=}") print(f"NUM_WORKERS: {num_workers=}")
self.nixl_wrapper = NixlWrapper(str(uuid.uuid4()), self.nixl_wrapper = NixlWrapper(str(uuid.uuid4()),
None, None,
num_workers=num_workers, num_workers=None,
num_shared_workers=None) num_shared_workers=num_workers)
# Map of engine_id -> {rank0: agent_name0, rank1: agent_name1..}. # Map of engine_id -> {rank0: agent_name0, rank1: agent_name1..}.
self._remote_agents: dict[str, dict[int, str]] = defaultdict(dict) self._remote_agents: dict[str, dict[int, str]] = defaultdict(dict)
@ -987,32 +987,28 @@ class NixlConnectorWorker:
# Prepare transfer with Nixl. # Prepare transfer with Nixl.
CHUNK_SIZE = 1000 CHUNK_SIZE = 1000
handles = [] handles = []
futures = []
# NOTE: this is a hack to make make_prepped_xfer into threads so that # NOTE: this is a hack to make make_prepped_xfer into threads so that
# different workers are allocated for each chuck. Without this change, # different workers are allocated for each chuck. Without this change,
# nixl was allocating the same worker (0) for all the chunks and the # nixl was allocating the same worker (0) for all the chunks and the
# overall launch time was >300 ms. # overall launch time was >300 ms.
with ThreadPoolExecutor() as executor: for i in range(0, len(local_block_descs_ids), CHUNK_SIZE):
for i in range(0, len(local_block_descs_ids), CHUNK_SIZE): handle = self.nixl_wrapper.make_prepped_xfer(
future = executor.submit( "READ",
self.nixl_wrapper.make_prepped_xfer, local_xfer_side_handle,
"READ", local_block_descs_ids[i:i + CHUNK_SIZE],
local_xfer_side_handle, remote_xfer_side_handle,
local_block_descs_ids[i:i + CHUNK_SIZE], remote_block_descs_ids[i:i + CHUNK_SIZE],
remote_xfer_side_handle, skip_desc_merge=True,
remote_block_descs_ids[i:i + CHUNK_SIZE], )
skip_desc_merge=True, handles.append(handle)
)
futures.append(future)
for future in futures:
handles.append(future.result())
# Begin async xfer. # Begin async xfer.
start = time.perf_counter() start = time.perf_counter()
if USE_BATCHED: if USE_BATCHED:
print("BATCHED!")
self.nixl_wrapper.transfer_batched(handles) self.nixl_wrapper.transfer_batched(handles)
else: else:
print("NON BATCHED!")
for handle in handles: for handle in handles:
self.nixl_wrapper.transfer(handle) self.nixl_wrapper.transfer(handle)
end = time.perf_counter() end = time.perf_counter()