Production-Ready Agents: Observability, Security & Deployment – Part 8

Part 8 of the Microsoft Agent Framework Series

Building an agent is one thing. Deploying it to production with enterprise-grade reliability, security, and observability is another challenge entirely. In this article, we’ll cover everything you need to move from prototype to production.

Production Readiness Checklist

AI Agent Production Architecture showing client layer, agent service layer, observability stack, and data layer
Figure 1: Production AI Agent Architecture with Observability
CategoryRequirementStatus
SecurityAuthentication & Authorization
SecurityContent Safety Filters
SecurityInput Validation
ObservabilityOpenTelemetry Integration
ObservabilityLogging & Metrics
ReliabilityError Handling & Retry
ReliabilityCircuit Breakers
PerformanceResponse Caching
PerformanceRate Limiting
DeploymentContainer/Cloud Ready

OpenTelemetry Integration

Microsoft Agent Framework has built-in OpenTelemetry support for comprehensive observability:

Python Setup

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def configure_telemetry(service_name: str = "agent-service"):
    """Configure OpenTelemetry for the agent service."""
    
    # Create resource with service metadata
    resource = Resource.create({
        "service.name": service_name,
        "service.version": "1.0.0",
        "deployment.environment": os.getenv("ENVIRONMENT", "development")
    })
    
    # Create tracer provider
    provider = TracerProvider(resource=resource)
    
    # Configure OTLP exporter (sends to collector)
    otlp_exporter = OTLPSpanExporter(
        endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317"),
        insecure=True
    )
    
    # Add batch processor for efficient export
    processor = BatchSpanProcessor(otlp_exporter)
    provider.add_span_processor(processor)
    
    # Set as global tracer provider
    trace.set_tracer_provider(provider)
    
    # Instrument HTTP client
    AioHttpClientInstrumentor().instrument()
    
    logger.info(f"Telemetry configured for {service_name}")
    return trace.get_tracer(service_name)

# Custom span decorator for agent operations
def traced(operation_name: str):
    """Decorator to trace agent operations."""
    def decorator(func):
        async def wrapper(*args, **kwargs):
            tracer = trace.get_tracer(__name__)
            with tracer.start_as_current_span(operation_name) as span:
                span.set_attribute("agent.operation", operation_name)
                try:
                    result = await func(*args, **kwargs)
                    span.set_attribute("agent.success", True)
                    return result
                except Exception as e:
                    span.set_attribute("agent.success", False)
                    span.set_attribute("agent.error", str(e))
                    span.record_exception(e)
                    raise
        return wrapper
    return decorator

# Usage example
tracer = configure_telemetry("my-agent-service")

@traced("process_user_request")
async def handle_request(message: str, thread):
    result = await agent.run(message, thread)
    return result

.NET / C# Implementation

using OpenTelemetry;
using OpenTelemetry.Trace;
using OpenTelemetry.Metrics;
using OpenTelemetry.Resources;
using Microsoft.Extensions.DependencyInjection;

namespace MAF.Part08.Telemetry;

/// 
/// Part 8: OpenTelemetry Configuration for .NET
/// 
public static class TelemetryConfiguration
{
    public static IServiceCollection AddAgentTelemetry(
        this IServiceCollection services,
        string serviceName = "agent-service")
    {
        services.AddOpenTelemetry()
            .ConfigureResource(resource => resource
                .AddService(
                    serviceName: serviceName,
                    serviceVersion: "1.0.0")
                .AddAttributes(new Dictionary
                {
                    ["deployment.environment"] = 
                        Environment.GetEnvironmentVariable("ENVIRONMENT") ?? "development"
                }))
            .WithTracing(tracing => tracing
                .AddSource("Microsoft.Agents.AI")
                .AddAspNetCoreInstrumentation()
                .AddHttpClientInstrumentation()
                .AddOtlpExporter(options =>
                {
                    options.Endpoint = new Uri(
                        Environment.GetEnvironmentVariable("OTEL_EXPORTER_OTLP_ENDPOINT") 
                        ?? "http://localhost:4317");
                }))
            .WithMetrics(metrics => metrics
                .AddMeter("Microsoft.Agents.AI")
                .AddAspNetCoreInstrumentation()
                .AddHttpClientInstrumentation()
                .AddOtlpExporter(options =>
                {
                    options.Endpoint = new Uri(
                        Environment.GetEnvironmentVariable("OTEL_EXPORTER_OTLP_ENDPOINT") 
                        ?? "http://localhost:4317");
                }));
        
        return services;
    }
}

// Program.cs usage example
public class Program
{
    public static void Main(string[] args)
    {
        var builder = WebApplication.CreateBuilder(args);

        // Add telemetry
        builder.Services.AddAgentTelemetry("customer-support-agent");

        var app = builder.Build();

        app.MapPost("/api/agent/chat", async (ChatRequest request) =>
        {
            // Agent endpoints are automatically traced
            return Results.Ok(new { response = "Hello!" });
        });

        app.Run();
    }
}

public record ChatRequest(string Message);

.NET Setup

Security Best Practices

Authentication with Azure AD

from azure.identity import (
    DefaultAzureCredential,
    ManagedIdentityCredential,
    ChainedTokenCredential,
    AzureCliCredential
)
import os
import logging

logger = logging.getLogger(__name__)

def get_azure_credential():
    """
    Get appropriate Azure credential based on environment.
    
    Priority:
    1. Managed Identity (when running in Azure)
    2. Azure CLI (for local development)
    3. Default credential chain (fallback)
    """
    
    # Check if running in Azure
    azure_client_id = os.getenv("AZURE_CLIENT_ID")
    
    if azure_client_id:
        # Production: Use Managed Identity
        logger.info("Using Managed Identity authentication")
        return ManagedIdentityCredential(client_id=azure_client_id)
    
    # Development: Use Azure CLI first, then fallback
    logger.info("Using development credential chain")
    return ChainedTokenCredential(
        AzureCliCredential(),
        DefaultAzureCredential()
    )

# Secure agent initialization
def create_secure_agent():
    """Create agent with secure credential management."""
    from agent_framework.azure import AzureOpenAIResponsesClient
    
    credential = get_azure_credential()
    
    # Get endpoint from environment (never hardcode)
    endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
    if not endpoint:
        raise ValueError("AZURE_OPENAI_ENDPOINT environment variable is required")
    
    client = AzureOpenAIResponsesClient(
        credential=credential,
        endpoint=endpoint
    )
    
    return client.create_agent(
        name="SecureAgent",
        instructions="You are a helpful assistant."
    )

# Key Vault integration for secrets
from azure.keyvault.secrets import SecretClient

class SecureConfigProvider:
    """Fetch configuration from Azure Key Vault."""
    
    def __init__(self, vault_url: str):
        credential = get_azure_credential()
        self.client = SecretClient(vault_url=vault_url, credential=credential)
    
    def get_secret(self, name: str) -> str:
        """Retrieve a secret from Key Vault."""
        secret = self.client.get_secret(name)
        return secret.value
    
    def get_api_key(self, service: str) -> str:
        """Get API key for external service."""
        return self.get_secret(f"{service}-api-key")

# Usage
config = SecureConfigProvider(os.getenv("AZURE_KEY_VAULT_URL"))
crm_api_key = config.get_api_key("crm")

.NET / C# Implementation

using Polly;
using Polly.CircuitBreaker;
using Polly.Retry;
using Polly.Timeout;
using Microsoft.Extensions.Logging;

namespace MAF.Part08.Resilience;

/// 
/// Part 8: Resilient Agent Wrapper with Circuit Breaker for .NET
/// 
public class ResilientAgent
{
    private readonly object _agent;
    private readonly ILogger _logger;
    private readonly AsyncRetryPolicy _retryPolicy;
    private readonly AsyncCircuitBreakerPolicy _circuitBreaker;
    private readonly AsyncTimeoutPolicy _timeoutPolicy;
    private readonly string _fallbackResponse;

    public ResilientAgent(
        object agent,
        ILogger logger,
        int maxRetries = 3,
        int circuitBreakerThreshold = 5,
        int circuitBreakerDuration = 60,
        int timeoutSeconds = 60,
        string? fallbackResponse = null)
    {
        _agent = agent;
        _logger = logger;
        _fallbackResponse = fallbackResponse 
            ?? "I'm experiencing difficulties. Please try again later.";

        // Timeout policy
        _timeoutPolicy = Policy.TimeoutAsync(
            TimeSpan.FromSeconds(timeoutSeconds),
            TimeoutStrategy.Optimistic);

        // Retry policy with exponential backoff
        _retryPolicy = Policy
            .Handle()
            .WaitAndRetryAsync(
                maxRetries,
                retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
                onRetry: (exception, timeSpan, retryCount, context) =>
                {
                    _logger.LogWarning(
                        "Retry {RetryCount} after {Delay}s due to: {Message}",
                        retryCount, timeSpan.TotalSeconds, exception.Message);
                });

        // Circuit breaker policy
        _circuitBreaker = Policy
            .Handle()
            .CircuitBreakerAsync(
                exceptionsAllowedBeforeBreaking: circuitBreakerThreshold,
                durationOfBreak: TimeSpan.FromSeconds(circuitBreakerDuration),
                onBreak: (exception, duration) =>
                {
                    _logger.LogError(
                        "Circuit OPENED for {Duration}s due to: {Message}",
                        duration.TotalSeconds, exception.Message);
                },
                onReset: () =>
                {
                    _logger.LogInformation("Circuit CLOSED - resuming normal operation");
                },
                onHalfOpen: () =>
                {
                    _logger.LogInformation("Circuit HALF-OPEN - testing...");
                });
    }

    public async Task RunAsync(string message, object? thread = null)
    {
        try
        {
            // Combine policies: timeout -> retry -> circuit breaker
            var combinedPolicy = Policy.WrapAsync(_timeoutPolicy, _retryPolicy, _circuitBreaker);

            var result = await combinedPolicy.ExecuteAsync(async () =>
            {
                // Use reflection to call the agent's RunAsync method
                var runMethod = _agent.GetType().GetMethod("RunAsync");
                if (runMethod == null)
                    throw new InvalidOperationException("Agent does not have RunAsync method");

                dynamic task = runMethod.Invoke(_agent, new[] { message, thread })!;
                return await task;
            });

            return result?.ToString() ?? string.Empty;
        }
        catch (BrokenCircuitException)
        {
            _logger.LogWarning("Circuit breaker is open - returning fallback");
            return _fallbackResponse;
        }
        catch (TimeoutRejectedException)
        {
            _logger.LogWarning("Request timed out - returning fallback");
            return _fallbackResponse;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "All retries exhausted - returning fallback");
            return _fallbackResponse;
        }
    }
}

Input Validation & Content Safety

from dataclasses import dataclass
from typing import List, Optional, Set
from enum import Enum
import re
import logging

logger = logging.getLogger(__name__)

class SafetyCategory(Enum):
    HARMFUL = "harmful"
    PII = "pii"
    JAILBREAK = "jailbreak"
    PROFANITY = "profanity"
    BLOCKED_TERM = "blocked_term"

@dataclass
class SafetyResult:
    is_safe: bool
    violations: List[SafetyCategory]
    details: str

class ContentSafetyFilter:
    """
    Enterprise-grade content safety filter for agent inputs/outputs.
    """
    
    def __init__(
        self,
        block_harmful: bool = True,
        block_pii: bool = True,
        block_jailbreaks: bool = True,
        custom_blocklist: Optional[List[str]] = None,
        max_input_length: int = 4000
    ):
        self.block_harmful = block_harmful
        self.block_pii = block_pii
        self.block_jailbreaks = block_jailbreaks
        self.blocklist: Set[str] = set(custom_blocklist or [])
        self.max_input_length = max_input_length
        
        # PII patterns
        self.pii_patterns = [
            (r'\d{3}-\d{2}-\d{4}', 'SSN'),  # Social Security Number
            (r'\d{16}', 'Credit Card'),      # Credit card number
            (r'[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}', 'Email'),
            (r'\d{3}[-.]?\d{3}[-.]?\d{4}', 'Phone'),
        ]
        
        # Jailbreak patterns
        self.jailbreak_patterns = [
            r'ignore (previous|all|your) instructions',
            r'pretend (you are|to be)',
            r'act as (if you are|a)',
            r'disregard (safety|guidelines)',
            r'bypass (filters|safety)',
        ]
    
    def check_input(self, text: str) -> SafetyResult:
        """Check user input for safety violations."""
        violations = []
        details = []
        
        # Length check
        if len(text) > self.max_input_length:
            violations.append(SafetyCategory.HARMFUL)
            details.append(f"Input exceeds max length ({len(text)} > {self.max_input_length})")
        
        # PII check
        if self.block_pii:
            for pattern, pii_type in self.pii_patterns:
                if re.search(pattern, text, re.IGNORECASE):
                    violations.append(SafetyCategory.PII)
                    details.append(f"Potential {pii_type} detected")
        
        # Jailbreak check
        if self.block_jailbreaks:
            for pattern in self.jailbreak_patterns:
                if re.search(pattern, text, re.IGNORECASE):
                    violations.append(SafetyCategory.JAILBREAK)
                    details.append("Potential jailbreak attempt detected")
                    break
        
        # Blocklist check
        text_lower = text.lower()
        for term in self.blocklist:
            if term.lower() in text_lower:
                violations.append(SafetyCategory.BLOCKED_TERM)
                details.append(f"Blocked term detected")
        
        is_safe = len(violations) == 0
        
        if not is_safe:
            logger.warning(f"Content safety violation: {details}")
        
        return SafetyResult(
            is_safe=is_safe,
            violations=list(set(violations)),
            details="; ".join(details) if details else "No issues detected"
        )
    
    def sanitize_output(self, text: str) -> str:
        """Sanitize agent output by redacting PII."""
        result = text
        
        for pattern, pii_type in self.pii_patterns:
            result = re.sub(pattern, f"[{pii_type} REDACTED]", result)
        
        return result

# Usage with agent
class SafeAgent:
    def __init__(self, agent, filter_config: Optional[dict] = None):
        self.agent = agent
        self.filter = ContentSafetyFilter(**(filter_config or {}))
    
    async def run(self, message: str, thread=None):
        # Check input safety
        safety_check = self.filter.check_input(message)
        
        if not safety_check.is_safe:
            return f"I cannot process this request: {safety_check.details}"
        
        # Run agent
        result = await self.agent.run(message, thread)
        
        # Sanitize output
        sanitized = self.filter.sanitize_output(result.text)
        
        return sanitized

.NET / C# Implementation

using System.Text.RegularExpressions;

namespace MAF.Part08.Security;

/// 
/// Part 8: Content Safety Filter for .NET
/// 
public enum SafetyCategory
{
    Harmful,
    PII,
    Jailbreak,
    Profanity,
    BlockedTerm
}

public record SafetyResult(bool IsSafe, List Violations, string Details);

public class ContentSafetyFilter
{
    private readonly bool _blockPii;
    private readonly bool _blockJailbreaks;
    private readonly HashSet _blocklist;
    private readonly int _maxInputLength;

    private readonly List<(Regex Pattern, string PiiType)> _piiPatterns = new()
    {
        (new Regex(@"\b\d{3}-\d{2}-\d{4}\b"), "SSN"),
        (new Regex(@"\b\d{16}\b"), "Credit Card"),
        (new Regex(@"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", RegexOptions.IgnoreCase), "Email"),
        (new Regex(@"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b"), "Phone"),
    };

    private readonly List _jailbreakPatterns = new()
    {
        new Regex(@"ignore (previous|all|your) instructions", RegexOptions.IgnoreCase),
        new Regex(@"pretend (you are|to be)", RegexOptions.IgnoreCase),
        new Regex(@"act as (if you are|a)", RegexOptions.IgnoreCase),
        new Regex(@"disregard (safety|guidelines)", RegexOptions.IgnoreCase),
        new Regex(@"bypass (filters|safety)", RegexOptions.IgnoreCase),
    };

    public ContentSafetyFilter(
        bool blockPii = true,
        bool blockJailbreaks = true,
        IEnumerable? customBlocklist = null,
        int maxInputLength = 4000)
    {
        _blockPii = blockPii;
        _blockJailbreaks = blockJailbreaks;
        _blocklist = new HashSet(customBlocklist ?? Enumerable.Empty(), 
            StringComparer.OrdinalIgnoreCase);
        _maxInputLength = maxInputLength;
    }

    public SafetyResult CheckInput(string text)
    {
        var violations = new List();
        var details = new List();

        // Length check
        if (text.Length > _maxInputLength)
        {
            violations.Add(SafetyCategory.Harmful);
            details.Add($"Input exceeds max length ({text.Length} > {_maxInputLength})");
        }

        // PII check
        if (_blockPii)
        {
            foreach (var (pattern, piiType) in _piiPatterns)
            {
                if (pattern.IsMatch(text))
                {
                    violations.Add(SafetyCategory.PII);
                    details.Add($"Potential {piiType} detected");
                }
            }
        }

        // Jailbreak check
        if (_blockJailbreaks)
        {
            foreach (var pattern in _jailbreakPatterns)
            {
                if (pattern.IsMatch(text))
                {
                    violations.Add(SafetyCategory.Jailbreak);
                    details.Add("Potential jailbreak attempt detected");
                    break;
                }
            }
        }

        // Blocklist check
        foreach (var term in _blocklist)
        {
            if (text.Contains(term, StringComparison.OrdinalIgnoreCase))
            {
                violations.Add(SafetyCategory.BlockedTerm);
                details.Add("Blocked term detected");
            }
        }

        var isSafe = violations.Count == 0;
        return new SafetyResult(
            isSafe,
            violations.Distinct().ToList(),
            details.Any() ? string.Join("; ", details) : "No issues detected"
        );
    }

    public string SanitizeOutput(string text)
    {
        var result = text;
        foreach (var (pattern, piiType) in _piiPatterns)
        {
            result = pattern.Replace(result, $"[{piiType} REDACTED]");
        }
        return result;
    }
}

Error Handling & Resilience

import asyncio
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Optional, Callable
import logging

logger = logging.getLogger(__name__)

@dataclass
class CircuitBreakerState:
    failures: int = 0
    last_failure: Optional[datetime] = None
    is_open: bool = False
    
class ResilientAgent:
    """
    Production-ready agent wrapper with:
    - Automatic retry with exponential backoff
    - Circuit breaker for failure protection
    - Timeout handling
    - Fallback responses
    """
    
    def __init__(
        self,
        agent,
        max_retries: int = 3,
        base_delay: float = 1.0,
        max_delay: float = 30.0,
        timeout: float = 60.0,
        circuit_threshold: int = 5,
        circuit_reset_time: int = 60,
        fallback_response: Optional[str] = None
    ):
        self.agent = agent
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.timeout = timeout
        self.circuit_threshold = circuit_threshold
        self.circuit_reset_time = circuit_reset_time
        self.fallback_response = fallback_response or "I'm experiencing difficulties. Please try again later."
        
        self.circuit = CircuitBreakerState()
    
    def _check_circuit(self) -> bool:
        """Check if circuit breaker allows requests."""
        if not self.circuit.is_open:
            return True
        
        # Check if reset time has passed
        if self.circuit.last_failure:
            elapsed = datetime.now() - self.circuit.last_failure
            if elapsed > timedelta(seconds=self.circuit_reset_time):
                logger.info("Circuit breaker reset - allowing requests")
                self.circuit.is_open = False
                self.circuit.failures = 0
                return True
        
        logger.warning("Circuit breaker is OPEN - rejecting request")
        return False
    
    def _record_failure(self):
        """Record a failure and potentially open the circuit."""
        self.circuit.failures += 1
        self.circuit.last_failure = datetime.now()
        
        if self.circuit.failures >= self.circuit_threshold:
            self.circuit.is_open = True
            logger.error(f"Circuit breaker OPENED after {self.circuit.failures} failures")
    
    def _record_success(self):
        """Record a success and reset failure count."""
        self.circuit.failures = 0
    
    async def run(
        self,
        message: str,
        thread=None,
        on_retry: Optional[Callable] = None
    ) -> str:
        """
        Run agent with resilience patterns.
        """
        # Check circuit breaker
        if not self._check_circuit():
            return self.fallback_response
        
        last_error = None
        
        for attempt in range(self.max_retries + 1):
            try:
                # Apply timeout
                result = await asyncio.wait_for(
                    self.agent.run(message, thread),
                    timeout=self.timeout
                )
                
                self._record_success()
                return result.text
                
            except asyncio.TimeoutError:
                last_error = "Request timed out"
                logger.warning(f"Attempt {attempt + 1}: Timeout after {self.timeout}s")
                
            except Exception as e:
                last_error = str(e)
                logger.warning(f"Attempt {attempt + 1}: {last_error}")
            
            # Record failure
            self._record_failure()
            
            # If not last attempt, wait with exponential backoff
            if attempt < self.max_retries:
                delay = min(self.base_delay * (2 ** attempt), self.max_delay)
                logger.info(f"Retrying in {delay:.1f} seconds...")
                
                if on_retry:
                    on_retry(attempt + 1, delay)
                
                await asyncio.sleep(delay)
        
        # All retries exhausted
        logger.error(f"All retries exhausted. Last error: {last_error}")
        return self.fallback_response

# Usage
resilient_agent = ResilientAgent(
    agent=base_agent,
    max_retries=3,
    timeout=45.0,
    circuit_threshold=5,
    fallback_response="Our AI assistant is temporarily unavailable. Please try again in a few minutes."
)

result = await resilient_agent.run("Help me with my order")

.NET / C# Implementation

using Polly;
using Polly.CircuitBreaker;
using Polly.Retry;
using Polly.Timeout;
using Microsoft.Extensions.Logging;

namespace MAF.Part08.Resilience;

/// 
/// Part 8: Resilient Agent Wrapper with Circuit Breaker for .NET
/// 
public class ResilientAgent
{
    private readonly object _agent;
    private readonly ILogger _logger;
    private readonly AsyncRetryPolicy _retryPolicy;
    private readonly AsyncCircuitBreakerPolicy _circuitBreaker;
    private readonly AsyncTimeoutPolicy _timeoutPolicy;
    private readonly string _fallbackResponse;

    public ResilientAgent(
        object agent,
        ILogger logger,
        int maxRetries = 3,
        int circuitBreakerThreshold = 5,
        int circuitBreakerDuration = 60,
        int timeoutSeconds = 60,
        string? fallbackResponse = null)
    {
        _agent = agent;
        _logger = logger;
        _fallbackResponse = fallbackResponse 
            ?? "I'm experiencing difficulties. Please try again later.";

        // Timeout policy
        _timeoutPolicy = Policy.TimeoutAsync(
            TimeSpan.FromSeconds(timeoutSeconds),
            TimeoutStrategy.Optimistic);

        // Retry policy with exponential backoff
        _retryPolicy = Policy
            .Handle()
            .WaitAndRetryAsync(
                maxRetries,
                retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
                onRetry: (exception, timeSpan, retryCount, context) =>
                {
                    _logger.LogWarning(
                        "Retry {RetryCount} after {Delay}s due to: {Message}",
                        retryCount, timeSpan.TotalSeconds, exception.Message);
                });

        // Circuit breaker policy
        _circuitBreaker = Policy
            .Handle()
            .CircuitBreakerAsync(
                exceptionsAllowedBeforeBreaking: circuitBreakerThreshold,
                durationOfBreak: TimeSpan.FromSeconds(circuitBreakerDuration),
                onBreak: (exception, duration) =>
                {
                    _logger.LogError(
                        "Circuit OPENED for {Duration}s due to: {Message}",
                        duration.TotalSeconds, exception.Message);
                },
                onReset: () =>
                {
                    _logger.LogInformation("Circuit CLOSED - resuming normal operation");
                },
                onHalfOpen: () =>
                {
                    _logger.LogInformation("Circuit HALF-OPEN - testing...");
                });
    }

    public async Task RunAsync(string message, object? thread = null)
    {
        try
        {
            // Combine policies: timeout -> retry -> circuit breaker
            var combinedPolicy = Policy.WrapAsync(_timeoutPolicy, _retryPolicy, _circuitBreaker);

            var result = await combinedPolicy.ExecuteAsync(async () =>
            {
                // Use reflection to call the agent's RunAsync method
                var runMethod = _agent.GetType().GetMethod("RunAsync");
                if (runMethod == null)
                    throw new InvalidOperationException("Agent does not have RunAsync method");

                dynamic task = runMethod.Invoke(_agent, new[] { message, thread })!;
                return await task;
            });

            return result?.ToString() ?? string.Empty;
        }
        catch (BrokenCircuitException)
        {
            _logger.LogWarning("Circuit breaker is open - returning fallback");
            return _fallbackResponse;
        }
        catch (TimeoutRejectedException)
        {
            _logger.LogWarning("Request timed out - returning fallback");
            return _fallbackResponse;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "All retries exhausted - returning fallback");
            return _fallbackResponse;
        }
    }
}

Azure AI Foundry Deployment

For managed hosting with enterprise controls:

import os
from azure.identity import DefaultAzureCredential
from agent_framework.azure import AzureAIAgentClient

async def deploy_to_azure_ai_foundry():
    """
    Deploy agent to Azure AI Foundry Agent Service.
    
    Benefits:
    - Fully managed infrastructure
    - Automatic scaling
    - Built-in monitoring and logging
    - Enterprise security controls
    - SLA-backed reliability
    """
    
    # Get project connection string from Azure AI Foundry
    project_connection = os.getenv("AZURE_AI_PROJECT_CONNECTION")
    
    if not project_connection:
        raise ValueError("AZURE_AI_PROJECT_CONNECTION required for Azure deployment")
    
    # Create Azure AI Agent client
    client = AzureAIAgentClient(
        project_connection_string=project_connection,
        credential=DefaultAzureCredential()
    )
    
    # Define agent configuration
    agent_config = {
        "name": "ProductionSupportAgent",
        "instructions": """
            You are an enterprise customer support agent.
            - Be professional and helpful
            - Use tools to look up information
            - Escalate complex issues appropriately
            - Always protect customer privacy
        """,
        "model": "gpt-4o",
        "tools": [
            {"type": "code_interpreter"},  # Built-in code execution
            {"type": "file_search"}         # RAG over documents
        ]
    }
    
    # Create agent in Azure AI Foundry
    agent = await client.create_agent(**agent_config)
    
    print(f"Agent deployed to Azure AI Foundry")
    print(f"  Agent ID: {agent.id}")
    print(f"  Endpoint: {agent.endpoint}")
    
    # Agent is now managed by Azure:
    # - Auto-scaling based on demand
    # - Automatic failover
    # - Integrated with Azure Monitor
    # - RBAC security model
    
    return agent

# Using the deployed agent
async def use_foundry_agent():
    agent = await deploy_to_azure_ai_foundry()
    
    # Create thread for conversation
    thread = await agent.create_thread()
    
    # Run conversation (fully managed by Azure)
    result = await agent.run(
        thread_id=thread.id,
        message="I need help with my subscription",
        stream=True  # Stream responses
    )
    
    async for chunk in result:
        print(chunk.text, end="", flush=True)
    
    print(f"\n\nConversation ID: {thread.id}")

# Run deployment
asyncio.run(use_foundry_agent())

.NET / C# Implementation

var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT");

Container Deployment

# Dockerfile for production agent service
FROM python:3.11-slim as base

# Security: Run as non-root user
RUN useradd -m -u 1000 agent
WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY --chown=agent:agent . .

# Switch to non-root user
USER agent

# Expose port
EXPOSE 8000

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
  CMD curl -f http://localhost:8000/health || exit 1

# Run with uvicorn
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

---
# docker-compose.yml for local development
version: '3.8'

services:
  agent-service:
    build: .
    ports:
      - "8000:8000"
    environment:
      - AZURE_OPENAI_ENDPOINT=${AZURE_OPENAI_ENDPOINT}
      - AZURE_CLIENT_ID=${AZURE_CLIENT_ID}
      - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
      - REDIS_URL=redis://redis:6379
    depends_on:
      - redis
      - otel-collector
    restart: unless-stopped

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data

  otel-collector:
    image: otel/opentelemetry-collector-contrib:latest
    command: ["--config=/etc/otel-collector-config.yaml"]
    volumes:
      - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
    ports:
      - "4317:4317"   # OTLP gRPC
      - "4318:4318"   # OTLP HTTP
      - "8889:8889"   # Prometheus metrics

  jaeger:
    image: jaegertracing/all-in-one:latest
    ports:
      - "16686:16686"  # Jaeger UI
      - "14250:14250"  # gRPC

  prometheus:
    image: prom/prometheus:latest
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    ports:
      - "9090:9090"

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - grafana-data:/var/lib/grafana

volumes:
  redis-data:
  grafana-data:

Monitoring Dashboard

Key metrics to monitor:

MetricDescriptionAlert Threshold
Response LatencyTime to generate response> 5 seconds
Token UsageTokens per request> 4000 avg
Error RateFailed requests %> 1%
Tool SuccessTool call success rate< 95%
ThroughputRequests per minuteCapacity based

📦 Source Code

All code examples from this article series are available on GitHub:

👉 https://github.com/nithinmohantk/microsoft-agent-framework-series-examples

Clone the repository to follow along:

git clone https://github.com/nithinmohantk/microsoft-agent-framework-series-examples.git
cd microsoft-agent-framework-series-examples

Rate Limiting (C#)


Series Navigation

References


Discover more from C4: Container, Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a comment

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.