Function Calling Patterns: Tool Schemas, Execution Pipelines, and Agent Loops

Introduction: Function calling transforms LLMs from text generators into capable agents that can interact with external systems. By defining tools with clear schemas, models can decide when to call functions, extract parameters from natural language, and incorporate results into responses. This guide covers practical function calling patterns: defining tool schemas, handling multiple tool calls, implementing tool execution pipelines, error handling, and building robust agent loops that can accomplish complex multi-step tasks.

Function Calling
Function Calling: Query to LLM Reasoning to Tool Execution

Defining Tool Schemas

from typing import Callable, Any
from dataclasses import dataclass
from pydantic import BaseModel, Field
import json

# Method 1: Dictionary-based schema
def get_weather_schema() -> dict:
    """Define weather tool schema."""
    
    return {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Get current weather for a location",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {
                        "type": "string",
                        "description": "City name, e.g., 'San Francisco, CA'"
                    },
                    "unit": {
                        "type": "string",
                        "enum": ["celsius", "fahrenheit"],
                        "description": "Temperature unit"
                    }
                },
                "required": ["location"]
            }
        }
    }

# Method 2: Pydantic-based schema
class WeatherParams(BaseModel):
    """Parameters for weather lookup."""
    
    location: str = Field(description="City name, e.g., 'San Francisco, CA'")
    unit: str = Field(
        default="celsius",
        description="Temperature unit",
        json_schema_extra={"enum": ["celsius", "fahrenheit"]}
    )

def pydantic_to_tool_schema(
    name: str,
    description: str,
    params_model: type[BaseModel]
) -> dict:
    """Convert Pydantic model to tool schema."""
    
    return {
        "type": "function",
        "function": {
            "name": name,
            "description": description,
            "parameters": params_model.model_json_schema()
        }
    }

# Method 3: Decorator-based tool definition
@dataclass
class Tool:
    """A callable tool with schema."""
    
    name: str
    description: str
    parameters: dict
    function: Callable

def tool(name: str, description: str):
    """Decorator to create a tool from a function."""
    
    def decorator(func: Callable) -> Tool:
        # Extract parameters from type hints
        import inspect
        sig = inspect.signature(func)
        
        properties = {}
        required = []
        
        for param_name, param in sig.parameters.items():
            if param_name == "self":
                continue
            
            param_type = "string"
            if param.annotation == int:
                param_type = "integer"
            elif param.annotation == float:
                param_type = "number"
            elif param.annotation == bool:
                param_type = "boolean"
            
            properties[param_name] = {
                "type": param_type,
                "description": f"Parameter: {param_name}"
            }
            
            if param.default == inspect.Parameter.empty:
                required.append(param_name)
        
        return Tool(
            name=name,
            description=description,
            parameters={
                "type": "object",
                "properties": properties,
                "required": required
            },
            function=func
        )
    
    return decorator

# Usage
@tool("search_database", "Search the database for records")
def search_database(query: str, limit: int = 10) -> list:
    """Search database."""
    return [{"id": 1, "name": "Result"}]

# Convert Tool to OpenAI format
def tool_to_openai_schema(t: Tool) -> dict:
    return {
        "type": "function",
        "function": {
            "name": t.name,
            "description": t.description,
            "parameters": t.parameters
        }
    }

Basic Function Calling

from openai import OpenAI
import json

client = OpenAI()

# Define tools
tools = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Get current weather for a location",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {
                        "type": "string",
                        "description": "City name"
                    }
                },
                "required": ["location"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "search_web",
            "description": "Search the web for information",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "Search query"
                    }
                },
                "required": ["query"]
            }
        }
    }
]

# Tool implementations
def get_weather(location: str) -> dict:
    """Simulated weather lookup."""
    return {
        "location": location,
        "temperature": 72,
        "unit": "fahrenheit",
        "condition": "sunny"
    }

def search_web(query: str) -> list:
    """Simulated web search."""
    return [
        {"title": f"Result for: {query}", "url": "https://example.com"}
    ]

TOOL_FUNCTIONS = {
    "get_weather": get_weather,
    "search_web": search_web
}

def call_with_tools(user_message: str) -> str:
    """Call LLM with tools and handle function calls."""
    
    messages = [{"role": "user", "content": user_message}]
    
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages,
        tools=tools,
        tool_choice="auto"
    )
    
    message = response.choices[0].message
    
    # Check if model wants to call tools
    if message.tool_calls:
        # Add assistant message with tool calls
        messages.append(message)
        
        # Execute each tool call
        for tool_call in message.tool_calls:
            function_name = tool_call.function.name
            arguments = json.loads(tool_call.function.arguments)
            
            # Execute function
            function = TOOL_FUNCTIONS.get(function_name)
            if function:
                result = function(**arguments)
            else:
                result = {"error": f"Unknown function: {function_name}"}
            
            # Add tool result
            messages.append({
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": json.dumps(result)
            })
        
        # Get final response
        final_response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages
        )
        
        return final_response.choices[0].message.content
    
    return message.content

# Usage
result = call_with_tools("What's the weather in San Francisco?")
print(result)

Multi-Turn Tool Execution

from dataclasses import dataclass, field
from typing import Optional

@dataclass
class ToolCall:
    """A tool call from the model."""
    
    id: str
    name: str
    arguments: dict

@dataclass
class ToolResult:
    """Result of a tool execution."""
    
    tool_call_id: str
    result: Any
    error: Optional[str] = None

class ToolExecutor:
    """Execute tools and manage multi-turn conversations."""
    
    def __init__(self, tools: dict[str, Callable]):
        self.tools = tools
        self.tool_schemas = []
        
        for name, func in tools.items():
            if isinstance(func, Tool):
                self.tool_schemas.append(tool_to_openai_schema(func))
    
    def execute(self, tool_call: ToolCall) -> ToolResult:
        """Execute a single tool call."""
        
        func = self.tools.get(tool_call.name)
        
        if not func:
            return ToolResult(
                tool_call_id=tool_call.id,
                result=None,
                error=f"Unknown tool: {tool_call.name}"
            )
        
        try:
            if isinstance(func, Tool):
                result = func.function(**tool_call.arguments)
            else:
                result = func(**tool_call.arguments)
            
            return ToolResult(
                tool_call_id=tool_call.id,
                result=result
            )
            
        except Exception as e:
            return ToolResult(
                tool_call_id=tool_call.id,
                result=None,
                error=str(e)
            )
    
    def execute_all(self, tool_calls: list[ToolCall]) -> list[ToolResult]:
        """Execute multiple tool calls."""
        return [self.execute(tc) for tc in tool_calls]

class AgentLoop:
    """Multi-turn agent loop with tool execution."""
    
    def __init__(
        self,
        client,
        executor: ToolExecutor,
        model: str = "gpt-4o-mini",
        max_iterations: int = 10
    ):
        self.client = client
        self.executor = executor
        self.model = model
        self.max_iterations = max_iterations
    
    def run(
        self,
        user_message: str,
        system_prompt: str = None
    ) -> str:
        """Run agent loop until completion."""
        
        messages = []
        
        if system_prompt:
            messages.append({"role": "system", "content": system_prompt})
        
        messages.append({"role": "user", "content": user_message})
        
        for iteration in range(self.max_iterations):
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                tools=self.executor.tool_schemas,
                tool_choice="auto"
            )
            
            message = response.choices[0].message
            
            # No tool calls - we're done
            if not message.tool_calls:
                return message.content
            
            # Add assistant message
            messages.append(message)
            
            # Execute tools
            tool_calls = [
                ToolCall(
                    id=tc.id,
                    name=tc.function.name,
                    arguments=json.loads(tc.function.arguments)
                )
                for tc in message.tool_calls
            ]
            
            results = self.executor.execute_all(tool_calls)
            
            # Add tool results
            for result in results:
                content = json.dumps(result.result) if result.result else result.error
                messages.append({
                    "role": "tool",
                    "tool_call_id": result.tool_call_id,
                    "content": content
                })
        
        return "Max iterations reached"

# Usage
executor = ToolExecutor({
    "get_weather": get_weather,
    "search_web": search_web
})

agent = AgentLoop(client, executor)
result = agent.run("What's the weather in NYC and search for restaurants there?")

Parallel Tool Execution

import asyncio
from typing import Callable, Awaitable

class AsyncToolExecutor:
    """Execute tools asynchronously in parallel."""
    
    def __init__(self, tools: dict[str, Callable]):
        self.tools = tools
    
    async def execute(self, tool_call: ToolCall) -> ToolResult:
        """Execute a single tool call."""
        
        func = self.tools.get(tool_call.name)
        
        if not func:
            return ToolResult(
                tool_call_id=tool_call.id,
                result=None,
                error=f"Unknown tool: {tool_call.name}"
            )
        
        try:
            # Check if function is async
            if asyncio.iscoroutinefunction(func):
                result = await func(**tool_call.arguments)
            else:
                # Run sync function in thread pool
                result = await asyncio.to_thread(func, **tool_call.arguments)
            
            return ToolResult(
                tool_call_id=tool_call.id,
                result=result
            )
            
        except Exception as e:
            return ToolResult(
                tool_call_id=tool_call.id,
                result=None,
                error=str(e)
            )
    
    async def execute_parallel(
        self,
        tool_calls: list[ToolCall]
    ) -> list[ToolResult]:
        """Execute multiple tool calls in parallel."""
        
        tasks = [self.execute(tc) for tc in tool_calls]
        return await asyncio.gather(*tasks)

class AsyncAgentLoop:
    """Async agent loop with parallel tool execution."""
    
    def __init__(
        self,
        client,
        executor: AsyncToolExecutor,
        tool_schemas: list[dict],
        model: str = "gpt-4o-mini",
        max_iterations: int = 10
    ):
        self.client = client
        self.executor = executor
        self.tool_schemas = tool_schemas
        self.model = model
        self.max_iterations = max_iterations
    
    async def run(
        self,
        user_message: str,
        system_prompt: str = None
    ) -> str:
        """Run async agent loop."""
        
        messages = []
        
        if system_prompt:
            messages.append({"role": "system", "content": system_prompt})
        
        messages.append({"role": "user", "content": user_message})
        
        for iteration in range(self.max_iterations):
            response = await self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                tools=self.tool_schemas,
                tool_choice="auto"
            )
            
            message = response.choices[0].message
            
            if not message.tool_calls:
                return message.content
            
            messages.append(message)
            
            # Parse tool calls
            tool_calls = [
                ToolCall(
                    id=tc.id,
                    name=tc.function.name,
                    arguments=json.loads(tc.function.arguments)
                )
                for tc in message.tool_calls
            ]
            
            # Execute in parallel
            results = await self.executor.execute_parallel(tool_calls)
            
            # Add results
            for result in results:
                content = json.dumps(result.result) if result.result else result.error
                messages.append({
                    "role": "tool",
                    "tool_call_id": result.tool_call_id,
                    "content": content
                })
        
        return "Max iterations reached"

# Async tool examples
async def async_search(query: str) -> list:
    """Async search implementation."""
    await asyncio.sleep(0.5)  # Simulate API call
    return [{"title": f"Result: {query}"}]

async def async_fetch_data(url: str) -> dict:
    """Async data fetch."""
    await asyncio.sleep(0.3)
    return {"url": url, "data": "content"}

Tool Error Handling

from dataclasses import dataclass
from typing import Optional
from enum import Enum

class ToolErrorType(str, Enum):
    VALIDATION = "validation"
    EXECUTION = "execution"
    TIMEOUT = "timeout"
    PERMISSION = "permission"
    NOT_FOUND = "not_found"

@dataclass
class ToolError:
    """Structured tool error."""
    
    type: ToolErrorType
    message: str
    recoverable: bool = True
    retry_after: Optional[float] = None

class RobustToolExecutor:
    """Tool executor with comprehensive error handling."""
    
    def __init__(
        self,
        tools: dict[str, Callable],
        timeout: float = 30.0,
        max_retries: int = 2
    ):
        self.tools = tools
        self.timeout = timeout
        self.max_retries = max_retries
    
    async def execute_with_retry(
        self,
        tool_call: ToolCall
    ) -> ToolResult:
        """Execute with retry logic."""
        
        last_error = None
        
        for attempt in range(self.max_retries + 1):
            try:
                result = await self._execute_single(tool_call)
                
                if result.error is None:
                    return result
                
                # Check if error is recoverable
                error = self._parse_error(result.error)
                if not error.recoverable:
                    return result
                
                last_error = error
                
                # Wait before retry
                if error.retry_after:
                    await asyncio.sleep(error.retry_after)
                else:
                    await asyncio.sleep(2 ** attempt)
                    
            except asyncio.TimeoutError:
                last_error = ToolError(
                    type=ToolErrorType.TIMEOUT,
                    message=f"Tool execution timed out after {self.timeout}s",
                    recoverable=True
                )
        
        return ToolResult(
            tool_call_id=tool_call.id,
            result=None,
            error=f"Failed after {self.max_retries + 1} attempts: {last_error.message}"
        )
    
    async def _execute_single(self, tool_call: ToolCall) -> ToolResult:
        """Execute single tool call with timeout."""
        
        func = self.tools.get(tool_call.name)
        
        if not func:
            return ToolResult(
                tool_call_id=tool_call.id,
                result=None,
                error=f"Tool not found: {tool_call.name}"
            )
        
        try:
            # Validate arguments
            self._validate_arguments(tool_call)
            
            # Execute with timeout
            if asyncio.iscoroutinefunction(func):
                result = await asyncio.wait_for(
                    func(**tool_call.arguments),
                    timeout=self.timeout
                )
            else:
                result = await asyncio.wait_for(
                    asyncio.to_thread(func, **tool_call.arguments),
                    timeout=self.timeout
                )
            
            return ToolResult(
                tool_call_id=tool_call.id,
                result=result
            )
            
        except asyncio.TimeoutError:
            raise
        except ValueError as e:
            return ToolResult(
                tool_call_id=tool_call.id,
                result=None,
                error=f"Validation error: {e}"
            )
        except Exception as e:
            return ToolResult(
                tool_call_id=tool_call.id,
                result=None,
                error=f"Execution error: {e}"
            )
    
    def _validate_arguments(self, tool_call: ToolCall):
        """Validate tool arguments."""
        
        # Add validation logic based on tool schema
        pass
    
    def _parse_error(self, error_str: str) -> ToolError:
        """Parse error string into structured error."""
        
        if "timeout" in error_str.lower():
            return ToolError(
                type=ToolErrorType.TIMEOUT,
                message=error_str,
                recoverable=True,
                retry_after=5.0
            )
        
        if "permission" in error_str.lower():
            return ToolError(
                type=ToolErrorType.PERMISSION,
                message=error_str,
                recoverable=False
            )
        
        return ToolError(
            type=ToolErrorType.EXECUTION,
            message=error_str,
            recoverable=True
        )

# Error-aware agent
class ErrorAwareAgent:
    """Agent that handles tool errors gracefully."""
    
    def __init__(self, client, executor: RobustToolExecutor, tool_schemas: list):
        self.client = client
        self.executor = executor
        self.tool_schemas = tool_schemas
    
    async def run(self, user_message: str) -> str:
        """Run with error handling."""
        
        messages = [{"role": "user", "content": user_message}]
        
        for _ in range(10):
            response = await self.client.chat.completions.create(
                model="gpt-4o-mini",
                messages=messages,
                tools=self.tool_schemas
            )
            
            message = response.choices[0].message
            
            if not message.tool_calls:
                return message.content
            
            messages.append(message)
            
            # Execute tools
            for tc in message.tool_calls:
                tool_call = ToolCall(
                    id=tc.id,
                    name=tc.function.name,
                    arguments=json.loads(tc.function.arguments)
                )
                
                result = await self.executor.execute_with_retry(tool_call)
                
                # Format result for model
                if result.error:
                    content = json.dumps({
                        "error": result.error,
                        "suggestion": "Try a different approach or ask user for help"
                    })
                else:
                    content = json.dumps(result.result)
                
                messages.append({
                    "role": "tool",
                    "tool_call_id": result.tool_call_id,
                    "content": content
                })
        
        return "Max iterations reached"

Production Function Calling Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, Any
import logging

app = FastAPI()
logger = logging.getLogger("tools")

# Tool registry
class ToolRegistry:
    """Registry for available tools."""
    
    def __init__(self):
        self.tools: dict[str, Callable] = {}
        self.schemas: list[dict] = []
    
    def register(
        self,
        name: str,
        description: str,
        parameters: dict,
        func: Callable
    ):
        """Register a tool."""
        
        self.tools[name] = func
        self.schemas.append({
            "type": "function",
            "function": {
                "name": name,
                "description": description,
                "parameters": parameters
            }
        })
    
    def get_function(self, name: str) -> Optional[Callable]:
        return self.tools.get(name)
    
    def get_schemas(self) -> list[dict]:
        return self.schemas

# Initialize registry
registry = ToolRegistry()

# Register tools
registry.register(
    name="get_weather",
    description="Get current weather for a location",
    parameters={
        "type": "object",
        "properties": {
            "location": {"type": "string", "description": "City name"}
        },
        "required": ["location"]
    },
    func=get_weather
)

registry.register(
    name="search_web",
    description="Search the web",
    parameters={
        "type": "object",
        "properties": {
            "query": {"type": "string", "description": "Search query"}
        },
        "required": ["query"]
    },
    func=search_web
)

# Initialize components
from openai import AsyncOpenAI
async_client = AsyncOpenAI()

executor = AsyncToolExecutor(registry.tools)
agent = AsyncAgentLoop(
    async_client,
    executor,
    registry.get_schemas()
)

class ChatRequest(BaseModel):
    message: str
    system_prompt: Optional[str] = None

class ToolCallRequest(BaseModel):
    name: str
    arguments: dict

@app.post("/v1/chat")
async def chat_with_tools(request: ChatRequest):
    """Chat endpoint with function calling."""
    
    try:
        result = await agent.run(
            request.message,
            system_prompt=request.system_prompt
        )
        
        return {"response": result}
        
    except Exception as e:
        logger.error(f"Chat error: {e}")
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/v1/tools/execute")
async def execute_tool(request: ToolCallRequest):
    """Direct tool execution endpoint."""
    
    func = registry.get_function(request.name)
    
    if not func:
        raise HTTPException(status_code=404, detail=f"Tool not found: {request.name}")
    
    try:
        if asyncio.iscoroutinefunction(func):
            result = await func(**request.arguments)
        else:
            result = await asyncio.to_thread(func, **request.arguments)
        
        return {"result": result}
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/v1/tools")
async def list_tools():
    """List available tools."""
    return {"tools": registry.get_schemas()}

@app.get("/health")
async def health():
    return {"status": "healthy", "tools_count": len(registry.tools)}

References

Conclusion

Function calling is the foundation of capable LLM agents. Define clear, well-documented tool schemas that help the model understand when and how to use each tool. Implement robust execution pipelines that handle errors gracefully and retry transient failures. Use parallel execution when tools are independent to reduce latency. Build agent loops that can handle multi-step tasks by iterating until completion. For production, implement comprehensive logging, timeout handling, and rate limiting. The key is making tools reliable and predictable—the model will learn to use them effectively when they behave consistently.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.