From 9f7fb5ec846e6349eb5beedd698954ec95058c56 Mon Sep 17 00:00:00 2001 From: Robert Shaw Date: Sat, 22 Mar 2025 13:22:00 -0400 Subject: [PATCH] updated Signed-off-by: Robert Shaw --- .../disaggregated/{connector.py => engine.py} | 1 - vllm/entrypoints/openai/api_server.py | 24 ++++++++++++++----- 2 files changed, 18 insertions(+), 7 deletions(-) rename vllm/entrypoints/disaggregated/{connector.py => engine.py} (99%) diff --git a/vllm/entrypoints/disaggregated/connector.py b/vllm/entrypoints/disaggregated/engine.py similarity index 99% rename from vllm/entrypoints/disaggregated/connector.py rename to vllm/entrypoints/disaggregated/engine.py index ce2e3311eec19..b3992234ecc25 100644 --- a/vllm/entrypoints/disaggregated/connector.py +++ b/vllm/entrypoints/disaggregated/engine.py @@ -298,7 +298,6 @@ if __name__ == "__main__": required=True, help="The zmq ipc decode address") parser = make_arg_parser(parser) - args = parser.parse_args() validate_parsed_serve_args(args) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index f9b1d69a31d8c..297f51ccf69e6 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -36,6 +36,7 @@ from vllm.engine.multiprocessing.client import MQLLMEngineClient from vllm.engine.multiprocessing.engine import run_mp_engine from vllm.engine.protocol import EngineClient from vllm.entrypoints.chat_utils import load_chat_template +from vllm.entrypoints.disaggregated.engine import PDEngine from vllm.entrypoints.launcher import serve_http from vllm.entrypoints.logger import RequestLogger from vllm.entrypoints.openai.cli_args import (make_arg_parser, @@ -134,19 +135,31 @@ async def lifespan(app: FastAPI): async def build_async_engine_client( args: Namespace) -> AsyncIterator[EngineClient]: - # Context manager to handle engine_client lifecycle - # Ensures everything is shutdown and cleaned up on error/exit - engine_args = AsyncEngineArgs.from_cli_args(args) + # Case 1: We are running a P/D Connector. + # The Engines may be running on another node. + if hasattr(args, "connector_addr"): + async with PDEngine( + prefill_addr=args.prefill_addr, + decode_addr=args.decode_addr, + connector_addr=args.connector_addr) as engine: + yield engine + engine.shutdown() - async with build_async_engine_client_from_engine_args( + # Case 2: We are running an actual Engine from this process. + else: + # Context manager to handle engine_client lifecycle + # Ensures everything is shutdown and cleaned up on error/exit + engine_args = AsyncEngineArgs.from_cli_args(args) + async with build_async_engine_client_from_engine_args( engine_args, args.disable_frontend_multiprocessing) as engine: - yield engine + yield engine @asynccontextmanager async def build_async_engine_client_from_engine_args( engine_args: AsyncEngineArgs, disable_frontend_multiprocessing: bool = False, + deploy_disagg_connector: bool = False, ) -> AsyncIterator[EngineClient]: """ Create EngineClient, either: @@ -160,7 +173,6 @@ async def build_async_engine_client_from_engine_args( usage_context = UsageContext.OPENAI_API_SERVER vllm_config = engine_args.create_engine_config(usage_context=usage_context) - # V1 AsyncLLM. if envs.VLLM_USE_V1: if disable_frontend_multiprocessing: logger.warning(