mirror of
https://git.datalinker.icu/vllm-project/vllm.git
synced 2026-01-26 11:24:28 +08:00
[Feat] Allow custom naming of vLLM processes (#21445)
Signed-off-by: chaunceyjiang <chaunceyjiang@gmail.com>
This commit is contained in:
parent
73e3949d07
commit
6da0078523
@ -48,3 +48,4 @@ scipy # Required for phi-4-multimodal-instruct
|
||||
ninja # Required for xgrammar, rocm, tpu, xpu
|
||||
pybase64 # fast base64 implementation
|
||||
cbor2 # Required for cross-language serialization of hashable objects
|
||||
setproctitle # Used to set process names for better debugging and monitoring
|
||||
|
||||
@ -22,6 +22,7 @@ pillow
|
||||
psutil
|
||||
pybase64
|
||||
pydantic
|
||||
setproctitle
|
||||
torch
|
||||
transformers
|
||||
zmq
|
||||
|
||||
@ -21,7 +21,7 @@ from vllm.entrypoints.utils import (VLLM_SUBCMD_PARSER_EPILOG,
|
||||
from vllm.executor.multiproc_worker_utils import _add_prefix
|
||||
from vllm.logger import init_logger
|
||||
from vllm.usage.usage_lib import UsageContext
|
||||
from vllm.utils import FlexibleArgumentParser, get_tcp_uri
|
||||
from vllm.utils import FlexibleArgumentParser, bind_process_name, get_tcp_uri
|
||||
from vllm.v1.engine.core import EngineCoreProc
|
||||
from vllm.v1.engine.utils import CoreEngineProcManager, launch_core_engines
|
||||
from vllm.v1.executor.abstract import Executor
|
||||
@ -77,7 +77,7 @@ def run_headless(args: argparse.Namespace):
|
||||
|
||||
if args.api_server_count > 1:
|
||||
raise ValueError("api_server_count can't be set in headless mode")
|
||||
|
||||
bind_process_name("APIServer_Headless")
|
||||
# Create the EngineConfig.
|
||||
engine_args = vllm.AsyncEngineArgs.from_cli_args(args)
|
||||
usage_context = UsageContext.OPENAI_API_SERVER
|
||||
|
||||
@ -101,8 +101,9 @@ from vllm.transformers_utils.config import (
|
||||
maybe_register_config_serialize_by_value)
|
||||
from vllm.transformers_utils.tokenizer import MistralTokenizer
|
||||
from vllm.usage.usage_lib import UsageContext
|
||||
from vllm.utils import (Device, FlexibleArgumentParser, get_open_zmq_ipc_path,
|
||||
is_valid_ipv6_address, set_ulimit)
|
||||
from vllm.utils import (Device, FlexibleArgumentParser, bind_process_name,
|
||||
get_open_zmq_ipc_path, is_valid_ipv6_address,
|
||||
set_ulimit)
|
||||
from vllm.v1.metrics.prometheus import get_prometheus_registry
|
||||
from vllm.version import __version__ as VLLM_VERSION
|
||||
|
||||
@ -1804,7 +1805,7 @@ async def run_server_worker(listen_address,
|
||||
ToolParserManager.import_tool_parser(args.tool_parser_plugin)
|
||||
|
||||
server_index = client_config.get("client_index", 0) if client_config else 0
|
||||
|
||||
bind_process_name("APIServer", str(server_index))
|
||||
# Load logging config for uvicorn if specified
|
||||
log_config = load_log_config(args.log_config_file)
|
||||
if log_config is not None:
|
||||
|
||||
@ -985,6 +985,12 @@ environment_variables: dict[str, Callable[[], Any]] = {
|
||||
# Used to force set up loopback IP
|
||||
"VLLM_LOOPBACK_IP":
|
||||
lambda: os.getenv("VLLM_LOOPBACK_IP", ""),
|
||||
|
||||
# Used to set the process name prefix for vLLM processes.
|
||||
# This is useful for debugging and monitoring purposes.
|
||||
# The default value is "VLLM".
|
||||
"VLLM_PROCESS_NAME_PREFIX":
|
||||
lambda: os.getenv("VLLM_PROCESS_NAME_PREFIX", "VLLM"),
|
||||
}
|
||||
|
||||
# --8<-- [end:env-vars-definition]
|
||||
|
||||
@ -58,6 +58,7 @@ import numpy as np
|
||||
import numpy.typing as npt
|
||||
import psutil
|
||||
import regex as re
|
||||
import setproctitle
|
||||
import torch
|
||||
import torch.types
|
||||
import yaml
|
||||
@ -3278,3 +3279,16 @@ def has_deep_gemm() -> bool:
|
||||
"""Whether the optional `deep_gemm` package is available."""
|
||||
|
||||
return _has_module("deep_gemm")
|
||||
|
||||
|
||||
def bind_process_name(name: str, suffix: str = "") -> None:
|
||||
"""Bind the process name to a specific name with an optional suffix.
|
||||
|
||||
Args:
|
||||
name: The base name to bind the process to.
|
||||
suffix: An optional suffix to append to the base name.
|
||||
"""
|
||||
name = f"{envs.VLLM_PROCESS_NAME_PREFIX}::{name}"
|
||||
if suffix:
|
||||
name = f"{name}_{suffix}"
|
||||
setproctitle.setproctitle(name)
|
||||
|
||||
@ -13,7 +13,8 @@ from vllm.logger import init_logger
|
||||
from vllm.utils import get_mp_context, make_zmq_socket
|
||||
from vllm.v1.engine import EngineCoreOutputs, EngineCoreRequestType
|
||||
from vllm.v1.serial_utils import MsgpackDecoder
|
||||
from vllm.v1.utils import get_engine_client_zmq_addr, shutdown
|
||||
from vllm.v1.utils import (bind_process_name, get_engine_client_zmq_addr,
|
||||
shutdown)
|
||||
|
||||
logger = init_logger(__name__)
|
||||
|
||||
@ -79,7 +80,7 @@ class DPCoordinator:
|
||||
|
||||
context = get_mp_context()
|
||||
self.proc: multiprocessing.Process = context.Process(
|
||||
target=CoordinatorProc.run_coordinator,
|
||||
target=DPCoordinatorProc.run_coordinator,
|
||||
name="VLLM_DP_Coordinator",
|
||||
kwargs={
|
||||
"engine_count": parallel_config.data_parallel_size,
|
||||
@ -113,12 +114,12 @@ class EngineState:
|
||||
self.request_counts = [0, 0] # [waiting, running]
|
||||
|
||||
|
||||
class CoordinatorProc:
|
||||
class DPCoordinatorProc:
|
||||
|
||||
def __init__(self,
|
||||
engine_count: int,
|
||||
min_stats_update_interval_ms: int = 100):
|
||||
|
||||
bind_process_name(self.__class__.__name__)
|
||||
self.ctx = zmq.Context()
|
||||
|
||||
self.engines = [EngineState() for _ in range(engine_count)]
|
||||
@ -137,7 +138,7 @@ class CoordinatorProc:
|
||||
back_publish_address: str,
|
||||
min_stats_update_interval_ms: int = 100,
|
||||
):
|
||||
coordinator = CoordinatorProc(
|
||||
coordinator = DPCoordinatorProc(
|
||||
engine_count=engine_count,
|
||||
min_stats_update_interval_ms=min_stats_update_interval_ms)
|
||||
try:
|
||||
|
||||
@ -25,7 +25,8 @@ from vllm.logging_utils.dump_input import dump_engine_exception
|
||||
from vllm.lora.request import LoRARequest
|
||||
from vllm.transformers_utils.config import (
|
||||
maybe_register_config_serialize_by_value)
|
||||
from vllm.utils import make_zmq_socket, resolve_obj_by_qualname
|
||||
from vllm.utils import (bind_process_name, make_zmq_socket,
|
||||
resolve_obj_by_qualname)
|
||||
from vllm.v1.core.kv_cache_utils import (get_kv_cache_config,
|
||||
unify_kv_cache_configs)
|
||||
from vllm.v1.core.sched.interface import SchedulerInterface
|
||||
@ -411,6 +412,7 @@ class EngineCoreProc(EngineCore):
|
||||
client_handshake_address: Optional[str] = None,
|
||||
engine_index: int = 0,
|
||||
):
|
||||
bind_process_name(self.__class__.__name__, f"{engine_index}")
|
||||
self.input_queue = queue.Queue[tuple[EngineCoreRequestType, Any]]()
|
||||
self.output_queue = queue.Queue[Union[tuple[int, EngineCoreOutputs],
|
||||
bytes]]()
|
||||
|
||||
@ -30,8 +30,8 @@ from vllm.distributed.kv_transfer.kv_connector.utils import KVOutputAggregator
|
||||
from vllm.executor.multiproc_worker_utils import (
|
||||
_add_prefix, set_multiprocessing_worker_envs)
|
||||
from vllm.logger import init_logger
|
||||
from vllm.utils import (get_distributed_init_method, get_loopback_ip,
|
||||
get_mp_context, get_open_port)
|
||||
from vllm.utils import (bind_process_name, get_distributed_init_method,
|
||||
get_loopback_ip, get_mp_context, get_open_port)
|
||||
from vllm.v1.executor.abstract import Executor, FailureCallback
|
||||
from vllm.v1.outputs import ModelRunnerOutput
|
||||
from vllm.worker.worker_base import WorkerWrapperBase
|
||||
@ -365,7 +365,10 @@ class WorkerProc:
|
||||
}
|
||||
wrapper.init_worker(all_kwargs)
|
||||
self.worker = wrapper
|
||||
|
||||
bind_process_name(
|
||||
self.worker.worker.__class__.__name__,
|
||||
f"TP{self.rank}_DP{vllm_config.parallel_config.data_parallel_rank}"
|
||||
)
|
||||
pid = os.getpid()
|
||||
_add_prefix(sys.stdout, f"VllmWorker rank={rank}", pid)
|
||||
_add_prefix(sys.stderr, f"VllmWorker rank={rank}", pid)
|
||||
|
||||
@ -15,8 +15,8 @@ import torch
|
||||
from vllm.logger import init_logger
|
||||
from vllm.usage.usage_lib import (UsageContext, is_usage_stats_enabled,
|
||||
usage_message)
|
||||
from vllm.utils import (get_open_port, get_open_zmq_ipc_path, get_tcp_uri,
|
||||
kill_process_tree)
|
||||
from vllm.utils import (bind_process_name, get_open_port,
|
||||
get_open_zmq_ipc_path, get_tcp_uri, kill_process_tree)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from vllm.v1.engine.coordinator import DPCoordinator
|
||||
@ -144,7 +144,7 @@ class APIServerProcessManager:
|
||||
self.listen_address = listen_address
|
||||
self.sock = sock
|
||||
self.args = args
|
||||
|
||||
bind_process_name(self.__class__.__name__)
|
||||
# Start API servers
|
||||
spawn_context = multiprocessing.get_context("spawn")
|
||||
self.processes: list[BaseProcess] = []
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user