diff --git a/vllm/entrypoints/cli/serve.py b/vllm/entrypoints/cli/serve.py index bdbe71b832f4..0305354a66e8 100644 --- a/vllm/entrypoints/cli/serve.py +++ b/vllm/entrypoints/cli/serve.py @@ -2,9 +2,7 @@ # SPDX-FileCopyrightText: Copyright contributors to the vLLM project import argparse -import os import signal -import sys from typing import Optional import uvloop @@ -18,10 +16,9 @@ from vllm.entrypoints.openai.cli_args import (make_arg_parser, validate_parsed_serve_args) from vllm.entrypoints.utils import (VLLM_SUBCMD_PARSER_EPILOG, show_filtered_argument_or_group_from_help) -from vllm.executor.multiproc_worker_utils import _add_prefix from vllm.logger import init_logger from vllm.usage.usage_lib import UsageContext -from vllm.utils import FlexibleArgumentParser, get_tcp_uri +from vllm.utils import FlexibleArgumentParser, decorate_logs, get_tcp_uri from vllm.v1.engine.core import EngineCoreProc from vllm.v1.engine.utils import CoreEngineProcManager, launch_core_engines from vllm.v1.executor.abstract import Executor @@ -229,11 +226,7 @@ def run_api_server_worker_proc(listen_address, """Entrypoint for individual API server worker processes.""" # Add process-specific prefix to stdout and stderr. - from multiprocessing import current_process - process_name = current_process().name - pid = os.getpid() - _add_prefix(sys.stdout, process_name, pid) - _add_prefix(sys.stderr, process_name, pid) + decorate_logs() uvloop.run( run_server_worker(listen_address, sock, args, client_config, diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 05d9a69a65f8..26db1357da4d 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -11,7 +11,6 @@ import multiprocessing import os import signal import socket -import sys import tempfile import uuid from argparse import Namespace @@ -95,15 +94,15 @@ from vllm.entrypoints.openai.serving_transcription import ( from vllm.entrypoints.openai.tool_parsers import ToolParserManager from vllm.entrypoints.utils import (cli_env_setup, load_aware_call, log_non_default_args, with_cancellation) -from vllm.executor.multiproc_worker_utils import _add_prefix from vllm.logger import init_logger from vllm.reasoning import ReasoningParserManager from vllm.transformers_utils.config import ( maybe_register_config_serialize_by_value) from vllm.transformers_utils.tokenizer import MistralTokenizer from vllm.usage.usage_lib import UsageContext -from vllm.utils import (Device, FlexibleArgumentParser, get_open_zmq_ipc_path, - is_valid_ipv6_address, set_process_title, set_ulimit) +from vllm.utils import (Device, FlexibleArgumentParser, decorate_logs, + get_open_zmq_ipc_path, is_valid_ipv6_address, + set_process_title, set_ulimit) from vllm.v1.metrics.prometheus import get_prometheus_registry from vllm.version import __version__ as VLLM_VERSION @@ -1808,10 +1807,7 @@ async def run_server(args, **uvicorn_kwargs) -> None: """Run a single-worker API server.""" # Add process-specific prefix to stdout and stderr. - process_name = "APIServer" - pid = os.getpid() - _add_prefix(sys.stdout, process_name, pid) - _add_prefix(sys.stderr, process_name, pid) + decorate_logs("APIServer") listen_address, sock = setup_server(args) await run_server_worker(listen_address, sock, args, **uvicorn_kwargs) diff --git a/vllm/executor/multiproc_worker_utils.py b/vllm/executor/multiproc_worker_utils.py index a6c172beff7b..48b3479ed799 100644 --- a/vllm/executor/multiproc_worker_utils.py +++ b/vllm/executor/multiproc_worker_utils.py @@ -3,21 +3,20 @@ import asyncio import os -import sys import threading import uuid from dataclasses import dataclass from multiprocessing import Queue from multiprocessing.connection import wait from multiprocessing.process import BaseProcess -from typing import (Any, Callable, Dict, Generic, List, Optional, TextIO, - TypeVar, Union) +from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar, Union import torch from vllm.config import VllmConfig from vllm.logger import init_logger -from vllm.utils import _maybe_force_spawn, get_mp_context, run_method +from vllm.utils import (_maybe_force_spawn, decorate_logs, get_mp_context, + run_method) logger = init_logger(__name__) @@ -25,10 +24,6 @@ T = TypeVar('T') _TERMINATE = "TERMINATE" # sentinel -# ANSI color codes -CYAN = '\033[1;36m' -RESET = '\033[0;0m' - JOIN_TIMEOUT_S = 2 @@ -213,9 +208,7 @@ def _run_worker_process( # Add process-specific prefix to stdout and stderr process_name = get_mp_context().current_process().name - pid = os.getpid() - _add_prefix(sys.stdout, process_name, pid) - _add_prefix(sys.stderr, process_name, pid) + decorate_logs(process_name) # Initialize worker worker = worker_factory(vllm_config, rank) @@ -260,33 +253,6 @@ def _run_worker_process( logger.info("Worker exiting") -def _add_prefix(file: TextIO, worker_name: str, pid: int) -> None: - """Prepend each output line with process-specific prefix""" - - prefix = f"{CYAN}({worker_name} pid={pid}){RESET} " - file_write = file.write - - def write_with_prefix(s: str): - if not s: - return - if file.start_new_line: # type: ignore[attr-defined] - file_write(prefix) - idx = 0 - while (next_idx := s.find('\n', idx)) != -1: - next_idx += 1 - file_write(s[idx:next_idx]) - if next_idx == len(s): - file.start_new_line = True # type: ignore[attr-defined] - return - file_write(prefix) - idx = next_idx - file_write(s[idx:]) - file.start_new_line = False # type: ignore[attr-defined] - - file.start_new_line = True # type: ignore[attr-defined] - file.write = write_with_prefix # type: ignore[method-assign] - - def set_multiprocessing_worker_envs(parallel_config): """ Set up environment variables that should be used when there are workers in a multiprocessing environment. This should be called by the parent diff --git a/vllm/utils/__init__.py b/vllm/utils/__init__.py index a7f579b0c9c2..d5d8d9dad73a 100644 --- a/vllm/utils/__init__.py +++ b/vllm/utils/__init__.py @@ -47,7 +47,7 @@ from dataclasses import dataclass, field from functools import cache, lru_cache, partial, wraps from types import MappingProxyType from typing import (TYPE_CHECKING, Any, Callable, Generic, Literal, NamedTuple, - Optional, Tuple, TypeVar, Union, cast, overload) + Optional, TextIO, Tuple, TypeVar, Union, cast, overload) from urllib.parse import urlparse from uuid import uuid4 @@ -167,6 +167,10 @@ GB_bytes = 1_000_000_000 GiB_bytes = 1 << 30 """The number of bytes in one gibibyte (GiB).""" +# ANSI color codes +CYAN = '\033[1;36m' +RESET = '\033[0;0m' + STR_DTYPE_TO_TORCH_DTYPE = { "half": torch.half, "bfloat16": torch.bfloat16, @@ -3258,3 +3262,52 @@ def set_process_title(name: str, else: name = f"{envs.VLLM_PROCESS_NAME_PREFIX}::{name}" setproctitle.setproctitle(name) + + +def _add_prefix(file: TextIO, worker_name: str, pid: int) -> None: + """Prepend each output line with process-specific prefix""" + + prefix = f"{CYAN}({worker_name} pid={pid}){RESET} " + file_write = file.write + + def write_with_prefix(s: str): + if not s: + return + if file.start_new_line: # type: ignore[attr-defined] + file_write(prefix) + idx = 0 + while (next_idx := s.find('\n', idx)) != -1: + next_idx += 1 + file_write(s[idx:next_idx]) + if next_idx == len(s): + file.start_new_line = True # type: ignore[attr-defined] + return + file_write(prefix) + idx = next_idx + file_write(s[idx:]) + file.start_new_line = False # type: ignore[attr-defined] + + file.start_new_line = True # type: ignore[attr-defined] + file.write = write_with_prefix # type: ignore[method-assign] + + +def decorate_logs(process_name: Optional[str] = None) -> None: + """ + Adds a process-specific prefix to each line of output written to stdout and + stderr. + + This function is intended to be called before initializing the api_server, + engine_core, or worker classes, so that all subsequent output from the + process is prefixed with the process name and PID. This helps distinguish + log output from different processes in multi-process environments. + + Args: + process_name: Optional; the name of the process to use in the prefix. + If not provided, the current process name from the multiprocessing + context is used. + """ + if process_name is None: + process_name = get_mp_context().current_process().name + pid = os.getpid() + _add_prefix(sys.stdout, process_name, pid) + _add_prefix(sys.stderr, process_name, pid) diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index f9a6315df8af..6ae5736df98b 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -3,7 +3,6 @@ import os import queue import signal -import sys import threading import time from collections import deque @@ -19,15 +18,14 @@ import zmq from vllm.config import ParallelConfig, VllmConfig from vllm.distributed import stateless_destroy_torch_distributed_process_group -from vllm.executor.multiproc_worker_utils import _add_prefix from vllm.logger import init_logger from vllm.logging_utils.dump_input import dump_engine_exception from vllm.lora.request import LoRARequest from vllm.tasks import POOLING_TASKS, SupportedTask from vllm.transformers_utils.config import ( maybe_register_config_serialize_by_value) -from vllm.utils import (make_zmq_socket, resolve_obj_by_qualname, - set_process_title) +from vllm.utils import (decorate_logs, make_zmq_socket, + resolve_obj_by_qualname, set_process_title) from vllm.v1.core.kv_cache_utils import (get_kv_cache_config, unify_kv_cache_configs) from vllm.v1.core.sched.interface import SchedulerInterface @@ -649,12 +647,14 @@ class EngineCoreProc(EngineCore): "vllm_config"].parallel_config if parallel_config.data_parallel_size > 1 or dp_rank > 0: set_process_title("DPEngineCore", str(dp_rank)) + decorate_logs() # Set data parallel rank for this engine process. parallel_config.data_parallel_rank = dp_rank parallel_config.data_parallel_rank_local = local_dp_rank engine_core = DPEngineCoreProc(*args, **kwargs) else: set_process_title("EngineCore") + decorate_logs() engine_core = EngineCoreProc(*args, **kwargs) engine_core.run_busy_loop() @@ -905,8 +905,6 @@ class DPEngineCoreProc(EngineCoreProc): log_stats: bool, client_handshake_address: Optional[str] = None, ): - self._decorate_logs() - # Counts forward-passes of the model so that we can synchronize # finished with DP peers every N steps. self.counter = 0 @@ -919,15 +917,6 @@ class DPEngineCoreProc(EngineCoreProc): executor_class, log_stats, client_handshake_address, dp_rank) - def _decorate_logs(self): - # Add process-specific prefix to stdout and stderr before - # we initialize the engine. - from multiprocessing import current_process - process_name = current_process().name - pid = os.getpid() - _add_prefix(sys.stdout, process_name, pid) - _add_prefix(sys.stderr, process_name, pid) - def _init_data_parallel(self, vllm_config: VllmConfig): # Configure GPUs and stateless process group for data parallel. @@ -1149,9 +1138,6 @@ class DPEngineCoreActor(DPEngineCoreProc): f"{(local_dp_rank + 1) * world_size}) " f"base value: \"{os.getenv(device_control_env_var)}\"") from e - def _decorate_logs(self): - pass - @contextmanager def _perform_handshakes(self, handshake_address: str, identity: bytes, local_client: bool, vllm_config: VllmConfig, diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index 827038505385..d90051c3224f 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -4,7 +4,6 @@ import multiprocessing import os import pickle import signal -import sys import threading import time import traceback @@ -28,10 +27,11 @@ from vllm.distributed.device_communicators.shm_broadcast import (Handle, MessageQueue) from vllm.distributed.kv_transfer.kv_connector.utils import KVOutputAggregator from vllm.executor.multiproc_worker_utils import ( - _add_prefix, set_multiprocessing_worker_envs) + set_multiprocessing_worker_envs) from vllm.logger import init_logger -from vllm.utils import (get_distributed_init_method, get_loopback_ip, - get_mp_context, get_open_port, set_process_title) +from vllm.utils import (decorate_logs, get_distributed_init_method, + get_loopback_ip, get_mp_context, get_open_port, + set_process_title) from vllm.v1.executor.abstract import Executor, FailureCallback from vllm.v1.outputs import ModelRunnerOutput from vllm.worker.worker_base import WorkerWrapperBase @@ -382,11 +382,11 @@ class WorkerProc: pp_str = f"PP{rank // tp_size}" if pp_size > 1 else "" tp_str = f"TP{rank % tp_size}" if tp_size > 1 else "" suffix = f"{pp_str}{'_' if pp_str and tp_str else ''}{tp_str}" + process_name = "VllmWorker" if suffix: set_process_title(suffix, append=True) - pid = os.getpid() - _add_prefix(sys.stdout, f"VllmWorker rank={rank}", pid) - _add_prefix(sys.stderr, f"VllmWorker rank={rank}", pid) + process_name = f"{process_name} {suffix}" + decorate_logs(process_name) # Initialize MessageQueue for receiving SchedulerOutput self.rpc_broadcast_mq = MessageQueue.create_from_handle(