ADK Building Blocks: Tools, Memory,
and State Management
Series Navigation:
← Part
1: Introduction & First Agent | Part 2: Tools & Memory | Part 3: Multi-Agent
Systems (Coming Soon)
In Part 1, we built a basic search assistant that could
answer questions using Google Search. But production agents need more: the ability to interact with multiple tools,
remember conversation context, manage complex state, and gracefully handle errors. This is where ADK’s building
blocks shine.
This article dives deep into the components that transform
simple agents into intelligent, production-ready systems. You’ll learn to build custom tools, implement
sophisticated memory patterns, and optimize agent performance for real-world workloads.
Understanding ADK Tools: The Agent’s Hands
Tools are how agents interact with the world beyond the LLM. While Gemini can generate text, tools enable agents to:
- Search the web for current information
- Execute code for calculations and data processing
- Query databases for structured data
- Call APIs to integrate with external services
- Read/write files for document processing
ADK provides an elegant abstraction that makes tools declarative, testable, and reusable.

Reference: Vertex AI Function Calling Overview
Built-in Tools: Google Search and Code Execution
ADK ships with two powerful built-in tools optimized for Gemini:
1. Google Search Tool
The GoogleSearchTool provides real-time web search capabilities:
from adk.tools import GoogleSearchTool
# Basic configuration
search_tool = GoogleSearchTool(
max_results=5, # Number of results to return
safe_search=True, # Enable safe search filtering
region='us', # Search region
time_range='week', # Recency filter: 'day', 'week', 'month', 'year'
)
# Advanced configuration with custom search engine
search_tool = GoogleSearchTool(
max_results=10,
custom_search_engine_id='your-cse-id', # Use custom search engine
api_key='your-api-key', # For quota management
result_fields=['title', 'snippet', 'link', 'displayLink'],
safe_search=True,
)
# Usage in agent
agent = Agent(
config=agent_config,
tools=[search_tool],
)
# Agent autonomously decides when to search
response = await agent.run(
"What are the latest Gemini 2.0 features announced in December 2024?"
)
🏗️ Architecture Insight: Google Search Tool uses the Google Custom Search JSON
API. For production, set up API keys and quotas in Google Cloud Console to avoid rate limits.
Reference: Grounding with Google Search
2. Code Execution Tool
The CodeExecutionTool runs Python code in a secure sandbox:
from adk.tools import CodeExecutionTool
# Initialize code execution
code_tool = CodeExecutionTool(
timeout=30, # Execution timeout in seconds
allowed_libraries=[ # Whitelist safe libraries
'numpy', 'pandas', 'matplotlib', 'scipy', 'sklearn'
],
max_memory_mb=512, # Memory limit
enable_network=False, # Disable network for security
)
# Usage example
agent = Agent(
config=agent_config,
tools=[code_tool],
)
# Agent can write and execute code
response = await agent.run("""
Calculate the compound annual growth rate (CAGR) for an investment
that grew from $10,000 to $25,000 over 5 years, and plot the growth trajectory.
""")
# Gemini will:
# 1. Write Python code to calculate CAGR
# 2. Execute it via CodeExecutionTool
# 3. Generate a matplotlib plot
# 4. Return results with visualization
⚠️ Security Best Practice: Code execution introduces risk. Always:
- Run in sandboxed environments (ADK’s default)
- Whitelist allowed libraries (no
os,subprocess,requests) - Set strict timeouts and memory limits
- Disable network access unless required
- Log all executed code for audit trails
Reference: Code
Execution Security Guide
Building Custom Tools: Complete Tutorial
Most production agents need custom tools for domain-specific operations. Let’s build a comprehensive example: a
Database Query Tool that safely queries PostgreSQL.
Step 1: Define the Tool Interface
"""
tools/database_tool.py
Custom Database Query Tool for ADK - Production Ready
"""
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import asyncpg
from adk.tools.base import Tool, ToolResult, ToolParameter
from adk.errors import ToolExecutionError
from datetime import datetime
import json
@dataclass
class DatabaseConfig:
"""Database connection configuration."""
host: str
port: int
database: str
user: str
password: str
pool_size: int = 10
command_timeout: int = 30
class DatabaseQueryTool(Tool):
"""
Secure database query tool for ADK agents.
Provides read-only access to PostgreSQL databases with:
- Connection pooling
- Query validation
- Parameter sanitization
- Timeout enforcement
- Result size limits
Architecture: Uses asyncpg for async operations and connection pooling.
"""
# Tool metadata for Gemini
name = "database_query"
description = """
Query a PostgreSQL database to retrieve structured data.
Use this tool when you need to:
- Look up customer information
- Retrieve order history
- Check inventory levels
- Analyze transaction data
Constraints:
- Read-only queries (SELECT statements only)
- Maximum 1000 rows returned
- 30-second query timeout
- No sensitive data (passwords, SSNs, etc.)
"""
parameters = [
ToolParameter(
name="query",
type="string",
description="""SQL query to execute. Must be a SELECT statement.
Examples:
- "SELECT * FROM customers WHERE email = $1"
- "SELECT COUNT(*) FROM orders WHERE created_at > $1"
- "SELECT product_name, stock_level FROM inventory WHERE stock < 10"
Use parameterized queries for dynamic values.
""",
required=True,
),
ToolParameter(
name="params",
type="array",
description="Query parameters for parameterized queries (e.g., [$1, $2])",
required=False,
),
ToolParameter(
name="max_rows",
type="integer",
description="Maximum rows to return (default: 100, max: 1000)",
required=False,
),
]
def __init__(self, config: DatabaseConfig):
"""Initialize the database tool with connection pool."""
self.config = config
self.pool: Optional[asyncpg.Pool] = None
async def _ensure_pool(self):
"""Lazy initialization of connection pool."""
if self.pool is None:
self.pool = await asyncpg.create_pool(
host=self.config.host,
port=self.config.port,
database=self.config.database,
user=self.config.user,
password=self.config.password,
min_size=1,
max_size=self.config.pool_size,
command_timeout=self.config.command_timeout,
)
def _validate_query(self, query: str) -> None:
"""
Validate that query is safe (read-only).
Raises:
ToolExecutionError: If query contains unsafe operations
"""
query_upper = query.upper().strip()
# Must be SELECT
if not query_upper.startswith('SELECT'):
raise ToolExecutionError(
"Only SELECT queries are allowed. "
"This tool provides read-only access."
)
# Block dangerous patterns
dangerous_patterns = [
'DROP', 'DELETE', 'INSERT', 'UPDATE', 'ALTER', 'CREATE',
'TRUNCATE', 'GRANT', 'REVOKE', '--', ';--', '/*', '*/',
]
for pattern in dangerous_patterns:
if pattern in query_upper:
raise ToolExecutionError(
f"Query contains forbidden keyword: {pattern}"
)
async def execute(
self,
query: str,
params: Optional[List[Any]] = None,
max_rows: int = 100,
) -> ToolResult:
"""
Execute a database query.
Args:
query: SQL SELECT statement
params: Query parameters
max_rows: Maximum rows to return
Returns:
ToolResult with query results or error
"""
try:
# Validate query safety
self._validate_query(query)
# Enforce row limit
max_rows = min(max_rows, 1000)
# Ensure connection pool
await self._ensure_pool()
# Execute query
start_time = datetime.utcnow()
async with self.pool.acquire() as conn:
if params:
rows = await conn.fetch(query, *params, timeout=30)
else:
rows = await conn.fetch(query, timeout=30)
execution_time = (datetime.utcnow() - start_time).total_seconds()
# Convert to JSON-serializable format
results = []
for row in rows[:max_rows]:
results.append(dict(row))
# Build result
result_data = {
'success': True,
'row_count': len(results),
'rows': results,
'execution_time_seconds': round(execution_time, 3),
'truncated': len(rows) > max_rows,
}
return ToolResult(
success=True,
data=result_data,
message=f"Retrieved {len(results)} rows in {execution_time:.2f}s",
)
except asyncpg.QueryCanceledError:
return ToolResult(
success=False,
error="Query timeout after 30 seconds",
message="Query took too long to execute. Try adding filters or limits.",
)
except asyncpg.PostgresError as e:
return ToolResult(
success=False,
error=f"Database error: {str(e)}",
message="Check your SQL syntax and try again.",
)
except ToolExecutionError as e:
return ToolResult(
success=False,
error=str(e),
message="Query validation failed.",
)
except Exception as e:
# Log unexpected errors
print(f"Unexpected error in DatabaseQueryTool: {e}")
return ToolResult(
success=False,
error="Internal tool error",
message="An unexpected error occurred. Please try again.",
)
async def cleanup(self):
"""Close the connection pool."""
if self.pool:
await self.pool.close()
# Example usage in agent configuration
async def create_agent_with_database():
"""Create an agent with database access."""
# Configure database connection
db_config = DatabaseConfig(
host='localhost',
port=5432,
database='ecommerce',
user='agent_user', # Use read-only user!
password='secure_pass',
pool_size=5,
)
# Initialize database tool
db_tool = DatabaseQueryTool(config=db_config)
# Create agent
agent = Agent(
config=AgentConfig(
name="customer-service-agent",
description="Customer service agent with database access",
model_config={
'provider': 'vertex-ai',
'model_name': 'gemini-1.5-pro-002',
'parameters': {'temperature': 0.3}, # Lower temp for factual queries
},
),
tools=[db_tool],
)
return agent, db_tool
# Usage example
async def main():
agent, db_tool = await create_agent_with_database()
try:
# Agent can now query the database
response = await agent.run(
"Find all orders for customer with email john@example.com in the last 30 days"
)
print(response.content)
# Gemini will:
# 1. Recognize it needs database access
# 2. Generate appropriate SQL (e.g., SELECT * FROM orders WHERE ...)
# 3. Call DatabaseQueryTool with the query
# 4. Synthesize results into natural language
finally:
await db_tool.cleanup()
if __name__ == "__main__":
import asyncio
asyncio.run(main())
- Input Validation: Always validate and sanitize inputs before execution
- Error Handling: Return
ToolResulteven on errors (don’t raise exceptions) - Timeouts: Enforce strict timeouts to prevent hanging
- Resource Limits: Cap memory, rows, file sizes, etc.
- Audit Logging: Log all tool executions for compliance
- Least Privilege: Use read-only credentials when possible
Step 2: Testing Custom Tools
Production tools need comprehensive tests:
"""
tests/test_database_tool.py
"""
import pytest
import asyncpg
from unittest.mock import AsyncMock, patch
from tools.database_tool import DatabaseQueryTool, Database Config, ToolExecutionError
class TestDatabaseQueryTool:
"""Test suite for DatabaseQueryTool."""
@pytest.fixture
async def db_tool(self):
"""Create tool instance with test config."""
config = DatabaseConfig(
host='localhost',
port=5432,
database='test_db',
user='test_user',
password='test_pass',
)
tool = DatabaseQueryTool(config=config)
yield tool
await tool.cleanup()
@pytest.mark.asyncio
async def test_valid_select_query(self, db_tool):
"""Test that valid SELECT queries execute successfully."""
# Mock the database pool
with patch.object(db_tool, 'pool') as mock_pool:
mock_conn = AsyncMock()
mock_conn.fetch.return_value = [
{'id': 1, 'name': 'Test'},
{'id': 2, 'name': 'Example'},
]
mock_pool.acquire.return_value.__aenter__.return_value = mock_conn
result = await db_tool.execute("SELECT * FROM users")
assert result.success is True
assert result.data['row_count'] == 2
assert len(result.data['rows']) == 2
@pytest.mark.asyncio
async def test_parameterized_query(self, db_tool):
"""Test parameterized queries work correctly."""
with patch.object(db_tool, 'pool') as mock_pool:
mock_conn = AsyncMock()
mock_conn.fetch.return_value = [{'count': 42}]
mock_pool.acquire.return_value.__aenter__.return_value = mock_conn
result = await db_tool.execute(
"SELECT COUNT(*) as count FROM orders WHERE user_id = $1",
params=[123],
)
assert result.success is True
mock_conn.fetch.assert_called_once()
@pytest.mark.asyncio
async def test_rejects_delete_query(self, db_tool):
"""Test that DELETE queries are rejected."""
result = await db_tool.execute("DELETE FROM users WHERE id = 1")
assert result.success is False
assert "Only SELECT queries are allowed" in result.error
@pytest.mark.asyncio
async def test_rejects_sql_injection(self, db_tool):
"""Test protection against SQL injection."""
malicious_queries = [
"SELECT * FROM users; DROP TABLE users;",
"SELECT * FROM users /* comment */ DROP TABLE users",
"SELECT * FROM users -- DROP TABLE",
]
for query in malicious_queries:
result = await db_tool.execute(query)
assert result.success is False
@pytest.mark.asyncio
async def test_enforces_row_limit(self, db_tool):
"""Test that row limits are enforced."""
with patch.object(db_tool, 'pool') as mock_pool:
# Mock 2000 rows
mock_rows = [{'id': i} for i in range(2000)]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = mock_rows
mock_pool.acquire.return_value.__aenter__.return_value = mock_conn
result = await db_tool.execute(
"SELECT * FROM large_table",
max_rows=100,
)
assert result.success is True
assert result.data['row_count'] == 100 # Limited to 100
assert result.data['truncated'] is True
@pytest.mark.asyncio
async def test_handles_timeout(self, db_tool):
"""Test that query timeouts are handled gracefully."""
with patch.object(db_tool, 'pool') as mock_pool:
mock_conn = AsyncMock()
mock_conn.fetch.side_effect = asyncpg.QueryCanceledError()
mock_pool.acquire.return_value.__aenter__.return_value = mock_conn
result = await db_tool.execute("SELECT * FROM huge_table")
assert result.success is False
assert "timeout" in result.error.lower()
# Run tests: pytest tests/test_database_tool.py -v
Testing Reference: pytest-asyncio
Documentation
Memory and Conversation Context
Stateless agents forget everything between requests. Production agents need memory to:
- Maintain conversation continuity (“Tell me more about that”)
- Remember user preferences (language, timezone, formatting)
- Track multi-turn tasks (research, data collection, analysis)
- Learn from interactions (improve responses over time)
ADK provides three memory layers:
1. Short-Term Memory: Conversation Buffer
Stores recent messages in the conversation:
from adk.memory import ConversationMemory
from adk.memory.buffers import WindowBuffer, TokenBuffer
# Simple window buffer (last N messages)
memory = ConversationMemory(
buffer=WindowBuffer(max_messages=10), # Keep last 10 messages
)
# Token-aware buffer (fits within context window)
memory = ConversationMemory(
buffer=TokenBuffer(
max_tokens=4000, # Reserve 4K tokens for history
model='gemini-1.5-pro-002', # For accurate token counting
),
)
# Usage in agent
agent = Agent(
config=agent_config,
tools=[search_tool],
memory=memory,
)
# Multi-turn conversation
response1 = await agent.run("What's the weather in San Francisco?")
# Agent: "Currently 62°F, partly cloudy..."
response2 = await agent.run("What about tomorrow?")
# Agent remembers "San Francisco" from previous message
# Agent: "Tomorrow in San Francisco: 65°F, sunny..."
response3 = await agent.run("Should I bring an umbrella?")
# Agent: "No need for an umbrella tomorrow in SF, it'll be sunny."
2. Session State: User Preferences and Context
Store metadata that persists across conversation turns:
from adk.memory import SessionState
# Initialize session state
session = SessionState(
session_id="user-123-session-456",
data={
'user_id': 'user-123',
'timezone': 'America/Los_Angeles',
'language': 'en',
'preferences': {
'temperature_unit': 'fahrenheit',
'date_format': 'MM/DD/YYYY',
},
},
)
# Agent can access session state
agent = Agent(
config=agent_config,
memory=memory,
session_state=session,
)
# Agent automatically uses preferences
response = await agent.run("What time is the meeting?")
# Agent: "The meeting is at 2:00 PM PST" (uses user's timezone)
3. Long-Term Memory: Vector Database Integration
For agents that need to remember facts across sessions:
from adk.memory import VectorMemory
from google.cloud import aiplatform
# Initialize Vertex AI Vector Search
vector_memory = VectorMemory(
index_endpoint='projects/my-project/locations/us-central1/indexEndpoints/123',
embedding_model='textembedding-gecko@003',
similarity_threshold=0.7,
)
# Store long-term facts
await vector_memory.store(
key="user_preferences",
content="User prefers concise answers without code examples",
metadata={'user_id': 'user-123', 'category': 'preference'},
)
await vector_memory.store(
key="project_context",
content="Working on an e-commerce platform using Python and PostgreSQL",
metadata={'user_id': 'user-123', 'category': 'context'},
)
# Agent retrieves relevant memories automatically
agent = Agent(
config=agent_config,
memory=memory,
long_term_memory=vector_memory,
)
# Later session (hours/days later)
response = await agent.run("How should I structure my database queries?")
# Agent retrieves "e-commerce + PostgreSQL" context from vector memory
# Agent: "For your PostgreSQL e-commerce database, I recommend..."
Memory Architecture Reference: Vertex AI Vector Search
Complete Real-World Example: Customer Support Agent
Let’s combine everything into a production-ready customer support agent:
"""
customer_support_agent.py
Production customer support agent with tools, memory, and state management.
"""
import asyncio
from typing import Dict, Any
from adk import Agent, AgentConfig
from adk.tools import GoogleSearchTool
from adk.memory import ConversationMemory, SessionState
from adk.memory.buffers import TokenBuffer
from tools.database_tool import DatabaseQueryTool, DatabaseConfig
class CustomerSupportAgent:
"""
Intelligent customer support agent with:
- Order lookup (database tool)
- Knowledge base search (Google search tool)
- Conversation memory
- User session management
"""
def __init__(self, db_config: DatabaseConfig):
# Initialize tools
self.db_tool = DatabaseQueryTool(config=db_config)
self.search_tool = GoogleSearchTool(
max_results=3,
custom_search_engine_id='support-kb-cse-id',
)
# Configure memory
self.memory = ConversationMemory(
buffer=TokenBuffer(max_tokens=4000, model='gemini-1.5-pro-002'),
)
# Create agent
self.agent = Agent(
config=AgentConfig(
name="customer-support-agent",
description="""Expert customer support agent for e-commerce platform.
Capabilities:
- Look up order status and tracking
- Answer product questions
- Process returns and exchanges
- Troubleshoot issues
- Escalate to human agents when needed
""",
model_config={
'provider': 'vertex-ai',
'model_name': 'gemini-1.5-pro-002',
'parameters': {
'temperature': 0.3, # Consistent, factual responses
'top_p': 0.95,
},
},
safety_settings={
'harm_category_hate_speech': 'BLOCK_MEDIUM_AND_ABOVE',
'harm_category_dangerous_content': 'BLOCK_MEDIUM_AND_ABOVE',
},
),
tools=[self.db_tool, self.search_tool],
memory=self.memory,
)
# System prompt
self.system_prompt = """You are a helpful customer support agent.
Guidelines:
1. **Be empathetic:** Acknowledge customer frustration
2. **Be accurate:** Use database for orders, search for policies
3. **Be concise:** Provide clear, actionable answers
4. **Escalate appropriately:** Transfer to human for refunds, complaints, account issues
Available Tools:
- database_query: Look up orders, products, customers
- google_search: Search knowledge base for policies, FAQs
Response Format:
- Start with empathy
- Provide solution with specific details
- Offer next steps
- Ask if they need more help
Example:
User: "Where is my order #12345?"
You: "I understand you're waiting for your order. Let me check that for you right away.
[After database lookup]
Your order #12345 shipped on 12/28 via FedEx (tracking: 123456789).
It's currently in transit and should arrive by 1/2.
You can track it here: [tracking link]
Is there anything else I can help you with?"
"""
async def handle_request(
self,
user_id: str,
session_id: str,
message: str,
user_context: Dict[str, Any] = None,
) -> str:
"""
Handle a customer support request.
Args:
user_id: Customer ID
session_id: Conversation session ID
message: Customer's message
user_context: Additional context (timezone, language, etc.)
Returns:
Agent's response
"""
# Create session state
session = SessionState(
session_id=session_id,
data={
'user_id': user_id,
**(user_context or {}),
},
)
# Run agent with session
response = await self.agent.run(
user_message=message,
system_message=self.system_prompt,
session_state=session,
)
return response.content
async def cleanup(self):
"""Clean up resources."""
await self.db_tool.cleanup()
# Example usage
async def main():
# Configure database
db_config = DatabaseConfig(
host='localhost',
port=5432,
database='ecommerce',
user='support_agent_readonly',
password='secure_password',
)
# Create agent
support_agent = CustomerSupportAgent(db_config=db_config)
try:
# Simulate customer conversation
user_id = "customer-789"
session_id = "session-abc-123"
# Turn 1
response1 = await support_agent.handle_request(
user_id=user_id,
session_id=session_id,
message="Where is my order? I ordered a week ago!",
user_context={'timezone': 'America/New_York'},
)
print(f"Agent: {response1}\n")
# Turn 2 (agent remembers context)
response2 = await support_agent.handle_request(
user_id=user_id,
session_id=session_id,
message="Can I change the delivery address?",
)
print(f"Agent: {response2}\n")
# Turn 3
response3 = await support_agent.handle_request(
user_id=user_id,
session_id=session_id,
message="What's your return policy?",
)
print(f"Agent: {response3}\n")
finally:
await support_agent.cleanup()
if __name__ == "__main__":
asyncio.run(main())
Agent Configuration and Performance Tuning
Model Selection Strategy
| Model | Best For | Speed | Cost | Context Window |
|---|---|---|---|---|
| Gemini 1.5 Flash | Simple queries, high throughput | ⚡ Very Fast (200ms) | 💲 Low | 1M tokens |
| Gemini 1.5 Pro | Complex reasoning, production | 🔄 Medium (800ms) | 💲💲 Medium | 2M tokens |
| Gemini 2.0 Flash | Multimodal, experimental | ⚡⚡ Ultra Fast (100ms) | 💲 Low | 1M tokens |
Temperature and Sampling
# Factual/deterministic tasks (customer support, data lookup)
model_config = {
'temperature': 0.1, # Nearly deterministic
'top_p': 0.9,
'top_k': 1,
}
# Balanced (general assistance, recommendations)
model_config = {
'temperature': 0.7, # Default balance
'top_p': 0.95,
'top_k': 40,
}
# Creative tasks (content generation, brainstorming)
model_config = {
'temperature': 0.9, # More creative
'top_p': 0.98,
'top_k': 100,
}
Cost Optimization Strategies
# Strategy 1: Smart model switching
class SmartAgent:
def __init__(self):
self.fast_agent = Agent(model='gemini-1.5-flash') # Cheap, fast
self.smart_agent = Agent(model='gemini-1.5-pro') # Expensive, smart
async def run(self, message: str):
# Use fast model first
fast_response = await self.fast_agent.run(message)
# Check confidence
if fast_response.confidence < 0.7:
# Retry with smarter model
return await self.smart_agent.run(message)
return fast_response
# Strategy 2: Caching common responses
from functools import lru_cache
@lru_cache(maxsize=1000)
async def cached_agent_run(message_hash: str):
return await agent.run(message_hash)
# Strategy 3: Batch processing
async def process_batch(messages: List[str]):
"""Process multiple messages in parallel."""
tasks = [agent.run(msg) for msg in messages]
return await asyncio.gather(*tasks)
Cost Optimization Reference: Vertex AI
Pricing
Key Takeaways and Next Steps
🎓 What You've Learned
1. Tool Development:
- Built-in tools (Google Search, Code Execution) for common needs
- Custom tool creation with validation, error handling, and security
- Testing patterns for production reliability
2. Memory Patterns:
- Short-term (conversation buffer) for multi-turn dialogues
- Session state for user preferences and context
- Long-term (vector memory) for cross-session knowledge
3. Production Patterns:
- Complete customer support agent with database integration
- Model selection and configuration strategies
- Cost optimization through caching and smart routing
Additional Resources
📖 Reference Links
Official Documentation:
- Function Calling in Vertex AI
- Vertex AI Vector
Search - Code
Execution Security - Vertex AI Pricing & Cost
Optimization
Tools & Libraries:
Related Papers:
Discover more from C4: Container, Code, Cloud & Context
Subscribe to get the latest posts sent to your email.