Part 8 of the Microsoft Agent Framework Series
Building an agent is one thing. Deploying it to production with enterprise-grade reliability, security, and observability is another challenge entirely. In this article, we’ll cover everything you need to move from prototype to production.
Production Readiness Checklist

| Category | Requirement | Status |
|---|---|---|
| Security | Authentication & Authorization | ☐ |
| Security | Content Safety Filters | ☐ |
| Security | Input Validation | ☐ |
| Observability | OpenTelemetry Integration | ☐ |
| Observability | Logging & Metrics | ☐ |
| Reliability | Error Handling & Retry | ☐ |
| Reliability | Circuit Breakers | ☐ |
| Performance | Response Caching | ☐ |
| Performance | Rate Limiting | ☐ |
| Deployment | Container/Cloud Ready | ☐ |
OpenTelemetry Integration
Microsoft Agent Framework has built-in OpenTelemetry support for comprehensive observability:
Python Setup
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def configure_telemetry(service_name: str = "agent-service"):
"""Configure OpenTelemetry for the agent service."""
# Create resource with service metadata
resource = Resource.create({
"service.name": service_name,
"service.version": "1.0.0",
"deployment.environment": os.getenv("ENVIRONMENT", "development")
})
# Create tracer provider
provider = TracerProvider(resource=resource)
# Configure OTLP exporter (sends to collector)
otlp_exporter = OTLPSpanExporter(
endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317"),
insecure=True
)
# Add batch processor for efficient export
processor = BatchSpanProcessor(otlp_exporter)
provider.add_span_processor(processor)
# Set as global tracer provider
trace.set_tracer_provider(provider)
# Instrument HTTP client
AioHttpClientInstrumentor().instrument()
logger.info(f"Telemetry configured for {service_name}")
return trace.get_tracer(service_name)
# Custom span decorator for agent operations
def traced(operation_name: str):
"""Decorator to trace agent operations."""
def decorator(func):
async def wrapper(*args, **kwargs):
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(operation_name) as span:
span.set_attribute("agent.operation", operation_name)
try:
result = await func(*args, **kwargs)
span.set_attribute("agent.success", True)
return result
except Exception as e:
span.set_attribute("agent.success", False)
span.set_attribute("agent.error", str(e))
span.record_exception(e)
raise
return wrapper
return decorator
# Usage example
tracer = configure_telemetry("my-agent-service")
@traced("process_user_request")
async def handle_request(message: str, thread):
result = await agent.run(message, thread)
return result.NET / C# Implementation
using OpenTelemetry;
using OpenTelemetry.Trace;
using OpenTelemetry.Metrics;
using OpenTelemetry.Resources;
using Microsoft.Extensions.DependencyInjection;
namespace MAF.Part08.Telemetry;
///
/// Part 8: OpenTelemetry Configuration for .NET
///
public static class TelemetryConfiguration
{
public static IServiceCollection AddAgentTelemetry(
this IServiceCollection services,
string serviceName = "agent-service")
{
services.AddOpenTelemetry()
.ConfigureResource(resource => resource
.AddService(
serviceName: serviceName,
serviceVersion: "1.0.0")
.AddAttributes(new Dictionary
{
["deployment.environment"] =
Environment.GetEnvironmentVariable("ENVIRONMENT") ?? "development"
}))
.WithTracing(tracing => tracing
.AddSource("Microsoft.Agents.AI")
.AddAspNetCoreInstrumentation()
.AddHttpClientInstrumentation()
.AddOtlpExporter(options =>
{
options.Endpoint = new Uri(
Environment.GetEnvironmentVariable("OTEL_EXPORTER_OTLP_ENDPOINT")
?? "http://localhost:4317");
}))
.WithMetrics(metrics => metrics
.AddMeter("Microsoft.Agents.AI")
.AddAspNetCoreInstrumentation()
.AddHttpClientInstrumentation()
.AddOtlpExporter(options =>
{
options.Endpoint = new Uri(
Environment.GetEnvironmentVariable("OTEL_EXPORTER_OTLP_ENDPOINT")
?? "http://localhost:4317");
}));
return services;
}
}
// Program.cs usage example
public class Program
{
public static void Main(string[] args)
{
var builder = WebApplication.CreateBuilder(args);
// Add telemetry
builder.Services.AddAgentTelemetry("customer-support-agent");
var app = builder.Build();
app.MapPost("/api/agent/chat", async (ChatRequest request) =>
{
// Agent endpoints are automatically traced
return Results.Ok(new { response = "Hello!" });
});
app.Run();
}
}
public record ChatRequest(string Message);
.NET Setup
Security Best Practices
Authentication with Azure AD
from azure.identity import (
DefaultAzureCredential,
ManagedIdentityCredential,
ChainedTokenCredential,
AzureCliCredential
)
import os
import logging
logger = logging.getLogger(__name__)
def get_azure_credential():
"""
Get appropriate Azure credential based on environment.
Priority:
1. Managed Identity (when running in Azure)
2. Azure CLI (for local development)
3. Default credential chain (fallback)
"""
# Check if running in Azure
azure_client_id = os.getenv("AZURE_CLIENT_ID")
if azure_client_id:
# Production: Use Managed Identity
logger.info("Using Managed Identity authentication")
return ManagedIdentityCredential(client_id=azure_client_id)
# Development: Use Azure CLI first, then fallback
logger.info("Using development credential chain")
return ChainedTokenCredential(
AzureCliCredential(),
DefaultAzureCredential()
)
# Secure agent initialization
def create_secure_agent():
"""Create agent with secure credential management."""
from agent_framework.azure import AzureOpenAIResponsesClient
credential = get_azure_credential()
# Get endpoint from environment (never hardcode)
endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
if not endpoint:
raise ValueError("AZURE_OPENAI_ENDPOINT environment variable is required")
client = AzureOpenAIResponsesClient(
credential=credential,
endpoint=endpoint
)
return client.create_agent(
name="SecureAgent",
instructions="You are a helpful assistant."
)
# Key Vault integration for secrets
from azure.keyvault.secrets import SecretClient
class SecureConfigProvider:
"""Fetch configuration from Azure Key Vault."""
def __init__(self, vault_url: str):
credential = get_azure_credential()
self.client = SecretClient(vault_url=vault_url, credential=credential)
def get_secret(self, name: str) -> str:
"""Retrieve a secret from Key Vault."""
secret = self.client.get_secret(name)
return secret.value
def get_api_key(self, service: str) -> str:
"""Get API key for external service."""
return self.get_secret(f"{service}-api-key")
# Usage
config = SecureConfigProvider(os.getenv("AZURE_KEY_VAULT_URL"))
crm_api_key = config.get_api_key("crm").NET / C# Implementation
using Polly;
using Polly.CircuitBreaker;
using Polly.Retry;
using Polly.Timeout;
using Microsoft.Extensions.Logging;
namespace MAF.Part08.Resilience;
///
/// Part 8: Resilient Agent Wrapper with Circuit Breaker for .NET
///
public class ResilientAgent
{
private readonly object _agent;
private readonly ILogger _logger;
private readonly AsyncRetryPolicy _retryPolicy;
private readonly AsyncCircuitBreakerPolicy _circuitBreaker;
private readonly AsyncTimeoutPolicy _timeoutPolicy;
private readonly string _fallbackResponse;
public ResilientAgent(
object agent,
ILogger logger,
int maxRetries = 3,
int circuitBreakerThreshold = 5,
int circuitBreakerDuration = 60,
int timeoutSeconds = 60,
string? fallbackResponse = null)
{
_agent = agent;
_logger = logger;
_fallbackResponse = fallbackResponse
?? "I'm experiencing difficulties. Please try again later.";
// Timeout policy
_timeoutPolicy = Policy.TimeoutAsync(
TimeSpan.FromSeconds(timeoutSeconds),
TimeoutStrategy.Optimistic);
// Retry policy with exponential backoff
_retryPolicy = Policy
.Handle()
.WaitAndRetryAsync(
maxRetries,
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
onRetry: (exception, timeSpan, retryCount, context) =>
{
_logger.LogWarning(
"Retry {RetryCount} after {Delay}s due to: {Message}",
retryCount, timeSpan.TotalSeconds, exception.Message);
});
// Circuit breaker policy
_circuitBreaker = Policy
.Handle ()
.CircuitBreakerAsync(
exceptionsAllowedBeforeBreaking: circuitBreakerThreshold,
durationOfBreak: TimeSpan.FromSeconds(circuitBreakerDuration),
onBreak: (exception, duration) =>
{
_logger.LogError(
"Circuit OPENED for {Duration}s due to: {Message}",
duration.TotalSeconds, exception.Message);
},
onReset: () =>
{
_logger.LogInformation("Circuit CLOSED - resuming normal operation");
},
onHalfOpen: () =>
{
_logger.LogInformation("Circuit HALF-OPEN - testing...");
});
}
public async Task RunAsync(string message, object? thread = null)
{
try
{
// Combine policies: timeout -> retry -> circuit breaker
var combinedPolicy = Policy.WrapAsync(_timeoutPolicy, _retryPolicy, _circuitBreaker);
var result = await combinedPolicy.ExecuteAsync(async () =>
{
// Use reflection to call the agent's RunAsync method
var runMethod = _agent.GetType().GetMethod("RunAsync");
if (runMethod == null)
throw new InvalidOperationException("Agent does not have RunAsync method");
dynamic task = runMethod.Invoke(_agent, new[] { message, thread })!;
return await task;
});
return result?.ToString() ?? string.Empty;
}
catch (BrokenCircuitException)
{
_logger.LogWarning("Circuit breaker is open - returning fallback");
return _fallbackResponse;
}
catch (TimeoutRejectedException)
{
_logger.LogWarning("Request timed out - returning fallback");
return _fallbackResponse;
}
catch (Exception ex)
{
_logger.LogError(ex, "All retries exhausted - returning fallback");
return _fallbackResponse;
}
}
}
Input Validation & Content Safety
from dataclasses import dataclass
from typing import List, Optional, Set
from enum import Enum
import re
import logging
logger = logging.getLogger(__name__)
class SafetyCategory(Enum):
HARMFUL = "harmful"
PII = "pii"
JAILBREAK = "jailbreak"
PROFANITY = "profanity"
BLOCKED_TERM = "blocked_term"
@dataclass
class SafetyResult:
is_safe: bool
violations: List[SafetyCategory]
details: str
class ContentSafetyFilter:
"""
Enterprise-grade content safety filter for agent inputs/outputs.
"""
def __init__(
self,
block_harmful: bool = True,
block_pii: bool = True,
block_jailbreaks: bool = True,
custom_blocklist: Optional[List[str]] = None,
max_input_length: int = 4000
):
self.block_harmful = block_harmful
self.block_pii = block_pii
self.block_jailbreaks = block_jailbreaks
self.blocklist: Set[str] = set(custom_blocklist or [])
self.max_input_length = max_input_length
# PII patterns
self.pii_patterns = [
(r'\d{3}-\d{2}-\d{4}', 'SSN'), # Social Security Number
(r'\d{16}', 'Credit Card'), # Credit card number
(r'[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}', 'Email'),
(r'\d{3}[-.]?\d{3}[-.]?\d{4}', 'Phone'),
]
# Jailbreak patterns
self.jailbreak_patterns = [
r'ignore (previous|all|your) instructions',
r'pretend (you are|to be)',
r'act as (if you are|a)',
r'disregard (safety|guidelines)',
r'bypass (filters|safety)',
]
def check_input(self, text: str) -> SafetyResult:
"""Check user input for safety violations."""
violations = []
details = []
# Length check
if len(text) > self.max_input_length:
violations.append(SafetyCategory.HARMFUL)
details.append(f"Input exceeds max length ({len(text)} > {self.max_input_length})")
# PII check
if self.block_pii:
for pattern, pii_type in self.pii_patterns:
if re.search(pattern, text, re.IGNORECASE):
violations.append(SafetyCategory.PII)
details.append(f"Potential {pii_type} detected")
# Jailbreak check
if self.block_jailbreaks:
for pattern in self.jailbreak_patterns:
if re.search(pattern, text, re.IGNORECASE):
violations.append(SafetyCategory.JAILBREAK)
details.append("Potential jailbreak attempt detected")
break
# Blocklist check
text_lower = text.lower()
for term in self.blocklist:
if term.lower() in text_lower:
violations.append(SafetyCategory.BLOCKED_TERM)
details.append(f"Blocked term detected")
is_safe = len(violations) == 0
if not is_safe:
logger.warning(f"Content safety violation: {details}")
return SafetyResult(
is_safe=is_safe,
violations=list(set(violations)),
details="; ".join(details) if details else "No issues detected"
)
def sanitize_output(self, text: str) -> str:
"""Sanitize agent output by redacting PII."""
result = text
for pattern, pii_type in self.pii_patterns:
result = re.sub(pattern, f"[{pii_type} REDACTED]", result)
return result
# Usage with agent
class SafeAgent:
def __init__(self, agent, filter_config: Optional[dict] = None):
self.agent = agent
self.filter = ContentSafetyFilter(**(filter_config or {}))
async def run(self, message: str, thread=None):
# Check input safety
safety_check = self.filter.check_input(message)
if not safety_check.is_safe:
return f"I cannot process this request: {safety_check.details}"
# Run agent
result = await self.agent.run(message, thread)
# Sanitize output
sanitized = self.filter.sanitize_output(result.text)
return sanitized.NET / C# Implementation
using System.Text.RegularExpressions;
namespace MAF.Part08.Security;
///
/// Part 8: Content Safety Filter for .NET
///
public enum SafetyCategory
{
Harmful,
PII,
Jailbreak,
Profanity,
BlockedTerm
}
public record SafetyResult(bool IsSafe, List Violations, string Details);
public class ContentSafetyFilter
{
private readonly bool _blockPii;
private readonly bool _blockJailbreaks;
private readonly HashSet _blocklist;
private readonly int _maxInputLength;
private readonly List<(Regex Pattern, string PiiType)> _piiPatterns = new()
{
(new Regex(@"\b\d{3}-\d{2}-\d{4}\b"), "SSN"),
(new Regex(@"\b\d{16}\b"), "Credit Card"),
(new Regex(@"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", RegexOptions.IgnoreCase), "Email"),
(new Regex(@"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b"), "Phone"),
};
private readonly List _jailbreakPatterns = new()
{
new Regex(@"ignore (previous|all|your) instructions", RegexOptions.IgnoreCase),
new Regex(@"pretend (you are|to be)", RegexOptions.IgnoreCase),
new Regex(@"act as (if you are|a)", RegexOptions.IgnoreCase),
new Regex(@"disregard (safety|guidelines)", RegexOptions.IgnoreCase),
new Regex(@"bypass (filters|safety)", RegexOptions.IgnoreCase),
};
public ContentSafetyFilter(
bool blockPii = true,
bool blockJailbreaks = true,
IEnumerable? customBlocklist = null,
int maxInputLength = 4000)
{
_blockPii = blockPii;
_blockJailbreaks = blockJailbreaks;
_blocklist = new HashSet (customBlocklist ?? Enumerable.Empty (),
StringComparer.OrdinalIgnoreCase);
_maxInputLength = maxInputLength;
}
public SafetyResult CheckInput(string text)
{
var violations = new List();
var details = new List();
// Length check
if (text.Length > _maxInputLength)
{
violations.Add(SafetyCategory.Harmful);
details.Add($"Input exceeds max length ({text.Length} > {_maxInputLength})");
}
// PII check
if (_blockPii)
{
foreach (var (pattern, piiType) in _piiPatterns)
{
if (pattern.IsMatch(text))
{
violations.Add(SafetyCategory.PII);
details.Add($"Potential {piiType} detected");
}
}
}
// Jailbreak check
if (_blockJailbreaks)
{
foreach (var pattern in _jailbreakPatterns)
{
if (pattern.IsMatch(text))
{
violations.Add(SafetyCategory.Jailbreak);
details.Add("Potential jailbreak attempt detected");
break;
}
}
}
// Blocklist check
foreach (var term in _blocklist)
{
if (text.Contains(term, StringComparison.OrdinalIgnoreCase))
{
violations.Add(SafetyCategory.BlockedTerm);
details.Add("Blocked term detected");
}
}
var isSafe = violations.Count == 0;
return new SafetyResult(
isSafe,
violations.Distinct().ToList(),
details.Any() ? string.Join("; ", details) : "No issues detected"
);
}
public string SanitizeOutput(string text)
{
var result = text;
foreach (var (pattern, piiType) in _piiPatterns)
{
result = pattern.Replace(result, $"[{piiType} REDACTED]");
}
return result;
}
}
Error Handling & Resilience
import asyncio
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Optional, Callable
import logging
logger = logging.getLogger(__name__)
@dataclass
class CircuitBreakerState:
failures: int = 0
last_failure: Optional[datetime] = None
is_open: bool = False
class ResilientAgent:
"""
Production-ready agent wrapper with:
- Automatic retry with exponential backoff
- Circuit breaker for failure protection
- Timeout handling
- Fallback responses
"""
def __init__(
self,
agent,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
timeout: float = 60.0,
circuit_threshold: int = 5,
circuit_reset_time: int = 60,
fallback_response: Optional[str] = None
):
self.agent = agent
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.timeout = timeout
self.circuit_threshold = circuit_threshold
self.circuit_reset_time = circuit_reset_time
self.fallback_response = fallback_response or "I'm experiencing difficulties. Please try again later."
self.circuit = CircuitBreakerState()
def _check_circuit(self) -> bool:
"""Check if circuit breaker allows requests."""
if not self.circuit.is_open:
return True
# Check if reset time has passed
if self.circuit.last_failure:
elapsed = datetime.now() - self.circuit.last_failure
if elapsed > timedelta(seconds=self.circuit_reset_time):
logger.info("Circuit breaker reset - allowing requests")
self.circuit.is_open = False
self.circuit.failures = 0
return True
logger.warning("Circuit breaker is OPEN - rejecting request")
return False
def _record_failure(self):
"""Record a failure and potentially open the circuit."""
self.circuit.failures += 1
self.circuit.last_failure = datetime.now()
if self.circuit.failures >= self.circuit_threshold:
self.circuit.is_open = True
logger.error(f"Circuit breaker OPENED after {self.circuit.failures} failures")
def _record_success(self):
"""Record a success and reset failure count."""
self.circuit.failures = 0
async def run(
self,
message: str,
thread=None,
on_retry: Optional[Callable] = None
) -> str:
"""
Run agent with resilience patterns.
"""
# Check circuit breaker
if not self._check_circuit():
return self.fallback_response
last_error = None
for attempt in range(self.max_retries + 1):
try:
# Apply timeout
result = await asyncio.wait_for(
self.agent.run(message, thread),
timeout=self.timeout
)
self._record_success()
return result.text
except asyncio.TimeoutError:
last_error = "Request timed out"
logger.warning(f"Attempt {attempt + 1}: Timeout after {self.timeout}s")
except Exception as e:
last_error = str(e)
logger.warning(f"Attempt {attempt + 1}: {last_error}")
# Record failure
self._record_failure()
# If not last attempt, wait with exponential backoff
if attempt < self.max_retries:
delay = min(self.base_delay * (2 ** attempt), self.max_delay)
logger.info(f"Retrying in {delay:.1f} seconds...")
if on_retry:
on_retry(attempt + 1, delay)
await asyncio.sleep(delay)
# All retries exhausted
logger.error(f"All retries exhausted. Last error: {last_error}")
return self.fallback_response
# Usage
resilient_agent = ResilientAgent(
agent=base_agent,
max_retries=3,
timeout=45.0,
circuit_threshold=5,
fallback_response="Our AI assistant is temporarily unavailable. Please try again in a few minutes."
)
result = await resilient_agent.run("Help me with my order").NET / C# Implementation
using Polly;
using Polly.CircuitBreaker;
using Polly.Retry;
using Polly.Timeout;
using Microsoft.Extensions.Logging;
namespace MAF.Part08.Resilience;
///
/// Part 8: Resilient Agent Wrapper with Circuit Breaker for .NET
///
public class ResilientAgent
{
private readonly object _agent;
private readonly ILogger _logger;
private readonly AsyncRetryPolicy _retryPolicy;
private readonly AsyncCircuitBreakerPolicy _circuitBreaker;
private readonly AsyncTimeoutPolicy _timeoutPolicy;
private readonly string _fallbackResponse;
public ResilientAgent(
object agent,
ILogger logger,
int maxRetries = 3,
int circuitBreakerThreshold = 5,
int circuitBreakerDuration = 60,
int timeoutSeconds = 60,
string? fallbackResponse = null)
{
_agent = agent;
_logger = logger;
_fallbackResponse = fallbackResponse
?? "I'm experiencing difficulties. Please try again later.";
// Timeout policy
_timeoutPolicy = Policy.TimeoutAsync(
TimeSpan.FromSeconds(timeoutSeconds),
TimeoutStrategy.Optimistic);
// Retry policy with exponential backoff
_retryPolicy = Policy
.Handle()
.WaitAndRetryAsync(
maxRetries,
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
onRetry: (exception, timeSpan, retryCount, context) =>
{
_logger.LogWarning(
"Retry {RetryCount} after {Delay}s due to: {Message}",
retryCount, timeSpan.TotalSeconds, exception.Message);
});
// Circuit breaker policy
_circuitBreaker = Policy
.Handle ()
.CircuitBreakerAsync(
exceptionsAllowedBeforeBreaking: circuitBreakerThreshold,
durationOfBreak: TimeSpan.FromSeconds(circuitBreakerDuration),
onBreak: (exception, duration) =>
{
_logger.LogError(
"Circuit OPENED for {Duration}s due to: {Message}",
duration.TotalSeconds, exception.Message);
},
onReset: () =>
{
_logger.LogInformation("Circuit CLOSED - resuming normal operation");
},
onHalfOpen: () =>
{
_logger.LogInformation("Circuit HALF-OPEN - testing...");
});
}
public async Task RunAsync(string message, object? thread = null)
{
try
{
// Combine policies: timeout -> retry -> circuit breaker
var combinedPolicy = Policy.WrapAsync(_timeoutPolicy, _retryPolicy, _circuitBreaker);
var result = await combinedPolicy.ExecuteAsync(async () =>
{
// Use reflection to call the agent's RunAsync method
var runMethod = _agent.GetType().GetMethod("RunAsync");
if (runMethod == null)
throw new InvalidOperationException("Agent does not have RunAsync method");
dynamic task = runMethod.Invoke(_agent, new[] { message, thread })!;
return await task;
});
return result?.ToString() ?? string.Empty;
}
catch (BrokenCircuitException)
{
_logger.LogWarning("Circuit breaker is open - returning fallback");
return _fallbackResponse;
}
catch (TimeoutRejectedException)
{
_logger.LogWarning("Request timed out - returning fallback");
return _fallbackResponse;
}
catch (Exception ex)
{
_logger.LogError(ex, "All retries exhausted - returning fallback");
return _fallbackResponse;
}
}
}
Azure AI Foundry Deployment
For managed hosting with enterprise controls:
import os
from azure.identity import DefaultAzureCredential
from agent_framework.azure import AzureAIAgentClient
async def deploy_to_azure_ai_foundry():
"""
Deploy agent to Azure AI Foundry Agent Service.
Benefits:
- Fully managed infrastructure
- Automatic scaling
- Built-in monitoring and logging
- Enterprise security controls
- SLA-backed reliability
"""
# Get project connection string from Azure AI Foundry
project_connection = os.getenv("AZURE_AI_PROJECT_CONNECTION")
if not project_connection:
raise ValueError("AZURE_AI_PROJECT_CONNECTION required for Azure deployment")
# Create Azure AI Agent client
client = AzureAIAgentClient(
project_connection_string=project_connection,
credential=DefaultAzureCredential()
)
# Define agent configuration
agent_config = {
"name": "ProductionSupportAgent",
"instructions": """
You are an enterprise customer support agent.
- Be professional and helpful
- Use tools to look up information
- Escalate complex issues appropriately
- Always protect customer privacy
""",
"model": "gpt-4o",
"tools": [
{"type": "code_interpreter"}, # Built-in code execution
{"type": "file_search"} # RAG over documents
]
}
# Create agent in Azure AI Foundry
agent = await client.create_agent(**agent_config)
print(f"Agent deployed to Azure AI Foundry")
print(f" Agent ID: {agent.id}")
print(f" Endpoint: {agent.endpoint}")
# Agent is now managed by Azure:
# - Auto-scaling based on demand
# - Automatic failover
# - Integrated with Azure Monitor
# - RBAC security model
return agent
# Using the deployed agent
async def use_foundry_agent():
agent = await deploy_to_azure_ai_foundry()
# Create thread for conversation
thread = await agent.create_thread()
# Run conversation (fully managed by Azure)
result = await agent.run(
thread_id=thread.id,
message="I need help with my subscription",
stream=True # Stream responses
)
async for chunk in result:
print(chunk.text, end="", flush=True)
print(f"\n\nConversation ID: {thread.id}")
# Run deployment
asyncio.run(use_foundry_agent()).NET / C# Implementation
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT");
Container Deployment
# Dockerfile for production agent service
FROM python:3.11-slim as base
# Security: Run as non-root user
RUN useradd -m -u 1000 agent
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY --chown=agent:agent . .
# Switch to non-root user
USER agent
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Run with uvicorn
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
---
# docker-compose.yml for local development
version: '3.8'
services:
agent-service:
build: .
ports:
- "8000:8000"
environment:
- AZURE_OPENAI_ENDPOINT=${AZURE_OPENAI_ENDPOINT}
- AZURE_CLIENT_ID=${AZURE_CLIENT_ID}
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
- REDIS_URL=redis://redis:6379
depends_on:
- redis
- otel-collector
restart: unless-stopped
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
otel-collector:
image: otel/opentelemetry-collector-contrib:latest
command: ["--config=/etc/otel-collector-config.yaml"]
volumes:
- ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
ports:
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
- "8889:8889" # Prometheus metrics
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686" # Jaeger UI
- "14250:14250" # gRPC
prometheus:
image: prom/prometheus:latest
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana-data:/var/lib/grafana
volumes:
redis-data:
grafana-data:Monitoring Dashboard
Key metrics to monitor:
| Metric | Description | Alert Threshold |
|---|---|---|
| Response Latency | Time to generate response | > 5 seconds |
| Token Usage | Tokens per request | > 4000 avg |
| Error Rate | Failed requests % | > 1% |
| Tool Success | Tool call success rate | < 95% |
| Throughput | Requests per minute | Capacity based |
📦 Source Code
All code examples from this article series are available on GitHub:
👉 https://github.com/nithinmohantk/microsoft-agent-framework-series-examples
Clone the repository to follow along:
git clone https://github.com/nithinmohantk/microsoft-agent-framework-series-examples.git
cd microsoft-agent-framework-series-examples
Rate Limiting (C#)
Series Navigation
- Part 1: Introduction
- Part 2: First Agent (.NET)
- Part 3: First Agent (Python)
- Part 4: Tools & Function Calling
- Part 5: Multi-Turn Conversations
- Part 6: Workflows
- Part 7: Multi-Agent Patterns
- Part 8: Production-Ready Agents ← You are here
- Part 9: MCP Integration — Coming next
References
- Microsoft Agent Framework GitHub
- Azure OpenAI Content Safety
- OpenTelemetry Documentation
- Azure AI Foundry Documentation
Discover more from C4: Container, Code, Cloud & Context
Subscribe to get the latest posts sent to your email.