[gpt-oss] Support streaming in response API (#22431)

Signed-off-by: Chen Zhang <zhangch99@outlook.com>
This commit is contained in:
Chen Zhang 2025-08-11 17:46:59 -07:00 committed by GitHub
parent 458e74eb90
commit 95a935fc48
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -2,6 +2,7 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project # SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import asyncio import asyncio
import json
import time import time
from collections.abc import AsyncGenerator, AsyncIterator from collections.abc import AsyncGenerator, AsyncIterator
from contextlib import AsyncExitStack from contextlib import AsyncExitStack
@ -10,10 +11,22 @@ from http import HTTPStatus
from typing import Any, Callable, Final, Optional, Union from typing import Any, Callable, Final, Optional, Union
import jinja2 import jinja2
import openai.types.responses as openai_responses_types
from fastapi import Request from fastapi import Request
from openai.types.responses import (ResponseFunctionToolCall, from openai import BaseModel
ResponseOutputItem, ResponseOutputMessage, # yapf conflicts with isort for this block
ResponseOutputText, ResponseReasoningItem) # yapf: disable
from openai.types.responses import (ResponseContentPartDoneEvent,
ResponseCreatedEvent,
ResponseFunctionToolCall,
ResponseInProgressEvent,
ResponseOutputItem,
ResponseOutputItemDoneEvent,
ResponseOutputMessage, ResponseOutputText,
ResponseReasoningItem,
ResponseReasoningTextDeltaEvent,
ResponseReasoningTextDoneEvent)
# yapf: enable
from openai.types.responses.response_reasoning_item import ( from openai.types.responses.response_reasoning_item import (
Content as ResponseReasoningTextContent) Content as ResponseReasoningTextContent)
from openai_harmony import Message as OpenAIHarmonyMessage from openai_harmony import Message as OpenAIHarmonyMessage
@ -330,8 +343,15 @@ class OpenAIServingResponses(OpenAIServing):
return response return response
if request.stream: if request.stream:
raise NotImplementedError( return self.responses_stream_generator(
"Streaming responses are not supported") request,
sampling_params,
result_generator,
context,
model_name,
tokenizer,
request_metadata,
)
try: try:
return await self.responses_full_generator( return await self.responses_full_generator(
@ -744,3 +764,423 @@ class OpenAIServingResponses(OpenAIServing):
"starting the vLLM server."), "starting the vLLM server."),
status_code=HTTPStatus.BAD_REQUEST, status_code=HTTPStatus.BAD_REQUEST,
) )
async def responses_stream_generator(
self,
request: ResponsesRequest,
sampling_params: SamplingParams,
result_generator: AsyncIterator[Optional[ConversationContext]],
context: ConversationContext,
model_name: str,
tokenizer: AnyTokenizer,
request_metadata: RequestResponseMetadata,
created_time: Optional[int] = None,
) -> AsyncGenerator[str, None]:
# TODO:
# 1. Handle disconnect
if not isinstance(context, StreamingHarmonyContext):
raise NotImplementedError(
"Streaming is not supported for responses API without Harmony."
)
created_time = created_time or int(time.time())
sequence_number = 0
def _send_event(event: BaseModel):
nonlocal sequence_number
# Set sequence_number if the event has this attribute
if hasattr(event, 'sequence_number'):
event.sequence_number = sequence_number
sequence_number += 1
# Get event type from the event's type field if it exists
event_type = getattr(event, 'type', 'unknown')
return (f"event: {event_type}\n"
f"data: {event.model_dump_json(indent=None)}\n\n")
current_content_index = 0 # FIXME: this number is never changed
current_output_index = 0
current_item_id = "" # FIXME: this number is never changed
sent_output_item_added = False
initial_response = ResponsesResponse.from_request(
request,
sampling_params,
model_name=model_name,
created_time=created_time,
output=[],
status="in_progress",
usage=None,
).model_dump()
yield _send_event(
ResponseCreatedEvent(
type="response.created",
sequence_number=-1,
response=initial_response,
))
yield _send_event(
ResponseInProgressEvent(
type="response.in_progress",
sequence_number=-1,
response=initial_response,
))
async for ctx in result_generator:
assert isinstance(ctx, StreamingHarmonyContext)
if ctx.is_expecting_start():
current_output_index += 1
sent_output_item_added = False
if len(ctx.parser.messages) > 0:
previous_item = ctx.parser.messages[-1]
if previous_item.recipient is not None:
# Deal with tool call here
pass
elif previous_item.channel == "analysis":
reasoning_item = ResponseReasoningItem(
type="reasoning",
content=[
ResponseReasoningTextContent(
text=previous_item.content[0].text),
],
status="completed",
)
yield _send_event(
ResponseReasoningTextDoneEvent(
type="response.reasoning_text.done",
item_id=current_item_id,
sequence_number=-1,
output_index=current_output_index,
content_index=current_content_index,
text=previous_item.content[0].text,
))
yield _send_event(
ResponseContentPartDoneEvent(
type="response.content_part.done",
item_id=current_item_id,
sequence_number=-1,
output_index=current_output_index,
content_index=current_content_index,
part=reasoning_item,
))
yield _send_event(
ResponseOutputItemDoneEvent(
type="response.output_item.done",
sequence_number=-1,
output_index=current_output_index,
item=reasoning_item,
))
elif previous_item.channel == "final":
text_content = ResponseOutputText(
type="output_text",
text=previous_item.content[0].text,
annotations=[],
)
yield _send_event(
openai_responses_types.ResponseTextDoneEvent(
type="response.output_text.done",
sequence_number=-1,
output_index=current_output_index,
content_index=current_content_index,
text=previous_item.content[0].text,
logprobs=[],
item_id=current_item_id,
))
yield _send_event(
openai_responses_types.
ResponseContentPartDoneEvent(
type="response.content_part.done",
sequence_number=-1,
item_id=current_item_id,
output_index=current_output_index,
content_index=current_content_index,
part=text_content,
))
yield _send_event(
openai_responses_types.ResponseOutputItemDoneEvent(
type="response.output_item.done",
sequence_number=-1,
output_index=current_output_index,
item=ResponseOutputMessage(
id=current_item_id,
type="message",
role="assistant",
content=[text_content],
status="completed",
),
))
if ctx.parser.last_content_delta:
if (ctx.parser.current_channel == "final"
and ctx.parser.current_recipient is None):
if not sent_output_item_added:
sent_output_item_added = True
yield _send_event(
openai_responses_types.
ResponseOutputItemAddedEvent(
type="response.output_item.added",
sequence_number=-1,
output_index=current_output_index,
item=openai_responses_types.
ResponseOutputMessage(
id=current_item_id,
type="message",
role="assistant",
content=[],
status="in_progress",
),
))
yield _send_event(
openai_responses_types.
ResponseContentPartAddedEvent(
type="response.content_part.added",
sequence_number=-1,
output_index=current_output_index,
item_id=current_item_id,
content_index=current_content_index,
part=openai_responses_types.ResponseOutputText(
type="output_text",
text="",
annotations=[],
logprobs=[],
),
))
yield _send_event(
openai_responses_types.ResponseTextDeltaEvent(
type="response.output_text.delta",
sequence_number=-1,
content_index=current_content_index,
output_index=current_output_index,
item_id=current_item_id,
delta=ctx.parser.last_content_delta,
# TODO, use logprobs from ctx.last_request_output
logprobs=[],
))
elif (ctx.parser.current_channel == "analysis"
and ctx.parser.current_recipient is None):
if not sent_output_item_added:
sent_output_item_added = True
yield _send_event(
openai_responses_types.
ResponseOutputItemAddedEvent(
type="response.output_item.added",
sequence_number=-1,
output_index=current_output_index,
item=openai_responses_types.
ResponseReasoningItem(
type="reasoning",
id=current_item_id,
summary=[],
status="in_progress",
),
))
yield _send_event(
openai_responses_types.
ResponseContentPartAddedEvent(
type="response.content_part.added",
sequence_number=-1,
output_index=current_output_index,
item_id=current_item_id,
content_index=current_content_index,
part=openai_responses_types.ResponseOutputText(
type="output_text",
text="",
annotations=[],
logprobs=[],
),
))
yield _send_event(
ResponseReasoningTextDeltaEvent(
type="response.reasoning_text.delta",
item_id=current_item_id,
output_index=current_output_index,
content_index=current_content_index,
delta=ctx.parser.last_content_delta,
sequence_number=-1,
))
if ctx.is_assistant_action_turn() and len(ctx.parser.messages) > 0:
previous_item = ctx.parser.messages[-1]
if (self.tool_server is not None
and self.tool_server.has_tool("browser")
and previous_item.recipient is not None
and previous_item.recipient.startswith("browser.")):
function_name = previous_item.recipient[len("browser."):]
action = None
parsed_args = json.loads(previous_item.content[0].text)
if function_name == "search":
action = (openai_responses_types.
response_function_web_search.ActionSearch(
type="search",
query=parsed_args["query"],
))
elif function_name == "open":
action = (
openai_responses_types.
response_function_web_search.ActionOpenPage(
type="open_page",
# TODO: translate to url
url=f"cursor:{parsed_args.get('cursor', '')}",
))
elif function_name == "find":
action = (
openai_responses_types.
response_function_web_search.ActionFind(
type="find",
pattern=parsed_args["pattern"],
# TODO: translate to url
url=f"cursor:{parsed_args.get('cursor', '')}",
))
else:
raise ValueError(
f"Unknown function name: {function_name}")
yield _send_event(
openai_responses_types.ResponseOutputItemAddedEvent(
type="response.output_item.added",
sequence_number=-1,
output_index=current_output_index,
item=openai_responses_types.
response_function_web_search.
ResponseFunctionWebSearch(
# TODO: generate a unique id for web search call
type="web_search_call",
id=current_item_id,
action=action,
status="in_progress",
),
))
yield _send_event(
openai_responses_types.
ResponseWebSearchCallInProgressEvent(
type="response.web_search_call.in_progress",
sequence_number=-1,
output_index=current_output_index,
item_id=current_item_id,
))
yield _send_event(
openai_responses_types.
ResponseWebSearchCallSearchingEvent(
type="response.web_search_call.searching",
sequence_number=-1,
output_index=current_output_index,
item_id=current_item_id,
))
# enqueue
yield _send_event(
openai_responses_types.
ResponseWebSearchCallCompletedEvent(
type="response.web_search_call.completed",
sequence_number=-1,
output_index=current_output_index,
item_id=current_item_id,
))
yield _send_event(
openai_responses_types.ResponseOutputItemDoneEvent(
type="response.output_item.done",
sequence_number=-1,
output_index=current_output_index,
item=openai_responses_types.
ResponseFunctionWebSearch(
type="web_search_call",
id=current_item_id,
action=action,
status="completed",
),
))
if (self.tool_server is not None
and self.tool_server.has_tool("python")
and previous_item.recipient is not None
and previous_item.recipient.startswith("python")):
yield _send_event(
openai_responses_types.ResponseOutputItemAddedEvent(
type="response.output_item.added",
sequence_number=-1,
output_index=current_output_index,
item=openai_responses_types.
ResponseCodeInterpreterToolCallParam(
type="code_interpreter_call",
id=current_item_id,
code="",
container_id="auto",
outputs=[],
status="in_progress",
),
))
yield _send_event(
openai_responses_types.
ResponseCodeInterpreterCallInProgressEvent(
type="response.code_interpreter_call.in_progress",
sequence_number=-1,
output_index=current_output_index,
item_id=current_item_id,
))
# TODO: do we need to add delta event here?
yield _send_event(
openai_responses_types.
ResponseCodeInterpreterCallCodeDoneEvent(
type="response.code_interpreter_call_code.done",
sequence_number=-1,
output_index=current_output_index,
item_id=current_item_id,
code=previous_item.content[0].text))
yield _send_event(
openai_responses_types.
ResponseCodeInterpreterCallInterpretingEvent(
type="response.code_interpreter_call.interpreting",
sequence_number=-1,
output_index=current_output_index,
item_id=current_item_id,
))
yield _send_event(
openai_responses_types.
ResponseCodeInterpreterCallCompletedEvent(
type="response.code_interpreter_call.completed",
sequence_number=-1,
output_index=current_output_index,
item_id=current_item_id,
))
yield _send_event(
openai_responses_types.ResponseOutputItemDoneEvent(
type="response.output_item.done",
sequence_number=-1,
output_index=current_output_index,
item=openai_responses_types.
ResponseCodeInterpreterToolCallParam(
type="code_interpreter_call",
id=current_item_id,
code=previous_item.content[0].text,
container_id="auto",
# TODO: add outputs here
outputs=[],
status="completed",
),
))
async def empty_async_generator():
# A hack to trick Python to think this is a generator but in fact
# it immediately returns.
if False:
yield
final_response = await self.responses_full_generator(
request,
sampling_params,
empty_async_generator(),
context,
model_name,
tokenizer,
request_metadata,
created_time=created_time,
)
yield _send_event(
openai_responses_types.ResponseCompletedEvent(
type="response.completed",
sequence_number=-1,
response=final_response.model_dump(),
))