OpenAI Chat Completion Client With Tools Xlam Streaming
Source examples/online_serving/openai_chat_completion_client_with_tools_xlam_streaming.py.
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
# ruff: noqa: E501
"""
Set up this example by starting a vLLM OpenAI-compatible server with tool call
options enabled for xLAM-2 models:
vllm serve --model Salesforce/Llama-xLAM-2-8b-fc-r --enable-auto-tool-choice --tool-call-parser xlam
OR
vllm serve --model Salesforce/xLAM-2-3b-fc-r --enable-auto-tool-choice --tool-call-parser xlam
This example demonstrates streaming tool calls with xLAM models.
"""
import json
import time
from openai import OpenAI
# Modify OpenAI's API key and API base to use vLLM's API server.
openai_api_key = "empty"
openai_api_base = "http://localhost:8000/v1"
# Define tool functions
def get_weather(location: str, unit: str):
return f"Weather in {location} is 22 degrees {unit}."
def calculate_expression(expression: str):
try:
result = eval(expression)
return f"The result of {expression} is {result}"
except Exception as e:
return f"Could not calculate {expression}: {e}"
def translate_text(text: str, target_language: str):
return f"Translation of '{text}' to {target_language}: [translated content]"
# Define tools
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get the current weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City and state, e.g., 'San Francisco, CA'",
},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]},
},
"required": ["location", "unit"],
},
},
},
{
"type": "function",
"function": {
"name": "calculate_expression",
"description": "Calculate a mathematical expression",
"parameters": {
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "Mathematical expression to evaluate, needs to be a valid Python expression",
}
},
"required": ["expression"],
},
},
},
{
"type": "function",
"function": {
"name": "translate_text",
"description": "Translate text to another language",
"parameters": {
"type": "object",
"properties": {
"text": {"type": "string", "description": "Text to translate"},
"target_language": {
"type": "string",
"description": "Target language for translation",
},
},
"required": ["text", "target_language"],
},
},
},
]
# Map of function names to implementations
tool_functions = {
"get_weather": get_weather,
"calculate_expression": calculate_expression,
"translate_text": translate_text,
}
def process_stream(response, tool_functions, original_query):
"""Process a streaming response with possible tool calls"""
# Track multiple tool calls
tool_calls = {} # Dictionary to store tool calls by ID
current_id = None
print("\n--- Stream Output ---")
for chunk in response:
# Handle tool calls in the stream
if chunk.choices[0].delta.tool_calls:
for tool_call_chunk in chunk.choices[0].delta.tool_calls:
# Get the tool call ID
if hasattr(tool_call_chunk, "id") and tool_call_chunk.id:
current_id = tool_call_chunk.id
if current_id not in tool_calls:
tool_calls[current_id] = {
"function_name": None,
"function_args": "",
"function_id": current_id,
}
# Extract function information as it comes in chunks
if (
hasattr(tool_call_chunk, "function")
and current_id
and current_id in tool_calls
):
if (
hasattr(tool_call_chunk.function, "name")
and tool_call_chunk.function.name
):
tool_calls[current_id]["function_name"] = (
tool_call_chunk.function.name
)
print(f"Function called: {tool_call_chunk.function.name}")
if (
hasattr(tool_call_chunk.function, "arguments")
and tool_call_chunk.function.arguments
):
tool_calls[current_id]["function_args"] += (
tool_call_chunk.function.arguments
)
print(f"Arguments chunk: {tool_call_chunk.function.arguments}")
# Handle regular content in the stream
elif chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
print("\n--- End Stream ---\n")
# Execute each function call and build messages for follow-up
follow_up_messages = [{"role": "user", "content": original_query}]
for tool_id, tool_data in tool_calls.items():
function_name = tool_data["function_name"]
function_args = tool_data["function_args"]
function_id = tool_data["function_id"]
if function_name and function_args:
try:
# Parse the JSON arguments
args = json.loads(function_args)
# Call the function with the arguments
function_result = tool_functions[function_name](**args)
print(
f"\n--- Function Result ({function_name}) ---\n{function_result}\n"
)
# Add the assistant message with tool call
follow_up_messages.append(
{
"role": "assistant",
"tool_calls": [
{
"id": function_id,
"type": "function",
"function": {
"name": function_name,
"arguments": function_args,
},
}
],
}
)
# Add the tool message with function result
follow_up_messages.append(
{
"role": "tool",
"tool_call_id": function_id,
"content": function_result,
}
)
except Exception as e:
print(f"Error executing function: {e}")
# Only send follow-up if we have results to process
if len(follow_up_messages) > 1:
# Create a follow-up message with all the function results
follow_up_response = client.chat.completions.create(
model=client.models.list().data[0].id,
messages=follow_up_messages,
stream=True,
)
print("\n--- Follow-up Response ---")
for chunk in follow_up_response:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
print("\n--- End Follow-up ---\n")
def run_test_case(query, test_name):
"""Run a single test case with the given query"""
print(f"\n{'=' * 50}\nTEST CASE: {test_name}\n{'=' * 50}")
print(f"Query: '{query}'")
start_time = time.time()
# Create streaming chat completion request
response = client.chat.completions.create(
model=client.models.list().data[0].id,
messages=[{"role": "user", "content": query}],
tools=tools,
tool_choice="auto",
stream=True,
)
# Process the streaming response
process_stream(response, tool_functions, query)
end_time = time.time()
print(f"Test completed in {end_time - start_time:.2f} seconds")
def main():
# Initialize OpenAI client
global client
client = OpenAI(
api_key=openai_api_key,
base_url=openai_api_base,
)
# Run test cases
test_cases = [
("I want to know the weather in San Francisco", "Weather Information"),
("Calculate 25 * 17 + 31", "Math Calculation"),
("Translate 'Hello world' to Spanish", "Text Translation"),
("What is the weather in Tokyo and New York in celsius", "Multiple Tool Usage"),
]
# Execute all test cases
for query, test_name in test_cases:
run_test_case(query, test_name)
time.sleep(1) # Small delay between tests
print("\nAll tests completed.")
if __name__ == "__main__":
main()