Signed-off-by: Robert Shaw <rshaw@neuralmagic.com>
This commit is contained in:
Robert Shaw 2025-03-22 13:22:00 -04:00
parent a8a621e419
commit 9f7fb5ec84
2 changed files with 18 additions and 7 deletions

View File

@ -298,7 +298,6 @@ if __name__ == "__main__":
required=True, required=True,
help="The zmq ipc decode address") help="The zmq ipc decode address")
parser = make_arg_parser(parser) parser = make_arg_parser(parser)
args = parser.parse_args() args = parser.parse_args()
validate_parsed_serve_args(args) validate_parsed_serve_args(args)

View File

@ -36,6 +36,7 @@ from vllm.engine.multiprocessing.client import MQLLMEngineClient
from vllm.engine.multiprocessing.engine import run_mp_engine from vllm.engine.multiprocessing.engine import run_mp_engine
from vllm.engine.protocol import EngineClient from vllm.engine.protocol import EngineClient
from vllm.entrypoints.chat_utils import load_chat_template from vllm.entrypoints.chat_utils import load_chat_template
from vllm.entrypoints.disaggregated.engine import PDEngine
from vllm.entrypoints.launcher import serve_http from vllm.entrypoints.launcher import serve_http
from vllm.entrypoints.logger import RequestLogger from vllm.entrypoints.logger import RequestLogger
from vllm.entrypoints.openai.cli_args import (make_arg_parser, from vllm.entrypoints.openai.cli_args import (make_arg_parser,
@ -134,19 +135,31 @@ async def lifespan(app: FastAPI):
async def build_async_engine_client( async def build_async_engine_client(
args: Namespace) -> AsyncIterator[EngineClient]: args: Namespace) -> AsyncIterator[EngineClient]:
# Context manager to handle engine_client lifecycle # Case 1: We are running a P/D Connector.
# Ensures everything is shutdown and cleaned up on error/exit # The Engines may be running on another node.
engine_args = AsyncEngineArgs.from_cli_args(args) if hasattr(args, "connector_addr"):
async with PDEngine(
prefill_addr=args.prefill_addr,
decode_addr=args.decode_addr,
connector_addr=args.connector_addr) as engine:
yield engine
engine.shutdown()
async with build_async_engine_client_from_engine_args( # Case 2: We are running an actual Engine from this process.
else:
# Context manager to handle engine_client lifecycle
# Ensures everything is shutdown and cleaned up on error/exit
engine_args = AsyncEngineArgs.from_cli_args(args)
async with build_async_engine_client_from_engine_args(
engine_args, args.disable_frontend_multiprocessing) as engine: engine_args, args.disable_frontend_multiprocessing) as engine:
yield engine yield engine
@asynccontextmanager @asynccontextmanager
async def build_async_engine_client_from_engine_args( async def build_async_engine_client_from_engine_args(
engine_args: AsyncEngineArgs, engine_args: AsyncEngineArgs,
disable_frontend_multiprocessing: bool = False, disable_frontend_multiprocessing: bool = False,
deploy_disagg_connector: bool = False,
) -> AsyncIterator[EngineClient]: ) -> AsyncIterator[EngineClient]:
""" """
Create EngineClient, either: Create EngineClient, either:
@ -160,7 +173,6 @@ async def build_async_engine_client_from_engine_args(
usage_context = UsageContext.OPENAI_API_SERVER usage_context = UsageContext.OPENAI_API_SERVER
vllm_config = engine_args.create_engine_config(usage_context=usage_context) vllm_config = engine_args.create_engine_config(usage_context=usage_context)
# V1 AsyncLLM.
if envs.VLLM_USE_V1: if envs.VLLM_USE_V1:
if disable_frontend_multiprocessing: if disable_frontend_multiprocessing:
logger.warning( logger.warning(