On environments where numa cannot be detected we get 0 (#21115)

Signed-off-by: Eric Curtin <ecurtin@redhat.com>
This commit is contained in:
Eric Curtin 2025-07-17 19:52:17 +01:00 committed by GitHub
parent a3a6c695f4
commit ac9fb732a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -13,12 +13,20 @@ from vllm.logger import init_logger
from vllm.model_executor.utils import set_random_seed from vllm.model_executor.utils import set_random_seed
from vllm.platforms import CpuArchEnum, current_platform from vllm.platforms import CpuArchEnum, current_platform
from vllm.sequence import IntermediateTensors from vllm.sequence import IntermediateTensors
from vllm.utils import PlaceholderModule
from vllm.v1.core.sched.output import SchedulerOutput from vllm.v1.core.sched.output import SchedulerOutput
from vllm.v1.outputs import ModelRunnerOutput from vllm.v1.outputs import ModelRunnerOutput
from vllm.v1.worker.cpu_model_runner import CPUModelRunner from vllm.v1.worker.cpu_model_runner import CPUModelRunner
from vllm.v1.worker.gpu_worker import (Worker, from vllm.v1.worker.gpu_worker import (Worker,
init_worker_distributed_environment) init_worker_distributed_environment)
try:
import psutil
from numa import info
except ImportError:
psutil = PlaceholderModule("psutil") # type: ignore[assignment]
numa = PlaceholderModule("numa") # type: ignore[assignment]
logger = init_logger(__name__) logger = init_logger(__name__)
@ -37,6 +45,8 @@ class CPUWorker(Worker):
is_driver_worker=is_driver_worker) is_driver_worker=is_driver_worker)
self.parallel_config.disable_custom_all_reduce = True self.parallel_config.disable_custom_all_reduce = True
self.manually_bind_threads_suggestion = (
"To get better performance, please try to manually bind threads.")
def init_device(self): def init_device(self):
# Setup OpenMP threads affinity. # Setup OpenMP threads affinity.
@ -112,50 +122,111 @@ class CPUWorker(Worker):
assert isinstance(output, ModelRunnerOutput) assert isinstance(output, ModelRunnerOutput)
return output if self.is_driver_worker else None return output if self.is_driver_worker else None
def warn_inability_to_detect_numa(self) -> None:
logger.warning(
"Auto thread-binding failed due to the "
"inability to detect numa nodes. %s",
self.manually_bind_threads_suggestion)
def warn_lack_of_numa_and_psutil(self) -> None:
logger.warning(
"Auto thread-binding failed due to "
"the lack of package numa and psutil. %s",
self.manually_bind_threads_suggestion)
def warn_world_size_too_large(self, world_size: int,
node_to_cpus_len: int) -> None:
logger.warning(
"Auto thread-binding failed due to "
"world size: %d being larger than "
"allowed NUMA nodes number: %d. %s", world_size, node_to_cpus_len,
self.manually_bind_threads_suggestion)
def get_cpus_allow_list_and_numa_size(self):
cpus_allow_list = psutil.Process().cpu_affinity()
numa_size = info.get_num_configured_nodes()
return cpus_allow_list, numa_size
def auto_thread_binding_based_on_numa_nodes(self, world_size: int,
rank_to_cpus: str) -> str:
cpu_count = psutil.cpu_count(logical=False)
cpus_allow_list, numa_size = self.get_cpus_allow_list_and_numa_size()
if not numa_size:
self.warn_inability_to_detect_numa()
return rank_to_cpus
cpu_count_per_numa = cpu_count // numa_size
num_of_reserved_cpu = min(envs.VLLM_CPU_NUM_OF_RESERVED_CPU,
cpu_count_per_numa // 2)
node_to_cpus = []
for i in range(numa_size):
node_intersect = set(
info.node_to_cpus(i)).intersection(cpus_allow_list)
if bool(node_intersect):
node_to_cpus.append(list(node_intersect))
node_to_cpus_len = len(node_to_cpus)
if world_size > node_to_cpus_len:
self.warn_world_size_too_large(world_size, node_to_cpus_len)
else:
end = cpu_count_per_numa - num_of_reserved_cpu
rank_to_cpus_list = node_to_cpus[self.rank][:end]
rank_to_cpus = ','.join(str(x) for x in rank_to_cpus_list)
logger.info("auto thread-binding list: %s", rank_to_cpus)
return rank_to_cpus
def libnuma_and_psutil_found(self) -> bool:
libnuma_found = util.find_spec("numa") is not None
psutil_found = util.find_spec("psutil") is not None
return libnuma_found and psutil_found
def get_cpus_id_binding_based_on_numa_nodes(self) -> str: def get_cpus_id_binding_based_on_numa_nodes(self) -> str:
"""Return CPUs id binding based on NUMA nodes. """Return CPUs id binding based on NUMA nodes.
""" """
rank_to_cpus = self.local_omp_cpuid rank_to_cpus = self.local_omp_cpuid
# Setup OpenMP thread affinity based on NUMA nodes automatically # Setup OpenMP thread affinity based on NUMA nodes automatically
world_size = self.vllm_config.parallel_config.world_size world_size = self.vllm_config.parallel_config.world_size
libnuma_found = util.find_spec("numa") is not None if self.libnuma_and_psutil_found():
psutil_found = util.find_spec("psutil") is not None rank_to_cpus = self.auto_thread_binding_based_on_numa_nodes(
if libnuma_found and psutil_found: world_size, rank_to_cpus)
import psutil else:
from numa import info self.warn_lack_of_numa_and_psutil()
cpu_count = psutil.cpu_count(logical=False) return rank_to_cpus
cpus_allow_list = psutil.Process().cpu_affinity()
numa_size = info.get_num_configured_nodes() def select_threads_per_power_core(self,
cpu_count_per_numa = cpu_count // numa_size node_cpu_ids: list[int]) -> list[int]:
return [cpu for cpu in node_cpu_ids if cpu % 8 < 4]
def auto_thread_binding_based_on_numa_nodes_ppc64le(
self, world_size: int, rank_to_cpus: str) -> str:
cpus_allow_list, numa_size = self.get_cpus_allow_list_and_numa_size()
if not numa_size:
self.warn_inability_to_detect_numa()
return rank_to_cpus
node_to_cpus = []
for i in range(numa_size):
node_intersect = set(
info.node_to_cpus(i)).intersection(cpus_allow_list)
if bool(node_intersect):
node_to_cpus.append(sorted(list(node_intersect)))
node_to_cpus_len = len(node_to_cpus)
if world_size > node_to_cpus_len:
self.warn_world_size_too_large(world_size, node_to_cpus_len)
else:
node_cpus_this_rank = node_to_cpus[self.rank]
node_cpus_this_rank = self.select_threads_per_power_core(
node_cpus_this_rank)
cpu_count_per_numa = len(node_cpus_this_rank)
num_of_reserved_cpu = min(envs.VLLM_CPU_NUM_OF_RESERVED_CPU, num_of_reserved_cpu = min(envs.VLLM_CPU_NUM_OF_RESERVED_CPU,
cpu_count_per_numa // 2) cpu_count_per_numa // 2)
end = cpu_count_per_numa - num_of_reserved_cpu
# check allow node_to_cpus list rank_to_cpus_list = node_cpus_this_rank[:end]
node_to_cpus = [] rank_to_cpus = ','.join(str(x) for x in rank_to_cpus_list)
for i in range(numa_size): logger.info("ppc64le thread-binding list: %s", rank_to_cpus)
node_intersect = set(
info.node_to_cpus(i)).intersection(cpus_allow_list)
if bool(node_intersect):
node_to_cpus.append(list(node_intersect))
if world_size > len(node_to_cpus):
logger.error(
"Auto thread-binding failed due to "
"world size: %d is larger than "
"allowed NUMA nodes number: %d."
"Please try to bind threads manually.", world_size,
len(node_to_cpus))
else:
end = cpu_count_per_numa - num_of_reserved_cpu
rank_to_cpus_list = node_to_cpus[self.rank][:end]
rank_to_cpus = ','.join(str(x) for x in rank_to_cpus_list)
logger.info("auto thread-binding list: %s", rank_to_cpus)
else:
logger.warning(
"Auto thread-binding is not supported due to "
"the lack of package numa and psutil,"
"fallback to no thread-binding. To get better performance,"
"please try to manually bind threads.")
return rank_to_cpus return rank_to_cpus
def get_cpus_id_binding_based_on_numa_nodes_ppc64le(self) -> str: def get_cpus_id_binding_based_on_numa_nodes_ppc64le(self) -> str:
@ -166,48 +237,11 @@ class CPUWorker(Worker):
performance by avoiding oversubscription of logical CPUs on Power. performance by avoiding oversubscription of logical CPUs on Power.
""" """
def select_threads_per_power_core(node_cpu_ids):
return [cpu for cpu in node_cpu_ids if cpu % 8 < 4]
rank_to_cpus = self.local_omp_cpuid rank_to_cpus = self.local_omp_cpuid
world_size = self.vllm_config.parallel_config.world_size world_size = self.vllm_config.parallel_config.world_size
libnuma_found = util.find_spec("numa") is not None if self.libnuma_and_psutil_found():
psutil_found = util.find_spec("psutil") is not None rank_to_cpus = self.auto_thread_binding_based_on_numa_nodes_ppc64le(
if libnuma_found and psutil_found: world_size, rank_to_cpus)
import psutil
from numa import info
cpus_allow_list = psutil.Process().cpu_affinity()
numa_size = info.get_num_configured_nodes()
node_to_cpus = []
for i in range(numa_size):
node_intersect = set(
info.node_to_cpus(i)).intersection(cpus_allow_list)
if bool(node_intersect):
node_to_cpus.append(sorted(list(node_intersect)))
if world_size > len(node_to_cpus):
logger.error(
"Auto thread-binding failed due to "
"world size: %d is larger than "
"allowed NUMA nodes number: %d."
"Please try to bind threads manually.", world_size,
len(node_to_cpus))
else:
node_cpus_this_rank = node_to_cpus[self.rank]
node_cpus_this_rank = select_threads_per_power_core(
node_cpus_this_rank)
cpu_count_per_numa = len(node_cpus_this_rank)
num_of_reserved_cpu = min(envs.VLLM_CPU_NUM_OF_RESERVED_CPU,
cpu_count_per_numa // 2)
end = cpu_count_per_numa - num_of_reserved_cpu
rank_to_cpus_list = node_cpus_this_rank[:end]
rank_to_cpus = ','.join(str(x) for x in rank_to_cpus_list)
logger.info("ppc64le thread-binding list: %s", rank_to_cpus)
else: else:
logger.warning( self.warn_lack_of_numa_and_psutil()
"Auto thread-binding is not supported due to "
"the lack of package numa and psutil,"
"fallback to no thread-binding. To get better performance,"
"please try to manually bind threads.")
return rank_to_cpus return rank_to_cpus