add comment about hack

Signed-off-by: rshaw@neuralmagic.com <robertgshaw2@gmail.com>
This commit is contained in:
rshaw@neuralmagic.com 2025-06-30 12:25:48 +00:00
parent 39e6bd19fd
commit f015919fc8

View File

@ -7,13 +7,13 @@ import time
import uuid import uuid
from collections import defaultdict from collections import defaultdict
from collections.abc import Iterator from collections.abc import Iterator
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Optional from typing import TYPE_CHECKING, Any, Optional
import msgspec import msgspec
import torch import torch
import zmq import zmq
from concurrent.futures import ThreadPoolExecutor, as_completed
from vllm import envs from vllm import envs
from vllm.attention.selector import backend_name_to_enum, get_attn_backend from vllm.attention.selector import backend_name_to_enum, get_attn_backend
@ -333,7 +333,7 @@ class NixlConnectorWorker:
# Agent. # Agent.
import os import os
num_workers = 16 num_workers = 16
# setting num workers on the prefiller causes the notifs to not be recved??? # setting num_workers on the prefiller causes no notifs to be recved???
# this is a hack to make sure we set num workers on the prefiller to 1. # this is a hack to make sure we set num workers on the prefiller to 1.
if os.getenv("VLLM_NIXL_SIDE_CHANNEL_PORT", "") == "5557": if os.getenv("VLLM_NIXL_SIDE_CHANNEL_PORT", "") == "5557":
num_workers = None num_workers = None
@ -988,6 +988,10 @@ class NixlConnectorWorker:
CHUNK_SIZE = 1000 CHUNK_SIZE = 1000
handles = [] handles = []
futures = [] futures = []
# NOTE: this is a hack to make make_prepped_xfer into threads so that
# different workers are allocated for each chuck. Without this change,
# nixl was allocating the same worker (0) for all the chunks and the
# overall launch time was >300 ms.
with ThreadPoolExecutor() as executor: with ThreadPoolExecutor() as executor:
for i in range(0, len(local_block_descs_ids), CHUNK_SIZE): for i in range(0, len(local_block_descs_ids), CHUNK_SIZE):
future = executor.submit( future = executor.submit(
@ -1004,14 +1008,10 @@ class NixlConnectorWorker:
for future in futures: for future in futures:
handles.append(future.result()) handles.append(future.result())
# Begin async xfer. # Begin async xfer.
start = time.perf_counter() start = time.perf_counter()
# IT WORKS WITH THIS:
# for handle in handles: # for handle in handles:
# self.nixl_wrapper.transfer(handle) # self.nixl_wrapper.transfer(handle)
# IT FAILS WITH THIS:
self.nixl_wrapper.transfer_batched(handles) self.nixl_wrapper.transfer_batched(handles)
end = time.perf_counter() end = time.perf_counter()
logger.info("======== LAUNCH TIME: %s ========", end - start) logger.info("======== LAUNCH TIME: %s ========", end - start)