diff --git a/docs/features/tool_calling.md b/docs/features/tool_calling.md index 2a48596571d1d..291c313cd57af 100644 --- a/docs/features/tool_calling.md +++ b/docs/features/tool_calling.md @@ -319,6 +319,15 @@ Supported models: Flags: `--tool-call-parser glm45` +### Qwen3-Coder Models (`qwen3_xml`) + +Supported models: + +* `Qwen/Qwen3-480B-A35B-Instruct` +* `Qwen/Qwen3-Coder-30B-A3B-Instruct` + +Flags: `--tool-call-parser qwen3_xml` + ### Models with Pythonic Tool Calls (`pythonic`) A growing number of models output a python list to represent tool calls instead of using JSON. This has the advantage of inherently supporting parallel tool calls and removing ambiguity around the JSON schema required for tool calls. The `pythonic` tool parser can support such models. diff --git a/tests/tool_use/test_qwen3coder_tool_parser.py b/tests/tool_use/test_qwen3coder_tool_parser.py index f06fb2b9f2f04..57eaf84d36f23 100644 --- a/tests/tool_use/test_qwen3coder_tool_parser.py +++ b/tests/tool_use/test_qwen3coder_tool_parser.py @@ -13,6 +13,8 @@ from vllm.entrypoints.openai.protocol import (ChatCompletionRequest, ToolCall) from vllm.entrypoints.openai.tool_parsers.qwen3coder_tool_parser import ( Qwen3CoderToolParser) +from vllm.entrypoints.openai.tool_parsers.qwen3xml_tool_parser import ( + Qwen3XMLToolParser) from vllm.transformers_utils.detokenizer_utils import detokenize_incrementally from vllm.transformers_utils.tokenizer import AnyTokenizer, get_tokenizer @@ -29,6 +31,21 @@ def qwen3_tool_parser(qwen3_tokenizer): return Qwen3CoderToolParser(qwen3_tokenizer) +@pytest.fixture +def qwen3_xml_tool_parser(qwen3_tokenizer): + return Qwen3XMLToolParser(qwen3_tokenizer) + + +@pytest.fixture(params=["original", "xml"]) +def qwen3_tool_parser_parametrized(qwen3_tool_parser, qwen3_xml_tool_parser, + request): + """Parameterized fixture that provides both parser types for testing""" + if request.param == "original": + return qwen3_tool_parser + else: + return qwen3_xml_tool_parser + + @pytest.fixture def sample_tools(): return [ @@ -95,7 +112,7 @@ def assert_tool_calls(actual_tool_calls: list[ToolCall], def stream_delta_message_generator( - qwen3_tool_parser: Qwen3CoderToolParser, + qwen3_tool_parser, qwen3_tokenizer: AnyTokenizer, model_output: str, request: Optional[ChatCompletionRequest] = None @@ -144,9 +161,9 @@ def stream_delta_message_generator( read_offset = new_read_offset -def test_extract_tool_calls_no_tools(qwen3_tool_parser): +def test_extract_tool_calls_no_tools(qwen3_tool_parser_parametrized): model_output = "This is a test response without any tool calls" - extracted_tool_calls = qwen3_tool_parser.extract_tool_calls( + extracted_tool_calls = qwen3_tool_parser_parametrized.extract_tool_calls( model_output, request=None) # type: ignore[arg-type] assert not extracted_tool_calls.tools_called assert extracted_tool_calls.tool_calls == [] @@ -294,12 +311,13 @@ circle ], "Let me calculate that area for you."), ], ) -def test_extract_tool_calls(qwen3_tool_parser, sample_tools, model_output, - expected_tool_calls, expected_content): +def test_extract_tool_calls(qwen3_tool_parser_parametrized, sample_tools, + model_output, expected_tool_calls, + expected_content): request = ChatCompletionRequest(model=MODEL, messages=[], tools=sample_tools) - extracted_tool_calls = qwen3_tool_parser.extract_tool_calls( + extracted_tool_calls = qwen3_tool_parser_parametrized.extract_tool_calls( model_output, request=request) assert extracted_tool_calls.tools_called @@ -308,7 +326,8 @@ def test_extract_tool_calls(qwen3_tool_parser, sample_tools, model_output, assert extracted_tool_calls.content == expected_content -def test_extract_tool_calls_fallback_no_tags(qwen3_tool_parser, sample_tools): +def test_extract_tool_calls_fallback_no_tags(qwen3_tool_parser_parametrized, + sample_tools): """Test fallback parsing when XML tags are missing""" model_output = ''' @@ -322,7 +341,7 @@ TX request = ChatCompletionRequest(model=MODEL, messages=[], tools=sample_tools) - extracted_tool_calls = qwen3_tool_parser.extract_tool_calls( + extracted_tool_calls = qwen3_tool_parser_parametrized.extract_tool_calls( model_output, request=request) assert extracted_tool_calls.tools_called @@ -331,7 +350,7 @@ TX "get_current_weather") -def test_extract_tool_calls_type_conversion(qwen3_tool_parser): +def test_extract_tool_calls_type_conversion(qwen3_tool_parser_parametrized): """Test parameter type conversion based on tool schema""" tools = [ ChatCompletionToolsParam(type="function", @@ -381,7 +400,7 @@ hello world ''' request = ChatCompletionRequest(model=MODEL, messages=[], tools=tools) - extracted_tool_calls = qwen3_tool_parser.extract_tool_calls( + extracted_tool_calls = qwen3_tool_parser_parametrized.extract_tool_calls( model_output, request=request) args = json.loads(extracted_tool_calls.tool_calls[0].function.arguments) @@ -536,9 +555,10 @@ circle ], "Let me calculate that area for you."), ], ) -def test_extract_tool_calls_streaming(qwen3_tool_parser, qwen3_tokenizer, - sample_tools, model_output, - expected_tool_calls, expected_content): +def test_extract_tool_calls_streaming(qwen3_tool_parser_parametrized, + qwen3_tokenizer, sample_tools, + model_output, expected_tool_calls, + expected_content): """Test incremental streaming behavior including typed parameters""" request = ChatCompletionRequest(model=MODEL, messages=[], @@ -548,7 +568,8 @@ def test_extract_tool_calls_streaming(qwen3_tool_parser, qwen3_tokenizer, tool_states = {} # Track state per tool index for delta_message in stream_delta_message_generator( - qwen3_tool_parser, qwen3_tokenizer, model_output, request): + qwen3_tool_parser_parametrized, qwen3_tokenizer, model_output, + request): # role should never be streamed from tool parser assert not delta_message.role @@ -609,7 +630,7 @@ def test_extract_tool_calls_streaming(qwen3_tool_parser, qwen3_tokenizer, def test_extract_tool_calls_missing_closing_parameter_tag( - qwen3_tool_parser, sample_tools): + qwen3_tool_parser_parametrized, sample_tools): """Test handling of missing closing tag""" # Using get_current_weather from sample_tools but with malformed XML model_output = '''Let me check the weather for you: @@ -629,7 +650,7 @@ fahrenheit request = ChatCompletionRequest(model=MODEL, messages=[], tools=sample_tools) - extracted_tool_calls = qwen3_tool_parser.extract_tool_calls( + extracted_tool_calls = qwen3_tool_parser_parametrized.extract_tool_calls( model_output, request=request) # The parser should handle the malformed XML gracefully @@ -652,7 +673,7 @@ fahrenheit def test_extract_tool_calls_streaming_missing_closing_tag( - qwen3_tool_parser, qwen3_tokenizer, sample_tools): + qwen3_tool_parser_parametrized, qwen3_tokenizer, sample_tools): """Test streaming with missing closing tag""" # Using get_current_weather from sample_tools but with malformed XML model_output = '''Let me check the weather for you: @@ -677,7 +698,8 @@ fahrenheit tool_states = {} for delta_message in stream_delta_message_generator( - qwen3_tool_parser, qwen3_tokenizer, model_output, request): + qwen3_tool_parser_parametrized, qwen3_tokenizer, model_output, + request): if delta_message.content: other_content += delta_message.content @@ -727,9 +749,8 @@ fahrenheit assert args["unit"] == "fahrenheit" -def test_extract_tool_calls_streaming_incremental(qwen3_tool_parser, - qwen3_tokenizer, - sample_tools): +def test_extract_tool_calls_streaming_incremental( + qwen3_tool_parser_parametrized, qwen3_tokenizer, sample_tools): """Test that streaming is truly incremental""" model_output = '''I'll check the weather. @@ -748,7 +769,8 @@ TX chunks = [] for delta_message in stream_delta_message_generator( - qwen3_tool_parser, qwen3_tokenizer, model_output, request): + qwen3_tool_parser_parametrized, qwen3_tokenizer, model_output, + request): chunks.append(delta_message) # Should have multiple chunks @@ -784,3 +806,49 @@ TX parsed_args = json.loads(full_args) assert parsed_args["city"] == "Dallas" assert parsed_args["state"] == "TX" + + +def test_extract_tool_calls_complex_type_with_single_quote( + qwen3_tool_parser_parametrized): + """Test parameter type conversion based on tool schema""" + tools = [ + ChatCompletionToolsParam(type="function", + function={ + "name": "test_types", + "parameters": { + "type": "object", + "properties": { + "int_param": { + "type": "integer" + }, + "float_param": { + "type": "float" + }, + "bool_param": { + "type": "boolean" + }, + "str_param": { + "type": "string" + }, + "obj_param": { + "type": "object" + } + } + } + }) + ] + + model_output = ''' + + +{'key': 'value'} + + +''' + + request = ChatCompletionRequest(model=MODEL, messages=[], tools=tools) + extracted_tool_calls = qwen3_tool_parser_parametrized.extract_tool_calls( + model_output, request=request) + + args = json.loads(extracted_tool_calls.tool_calls[0].function.arguments) + assert args["obj_param"] == {"key": "value"} diff --git a/vllm/entrypoints/openai/tool_parsers/__init__.py b/vllm/entrypoints/openai/tool_parsers/__init__.py index 35096b0461361..5e77c406b8d92 100644 --- a/vllm/entrypoints/openai/tool_parsers/__init__.py +++ b/vllm/entrypoints/openai/tool_parsers/__init__.py @@ -20,6 +20,7 @@ from .openai_tool_parser import OpenAIToolParser from .phi4mini_tool_parser import Phi4MiniJsonToolParser from .pythonic_tool_parser import PythonicToolParser from .qwen3coder_tool_parser import Qwen3CoderToolParser +from .qwen3xml_tool_parser import Qwen3XMLToolParser from .seed_oss_tool_parser import SeedOssToolParser from .step3_tool_parser import Step3ToolParser from .xlam_tool_parser import xLAMToolParser @@ -45,6 +46,7 @@ __all__ = [ "HunyuanA13BToolParser", "Glm4MoeModelToolParser", "Qwen3CoderToolParser", + "Qwen3XMLToolParser", "SeedOssToolParser", "Step3ToolParser", "OpenAIToolParser", diff --git a/vllm/entrypoints/openai/tool_parsers/qwen3xml_tool_parser.py b/vllm/entrypoints/openai/tool_parsers/qwen3xml_tool_parser.py new file mode 100644 index 0000000000000..4ab67dfea104c --- /dev/null +++ b/vllm/entrypoints/openai/tool_parsers/qwen3xml_tool_parser.py @@ -0,0 +1,1137 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +import ast +import json +import uuid +from collections.abc import Sequence +from typing import Any, Optional, Union +from xml.parsers.expat import ParserCreate + +import regex as re + +from vllm.entrypoints.openai.protocol import (ChatCompletionRequest, + ChatCompletionToolsParam, + DeltaFunctionCall, DeltaMessage, + DeltaToolCall, + ExtractedToolCallInformation, + FunctionCall, ToolCall) +from vllm.entrypoints.openai.tool_parsers.abstract_tool_parser import ( + ToolParser, ToolParserManager) +from vllm.logger import init_logger +from vllm.transformers_utils.tokenizer import AnyTokenizer + +logger = init_logger(__name__) + + +class StreamingXMLToolCallParser: + """ + Simplified streaming XML tool call parser + Supports streaming input, parsing, and output + """ + + def __init__(self): + self.reset_streaming_state() + + # Tool configuration information + self.tools: Union[list[ChatCompletionToolsParam], None] = None + self.tool_call_start_token: str = '' + self.tool_call_end_token: str = '' + self.function_start_token: str = ' DeltaMessage: + """ + Parse single streaming XML chunk and return Delta response + This is the actual streaming interface that receives chunks + one by one and maintains internal state + + Args: + xml_chunk: Single XML chunk string + Returns: + DeltaMessage: Contains delta information generated by this chunk, + returns empty response if no complete elements + """ + # Record delta count before processing + initial_delta_count = len(self.deltas) + + self.streaming_buffer += xml_chunk + + found_elements = self._process_complete_xml_elements() + + if found_elements: + # If complete elements found, check if end events were missed + # some tags may not have been triggered + try: + new_deltas = self.deltas[initial_delta_count:] + # If this chunk contains + # but didn't generate '}', then complete it + if (self.current_call_id is not None + and self.function_end_token in xml_chunk): + + # - Added '}' (non-empty parameter ending) + # - Added '{}' (empty parameter function) + has_function_close = any((td.tool_calls and any( + (tc.function and tc.id == self.current_call_id + and isinstance(tc.function.arguments, str) and + (tc.function.arguments in ('}', '{}'))) + for tc in td.tool_calls)) for td in new_deltas) + if not has_function_close: + # Close potentially unclosed element + if self.current_param_name: + self._end_element('parameter') + if self.current_function_name: + self._end_element('function') + # If this chunk contains + # but didn't generate final empty delta, then complete it + if (self.current_call_id is not None + and self.tool_call_end_token in xml_chunk): + has_toolcall_close = any((td.tool_calls and any( + (tc.type == 'function' and tc.function and tc.function. + arguments == '' and tc.id == self.current_call_id) + for tc in td.tool_calls)) for td in new_deltas) + if not has_toolcall_close: + # Close potentially unclosed element + if self.current_param_name: + self._end_element('parameter') + if self.current_function_name: + self._end_element('function') + self._end_element('tool_call') + except Exception as e: + logger.warning("Error with fallback parsing: %s", e) + # Merge newly generated deltas into single response + result_delta = self._merge_new_deltas_to_single_response( + initial_delta_count) + return result_delta + else: + # No complete elements, check if there's unoutput text content + if self.text_content_buffer and self.tool_call_index == 0: + # Has text content but no tool_call yet, output text content + text_delta = DeltaMessage(content=self.text_content_buffer) + self._emit_delta(text_delta) + # Clear buffer to avoid duplicate output + self.text_content_buffer = '' + return text_delta + + # If this chunk contains end tags but wasn't triggered by parser, + # manually complete end events + # Only execute when still on the same call as when entered, + # to prevent accidentally closing new calls + # in multi scenarios + if (self.current_call_id is not None + and (self.function_end_token in xml_chunk + or self.tool_call_end_token in xml_chunk)): + # Close potentially unclosed element + if self.current_param_name: + self._end_element('parameter') + if self.function_end_token in xml_chunk and \ + self.current_function_name: + self._end_element('function') + if self.tool_call_end_token in xml_chunk: + self._end_element('tool_call') + # Return the merged delta result generated by this fallback + result_delta = self._merge_new_deltas_to_single_response( + initial_delta_count) + return result_delta + + # No complete elements, return empty response + return DeltaMessage(content=None) + + def _escape_xml_special_chars(self, text: str) -> str: + """ + Escape XML special characters + Args: + text: Original text + Returns: + Escaped text + """ + xml_escapes = { + '&': '&', + '<': '<', + '>': '>', + '"': '"', + "'": ''' + } + + for char, escape in xml_escapes.items(): + text = text.replace(char, escape) + + return text + + def _process_complete_xml_elements(self) -> bool: + """ + Process complete XML elements in buffer + + Returns: + bool: Whether complete elements were found and processed + """ + found_any = False + + while self.last_processed_pos < len(self.streaming_buffer): + # Find next complete xml element + element, end_pos = self._find_next_complete_element( + self.last_processed_pos) + if element is None: + # No complete element found, wait for more data + break + + # Check if this element should be skipped + if self._should_skip_element(element): + self.last_processed_pos = end_pos + continue + + # Found complete XML element, process it + try: + preprocessed_element = self._preprocess_xml_chunk(element) + # Check if this is the first tool_call start + if ((preprocessed_element.strip().startswith('') or + preprocessed_element.strip().startswith('') + and self.tool_call_index > 0 and self.current_call_id): + # Reset parser state but preserve generated deltas + if self.current_param_name: + self._end_element('parameter') + if self.current_function_open or self.current_function_name: + self._end_element('function') + # Output final tool_call tail delta + final_delta = DeltaMessage( + role=None, + content=None, + reasoning_content=None, + tool_calls=[ + DeltaToolCall(index=self.tool_call_index - 1, + id=self.current_call_id, + type='function', + function=DeltaFunctionCall( + name=None, arguments='')) + ]) + self._emit_delta(final_delta) + # Reset XML parser and current call state + self._reset_xml_parser_after_tool_call() + # Parse preprocessed element + self.parser.Parse(preprocessed_element, False) + found_any = True + + except Exception as e: + logger.warning("Error when parsing XML elements: %s", e) + + # Update processed position + self.last_processed_pos = end_pos + + return found_any + + def _should_skip_element(self, element: str) -> bool: + """ + Determine whether an element should be skipped + + Args: + element: Element to evaluate + + Returns: + bool: True means should skip, False means should process + """ + + # If it's a tool_call XML tag, don't skip + if element.startswith( + self.tool_call_start_token) or element.startswith( + self.function_start_token) or element.startswith( + self.parameter_start_token): + return False + + # If currently not parsing tool calls and not blank, + # collect this text instead of skipping + # Only process other XML elements after tool_call appears, + # otherwise treat as plain text + if self.current_call_id is None and element: + # Collect text content to buffer + self.text_content_buffer += element + return True # Still skip, but content has been collected + + # If currently parsing tool calls, + # this might be parameter value, don't skip + if self.current_call_id is not None: + return False + + # Skip blank content + return not element + + def _find_next_complete_element( + self, start_pos: int) -> tuple[Optional[str], int]: + """ + Find next complete XML element from specified position + + Args: + start_pos: Position to start searching + + Returns: + (Complete element string, element end position), + returns (None, start_pos) if no complete element found + """ + buffer = self.streaming_buffer[start_pos:] + + if not buffer: + return None, start_pos + + if buffer.startswith('<'): + # Need to ensure no new < appears, + # find the nearest one between < and > + tag_end = buffer.find('<', 1) + tag_end2 = buffer.find('>', 1) + if tag_end != -1 and tag_end2 != -1: + # Next nearest is < + if tag_end < tag_end2: + return buffer[:tag_end], start_pos + tag_end + # Next nearest is >, means found XML element + else: + return buffer[:tag_end2 + 1], start_pos + tag_end2 + 1 + elif tag_end != -1: + return buffer[:tag_end], start_pos + tag_end + elif tag_end2 != -1: + return buffer[:tag_end2 + 1], start_pos + tag_end2 + 1 + else: + # If currently not parsing tool calls (entering a tool_call), + # check if starts with + if self.current_call_id is None: + # Check if might be start of + if buffer == ''[:len(buffer)]: + # Might be start of , wait for more data + return None, start_pos + else: + # Not start of , treat as text + return buffer, start_pos + len(buffer) + else: + # When parsing tool calls, + # wait for more data to get complete tag + return None, start_pos + else: + # Find text content (until next < or buffer end) + next_tag_pos = buffer.find('<') + if next_tag_pos != -1: + # Found text content + text_content = buffer[:next_tag_pos] + return text_content, start_pos + next_tag_pos + else: + # Buffer end is all text, process + # (no longer wait for more data) + remaining = buffer + return remaining, start_pos + len(remaining) + + def _merge_new_deltas_to_single_response( + self, initial_count: int) -> DeltaMessage: + """ + Merge newly generated deltas from this processing + into a single DeltaMessage + + Args: + initial_count: Delta count before processing + + Returns: + Merged DeltaMessage containing all newly generated delta information + """ + if len(self.deltas) <= initial_count: + return DeltaMessage(content=None) + + # Get newly generated deltas + new_deltas = self.deltas[initial_count:] + + if len(new_deltas) == 1: + # Only one new delta, return directly + return new_deltas[0] + + # Merge multiple new deltas + merged_tool_calls: list[DeltaToolCall] = [] + merged_content: str = '' + + for delta in new_deltas: + if delta.content: + merged_content += delta.content + if delta.tool_calls: + # For tool_calls, we need to intelligently merge arguments + for tool_call in delta.tool_calls: + # Find if there's already a tool_call with the same call_id + existing_call = None + for existing in merged_tool_calls: + if existing.id == tool_call.id: + existing_call = existing + break + + if existing_call and existing_call.function: + # Merge to existing tool_call + if tool_call.function and tool_call.function.name: + existing_call.function.name = \ + tool_call.function.name + if tool_call.function \ + and tool_call.function.arguments is not None: + if existing_call.function.arguments is None: + existing_call.function.arguments = '' + + # For streaming JSON parameters, + # simply concatenate in order + new_args = tool_call.function.arguments + existing_call.function.arguments += new_args + if tool_call.type: + existing_call.type = tool_call.type + else: + # Add new tool_call + merged_tool_calls.append(tool_call) + + return DeltaMessage(content=merged_content if merged_content else None, + tool_calls=merged_tool_calls) + + def _preprocess_xml_chunk(self, chunk: str) -> str: + """ + Preprocess XML chunk, handle non-standard formats, + and escape special characters + + Args: + chunk: Original XML chunk + + Returns: + Processed XML chunk + """ + + # Check if this is a tool_call related element + is_tool_call = False + if chunk.startswith(self.tool_call_start_token) or chunk.startswith( + self.tool_call_end_token): + is_tool_call = True + if chunk.startswith(self.function_start_token) or chunk.startswith( + self.function_end_token): + is_tool_call = True + if chunk.startswith(self.parameter_start_token) or chunk.startswith( + self.parameter_end_token): + is_tool_call = True + # Handle format -> + processed = re.sub(r']+)>', r'', + chunk) + # Handle format -> + processed = re.sub(r']+)>', r'', + processed) + + original_chunk = chunk + # If in parameter value accumulation mode + if self._pre_inside_parameter: + # Parameter end: output accumulated raw text + # safely then return + if processed.startswith(''): + body_text = self._pre_param_buffer + # Trigger deferred parsing mode + # literal_eval+json output in end_element + self.defer_current_parameter = True + self.deferred_param_raw_value = body_text + # Clean up state + self._pre_inside_parameter = False + self._pre_param_buffer = "" + self._pre_current_param_name = None + safe_text = self._escape_xml_special_chars(body_text) + return f"{safe_text}" + else: + # If this is the first block of content after entering parameter + # evaluate if deferred parsing is needed; + # If not needed, exit accumulation mode + # and pass through directly + if self._pre_param_buffer == "": + # Get current parameter type + param_type = self._get_param_type( + self._pre_current_param_name + ) if self._pre_current_param_name else 'string' + # Only these types need deferred parsing to + # handle Python literals containing single quotes + is_object_type = param_type in ["object"] + is_complex_type = (param_type + in ["array", "arr", "sequence"] + or param_type.startswith("dict") + or param_type.startswith("list")) + + # Only delay when contains container symbols + # and has single quotes and is complex type + has_container_hint = ('[' in original_chunk) or ( + '{' in original_chunk) or ('(' in original_chunk) + + # Determine if deferred parsing is needed + need_defer = False + if is_complex_type: + # Complex type, always need deferred parsing + need_defer = True + elif is_object_type and has_container_hint and ( + "'" in original_chunk): + # Object type with container symbols + # and single quotes, need deferred parsing + need_defer = True + + if not need_defer: + # No need for deferred parsing, + # exit parameter mode directly + self._pre_inside_parameter = False + return self._escape_xml_special_chars(original_chunk) + self._pre_param_buffer += original_chunk + return "" + + # Parameter start: enable accumulation + if processed.startswith('', processed) + if m: + self._pre_current_param_name = m.group(1) + self._pre_inside_parameter = True + self._pre_param_buffer = "" + return processed + + # If processed doesn't contain special_token, escape processed + # This is because XML parsing encounters special characters + # and reports errors, so escaping is needed + if not is_tool_call: + processed = self._escape_xml_special_chars(processed) + return processed + + def _emit_delta(self, delta: DeltaMessage): + """Emit Delta response (streaming output)""" + self.deltas.append(delta) + + def _auto_close_open_parameter_if_needed(self, + incoming_tag: Optional[str] = None + ): + """Before starting to process new elements, + if there are unclosed tags from before, + automatically complete their endings to the parser. + - If there are unclosed parameters, + it's equivalent to feeding `` + - When about to start a new function or tool_call, + if there are unclosed functions, complete ``. + - When about to start a new tool_call, + if there are unclosed tool_calls, complete ``. + """ + # First close unclosed parameters + if self.current_param_name: + self._end_element('parameter') + + # If about to start new function or tool_call, + # and there are unclosed functions, close function first + if incoming_tag in ('function', + 'tool_call') and self.current_function_name: + self._end_element('function') + + # If about to start new tool_call, + # and there are unclosed tool_calls, close tool_call first + if incoming_tag == 'tool_call' and self.current_call_id: + self._end_element('tool_call') + + def _start_element(self, name: str, attrs: dict[str, str]): + """Handle XML start element events""" + + if name == 'root': + return + + if name == 'tool_call': + # Before opening new tool_call, + # automatically complete previous unclosed tags + self._auto_close_open_parameter_if_needed('tool_call') + + self.parameters = {} + self.current_call_id = self._get_next_call_id() + self.current_param_is_first = True + self.tool_call_index += 1 + elif name.startswith('function') or (name == 'function'): + # If missing tool_call, manually complete + if not self.current_call_id: + self._start_element('tool_call', {}) + # Before opening new function, + # automatically complete previous unclosed tags (parameter/function) + self._auto_close_open_parameter_if_needed('function') + function_name = self._extract_function_name(name, attrs) + self.current_function_name = function_name + self.current_function_open = True + if function_name: + delta = DeltaMessage(tool_calls=[ + DeltaToolCall(index=self.tool_call_index - 1, + id=self.current_call_id, + type='function', + function=DeltaFunctionCall( + name=function_name, arguments='')) + ]) + self._emit_delta(delta) + elif name.startswith('parameter') or (name == 'parameter'): + # If previous parameter hasn't ended normally, + # complete its end first, then start new parameter + self._auto_close_open_parameter_if_needed('parameter') + param_name = self._extract_parameter_name(name, attrs) + self.current_param_name = param_name + self.current_param_value = '' + self.current_param_value_converted = '' + self.start_quote_emitted = False # Reset start quote flag + + # Only output parameter name and colon, + # don't output quotes + # decide after parameter value type is determined + if param_name: + if not self.parameters: + # First parameter + # start JSON, only output parameter name and colon + json_start = f'{{"{param_name}": ' + delta = DeltaMessage(tool_calls=[ + DeltaToolCall(index=self.tool_call_index - 1, + id=self.current_call_id, + type='function', + function=DeltaFunctionCall( + name=None, arguments=json_start)) + ]) + self._emit_delta(delta) + self.current_param_is_first = True + else: + # Subsequent parameters + # add comma and parameter name, no quotes + json_continue = f', "{param_name}": ' + delta = DeltaMessage(tool_calls=[ + DeltaToolCall(index=self.tool_call_index - 1, + id=self.current_call_id, + type='function', + function=DeltaFunctionCall( + name=None, arguments=json_continue)) + ]) + self._emit_delta(delta) + self.current_param_is_first = False + + def _char_data(self, data: str): + """Handle XML character data events""" + if data and self.current_param_name: + # If preprocessing stage determines deferred parsing is needed, + # only cache character data, no streaming output + if self.defer_current_parameter: + original_data = data + if self.should_emit_end_newline: + original_data = '\n' + original_data + self.should_emit_end_newline = False + if original_data.endswith('\n'): + self.should_emit_end_newline = True + original_data = original_data[:-1] + self.current_param_value += original_data + return + + param_type = self._get_param_type(self.current_param_name) + + # Check if this is the first time receiving data for this parameter + # If this is the first packet of data and starts with \n, remove \n + if not self.current_param_value and data.startswith('\n'): + data = data[1:] + + # Output start quote for string type (if not already output) + if (param_type + in ['string', 'str', 'text', 'varchar', 'char', 'enum'] + and not self.start_quote_emitted): + quote_delta = DeltaMessage(tool_calls=[ + DeltaToolCall(index=self.tool_call_index - 1, + id=self.current_call_id, + type='function', + function=DeltaFunctionCall(name=None, + arguments='"')) + ]) + self._emit_delta(quote_delta) + self.start_quote_emitted = True + + if not data: + return + + original_data = data + # Delay output of trailing newline + if self.should_emit_end_newline: + original_data = '\n' + original_data + self.should_emit_end_newline = False + if original_data.endswith('\n'): + self.should_emit_end_newline = True + original_data = original_data[:-1] + self.current_param_value += original_data + + # convert parameter value by param_type + converted_value = self._convert_param_value( + self.current_param_value, param_type) + output_data = self._convert_for_json_streaming( + converted_value, param_type) + + delta_data = output_data[len(self.current_param_value_converted):] + self.current_param_value_converted = output_data + + delta = DeltaMessage(tool_calls=[ + DeltaToolCall(index=self.tool_call_index - 1, + id=self.current_call_id, + type='function', + function=DeltaFunctionCall(name=None, + arguments=delta_data)) + ]) + self._emit_delta(delta) + + def _end_element(self, name: str): + """Handle XML end element events""" + + if name == 'root': + return + + # If function or tool_call ends and there are still unclosed parameters, + # complete parameter end first + if (name.startswith('function') or name == 'function' + or name == 'tool_call') and self.current_param_name: + self._auto_close_open_parameter_if_needed() + + if (name.startswith('parameter') + or name == 'parameter') and self.current_param_name: + # End current parameter + param_name = self.current_param_name + param_value = self.current_param_value + + # If in deferred parsing mode, + # perform overall parsing on raw content + # accumulated in preprocessing stage and output once + if self.defer_current_parameter: + raw_text = self.deferred_param_raw_value \ + if self.deferred_param_raw_value else param_value + parsed_value = None + output_arguments = None + try: + # If previously delayed trailing newline, + # add it back before parsing + if self.should_emit_end_newline: + raw_for_parse = raw_text + '\n' + else: + raw_for_parse = raw_text + parsed_value = ast.literal_eval(raw_for_parse) + output_arguments = json.dumps(parsed_value, + ensure_ascii=False) + except Exception: + # Fallback: output as string as-is + output_arguments = json.dumps(raw_text, ensure_ascii=False) + parsed_value = raw_text + + delta = DeltaMessage(tool_calls=[ + DeltaToolCall(index=self.tool_call_index - 1, + id=self.current_call_id, + type='function', + function=DeltaFunctionCall( + name=None, arguments=output_arguments)) + ]) + self._emit_delta(delta) + + # Clean up and store + self.should_emit_end_newline = False + self.parameters[param_name] = parsed_value + self.current_param_name = None + self.current_param_value = "" + self.current_param_value_converted = "" + self.start_quote_emitted = False + self.defer_current_parameter = False + self.deferred_param_raw_value = "" + return + + param_type = self._get_param_type(param_name) + + # convert complete parameter value by param_type + converted_value = self._convert_param_value( + param_value, param_type) + + # Decide whether to add end quote based on parameter type + if param_type in [ + 'string', 'str', 'text', 'varchar', 'char', 'enum' + ]: + # For empty string parameters, need special handling + if not param_value and not self.start_quote_emitted: + # No start quote output, + # directly output complete empty string + delta = DeltaMessage(tool_calls=[ + DeltaToolCall(index=self.tool_call_index - 1, + id=self.current_call_id, + type='function', + function=DeltaFunctionCall( + name=None, arguments='""')) + ]) + self._emit_delta(delta) + else: + # Non-empty parameter value, output end quote + delta = DeltaMessage(tool_calls=[ + DeltaToolCall(index=self.tool_call_index - 1, + id=self.current_call_id, + type='function', + function=DeltaFunctionCall( + name=None, arguments='"')) + ]) + self._emit_delta(delta) + + self.should_emit_end_newline = False + # Store converted value + self.parameters[param_name] = converted_value + self.current_param_name = None + self.current_param_value = '' + self.current_param_value_converted = '' + self.start_quote_emitted = False + + elif name.startswith('function') or name == 'function': + # if there are parameters, close JSON object + if self.parameters: + delta = DeltaMessage(tool_calls=[ + DeltaToolCall(index=self.tool_call_index - 1, + id=self.current_call_id, + type='function', + function=DeltaFunctionCall(name=None, + arguments='}')) + ]) + self._emit_delta(delta) + # return empty object + else: + delta = DeltaMessage(tool_calls=[ + DeltaToolCall(index=self.tool_call_index - 1, + id=self.current_call_id, + type='function', + function=DeltaFunctionCall(name=None, + arguments='{}')) + ]) + self._emit_delta(delta) + self.current_function_open = False + + elif name == 'tool_call': + # Before ending tool_call, + # ensure function is closed to complete missing right brace + if self.current_function_open: + # If there are still unclosed parameters, close them first + if self.current_param_name: + self._end_element('parameter') + # Close function, ensure output '}' or '{}' + self._end_element('function') + # Final Delta + delta = DeltaMessage(tool_calls=[ + DeltaToolCall(index=self.tool_call_index - 1, + id=self.current_call_id, + type='function', + function=DeltaFunctionCall(name=None, + arguments='')) + ]) + self._emit_delta(delta) + + # Check if there's text content to output (between tool_calls) + if self.text_content_buffer.strip(): + text_delta = DeltaMessage(content=self.text_content_buffer) + self._emit_delta(text_delta) + + self._reset_xml_parser_after_tool_call() + + def setup_parser(self): + """Set up XML parser event handlers""" + self.parser.buffer_text = True + self.parser.StartElementHandler = self._start_element + self.parser.EndElementHandler = self._end_element + self.parser.CharacterDataHandler = self._char_data + + def set_tools(self, tools: Union[list[ChatCompletionToolsParam], None]): + """Set tool configuration information""" + self.tools = tools + + def _get_next_call_id(self): + """Generate unique call ID""" + return f'call_{uuid.uuid4().hex[:24]}' + + def _extract_function_name(self, name: str, + attrs: dict[str, str]) -> Optional[str]: + """Extract function name from various formats""" + if attrs and 'name' in attrs: + return attrs['name'] + + if '=' in name: + parts = name.split('=', 1) + if len(parts) == 2 and parts[0] == 'function': + return parts[1] + + return None + + def _extract_parameter_name(self, name: str, + attrs: dict[str, str]) -> Optional[str]: + """Extract parameter name from various formats""" + if attrs and 'name' in attrs: + return attrs['name'] + + if '=' in name: + parts = name.split('=', 1) + if len(parts) == 2 and parts[0] == 'parameter': + return parts[1] + + return None + + def _get_param_type(self, param_name: str) -> str: + """Get parameter type based on tool configuration, defaults to string + Args: + param_name: Parameter name + + Returns: + Parameter type + """ + if not self.tools or not self.current_function_name: + return 'string' + + for tool in self.tools: + if not hasattr(tool, 'type') or not (hasattr( + tool, 'function') and hasattr(tool.function, 'name')): + continue + if tool.type == 'function' and \ + tool.function.name == self.current_function_name: + if not hasattr(tool.function, 'parameters'): + return 'string' + params = tool.function.parameters + if isinstance(params, dict) and 'properties' in params: + properties = params['properties'] + if param_name in properties and isinstance( + properties[param_name], dict): + return self.repair_param_type( + str(properties[param_name].get('type', 'string'))) + elif isinstance(params, dict) and param_name in params: + param_config = params[param_name] + if isinstance(param_config, dict): + return self.repair_param_type( + str(param_config.get('type', 'string'))) + break + return 'string' + + def repair_param_type(self, param_type: str) -> str: + """Repair unknown parameter types by treating them as string + Args: + param_type: Parameter type + + Returns: + Repaired parameter type + """ + if param_type in [ + 'string', 'str', 'text', 'varchar', 'char', 'enum' + ] or param_type.startswith('int') or param_type.startswith( + 'uint' + ) or param_type.startswith('long') or param_type.startswith( + 'short' + ) or param_type.startswith('unsigned') or param_type.startswith( + 'num') or param_type.startswith('float') or param_type in [ + 'boolean', 'bool', 'binary' + ] or (param_type in ["object", "array", "arr", "sequence"] + or param_type.startswith("dict") + or param_type.startswith("list")): + return param_type + else: + return 'string' + + def _convert_param_value(self, param_value: str, param_type: str) -> Any: + """Convert value based on parameter type + Args: + param_value: Parameter value + param_type: Parameter type + + Returns: + Converted value + """ + if param_value.lower() == 'null': + return None + + param_type = param_type.strip().lower() + if param_type in ['string', 'str', 'text', 'varchar', 'char', 'enum']: + return param_value + elif (param_type.startswith('int') or param_type.startswith('uint') + or param_type.startswith('long') + or param_type.startswith('short') + or param_type.startswith('unsigned')): + try: + return int(param_value) + except (ValueError, TypeError): + logger.warning( + "Parsed value '%s' of parameter '%s' is not an integer " + "in tool '%s', degenerating to string.", param_value) + return param_value + elif param_type.startswith('num') or param_type.startswith('float'): + try: + float_param_value: float = float(param_value) + return float_param_value if float_param_value - int( + float_param_value) != 0 else int(float_param_value) + except (ValueError, TypeError): + logger.warning( + "Parsed value '%s' of parameter '%s' is not a float " + "in tool '%s', degenerating to string.", param_value) + return param_value + elif param_type in ['boolean', 'bool', 'binary']: + param_value = param_value.lower() + return param_value == 'true' + else: + return param_value + + def _convert_for_json_streaming(self, converted_value: Any, + param_type: str) -> str: + """Convert converted_value based on + whether it's empty and if type is string + Args: + converted_value: Converted value + param_type: Parameter type + + Returns: + Converted string for streaming output + """ + # Check if value is empty, but exclude numeric 0 + if converted_value is None or converted_value == '': + return '' + + if param_type in ['string', 'str', 'text', 'varchar', 'char', 'enum']: + # String type, remove double quotes + return json.dumps(converted_value, ensure_ascii=False)[1:-1] + else: + # Non-string type, return complete JSON string + if not isinstance(converted_value, str): + return json.dumps(converted_value, ensure_ascii=False) + else: + return converted_value + + def _reset_xml_parser_after_tool_call(self): + """ + Each tool_call is treated as a separate XML document, + so we need to reset the parser after each tool_call. + """ + + # recreate XML parser + self.parser = ParserCreate() + self.setup_parser() + + # Reset current tool_call state + if self.current_call_id: + self.last_completed_call_id = self.current_call_id + self.current_call_id = None + self.current_function_name = None + self.current_function_open = False + self.parameters = {} + self.current_param_name = None + self.current_param_value = '' + self.current_param_value_converted = '' + self.current_param_is_first = False + self.should_emit_end_newline = False + self.start_quote_emitted = False + self.text_content_buffer = '' + + # Reset preprocessing and deferred parsing state + self._pre_inside_parameter = False + self._pre_param_buffer = "" + self._pre_current_param_name = None + self.defer_current_parameter = False + self.deferred_param_raw_value = "" + + +@ToolParserManager.register_module("qwen3_xml") +class Qwen3XMLToolParser(ToolParser): + + def __init__(self, tokenizer: AnyTokenizer): + super().__init__(tokenizer) + self.parser = StreamingXMLToolCallParser() + + logger.info("vLLM Successfully import tool parser %s !", + self.__class__.__name__) + + def extract_tool_calls( + self, + model_output: str, + request: ChatCompletionRequest, + ) -> ExtractedToolCallInformation: + self.parser.reset_streaming_state() + if request: + self.parser.set_tools(request.tools) + result = self.parser.parse_single_streaming_chunks(model_output) + if not result.tool_calls: + return ExtractedToolCallInformation( + tool_calls=[], + tools_called=False, + content=result.content, + ) + else: + tool_calls = [] + for tool_call in result.tool_calls: + if tool_call.function and tool_call.function.name: + tool_calls.append( + ToolCall( + id=tool_call.id, + type=tool_call.type, + function=FunctionCall( + name=tool_call.function.name, + arguments=tool_call.function.arguments, + ), + )) + return ExtractedToolCallInformation( + tool_calls=tool_calls, + tools_called=len(tool_calls) > 0, + content=result.content, + ) + + def extract_tool_calls_streaming( + self, + previous_text: str, + current_text: str, + delta_text: str, + previous_token_ids: Sequence[int], + current_token_ids: Sequence[int], + delta_token_ids: Sequence[int], + request: ChatCompletionRequest, + ) -> Union[DeltaMessage, None]: + if not previous_text: + self.parser.reset_streaming_state() + if request: + self.parser.set_tools(request.tools) + + # Model sometimes outputs separately causing delta_text to be empty. + # If there were tool_calls before and all current tool_calls have ended, + # return an empty tool_call for outer streaming output + # to correctly output tool_call field + if not delta_text and delta_token_ids: + open_calls = current_text.count( + self.parser.tool_call_start_token) - current_text.count( + self.parser.tool_call_end_token) + if open_calls == 0 and self.parser.tool_call_index > 0: + # If current_call_id is None, use last_completed_call_id + call_id = self.parser.current_call_id or \ + self.parser.last_completed_call_id + return DeltaMessage(tool_calls=[ + DeltaToolCall( + index=self.parser.tool_call_index - 1, + id=call_id, + function=DeltaFunctionCall(arguments=''), + type='function', + ) + ]) + + return self.parser.parse_single_streaming_chunks(delta_text)