# SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project # ruff: noqa: E501 import json from typing import Any import pytest from vllm.entrypoints.openai.protocol import (ChatCompletionToolsParam, FunctionCall, ToolCall) from vllm.entrypoints.openai.tool_parsers import MinimaxToolParser from vllm.transformers_utils.tokenizer import get_tokenizer # Use a common model that is likely to be available MODEL = "MiniMaxAi/MiniMax-M1-40k" @pytest.fixture(scope="module") def minimax_tokenizer(): return get_tokenizer(tokenizer_name=MODEL) @pytest.fixture def minimax_tool_parser(minimax_tokenizer): return MinimaxToolParser(minimax_tokenizer) @pytest.fixture def sample_tools(): return [ ChatCompletionToolsParam(type="function", function={ "name": "get_current_weather", "description": "Get the current weather", "parameters": { "type": "object", "properties": { "city": { "type": "string", "description": "The city name" }, "state": { "type": "string", "description": "The state code" }, "unit": { "type": "string", "enum": ["fahrenheit", "celsius"] } }, "required": ["city", "state"] } }), ChatCompletionToolsParam(type="function", function={ "name": "calculate_area", "description": "Calculate area of a shape", "parameters": { "type": "object", "properties": { "shape": { "type": "string" }, "dimensions": { "type": "object" }, "precision": { "type": "integer" } } } }) ] def assert_tool_calls(actual_tool_calls: list[ToolCall], expected_tool_calls: list[ToolCall]): assert len(actual_tool_calls) == len(expected_tool_calls) for actual_tool_call, expected_tool_call in zip(actual_tool_calls, expected_tool_calls): assert isinstance(actual_tool_call.id, str) assert len(actual_tool_call.id) > 16 assert actual_tool_call.type == "function" assert actual_tool_call.function == expected_tool_call.function def test_extract_tool_calls_no_tools(minimax_tool_parser): model_output = "This is a test" extracted_tool_calls = minimax_tool_parser.extract_tool_calls( model_output, request=None) # type: ignore[arg-type] assert not extracted_tool_calls.tools_called assert extracted_tool_calls.tool_calls == [] assert extracted_tool_calls.content == model_output @pytest.mark.parametrize( ids=[ "single_tool_call", "multiple_tool_calls", "tool_call_with_content_before", "tool_call_with_single_line_json", "tool_call_incomplete_tag", ], argnames=["model_output", "expected_tool_calls", "expected_content"], argvalues=[ ( """ {"name": "get_current_weather", "arguments": {"city": "Dallas", "state": "TX", "unit": "fahrenheit"}} """, [ ToolCall(function=FunctionCall( name="get_current_weather", arguments=json.dumps({ "city": "Dallas", "state": "TX", "unit": "fahrenheit", }), )) ], None, ), ( """ {"name": "get_current_weather", "arguments": {"city": "Dallas", "state": "TX", "unit": "fahrenheit"}} {"name": "get_current_weather", "arguments": {"city": "Orlando", "state": "FL", "unit": "fahrenheit"}} """, [ ToolCall(function=FunctionCall( name="get_current_weather", arguments=json.dumps({ "city": "Dallas", "state": "TX", "unit": "fahrenheit", }), )), ToolCall(function=FunctionCall( name="get_current_weather", arguments=json.dumps({ "city": "Orlando", "state": "FL", "unit": "fahrenheit", }), )), ], None, ), ( """I'll help you check the weather. {"name": "get_current_weather", "arguments": {"city": "Seattle", "state": "WA", "unit": "celsius"}} """, [ ToolCall(function=FunctionCall( name="get_current_weather", arguments=json.dumps({ "city": "Seattle", "state": "WA", "unit": "celsius", }), )) ], "I'll help you check the weather.", ), ( """ {"name": "get_current_weather", "arguments": {"city": "New York", "state": "NY", "unit": "celsius"}} """, [ ToolCall(function=FunctionCall( name="get_current_weather", arguments=json.dumps({ "city": "New York", "state": "NY", "unit": "celsius", }), )) ], None, ), ( """ {"name": "get_current_weather", "arguments": {"city": "Boston", "state": "MA"}}""", [ ToolCall(function=FunctionCall( name="get_current_weather", arguments=json.dumps({ "city": "Boston", "state": "MA", }), )) ], None, ), ], ) def test_extract_tool_calls(minimax_tool_parser, model_output, expected_tool_calls, expected_content): extracted_tool_calls = minimax_tool_parser.extract_tool_calls( model_output, request=None) # type: ignore[arg-type] assert extracted_tool_calls.tools_called assert_tool_calls(extracted_tool_calls.tool_calls, expected_tool_calls) assert extracted_tool_calls.content == expected_content def test_preprocess_model_output_with_thinking_tags(minimax_tool_parser): """Test that tool calls within thinking tags are removed during preprocessing.""" model_output = """Let me think about this. {"name": "fake_tool", "arguments": {"param": "value"}} This should be removed. I'll help you with that. {"name": "get_current_weather", "arguments": {"city": "Seattle", "state": "WA"}} """ processed_output = minimax_tool_parser.preprocess_model_output( model_output) # The tool call within thinking tags should be removed assert "fake_tool" not in processed_output # But the thinking tag itself should remain assert "" in processed_output assert "" in processed_output # The actual tool call outside thinking tags should remain assert "get_current_weather" in processed_output def test_extract_tool_calls_with_thinking_tags(minimax_tool_parser): """Test tool extraction when thinking tags contain tool calls that should be ignored.""" model_output = """I should use a tool. {"name": "ignored_tool", "arguments": {"should": "ignore"}} Let me help you with the weather. {"name": "get_current_weather", "arguments": {"city": "Miami", "state": "FL", "unit": "fahrenheit"}} """ extracted_tool_calls = minimax_tool_parser.extract_tool_calls( model_output, request=None) # type: ignore[arg-type] assert extracted_tool_calls.tools_called assert len(extracted_tool_calls.tool_calls) == 1 assert extracted_tool_calls.tool_calls[ 0].function.name == "get_current_weather" # Content extraction is based on the position of the first in the original model_output # Since preprocessing removes tool calls within thinking tags, the actual first is the external one expected_content = """I should use a tool. {"name": "ignored_tool", "arguments": {"should": "ignore"}} Let me help you with the weather.""" assert extracted_tool_calls.content == expected_content def test_extract_tool_calls_invalid_json(minimax_tool_parser): """Test that invalid JSON in tool calls is handled gracefully.""" model_output = """ {"name": "valid_tool", "arguments": {"city": "Seattle"}} {invalid json here} {"name": "another_valid_tool", "arguments": {"param": "value"}} """ extracted_tool_calls = minimax_tool_parser.extract_tool_calls( model_output, request=None) # type: ignore[arg-type] assert extracted_tool_calls.tools_called # Should extract only the valid JSON tool calls assert len(extracted_tool_calls.tool_calls) == 2 assert extracted_tool_calls.tool_calls[0].function.name == "valid_tool" assert extracted_tool_calls.tool_calls[ 1].function.name == "another_valid_tool" def test_extract_tool_calls_missing_name_or_arguments(minimax_tool_parser): """Test that tool calls missing name or arguments are filtered out.""" model_output = """ {"name": "valid_tool", "arguments": {"city": "Seattle"}} {"name": "missing_args"} {"arguments": {"city": "Portland"}} {"name": "another_valid_tool", "arguments": {"param": "value"}} """ extracted_tool_calls = minimax_tool_parser.extract_tool_calls( model_output, request=None) # type: ignore[arg-type] assert extracted_tool_calls.tools_called # Should extract only the valid tool calls with both name and arguments assert len(extracted_tool_calls.tool_calls) == 2 assert extracted_tool_calls.tool_calls[0].function.name == "valid_tool" assert extracted_tool_calls.tool_calls[ 1].function.name == "another_valid_tool" def test_streaming_basic_functionality(minimax_tool_parser): """Test basic streaming functionality.""" # Reset streaming state minimax_tool_parser.current_tool_name_sent = False minimax_tool_parser.prev_tool_call_arr = [] minimax_tool_parser.current_tool_id = -1 minimax_tool_parser.streamed_args_for_tool = [] # Test with a simple tool call current_text = """ {"name": "get_current_weather", "arguments": {"city": "Seattle"}} """ # First call should handle the initial setup result = minimax_tool_parser.extract_tool_calls_streaming( previous_text="", current_text=current_text, delta_text="", previous_token_ids=[], current_token_ids=[], delta_token_ids=[], request=None, ) # The result might be None or contain tool call information # This depends on the internal state management if result is not None and hasattr(result, 'tool_calls') and result.tool_calls: assert len(result.tool_calls) >= 0 def test_streaming_with_content_before_tool_calls(minimax_tool_parser): """Test streaming when there's content before tool calls.""" # Reset streaming state minimax_tool_parser.current_tool_name_sent = False minimax_tool_parser.prev_tool_call_arr = [] minimax_tool_parser.current_tool_id = -1 minimax_tool_parser.streamed_args_for_tool = [] current_text = "I'll help you with that. " # When there's content before tool calls, it should be returned as content result = minimax_tool_parser.extract_tool_calls_streaming( previous_text="I'll help you", current_text=current_text, delta_text=" with that. ", previous_token_ids=[], current_token_ids=[], delta_token_ids=[], request=None, ) if result is not None and hasattr(result, 'content'): # Should contain some content assert result.content is not None def test_streaming_no_tool_calls(minimax_tool_parser): """Test streaming when there are no tool calls.""" current_text = "This is just regular text without any tool calls." result = minimax_tool_parser.extract_tool_calls_streaming( previous_text="This is just regular text", current_text=current_text, delta_text=" without any tool calls.", previous_token_ids=[], current_token_ids=[], delta_token_ids=[], request=None, ) # Should return the delta text as content assert result is not None assert hasattr(result, 'content') assert result.content == " without any tool calls." def test_streaming_with_thinking_tags(minimax_tool_parser): """Test streaming with thinking tags that contain tool calls.""" # Reset streaming state minimax_tool_parser.current_tool_name_sent = False minimax_tool_parser.prev_tool_call_arr = [] minimax_tool_parser.current_tool_id = -1 minimax_tool_parser.streamed_args_for_tool = [] current_text = """{"name": "ignored", "arguments": {}}{"name": "real_tool", "arguments": {"param": "value"}}""" result = minimax_tool_parser.extract_tool_calls_streaming( previous_text="", current_text=current_text, delta_text=current_text, previous_token_ids=[], current_token_ids=[], delta_token_ids=[], request=None, ) # The preprocessing should remove tool calls from thinking tags # and only process the real tool call if result is not None and hasattr(result, 'tool_calls') and result.tool_calls: for tool_call in result.tool_calls: assert tool_call.function.name != "ignored" def test_extract_tool_calls_multiline_json_not_supported(minimax_tool_parser): """Test that multiline JSON in tool calls is not currently supported.""" model_output = """ { "name": "get_current_weather", "arguments": { "city": "New York", "state": "NY", "unit": "celsius" } } """ extracted_tool_calls = minimax_tool_parser.extract_tool_calls( model_output, request=None) # type: ignore[arg-type] # Multiline JSON is currently not supported, should return no tools called assert not extracted_tool_calls.tools_called assert extracted_tool_calls.tool_calls == [] assert extracted_tool_calls.content is None def test_streaming_arguments_incremental_output(minimax_tool_parser): """Test that streaming arguments are returned incrementally, not cumulatively.""" # Reset streaming state minimax_tool_parser.current_tool_name_sent = False minimax_tool_parser.prev_tool_call_arr = [] minimax_tool_parser.current_tool_id = -1 minimax_tool_parser.streamed_args_for_tool = [] # Simulate progressive tool call building stages = [ # Stage 1: Function name complete '\n{"name": "get_current_weather", "arguments": ', # Stage 2: Arguments object starts with first key '\n{"name": "get_current_weather", "arguments": {"city": ', # Stage 3: First parameter value added '\n{"name": "get_current_weather", "arguments": {"city": "Seattle"', # Stage 4: Second parameter added '\n{"name": "get_current_weather", "arguments": {"city": "Seattle", "state": "WA"', # Stage 5: Third parameter added, arguments complete '\n{"name": "get_current_weather", "arguments": {"city": "Seattle", "state": "WA", "unit": "celsius"}}', # Stage 6: Tool calls closed '\n{"name": "get_current_weather", "arguments": {"city": "Seattle", "state": "WA", "unit": "celsius"}}\n\n{"name": "get_current_weather", "arguments": {"city": "Seattle", "state": "WA", "unit": "celsius"}}\n' ] function_name_sent = False previous_args_content = "" for i, current_text in enumerate(stages): previous_text = stages[i - 1] if i > 0 else "" delta_text = current_text[len(previous_text ):] if i > 0 else current_text result = minimax_tool_parser.extract_tool_calls_streaming( previous_text=previous_text, current_text=current_text, delta_text=delta_text, previous_token_ids=[], current_token_ids=[], delta_token_ids=[], request=None, ) print(f"Stage {i}: Current text: {repr(current_text)}") print(f"Stage {i}: Delta text: {repr(delta_text)}") if result is not None and hasattr(result, 'tool_calls') and result.tool_calls: tool_call = result.tool_calls[0] # Check if function name is sent (should happen only once) if tool_call.function and tool_call.function.name: assert tool_call.function.name == "get_current_weather" function_name_sent = True print( f"Stage {i}: Function name sent: {tool_call.function.name}" ) # Check if arguments are sent incrementally if tool_call.function and tool_call.function.arguments: args_fragment = tool_call.function.arguments print( f"Stage {i}: Got arguments fragment: {repr(args_fragment)}" ) # For incremental output, each fragment should be new content only # The fragment should not contain all previous content if i >= 2 and previous_args_content: # After we start getting arguments # The new fragment should not be identical to or contain all previous content assert args_fragment != previous_args_content, f"Fragment should be incremental, not cumulative: {args_fragment}" # If this is truly incremental, the fragment should be relatively small # compared to the complete arguments so far if len(args_fragment) > len(previous_args_content): print( "Warning: Fragment seems cumulative rather than incremental" ) previous_args_content = args_fragment # Verify function name was sent at least once assert function_name_sent, "Function name should have been sent" def test_streaming_arguments_delta_only(minimax_tool_parser): """Test that each streaming call returns only the delta (new part) of arguments.""" # Reset streaming state minimax_tool_parser.current_tool_name_sent = False minimax_tool_parser.prev_tool_call_arr = [] minimax_tool_parser.current_tool_id = -1 minimax_tool_parser.streamed_args_for_tool = [] # Simulate two consecutive calls with growing arguments call1_text = '\n{"name": "test_tool", "arguments": {"param1": "value1"}}' call2_text = '\n{"name": "test_tool", "arguments": {"param1": "value1", "param2": "value2"}}' print(f"Call 1 text: {repr(call1_text)}") print(f"Call 2 text: {repr(call2_text)}") # First call - should get the function name and initial arguments result1 = minimax_tool_parser.extract_tool_calls_streaming( previous_text="", current_text=call1_text, delta_text=call1_text, previous_token_ids=[], current_token_ids=[], delta_token_ids=[], request=None, ) print(f"Result 1: {result1}") if result1 and hasattr(result1, 'tool_calls') and result1.tool_calls: for i, tc in enumerate(result1.tool_calls): print(f" Tool call {i}: {tc}") # Second call - should only get the delta (new part) of arguments result2 = minimax_tool_parser.extract_tool_calls_streaming( previous_text=call1_text, current_text=call2_text, delta_text=', "param2": "value2"}', previous_token_ids=[], current_token_ids=[], delta_token_ids=[], request=None, ) print(f"Result 2: {result2}") if result2 and hasattr(result2, 'tool_calls') and result2.tool_calls: for i, tc in enumerate(result2.tool_calls): print(f" Tool call {i}: {tc}") # Verify the second call only returns the delta if result2 is not None and hasattr(result2, 'tool_calls') and result2.tool_calls: tool_call = result2.tool_calls[0] if tool_call.function and tool_call.function.arguments: args_delta = tool_call.function.arguments print(f"Arguments delta from second call: {repr(args_delta)}") # Should only contain the new part, not the full arguments # The delta should be something like ', "param2": "value2"}' or just '"param2": "value2"' assert ', "param2": "value2"}' in args_delta or '"param2": "value2"' in args_delta, f"Expected delta containing param2, got: {args_delta}" # Should NOT contain the previous parameter data assert '"param1": "value1"' not in args_delta, f"Arguments delta should not contain previous data: {args_delta}" # The delta should be relatively short (incremental, not cumulative) expected_max_length = len( ', "param2": "value2"}') + 10 # Some tolerance assert len( args_delta ) <= expected_max_length, f"Delta seems too long (possibly cumulative): {args_delta}" print("✓ Delta validation passed") else: print("No arguments in result2 tool call") else: print("No tool calls in result2 or result2 is None") # This might be acceptable if no incremental update is needed # But let's at least verify that result1 had some content assert result1 is not None, "At least the first call should return something" def test_streaming_openai_compatibility(minimax_tool_parser): """Test that streaming behavior with buffering works correctly.""" # Reset streaming state minimax_tool_parser.current_tool_name_sent = False minimax_tool_parser.prev_tool_call_arr = [] minimax_tool_parser.current_tool_id = -1 minimax_tool_parser.streamed_args_for_tool = [] # Reset buffering state minimax_tool_parser.pending_buffer = "" minimax_tool_parser.in_thinking_tag = False minimax_tool_parser.thinking_depth = 0 # Test scenario: simple buffering without complex tool call context test_cases: list[dict[str, Any]] = [ { 'stage': 'Token: <', 'previous': '', 'current': '<', 'delta': '<', 'expected_content': None, # Should be buffered }, { 'stage': 'Token: tool_calls>', 'previous': '<', 'current': '', 'delta': 'tool_calls>', 'expected_content': None, # Complete tag, should not output }, { 'stage': 'Regular content', 'previous': 'Hello', 'current': 'Hello world', 'delta': ' world', 'expected_content': ' world', # Normal content should pass through }, { 'stage': 'Content with end tag start', 'previous': 'Text', 'current': 'Text content', 'delta': 'calls>', 'expected_content': None, # Complete close tag, should not output }, ] for i, test_case in enumerate(test_cases): print(f"\n--- Stage {i}: {test_case['stage']} ---") print(f"Previous: {repr(test_case['previous'])}") print(f"Current: {repr(test_case['current'])}") print(f"Delta: {repr(test_case['delta'])}") result = minimax_tool_parser.extract_tool_calls_streaming( previous_text=test_case['previous'], current_text=test_case['current'], delta_text=test_case['delta'], previous_token_ids=[], current_token_ids=[], delta_token_ids=[], request=None, ) print(f"Result: {result}") # Check expected content if test_case['expected_content'] is None: assert result is None or not getattr(result, 'content', None), \ f"Stage {i}: Expected no content, got {result}" print("✓ No content output as expected") else: assert result is not None and hasattr(result, 'content'), \ f"Stage {i}: Expected content, got {result}" assert result.content == test_case['expected_content'], \ f"Stage {i}: Expected content {test_case['expected_content']}, got {result.content}" print(f"✓ Content matches: {repr(result.content)}") print("✓ Streaming test with buffering completed successfully") def test_streaming_thinking_tag_buffering(minimax_tool_parser): """Test that tool calls within thinking tags are properly handled during streaming.""" # Reset streaming state minimax_tool_parser.current_tool_name_sent = False minimax_tool_parser.prev_tool_call_arr = [] minimax_tool_parser.current_tool_id = -1 minimax_tool_parser.streamed_args_for_tool = [] # Reset buffering state minimax_tool_parser.pending_buffer = "" minimax_tool_parser.in_thinking_tag = False minimax_tool_parser.thinking_depth = 0 # Test scenario: tool calls within thinking tags should be ignored test_cases: list[dict[str, Any]] = [ { 'stage': 'Start thinking', 'previous': '', 'current': 'I need to use a tool. ', 'delta': 'I need to use a tool. ', 'expected_content': 'I need to use a tool. ', # Should pass through as content }, { 'stage': 'Tool call in thinking', 'previous': 'I need to use a tool. ', 'current': 'I need to use a tool. \n{"name": "ignored_tool", "arguments": {"param": "value"}}\n', 'delta': '\n{"name": "ignored_tool", "arguments": {"param": "value"}}\n', 'expected_content': '\n{"name": "ignored_tool", "arguments": {"param": "value"}}\n', # should be preserved in thinking tags }, { 'stage': 'Real tool call after thinking', 'previous': 'I need to use a tool. \n{"name": "ignored_tool", "arguments": {"param": "value"}}\n', 'current': 'I need to use a tool. \n{"name": "ignored_tool", "arguments": {"param": "value"}}\n\n', 'delta': '\n', 'expected_content': '\n', # Should output '\n' and suppress } ] for i, test_case in enumerate(test_cases): print(f"\n--- Stage {i}: {test_case['stage']} ---") print(f"Previous: {repr(test_case['previous'])}") print(f"Current: {repr(test_case['current'])}") print(f"Delta: {repr(test_case['delta'])}") result = minimax_tool_parser.extract_tool_calls_streaming( previous_text=test_case['previous'], current_text=test_case['current'], delta_text=test_case['delta'], previous_token_ids=[], current_token_ids=[], delta_token_ids=[], request=None, ) print(f"Result: {result}") # Check expected content if 'expected_content' in test_case: if test_case['expected_content'] is None: assert result is None or not getattr(result, 'content', None), \ f"Stage {i}: Expected no content, got {result}" else: assert result is not None and hasattr(result, 'content'), \ f"Stage {i}: Expected content, got {result}" assert result.content == test_case['expected_content'], \ f"Stage {i}: Expected content {test_case['expected_content']}, got {result.content}" print(f"✓ Content matches: {repr(result.content)}") # Check tool calls if test_case.get('expected_tool_call'): assert result is not None and hasattr(result, 'tool_calls') and result.tool_calls, \ f"Stage {i}: Expected tool call, got {result}" tool_call = result.tool_calls[0] assert tool_call.function.name == "real_tool", \ f"Expected real_tool, got {tool_call.function.name}" print(f"✓ Real tool call detected: {tool_call.function.name}") print("✓ Thinking tag buffering test completed successfully") def reset_streaming_state(minimax_tool_parser): """Helper function to properly reset the streaming state for MinimaxToolParser.""" # Reset minimax-specific state minimax_tool_parser._reset_streaming_state() # Reset base class state (these should still be reset for compatibility) minimax_tool_parser.prev_tool_call_arr = [] minimax_tool_parser.current_tool_id = -1 minimax_tool_parser.current_tool_name_sent = False minimax_tool_parser.streamed_args_for_tool = [] def test_streaming_complex_scenario_with_multiple_tools(minimax_tool_parser): """Test complex streaming scenario: tools inside tags and multiple tool calls in one group.""" # Reset streaming state reset_streaming_state(minimax_tool_parser) # Complex scenario: tools inside thinking tags and multiple tools in one group test_stages: list[dict[str, Any]] = [ { 'stage': 'Initial content', 'previous': '', 'current': 'Let me help you with this task.', 'delta': 'Let me help you with this task.', 'expected_content': 'Let me help you with this task.', 'expected_tool_calls': 0, }, { 'stage': 'Start thinking tag', 'previous': 'Let me help you with this task.', 'current': 'Let me help you with this task.I need to analyze this situation first.', 'delta': 'I need to analyze this situation first.', 'expected_content': 'I need to analyze this situation first.', 'expected_tool_calls': 0, }, { 'stage': 'Tool call inside thinking tag starts', 'previous': 'Let me help you with this task.I need to analyze this situation first.', 'current': 'Let me help you with this task.I need to analyze this situation first.', 'delta': '', 'expected_content': '', # Inside thinking tags, tool tags should be preserved as content 'expected_tool_calls': 0, }, { 'stage': 'Complete tool call inside thinking tag', 'previous': 'Let me help you with this task.I need to analyze this situation first.', 'current': 'Let me help you with this task.I need to analyze this situation first.\n{"name": "internal_analysis", "arguments": {"query": "analyze situation"}}\n', 'delta': '\n{"name": "internal_analysis", "arguments": {"query": "analyze situation"}}\n', 'expected_content': '\n{"name": "internal_analysis", "arguments": {"query": "analyze situation"}}\n', 'expected_tool_calls': 0, # Tools inside thinking tags should be ignored }, { 'stage': 'End thinking tag', 'previous': 'Let me help you with this task.I need to analyze this situation first.\n{"name": "internal_analysis", "arguments": {"query": "analyze situation"}}\n', 'current': 'Let me help you with this task.I need to analyze this situation first.\n{"name": "internal_analysis", "arguments": {"query": "analyze situation"}}\n', 'delta': '', 'expected_content': '', 'expected_tool_calls': 0, }, { 'stage': 'Multiple tools group starts', 'previous': 'Let me help you with this task.I need to analyze this situation first.\n{"name": "internal_analysis", "arguments": {"query": "analyze situation"}}\n', 'current': 'Let me help you with this task.I need to analyze this situation first.\n{"name": "internal_analysis", "arguments": {"query": "analyze situation"}}\n\nNow I need to get weather information and calculate area.', 'delta': '\nNow I need to get weather information and calculate area.', 'expected_content': '\nNow I need to get weather information and calculate area.', # should be filtered 'expected_tool_calls': 0, }, { 'stage': 'First tool in group', 'previous': 'Let me help you with this task.I need to analyze this situation first.\n{"name": "internal_analysis", "arguments": {"query": "analyze situation"}}\n\nNow I need to get weather information and calculate area.', 'current': 'Let me help you with this task.I need to analyze this situation first.\n{"name": "internal_analysis", "arguments": {"query": "analyze situation"}}\n\nNow I need to get weather information and calculate area.\n{"name": "get_current_weather", "arguments": {"city": "Seattle", "state": "WA", "unit": "celsius"}}', 'delta': '\n{"name": "get_current_weather", "arguments": {"city": "Seattle", "state": "WA", "unit": "celsius"}}', 'expected_content': None, # No content should be output when tool call is in progress 'expected_tool_calls': 1, 'expected_tool_name': 'get_current_weather', }, { 'stage': 'Second tool in group', 'previous': 'Let me help you with this task.I need to analyze this situation first.\n{"name": "internal_analysis", "arguments": {"query": "analyze situation"}}\n\nNow I need to get weather information and calculate area.\n{"name": "get_current_weather", "arguments": {"city": "Seattle", "state": "WA", "unit": "celsius"}}', 'current': 'Let me help you with this task.I need to analyze this situation first.\n{"name": "internal_analysis", "arguments": {"query": "analyze situation"}}\n\nNow I need to get weather information and calculate area.\n{"name": "get_current_weather", "arguments": {"city": "Seattle", "state": "WA", "unit": "celsius"}}\n{"name": "calculate_area", "arguments": {"shape": "rectangle", "dimensions": {"width": 10, "height": 5}}}', 'delta': '\n{"name": "calculate_area", "arguments": {"shape": "rectangle", "dimensions": {"width": 10, "height": 5}}}', 'expected_content': None, 'expected_tool_calls': 1, 'expected_tool_name': 'calculate_area', }, { 'stage': 'Complete tool calls group', 'previous': 'Let me help you with this task.I need to analyze this situation first.\n{"name": "internal_analysis", "arguments": {"query": "analyze situation"}}\n\nNow I need to get weather information and calculate area.\n{"name": "get_current_weather", "arguments": {"city": "Seattle", "state": "WA", "unit": "celsius"}}\n{"name": "calculate_area", "arguments": {"shape": "rectangle", "dimensions": {"width": 10, "height": 5}}}', 'current': 'Let me help you with this task.I need to analyze this situation first.\n{"name": "internal_analysis", "arguments": {"query": "analyze situation"}}\n\nNow I need to get weather information and calculate area.\n{"name": "get_current_weather", "arguments": {"city": "Seattle", "state": "WA", "unit": "celsius"}}\n{"name": "calculate_area", "arguments": {"shape": "rectangle", "dimensions": {"width": 10, "height": 5}}}', 'delta': '', 'expected_content': None, 'expected_tool_calls': 0, } ] tool_calls_count = 0 for i, test_case in enumerate(test_stages): print(f"\n--- Stage {i}: {test_case['stage']} ---") print( f"Previous: {repr(test_case['previous'][:100])}{'...' if len(test_case['previous']) > 100 else ''}" ) print(f"Current: {repr(test_case['current'][-100:])}") print(f"Delta: {repr(test_case['delta'])}") result = minimax_tool_parser.extract_tool_calls_streaming( previous_text=test_case['previous'], current_text=test_case['current'], delta_text=test_case['delta'], previous_token_ids=[], current_token_ids=[], delta_token_ids=[], request=None, ) print(f"Result: {result}") # Check expected content if test_case['expected_content'] is None: assert result is None or not getattr(result, 'content', None), \ f"Stage {i}: Expected no content output, got {result}" print("✓ No content output as expected") else: assert result is not None and hasattr(result, 'content'), \ f"Stage {i}: Expected content output, got {result}" assert result.content == test_case['expected_content'], \ f"Stage {i}: Expected content {repr(test_case['expected_content'])}, got {repr(result.content)}" print(f"✓ Content matches: {repr(result.content)}") # Check tool calls expected_tool_calls = test_case['expected_tool_calls'] actual_tool_calls = len(result.tool_calls) if result and hasattr( result, 'tool_calls') and result.tool_calls else 0 if expected_tool_calls > 0: assert actual_tool_calls >= expected_tool_calls, \ f"Stage {i}: Expected at least {expected_tool_calls} tool calls, got {actual_tool_calls}" if 'expected_tool_name' in test_case: # Find the tool call with the expected name found_tool_call = None for tool_call in result.tool_calls: if tool_call.function.name == test_case[ 'expected_tool_name']: found_tool_call = tool_call break assert found_tool_call is not None, \ f"Stage {i}: Expected tool name {test_case['expected_tool_name']} not found in tool calls: {[tc.function.name for tc in result.tool_calls]}" print(f"✓ Tool call correct: {found_tool_call.function.name}") # Ensure tools inside thinking tags are not called assert found_tool_call.function.name != "internal_analysis", \ f"Stage {i}: Tool 'internal_analysis' inside thinking tags should not be called" tool_calls_count += actual_tool_calls print(f"✓ Detected {actual_tool_calls} tool calls") else: assert actual_tool_calls == 0, \ f"Stage {i}: Expected no tool calls, got {actual_tool_calls}" # Verify overall results print("\n=== Test Summary ===") print(f"Total tool calls count: {tool_calls_count}") assert tool_calls_count >= 2, f"Expected at least 2 valid tool calls (outside thinking tags), but got {tool_calls_count}" print("✓ Complex streaming test completed:") print(" - ✓ Tools inside thinking tags correctly ignored") print(" - ✓ Two tool groups outside thinking tags correctly parsed") print(" - ✓ Content and tool call streaming correctly handled") print(" - ✓ Buffering mechanism works correctly") def test_streaming_character_by_character_output(minimax_tool_parser): """Test character-by-character streaming output to simulate real streaming scenarios.""" # Reset streaming state reset_streaming_state(minimax_tool_parser) # Complete text that will be streamed character by character complete_text = """I'll help you with the weather analysis. Let me think about this. {"name": "internal_analysis", "arguments": {"type": "thinking"}} This tool should be ignored. Now I'll get the weather information for you. {"name": "get_current_weather", "arguments": {"city": "Seattle", "state": "WA", "unit": "celsius"}} {"name": "calculate_area", "arguments": {"shape": "rectangle", "dimensions": {"width": 10, "height": 5}}} Here are the results.""" print("\n=== Starting character-by-character streaming test ===") print(f"Complete text length: {len(complete_text)} characters") # Track the streaming results content_fragments = [] tool_calls_detected = [] # Stream character by character for i in range(1, len(complete_text) + 1): current_text = complete_text[:i] previous_text = complete_text[:i - 1] if i > 1 else "" delta_text = complete_text[i - 1:i] # Show progress every 50 characters if i % 50 == 0 or i == len(complete_text): print(f"Progress: {i}/{len(complete_text)} characters") # Call the streaming parser result = minimax_tool_parser.extract_tool_calls_streaming( previous_text=previous_text, current_text=current_text, delta_text=delta_text, previous_token_ids=[], current_token_ids=[], delta_token_ids=[], request=None, ) # Collect results if result is not None: if hasattr(result, 'content') and result.content: content_fragments.append(result.content) # Log important content fragments if any( keyword in result.content for keyword in ['', '', '', '']): print( f" Char {i}: Content fragment: {repr(result.content)}" ) if hasattr(result, 'tool_calls') and result.tool_calls: for tool_call in result.tool_calls: tool_info = { 'character_position': i, 'function_name': tool_call.function.name if tool_call.function else None, 'arguments': tool_call.function.arguments if tool_call.function else None, } tool_calls_detected.append(tool_info) print( f" Char {i}: Tool call detected: {tool_call.function.name}" ) if tool_call.function.arguments: print( f" Arguments: {repr(tool_call.function.arguments)}" ) # Verify results print("\n=== Streaming Test Results ===") print(f"Total content fragments: {len(content_fragments)}") print(f"Total tool calls detected: {len(tool_calls_detected)}") # Reconstruct content from fragments reconstructed_content = ''.join(content_fragments) print(f"Reconstructed content length: {len(reconstructed_content)}") # Verify thinking tags content is preserved assert '' in reconstructed_content, "Opening thinking tag should be preserved in content" assert '' in reconstructed_content, "Closing thinking tag should be preserved in content" # Verify that tool calls inside thinking tags are NOT extracted as actual tool calls thinking_tool_calls = [ tc for tc in tool_calls_detected if tc['function_name'] == 'internal_analysis' ] assert len( thinking_tool_calls ) == 0, f"Tool calls inside thinking tags should be ignored, but found: {thinking_tool_calls}" # Verify that real tool calls outside thinking tags ARE extracted weather_tool_calls = [ tc for tc in tool_calls_detected if tc['function_name'] == 'get_current_weather' ] area_tool_calls = [ tc for tc in tool_calls_detected if tc['function_name'] == 'calculate_area' ] print(tool_calls_detected) assert len(weather_tool_calls ) > 0, "get_current_weather tool call should be detected" assert len( area_tool_calls) > 0, "calculate_area tool call should be detected" # Verify tool call arguments are properly streamed weather_args_found = any(tc['arguments'] for tc in weather_tool_calls if tc['arguments']) area_args_found = any(tc['arguments'] for tc in area_tool_calls if tc['arguments']) print(f"Weather tool call with arguments: {weather_args_found}") print(f"Area tool call with arguments: {area_args_found}") # Verify content before and after tool calls assert 'I\'ll help you with the weather analysis.' in reconstructed_content, "Initial content should be preserved" assert 'Here are the results.' in reconstructed_content, "Final content should be preserved" # Verify that and tags are not included in the final content # (they should be filtered out when not inside thinking tags) content_outside_thinking = reconstructed_content # Remove thinking tag content to check content outside if '' in content_outside_thinking and '' in content_outside_thinking: start_think = content_outside_thinking.find('') end_think = content_outside_thinking.find('') + len('') content_outside_thinking = content_outside_thinking[: start_think] + content_outside_thinking[ end_think:] # Outside thinking tags, tool_calls tags should be filtered tool_calls_in_content = content_outside_thinking.count('') assert tool_calls_in_content == 0, f" tags should be filtered from content outside thinking tags, but found {tool_calls_in_content}" print( "\n=== Character-by-character streaming test completed successfully ===" ) print("✓ Tool calls inside thinking tags correctly ignored") print("✓ Tool calls outside thinking tags correctly detected") print("✓ Content properly streamed and reconstructed") print("✓ Tool call tags properly filtered from content") print("✓ Character-level streaming works correctly") def test_streaming_character_by_character_simple_tool_call( minimax_tool_parser): """Test character-by-character streaming for a simple tool call scenario.""" # Reset streaming state reset_streaming_state(minimax_tool_parser) # Simple tool call text simple_text = 'Let me check the weather. \n{"name": "get_weather", "arguments": {"city": "NYC"}}\n' print("\n=== Simple character-by-character test ===") print(f"Text: {repr(simple_text)}") content_parts = [] tool_name_sent = False tool_args_sent = False for i in range(1, len(simple_text) + 1): current_text = simple_text[:i] previous_text = simple_text[:i - 1] if i > 1 else "" delta_text = simple_text[i - 1:i] result = minimax_tool_parser.extract_tool_calls_streaming( previous_text=previous_text, current_text=current_text, delta_text=delta_text, previous_token_ids=[], current_token_ids=[], delta_token_ids=[], request=None, ) if result: if hasattr(result, 'content') and result.content: content_parts.append(result.content) print( f" Char {i} ({repr(delta_text)}): Content: {repr(result.content)}" ) if hasattr(result, 'tool_calls') and result.tool_calls: for tool_call in result.tool_calls: if tool_call.function and tool_call.function.name: tool_name_sent = True print( f" Char {i}: Tool name: {tool_call.function.name}" ) if tool_call.function and tool_call.function.arguments: tool_args_sent = True print( f" Char {i}: Tool args: {repr(tool_call.function.arguments)}" ) # Verify basic expectations reconstructed_content = ''.join(content_parts) print(f"Final reconstructed content: {repr(reconstructed_content)}") assert tool_name_sent, "Tool name should be sent during streaming" assert tool_args_sent, "Tool arguments should be sent during streaming" assert "Let me check the weather." in reconstructed_content, "Initial content should be preserved" print("✓ Simple character-by-character test passed") def test_streaming_character_by_character_with_buffering(minimax_tool_parser): """Test character-by-character streaming with edge cases that trigger buffering.""" # Reset streaming state reset_streaming_state(minimax_tool_parser) # Text that includes potential buffering scenarios buffering_text = 'Hello world\n{"name": "test"}\ndone' print("\n=== Buffering character-by-character test ===") print(f"Text: {repr(buffering_text)}") all_content = [] for i in range(1, len(buffering_text) + 1): current_text = buffering_text[:i] previous_text = buffering_text[:i - 1] if i > 1 else "" delta_text = buffering_text[i - 1:i] result = minimax_tool_parser.extract_tool_calls_streaming( previous_text=previous_text, current_text=current_text, delta_text=delta_text, previous_token_ids=[], current_token_ids=[], delta_token_ids=[], request=None, ) if result and hasattr(result, 'content') and result.content: all_content.append(result.content) print(f" Char {i} ({repr(delta_text)}): {repr(result.content)}") final_content = ''.join(all_content) print(f"Final content: {repr(final_content)}") # The parser should handle the edge case where appears before assert "Hello" in final_content, "Initial 'Hello' should be preserved" assert "world" in final_content, "Content after false closing tag should be preserved" assert "done" in final_content, "Final content should be preserved" print("✓ Buffering character-by-character test passed")