class MiniCPM5XMLToolParser(ToolParser):
"""MiniCPM5 XML tool parser."""
def __init__(
self,
tokenizer: TokenizerLike,
tools: list[Tool] | None = None,
):
super().__init__(tokenizer, tools)
self.tool_call_start_token = "<function"
self.tool_call_end_token = "</function>"
self._processed_len = 0
def _reset_stream_state(self) -> None:
self._processed_len = 0
self.current_tool_id = -1
self.current_tool_name_sent = False
self.prev_tool_call_arr = []
self.streamed_args_for_tool = []
def adjust_request(
self, request: ChatCompletionRequest | ResponsesRequest
) -> ChatCompletionRequest | ResponsesRequest:
request = super().adjust_request(request)
if request.tools and request.tool_choice != "none":
# Tool XML tags are special tokens in MiniCPM5; must not strip them
# before tool parsing (see internlm2/mistral vLLM tool parsers).
request.skip_special_tokens = False
return request
def extract_tool_calls(
self,
model_output: str,
request: ChatCompletionRequest,
) -> ExtractedToolCallInformation:
model_output = _normalize_model_output(model_output)
if self.tool_call_start_token not in model_output:
logger.debug("[MiniCPM5XMLToolParser] no <function token in output")
return ExtractedToolCallInformation(
tools_called=False,
tool_calls=[],
content=_strip_thinking_content(model_output),
)
tool_names, name_to_allowed_props, name_to_required, name_to_tool = (
_build_tool_maps(request.tools)
)
tool_calls: list[ToolCall] = []
normal_parts: list[str] = []
last_end = 0
try:
for match in _FUNC_BLOCK_REGEX.finditer(model_output):
if match.start() > last_end:
normal_parts.append(model_output[last_end : match.start()])
block = match.group(0)
parsed = _parse_function_block(
block,
tool_names,
name_to_allowed_props,
name_to_required,
name_to_tool,
)
if parsed is not None:
tool_calls.append(
ToolCall(
id=f"call_{random_uuid()}",
type="function",
function=FunctionCall(
name=parsed["name"],
arguments=json.dumps(
parsed["parameters"],
ensure_ascii=False,
),
),
)
)
else:
normal_parts.append(block)
last_end = match.end()
if last_end < len(model_output):
normal_parts.append(model_output[last_end:])
content = _strip_thinking_content("".join(normal_parts).strip())
logger.debug(
"[MiniCPM5XMLToolParser] extracted %d tool calls",
len(tool_calls),
)
tools_called = len(tool_calls) > 0
return ExtractedToolCallInformation(
tools_called=tools_called,
tool_calls=tool_calls,
content=None if tools_called else content,
)
except Exception as e:
logger.error("Error in MiniCPM5XMLToolParser.extract_tool_calls: %s", e)
return ExtractedToolCallInformation(
tools_called=False,
tool_calls=[],
content=model_output,
)
def _emit_tool_args_delta(
self,
tool_index: int,
args_json: str,
*,
is_complete: bool = False,
) -> DeltaMessage | None:
prev_args = (
self.streamed_args_for_tool[tool_index]
if tool_index < len(self.streamed_args_for_tool)
else ""
)
arg_diff = _streaming_args_diff(
prev_args,
args_json,
is_complete=is_complete,
)
if not arg_diff:
return None
while len(self.streamed_args_for_tool) <= tool_index:
self.streamed_args_for_tool.append("")
self.streamed_args_for_tool[tool_index] = prev_args + arg_diff
return DeltaMessage(
tool_calls=[
DeltaToolCall(
index=tool_index,
function=DeltaFunctionCall(
arguments=arg_diff,
).model_dump(exclude_none=True),
)
],
)
def _start_tool_call(self, func_name: str) -> DeltaMessage:
self.current_tool_id += 1
self.current_tool_name_sent = True
while len(self.streamed_args_for_tool) <= self.current_tool_id:
self.streamed_args_for_tool.append("")
while len(self.prev_tool_call_arr) <= self.current_tool_id:
self.prev_tool_call_arr.append({})
self.prev_tool_call_arr[self.current_tool_id] = {"name": func_name}
return DeltaMessage(
tool_calls=[
DeltaToolCall(
index=self.current_tool_id,
id=make_tool_call_id(),
type="function",
function=DeltaFunctionCall(
name=func_name,
).model_dump(exclude_none=True),
)
],
)
def _process_complete_block_streaming(
self,
block: str,
request: ChatCompletionRequest,
) -> DeltaMessage | None:
tool_names, name_to_allowed_props, name_to_required, name_to_tool = (
_build_tool_maps(request.tools)
)
parsed = _parse_function_block(
block,
tool_names,
name_to_allowed_props,
name_to_required,
name_to_tool,
)
if parsed is None:
return DeltaMessage(content=block)
args_json = json.dumps(parsed["parameters"], ensure_ascii=False)
func_name = parsed["name"]
if not self.current_tool_name_sent:
self.current_tool_id += 1
tool_index = self.current_tool_id
while len(self.streamed_args_for_tool) <= tool_index:
self.streamed_args_for_tool.append("")
while len(self.prev_tool_call_arr) <= tool_index:
self.prev_tool_call_arr.append({})
self.streamed_args_for_tool[tool_index] = args_json
self.prev_tool_call_arr[tool_index] = {
"name": func_name,
"arguments": parsed["parameters"],
}
if not self.prev_tool_call_arr:
self.prev_tool_call_arr = [{"arguments": {}}]
return DeltaMessage(
tool_calls=[
DeltaToolCall(
index=tool_index,
id=make_tool_call_id(),
type="function",
function=DeltaFunctionCall(
name=func_name,
arguments=args_json,
).model_dump(exclude_none=True),
)
],
)
tool_index = self.current_tool_id
self.prev_tool_call_arr[tool_index]["arguments"] = parsed["parameters"]
delta = self._emit_tool_args_delta(
tool_index,
args_json,
is_complete=True,
)
self.current_tool_name_sent = False
if delta:
if not self.prev_tool_call_arr:
self.prev_tool_call_arr = [{"arguments": {}}]
return delta
return DeltaMessage(content="")
def _process_partial_block_streaming(
self,
block: str,
request: ChatCompletionRequest,
) -> DeltaMessage | None:
tool_names, name_to_allowed_props, _, name_to_tool = _build_tool_maps(
request.tools
)
if _PARAM_MISSING_NAME_REGEX.search(block):
return None
match = _FUNC_NAME_V1_REGEX.search(block)
if not match:
return None
func_name = (match.group(1) or "").strip()
if func_name not in tool_names:
return None
if not self.current_tool_name_sent:
return self._start_tool_call(func_name)
arguments = _parse_partial_params(
block,
func_name,
name_to_allowed_props,
name_to_tool,
)
if not arguments:
return None
args_json = json.dumps(arguments, ensure_ascii=False)
self.prev_tool_call_arr[self.current_tool_id]["arguments"] = arguments
delta = self._emit_tool_args_delta(
self.current_tool_id,
args_json,
is_complete=False,
)
if delta and not self.prev_tool_call_arr:
self.prev_tool_call_arr = [{"arguments": {}}]
return delta
def extract_tool_calls_streaming(
self,
previous_text: str,
current_text: str,
delta_text: str,
previous_token_ids: Sequence[int],
current_token_ids: Sequence[int],
delta_token_ids: Sequence[int],
request: ChatCompletionRequest,
) -> DeltaMessage | None:
del previous_token_ids, current_token_ids, delta_token_ids
try:
current_text = _normalize_model_output(current_text)
if not previous_text:
self._reset_stream_state()
if self.tool_call_start_token not in current_text:
if self._processed_len < len(current_text):
content = current_text[self._processed_len :]
self._processed_len = len(current_text)
return DeltaMessage(content=content) if content else None
return None
for match in _FUNC_BLOCK_REGEX.finditer(current_text):
if match.end() <= self._processed_len:
continue
if match.start() > self._processed_len:
gap = current_text[self._processed_len : match.start()]
self._processed_len = match.start()
return DeltaMessage(content=gap) if gap else None
block = match.group(0)
delta = self._process_complete_block_streaming(block, request)
self._processed_len = match.end()
if delta is not None:
return delta
remainder = current_text[self._processed_len :]
if not remainder:
if (
not delta_text
and self.current_tool_id >= 0
and not self.current_tool_name_sent
):
return DeltaMessage(content="")
return None
func_idx = remainder.find(self.tool_call_start_token)
if func_idx > 0:
gap = remainder[:func_idx]
self._processed_len += func_idx
return DeltaMessage(content=gap)
if func_idx == -1:
overlap = partial_tag_overlap(remainder, self.tool_call_start_token)
if overlap:
return None
self._processed_len = len(current_text)
return DeltaMessage(content=remainder) if remainder else None
partial_block = remainder[func_idx:]
if self.tool_call_end_token in partial_block:
end_idx = partial_block.rfind(self.tool_call_end_token)
complete_block = partial_block[
: end_idx + len(self.tool_call_end_token)
]
delta = self._process_complete_block_streaming(complete_block, request)
self._processed_len += func_idx + len(complete_block)
if delta is not None:
return delta
partial_block = partial_block[end_idx + len(self.tool_call_end_token) :]
if not partial_block.strip():
return None
func_idx = partial_block.find(self.tool_call_start_token)
if func_idx == -1:
self._processed_len = len(current_text)
if partial_block:
return DeltaMessage(content=partial_block)
return None
partial_block = partial_block[func_idx:]
return self._process_partial_block_streaming(partial_block, request)
except Exception:
logger.exception(
"Error in MiniCPM5XMLToolParser.extract_tool_calls_streaming"
)
return None