vllm/vllm/entrypoints/renderer.py
2025-09-07 08:26:48 +00:00

225 lines
8.8 KiB
Python

# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import asyncio
from abc import ABC, abstractmethod
from typing import Annotated, Optional, Union
from pydantic import Field
from vllm.config import ModelConfig
from vllm.inputs.data import TokensPrompt as EngineTokensPrompt
from vllm.inputs.parse import parse_and_batch_prompt
from vllm.transformers_utils.tokenizer import AnyTokenizer
from vllm.utils import AsyncMicrobatchTokenizer
class BaseRenderer(ABC):
"""
Base class for unified input processing and rendering.
The Renderer serves as a unified input processor that consolidates
tokenization, chat template formatting, and multimodal input handling
into a single component.
It converts high-level API requests (OpenAI-style JSON) into token IDs and
multimodal features ready for engine consumption.
Key responsibilities:
- Convert text prompts to token sequences with proper special tokens
- Apply chat templates and format conversations
- Handle multimodal inputs (images, audio, etc.) when applicable
- Manage prompt truncation and length validation
- Provide clean separation between API layer and engine core
"""
def __init__(
self,
model_config: ModelConfig,
tokenizer: Optional[AnyTokenizer] = None,
):
super().__init__()
self.model_config = model_config
self.tokenizer = tokenizer
@abstractmethod
async def render_prompt(
self,
prompt_or_prompts: Union[str, list[str], list[int], list[list[int]]],
max_length: Optional[int] = None,
truncate_prompt_tokens: Optional[Annotated[int, Field(ge=-1)]] = None,
add_special_tokens: Optional[bool] = True,
cache_salt: Optional[str] = None,
) -> list[EngineTokensPrompt]:
"""
Convert input prompts into tokenized format for engine processing.
This is the core method that transforms various input formats into
standardized TokensPrompt objects. Implementations should handle
tokenization, special token insertion, truncation, and validation
according to model requirements.
Args:
prompt_or_prompts: Input data in various formats:
- str: Single text prompt
- list[str]: Batch of text prompts
- list[int]: Pre-tokenized sequence
- list[list[int]]: Batch of pre-tokenized sequences
max_length: Maximum sequence length (endpoint-specific behavior)
truncate_prompt_tokens: Truncate to last N tokens
(None=no truncation, 0=empty)
add_special_tokens: Add model-specific tokens (e.g., [CLS], [SEP])
to text inputs
cache_salt: Optional string to disambiguate cached prompts
Returns:
list[EngineTokensPrompt]: Tokenized prompts ready for engine
consumption
Raises:
ValueError: If input format is invalid or length limits exceeded
"""
raise NotImplementedError
class CompletionRenderer(BaseRenderer):
def __init__(
self,
model_config: ModelConfig,
tokenizer: Optional[AnyTokenizer] = None,
async_tokenizer_pool: Optional[dict[AnyTokenizer,
AsyncMicrobatchTokenizer]] = None,
):
super().__init__(model_config, tokenizer)
self.async_tokenizer_pool = async_tokenizer_pool or {}
self.async_tokenizer: Optional[AsyncMicrobatchTokenizer] = None
async def render_prompt(
self,
prompt_or_prompts: Union[str, list[str], list[int], list[list[int]]],
max_length: Optional[int] = None,
truncate_prompt_tokens: Optional[Annotated[int, Field(ge=-1)]] = None,
add_special_tokens: Optional[bool] = True,
cache_salt: Optional[str] = None,
) -> list[EngineTokensPrompt]:
"""Implementation of prompt rendering for completion-style requests.
Uses async tokenizer pooling for improved performance. See base class
for detailed parameter documentation.
"""
if truncate_prompt_tokens is not None:
if truncate_prompt_tokens == 0:
return []
if truncate_prompt_tokens < 0:
truncate_prompt_tokens = self.model_config.max_model_len
if max_length is not None and truncate_prompt_tokens > max_length:
raise ValueError(
f"truncate_prompt_tokens ({truncate_prompt_tokens}) "
f"cannot be greater than max_length ({max_length}). "
f"Please select a smaller truncation size.")
# Parse and batch the input prompts
batch_inputs = parse_and_batch_prompt(prompt_or_prompts)
rendered_prompts: list[EngineTokensPrompt] = []
tokenize_tasks = []
for prompt_input in batch_inputs:
if prompt_input["is_tokens"] is True:
# Token input
token_ids = self._maybe_apply_truncation(
prompt_input["content"], truncate_prompt_tokens)
rendered_prompts.append(
self._create_tokens_prompt(token_ids, max_length,
cache_salt))
else:
# Text input
tokenize_task = asyncio.create_task(
self._tokenize(prompt_input["content"], max_length,
truncate_prompt_tokens, add_special_tokens,
cache_salt))
tokenize_tasks.append(tokenize_task)
# Wait for all text tokenization to finish
if tokenize_tasks:
tokenized_text_prompts = await asyncio.gather(*tokenize_tasks)
rendered_prompts.extend(tokenized_text_prompts)
return rendered_prompts
def _maybe_apply_truncation(
self, token_ids: list[int],
truncate_prompt_tokens: Optional[int]) -> list[int]:
"""Apply truncation to token sequence."""
if truncate_prompt_tokens is None:
return token_ids
if truncate_prompt_tokens >= len(token_ids):
return token_ids
return token_ids[-truncate_prompt_tokens:]
async def _tokenize(
self,
text: str,
max_length: Optional[int],
truncate_prompt_tokens: Optional[int],
add_special_tokens: Optional[bool],
cache_salt: Optional[str],
) -> EngineTokensPrompt:
"""Tokenize text input asynchronously."""
async_tokenizer = self._get_async_tokenizer()
# Handle encoder-specific preprocessing
if (self.model_config.encoder_config is not None
and self.model_config.encoder_config.get(
"do_lower_case", False)):
text = text.lower()
# Tokenize texts
if truncate_prompt_tokens is None:
encoded = await async_tokenizer(
text, add_special_tokens=add_special_tokens)
else:
encoded = await async_tokenizer(
text,
add_special_tokens=add_special_tokens,
truncation=True,
max_length=truncate_prompt_tokens)
return self._create_tokens_prompt(encoded.input_ids, max_length,
cache_salt)
def _get_async_tokenizer(self) -> AsyncMicrobatchTokenizer:
"""Get or create async tokenizer using shared pool."""
if self.async_tokenizer is not None:
return self.async_tokenizer
if self.tokenizer is None:
raise ValueError(
"No tokenizer available for text input processing")
# Check shared pool first
if self.tokenizer in self.async_tokenizer_pool:
return self.async_tokenizer_pool[self.tokenizer]
# Create new async tokenizer and add to pool
self.async_tokenizer = AsyncMicrobatchTokenizer(self.tokenizer)
self.async_tokenizer_pool[self.tokenizer] = self.async_tokenizer
return self.async_tokenizer
def _create_tokens_prompt(
self,
token_ids: list[int],
max_length: Optional[int] = None,
cache_salt: Optional[str] = None,
) -> EngineTokensPrompt:
"""Create validated EngineTokensPrompt."""
if max_length is not None and len(token_ids) > max_length:
raise ValueError(
f"This maximum context length is {max_length} tokens. "
f"However, your request has {len(token_ids)} input tokens. "
"Please reduce the length of the input messages.")
tokens_prompt = EngineTokensPrompt(prompt_token_ids=token_ids)
if cache_salt is not None:
tokens_prompt["cache_salt"] = cache_salt
return tokens_prompt