From f015919fc8efec4c8b8d94374afeda98a1f0a81b Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Mon, 30 Jun 2025 12:25:48 +0000 Subject: [PATCH] add comment about hack Signed-off-by: rshaw@neuralmagic.com --- .../kv_transfer/kv_connector/v1/nixl_connector.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index c6c86d74ab52c..745ef4150295c 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -7,13 +7,13 @@ import time import uuid from collections import defaultdict from collections.abc import Iterator +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Optional import msgspec import torch import zmq -from concurrent.futures import ThreadPoolExecutor, as_completed from vllm import envs from vllm.attention.selector import backend_name_to_enum, get_attn_backend @@ -333,7 +333,7 @@ class NixlConnectorWorker: # Agent. import os num_workers = 16 - # setting num workers on the prefiller causes the notifs to not be recved??? + # setting num_workers on the prefiller causes no notifs to be recved??? # this is a hack to make sure we set num workers on the prefiller to 1. if os.getenv("VLLM_NIXL_SIDE_CHANNEL_PORT", "") == "5557": num_workers = None @@ -988,6 +988,10 @@ class NixlConnectorWorker: CHUNK_SIZE = 1000 handles = [] futures = [] + # NOTE: this is a hack to make make_prepped_xfer into threads so that + # different workers are allocated for each chuck. Without this change, + # nixl was allocating the same worker (0) for all the chunks and the + # overall launch time was >300 ms. with ThreadPoolExecutor() as executor: for i in range(0, len(local_block_descs_ids), CHUNK_SIZE): future = executor.submit( @@ -1004,14 +1008,10 @@ class NixlConnectorWorker: for future in futures: handles.append(future.result()) - # Begin async xfer. start = time.perf_counter() - # IT WORKS WITH THIS: # for handle in handles: # self.nixl_wrapper.transfer(handle) - - # IT FAILS WITH THIS: self.nixl_wrapper.transfer_batched(handles) end = time.perf_counter() logger.info("======== LAUNCH TIME: %s ========", end - start)