diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 12aeb4e68b61b..8b433e2cbcc23 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -42,7 +42,9 @@ Transfer = tuple[int, float] # (xfer_handle, start_time) EngineId = str ReqId = str GET_META_MSG = b"get_meta_msg" -NIXL_NUM_WORKERS = 4 + +import os +VLLM_DEBUG_NIXL_XFER_TIME = os.getenv("VLLM_DEBUG_NIXL_XFER_TIME", "0") == "1" logger = init_logger(__name__) @@ -50,16 +52,11 @@ logger = init_logger(__name__) try: from nixl._api import nixl_agent as NixlWrapper, nixl_agent_config NIXL_VERSION = metadata.version("nixl") - NIXL_major, NIXL_minor, NIXL_patch = map(int, NIXL_VERSION.split(".")) - - except ImportError: logger.warning("NIXL is not available") NixlWrapper = None NIXL_VERSION = None -NIXL_NUM_WORKERS - class NixlAgentMetadata( msgspec.Struct, omit_defaults=True, # type: ignore[call-arg] @@ -368,7 +365,7 @@ class NixlConnectorWorker: # Agent. import os - NIXL_NUM_WORKERS = int(os.getenv("VLLM_NIXL_NUM_WORKERS", "1")) + NIXL_NUM_WORKERS = int(os.getenv("VLLM_NIXL_NUM_WORKERS", "8")) logger.info(f"Using NIXL_NUM_WORKERS={NIXL_NUM_WORKERS} for NIXL agent.") config = nixl_agent_config(enable_prog_thread=False, num_threads=NIXL_NUM_WORKERS) @@ -1038,7 +1035,9 @@ class NixlConnectorWorker: start = time.perf_counter() self.nixl_wrapper.transfer(handle) end = time.perf_counter() - logger.info(f"TIME: {end - start}") + if VLLM_DEBUG_NIXL_XFER_TIME: + # Log the time taken for the transfer. + logger.info(f"TIME: {end - start}") # Use handle to check completion in future step(). # TODO (NickLucche) surface xfer elapsed time