[Feature][Response API] Add streaming support for non-harmony (#23741)

Signed-off-by: Kebe <mail@kebe7jun.com>
This commit is contained in:
Kebe 2025-09-04 18:49:06 +09:00 committed by GitHub
parent 369a079568
commit 8f423e5f43
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 406 additions and 76 deletions

View File

@ -2,6 +2,7 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project # SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import openai # use the official client for correctness check import openai # use the official client for correctness check
import openai.types.responses as openai_responses_types
import pytest import pytest
@ -86,3 +87,18 @@ async def test_logprobs(client: openai.AsyncOpenAI):
outputs = response.output outputs = response.output
assert outputs[-1].content[-1].logprobs assert outputs[-1].content[-1].logprobs
assert len(outputs[-1].content[-1].logprobs[0].top_logprobs) == 5 assert len(outputs[-1].content[-1].logprobs[0].top_logprobs) == 5
@pytest.mark.asyncio
async def test_streaming(client: openai.AsyncOpenAI):
stream = await client.responses.create(
input="What is 13 * 24?",
stream=True,
)
events = [event async for event in stream]
assert isinstance(events[0], openai_responses_types.ResponseCreatedEvent)
assert any(
isinstance(event, openai_responses_types.ResponseTextDeltaEvent)
for event in events)
assert isinstance(events[-1],
openai_responses_types.ResponseCompletedEvent)

View File

@ -49,9 +49,19 @@ class SimpleContext(ConversationContext):
def __init__(self): def __init__(self):
self.last_output = None self.last_output = None
self.num_prompt_tokens = 0
self.num_output_tokens = 0
self.num_cached_tokens = 0
# todo num_reasoning_tokens is not implemented yet.
self.num_reasoning_tokens = 0
def append_output(self, output) -> None: def append_output(self, output) -> None:
self.last_output = output self.last_output = output
if not isinstance(output, RequestOutput):
raise ValueError("SimpleContext only supports RequestOutput.")
self.num_prompt_tokens = len(output.prompt_token_ids or [])
self.num_cached_tokens = output.num_cached_tokens or 0
self.num_output_tokens += len(output.outputs[0].token_ids or [])
def need_builtin_tool_call(self) -> bool: def need_builtin_tool_call(self) -> bool:
return False return False

View File

@ -4,6 +4,7 @@
import asyncio import asyncio
import json import json
import time import time
import uuid
from collections import deque from collections import deque
from collections.abc import AsyncGenerator, AsyncIterator, Sequence from collections.abc import AsyncGenerator, AsyncIterator, Sequence
from contextlib import AsyncExitStack from contextlib import AsyncExitStack
@ -25,7 +26,8 @@ from openai.types.responses import (ResponseCreatedEvent,
ResponseOutputMessage, ResponseOutputText, ResponseOutputMessage, ResponseOutputText,
ResponseReasoningItem, ResponseReasoningItem,
ResponseReasoningTextDeltaEvent, ResponseReasoningTextDeltaEvent,
ResponseReasoningTextDoneEvent) ResponseReasoningTextDoneEvent,
response_text_delta_event)
from openai.types.responses.response_output_text import (Logprob, from openai.types.responses.response_output_text import (Logprob,
LogprobTopLogprob) LogprobTopLogprob)
# yapf: enable # yapf: enable
@ -47,7 +49,7 @@ from vllm.entrypoints.harmony_utils import (
from vllm.entrypoints.logger import RequestLogger from vllm.entrypoints.logger import RequestLogger
# yapf conflicts with isort for this block # yapf conflicts with isort for this block
# yapf: disable # yapf: disable
from vllm.entrypoints.openai.protocol import (ErrorResponse, from vllm.entrypoints.openai.protocol import (DeltaMessage, ErrorResponse,
InputTokensDetails, InputTokensDetails,
OutputTokensDetails, OutputTokensDetails,
RequestResponseMetadata, RequestResponseMetadata,
@ -459,10 +461,6 @@ class OpenAIServingResponses(OpenAIServing):
assert isinstance(context, HarmonyContext) assert isinstance(context, HarmonyContext)
output = self._make_response_output_items_with_harmony(context) output = self._make_response_output_items_with_harmony(context)
# TODO: these are all 0 for now! # TODO: these are all 0 for now!
num_prompt_tokens = context.num_prompt_tokens
num_generated_tokens = context.num_output_tokens
num_cached_tokens = context.num_cached_tokens
num_reasoning_tokens = context.num_reasoning_tokens
else: else:
assert isinstance(context, SimpleContext) assert isinstance(context, SimpleContext)
final_res = context.last_output final_res = context.last_output
@ -475,10 +473,11 @@ class OpenAIServingResponses(OpenAIServing):
# Calculate usage. # Calculate usage.
assert final_res.prompt_token_ids is not None assert final_res.prompt_token_ids is not None
num_prompt_tokens = len(final_res.prompt_token_ids) assert isinstance(context, (SimpleContext, HarmonyContext))
num_generated_tokens = len(final_output.token_ids) num_prompt_tokens = context.num_prompt_tokens
num_cached_tokens = final_res.num_cached_tokens num_generated_tokens = context.num_output_tokens
num_reasoning_tokens = 0 num_cached_tokens = context.num_cached_tokens
num_reasoning_tokens = context.num_reasoning_tokens
usage = ResponseUsage( usage = ResponseUsage(
input_tokens=num_prompt_tokens, input_tokens=num_prompt_tokens,
@ -553,6 +552,28 @@ class OpenAIServingResponses(OpenAIServing):
)) ))
return out return out
def _create_stream_response_logprobs(
self,
token_ids: Sequence[int],
logprobs: Optional[SampleLogprobs],
tokenizer: AnyTokenizer,
top_logprobs: Optional[int] = None
) -> list[response_text_delta_event.Logprob]:
lgs = self._create_response_logprobs(token_ids=token_ids,
logprobs=logprobs,
tokenizer=tokenizer,
top_logprobs=top_logprobs)
return [
response_text_delta_event.Logprob(
token=lg.token,
logprob=lg.logprob,
top_logprobs=[
response_text_delta_event.LogprobTopLogprob(
token=tl.token, logprob=tl.logprob)
for tl in lg.top_logprobs
]) for lg in lgs
]
def _make_response_output_items( def _make_response_output_items(
self, self,
request: ResponsesRequest, request: ResponsesRequest,
@ -912,7 +933,7 @@ class OpenAIServingResponses(OpenAIServing):
status_code=HTTPStatus.BAD_REQUEST, status_code=HTTPStatus.BAD_REQUEST,
) )
async def _process_streaming_events( async def _process_simple_streaming_events(
self, self,
request: ResponsesRequest, request: ResponsesRequest,
sampling_params: SamplingParams, sampling_params: SamplingParams,
@ -922,47 +943,292 @@ class OpenAIServingResponses(OpenAIServing):
tokenizer: AnyTokenizer, tokenizer: AnyTokenizer,
request_metadata: RequestResponseMetadata, request_metadata: RequestResponseMetadata,
created_time: int, created_time: int,
_send_event: Callable[[BaseModel], str],
) -> AsyncGenerator[str, None]: ) -> AsyncGenerator[str, None]:
sequence_number = 0 current_content_index = 0
current_output_index = 0
current_item_id = ""
reasoning_parser = None
if self.reasoning_parser:
reasoning_parser = self.reasoning_parser(tokenizer)
previous_text = ""
previous_token_ids: list[int] = []
first_delta_sent = False
previous_delta_messages: list[DeltaMessage] = []
async for ctx in result_generator:
assert isinstance(ctx, SimpleContext)
if ctx.last_output is None:
continue
if ctx.last_output.outputs:
output = ctx.last_output.outputs[0]
if reasoning_parser:
delta_message = \
reasoning_parser.extract_reasoning_content_streaming(
previous_text=previous_text,
current_text=previous_text + output.text,
delta_text=output.text,
previous_token_ids=previous_token_ids,
current_token_ids=previous_token_ids +
output.token_ids,
delta_token_ids=output.token_ids,
)
else:
delta_message = DeltaMessage(content=output.text, )
previous_text += output.text
previous_token_ids += output.token_ids
if not delta_message:
continue
if not first_delta_sent:
current_item_id = str(uuid.uuid4())
if delta_message.reasoning_content:
yield _send_event(
openai_responses_types.
ResponseOutputItemAddedEvent(
type="response.output_item.added",
sequence_number=-1,
output_index=current_output_index,
item=openai_responses_types.
ResponseReasoningItem(
type="reasoning",
id=current_item_id,
summary=[],
status="in_progress",
),
))
else:
yield _send_event(
openai_responses_types.
ResponseOutputItemAddedEvent(
type="response.output_item.added",
sequence_number=-1,
output_index=current_output_index,
item=openai_responses_types.
ResponseOutputMessage(
id=current_item_id,
type="message",
role="assistant",
content=[],
status="in_progress",
),
))
yield _send_event(
openai_responses_types.ResponseContentPartAddedEvent(
type="response.content_part.added",
sequence_number=-1,
output_index=current_output_index,
item_id=current_item_id,
content_index=current_content_index,
part=openai_responses_types.ResponseOutputText(
type="output_text",
text="",
annotations=[],
logprobs=[],
),
))
current_content_index += 1
first_delta_sent = True
# todo(kebe7jun) tool call support
def _send_event(event: BaseModel): # check delta message and previous delta message are
nonlocal sequence_number # same as content or reasoning content
# Set sequence_number if the event has this attribute if (previous_delta_messages
if hasattr(event, 'sequence_number'): and previous_delta_messages[-1].reasoning_content
event.sequence_number = sequence_number is not None and delta_message.content is not None):
sequence_number += 1 # from reasoning to normal content, send done
# Get event type from the event's type field if it exists # event for reasoning
event_type = getattr(event, 'type', 'unknown') reason_content = ''.join(
return (f"event: {event_type}\n" pm.reasoning_content for pm in previous_delta_messages
f"data: {event.model_dump_json(indent=None)}\n\n") if pm.reasoning_content is not None)
yield _send_event(
ResponseReasoningTextDoneEvent(
type="response.reasoning_text.done",
item_id=current_item_id,
sequence_number=-1,
output_index=current_output_index,
content_index=current_content_index,
text=reason_content,
))
current_content_index = 0
reasoning_item = ResponseReasoningItem(
type="reasoning",
content=[
ResponseReasoningTextContent(
text=reason_content,
type="reasoning_text",
),
],
status="completed",
id=current_item_id,
summary=[],
)
yield _send_event(
ResponseOutputItemDoneEvent(
type="response.output_item.done",
sequence_number=-1,
output_index=current_output_index,
item=reasoning_item,
))
yield _send_event(
openai_responses_types.ResponseOutputItemAddedEvent(
type="response.output_item.added",
sequence_number=-1,
output_index=current_output_index,
item=openai_responses_types.ResponseOutputMessage(
id=current_item_id,
type="message",
role="assistant",
content=[],
status="in_progress",
),
))
current_output_index += 1
current_item_id = str(uuid.uuid4())
yield _send_event(
openai_responses_types.ResponseContentPartAddedEvent(
type="response.content_part.added",
sequence_number=-1,
output_index=current_output_index,
item_id=current_item_id,
content_index=current_content_index,
part=openai_responses_types.ResponseOutputText(
type="output_text",
text="",
annotations=[],
logprobs=[],
),
))
current_content_index += 1
# reset previous delta messages
previous_delta_messages = []
if delta_message.reasoning_content is not None:
yield _send_event(
ResponseReasoningTextDeltaEvent(
type="response.reasoning_text.delta",
sequence_number=-1,
content_index=current_content_index,
output_index=current_output_index,
item_id=current_item_id,
delta=delta_message.reasoning_content,
))
elif delta_message.content is not None:
yield _send_event(
openai_responses_types.ResponseTextDeltaEvent(
type="response.output_text.delta",
sequence_number=-1,
content_index=current_content_index,
output_index=current_output_index,
item_id=current_item_id,
delta=delta_message.content,
logprobs=self._create_stream_response_logprobs(
token_ids=output.token_ids,
logprobs=output.logprobs,
tokenizer=tokenizer,
top_logprobs=request.top_logprobs,
) if request.is_include_output_logprobs() else [],
))
current_content_index += 1
previous_delta_messages.append(delta_message)
if previous_delta_messages:
if previous_delta_messages[-1].reasoning_content is not None:
reason_content = ''.join(pm.reasoning_content
for pm in previous_delta_messages
if pm.reasoning_content is not None)
yield _send_event(
ResponseReasoningTextDoneEvent(
type="response.reasoning_text.done",
item_id=current_item_id,
sequence_number=-1,
output_index=current_output_index,
content_index=current_content_index,
text=reason_content,
))
current_content_index += 1
reasoning_item = ResponseReasoningItem(
type="reasoning",
content=[
ResponseReasoningTextContent(
text=reason_content,
type="reasoning_text",
),
],
status="completed",
id=current_item_id,
summary=[],
)
yield _send_event(
ResponseOutputItemDoneEvent(
type="response.output_item.done",
sequence_number=-1,
output_index=current_output_index,
item=reasoning_item,
))
elif previous_delta_messages[-1].content is not None:
final_content = ''.join(pm.content
for pm in previous_delta_messages
if pm.content is not None)
yield _send_event(
openai_responses_types.ResponseTextDoneEvent(
type="response.output_text.done",
sequence_number=-1,
output_index=current_output_index,
content_index=current_content_index,
text=final_content,
logprobs=[],
item_id=current_item_id,
))
current_content_index += 1
part = ResponseOutputText(
text=final_content,
type="output_text",
annotations=[],
)
yield _send_event(
openai_responses_types.ResponseContentPartDoneEvent(
type="response.content_part.done",
sequence_number=-1,
item_id=current_item_id,
output_index=current_output_index,
content_index=current_content_index,
part=part,
))
current_content_index += 1
item = ResponseOutputMessage(
type="message",
role="assistant",
content=[
part,
],
status="completed",
id=current_item_id,
summary=[],
)
yield _send_event(
ResponseOutputItemDoneEvent(
type="response.output_item.done",
sequence_number=-1,
output_index=current_output_index,
item=item,
))
async def _process_harmony_streaming_events(
self,
request: ResponsesRequest,
sampling_params: SamplingParams,
result_generator: AsyncIterator[Optional[ConversationContext]],
context: ConversationContext,
model_name: str,
tokenizer: AnyTokenizer,
request_metadata: RequestResponseMetadata,
created_time: int,
_send_event: Callable[[BaseModel], str],
) -> AsyncGenerator[str, None]:
current_content_index = 0 # FIXME: this number is never changed current_content_index = 0 # FIXME: this number is never changed
current_output_index = 0 current_output_index = 0
current_item_id = "" # FIXME: this number is never changed current_item_id = "" # FIXME: this number is never changed
sent_output_item_added = False sent_output_item_added = False
initial_response = ResponsesResponse.from_request(
request,
sampling_params,
model_name=model_name,
created_time=created_time,
output=[],
status="in_progress",
usage=None,
).model_dump()
yield _send_event(
ResponseCreatedEvent(
type="response.created",
sequence_number=-1,
response=initial_response,
))
yield _send_event(
ResponseInProgressEvent(
type="response.in_progress",
sequence_number=-1,
response=initial_response,
))
async for ctx in result_generator: async for ctx in result_generator:
assert isinstance(ctx, StreamingHarmonyContext) assert isinstance(ctx, StreamingHarmonyContext)
@ -1312,29 +1578,6 @@ class OpenAIServingResponses(OpenAIServing):
), ),
)) ))
async def empty_async_generator():
# A hack to trick Python to think this is a generator but in fact
# it immediately returns.
if False:
yield
final_response = await self.responses_full_generator(
request,
sampling_params,
empty_async_generator(),
context,
model_name,
tokenizer,
request_metadata,
created_time=created_time,
)
yield _send_event(
openai_responses_types.ResponseCompletedEvent(
type="response.completed",
sequence_number=-1,
response=final_response.model_dump(),
))
async def responses_stream_generator( async def responses_stream_generator(
self, self,
request: ResponsesRequest, request: ResponsesRequest,
@ -1349,16 +1592,77 @@ class OpenAIServingResponses(OpenAIServing):
# TODO: # TODO:
# 1. Handle disconnect # 1. Handle disconnect
if not isinstance(context, StreamingHarmonyContext):
raise NotImplementedError(
"Streaming is not supported for responses API without Harmony."
)
created_time = created_time or int(time.time()) created_time = created_time or int(time.time())
sequence_number = 0
def _send_event(event: BaseModel):
nonlocal sequence_number
# Set sequence_number if the event has this attribute
if hasattr(event, 'sequence_number'):
event.sequence_number = sequence_number
sequence_number += 1
# Get event type from the event's type field if it exists
event_type = getattr(event, 'type', 'unknown')
return (f"event: {event_type}\n"
f"data: {event.model_dump_json(indent=None)}\n\n")
async with AsyncExitStack() as exit_stack: async with AsyncExitStack() as exit_stack:
await context.init_tool_sessions(self.tool_server, exit_stack) processer = None
async for event_data in self._process_streaming_events( if self.use_harmony:
request, sampling_params, result_generator, context, await context.init_tool_sessions(self.tool_server, exit_stack)
model_name, tokenizer, request_metadata, created_time): processer = self._process_harmony_streaming_events
else:
processer = self._process_simple_streaming_events
initial_response = ResponsesResponse.from_request(
request,
sampling_params,
model_name=model_name,
created_time=created_time,
output=[],
status="in_progress",
usage=None,
).model_dump()
yield _send_event(
ResponseCreatedEvent(
type="response.created",
sequence_number=-1,
response=initial_response,
))
yield _send_event(
ResponseInProgressEvent(
type="response.in_progress",
sequence_number=-1,
response=initial_response,
))
async for event_data in processer(request, sampling_params,
result_generator, context,
model_name, tokenizer,
request_metadata, created_time,
_send_event):
yield event_data yield event_data
async def empty_async_generator():
# A hack to trick Python to think this is a generator but
# in fact it immediately returns.
if False:
yield
final_response = await self.responses_full_generator(
request,
sampling_params,
empty_async_generator(),
context,
model_name,
tokenizer,
request_metadata,
created_time=created_time,
)
yield _send_event(
openai_responses_types.ResponseCompletedEvent(
type="response.completed",
sequence_number=-1,
response=final_response.model_dump(),
))