Workflows: Graph-Based Agent Orchestration in Microsoft Agent Framework – Part 6

Part 6 of the Microsoft Agent Framework Series

So far we’ve focused on individual agents. But enterprise applications often require coordinated multi-step processes — document processing pipelines, approval workflows, complex research tasks. This is where Workflows come in.

Workflows are graph-based orchestration systems that connect multiple agents and functions to perform complex, deterministic processes with checkpointing, error recovery, and human-in-the-loop support.

Agents vs Workflows

AspectAgentsWorkflows
ControlLLM decides next actionDeveloper defines execution path
DeterminismVariable (creative)Predictable (structured)
ComplexitySingle agent, multiple toolsMultiple agents + functions
Use caseOpen-ended tasksMulti-step business processes
StateThread-basedCheckpointed graph state
💡
WHEN TO USE WORKFLOWS

Use workflows when you need explicit control over execution order, checkpointing for long-running processes, or when combining multiple specialized agents in a predictable sequence.

Workflow Components

Executors

Executors are the nodes in your workflow graph. They can be:

  • AI Agents: LLM-powered decision makers
  • Functions: Deterministic code execution
  • Custom logic: Business rules, validations

Edges

Edges define how messages flow between executors:

  • Direct edges: Simple A → B connections
  • Conditional edges: Route based on conditions
  • Switch-case edges: Multiple routing options
  • Fan-out edges: One to many (parallel)
  • Fan-in edges: Many to one (aggregation)

Building Your First Workflow

Python Example: Document Processing Pipeline

from agent_framework import WorkflowBuilder
from agent_framework.azure import AzureOpenAIResponsesClient
from azure.identity import AzureCliCredential
import asyncio

async def create_document_processing_workflow():
    # Create the client
    client = AzureOpenAIResponsesClient(credential=AzureCliCredential())
    
    # Create specialized agents
    classifier_agent = client.create_agent(
        name="DocumentClassifier",
        instructions="""
            You are a document classification expert.
            Analyze documents and classify them into categories:
            - invoice: Bills, payment requests, receipts
            - contract: Legal agreements, terms of service
            - report: Analysis documents, summaries, metrics
            - correspondence: Letters, emails, memos
            - other: Anything that doesn't fit above categories
            
            Return ONLY the category name in lowercase.
        """
    )
    
    extractor_agent = client.create_agent(
        name="DataExtractor",
        instructions="""
            You are a data extraction specialist.
            Based on the document type, extract key fields:
            
            For invoices: vendor, amount, date, invoice_number
            For contracts: parties, effective_date, terms, signatures
            For reports: title, date, key_findings, recommendations
            For correspondence: sender, recipient, date, subject, action_items
            
            Return data as structured JSON.
        """
    )
    
    validator_agent = client.create_agent(
        name="DataValidator",
        instructions="""
            You are a data validation expert.
            Check extracted data for:
            - Completeness: All required fields present
            - Format: Dates, numbers, emails properly formatted
            - Consistency: Values make logical sense
            
            Return validation result with any issues found.
        """
    )
    
    # Build the workflow
    builder = WorkflowBuilder()
    
    # Add executors (agents)
    builder.add_executor(classifier_agent)
    builder.add_executor(extractor_agent)
    builder.add_executor(validator_agent)
    
    # Define edges (execution flow)
    builder.add_edge(classifier_agent, extractor_agent)
    builder.add_edge(extractor_agent, validator_agent)
    
    # Set entry point
    builder.set_start_executor(classifier_agent)
    
    # Build the workflow
    workflow = builder.build()
    
    return workflow

async def process_document(document_text: str):
    workflow = await create_document_processing_workflow()
    
    print("Processing document through workflow...")
    print("=" * 50)
    
    # Run the workflow
    result = await workflow.run(document_text)
    
    print(f"\nFinal Result:\n{result}")
    return result

# Example usage
if __name__ == "__main__":
    sample_document = """
    INVOICE #INV-2025-001
    
    From: TechCorp Solutions
    To: Acme Industries
    Date: January 15, 2025
    
    Services Rendered:
    - Cloud Infrastructure Setup: $5,000
    - Security Audit: $2,500
    - Training (3 days): $1,500
    
    Total: $9,000
    Due Date: February 15, 2025
    """
    
    asyncio.run(process_document(sample_document))

.NET / C# Implementation

using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Azure.Identity;
using OpenAI;

namespace MAF.Part06.Workflows;

/// 
/// Part 6: Document Processing Workflow in .NET
/// 
public class DocumentWorkflow
{
    public static async Task Main(string[] args)
    {
        var client = new AzureOpenAIClient(
            new Uri(Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")!),
            new DefaultAzureCredential());

        var responseClient = client.GetOpenAIResponseClient("gpt-4o");

        // Create specialized agents
        var classifier = responseClient.CreateAIAgent(
            name: "DocumentClassifier",
            instructions: @"
                Classify documents into categories:
                - invoice: Bills, payment requests
                - contract: Legal agreements
                - report: Analysis documents
                - correspondence: Letters, emails
                Return ONLY the category name.");

        var extractor = responseClient.CreateAIAgent(
            name: "DataExtractor",
            instructions: @"
                Extract key fields based on document type:
                - invoice: vendor, amount, date, invoice_number
                - contract: parties, effective_date, terms
                - report: title, date, key_findings
                Return data as structured JSON.");

        var validator = responseClient.CreateAIAgent(
            name: "DataValidator",
            instructions: @"
                Validate extracted data for:
                - Completeness
                - Format correctness
                - Logical consistency
                Return validation result.");

        // Build workflow
        var builder = new WorkflowBuilder();

        builder.AddExecutor(classifier);
        builder.AddExecutor(extractor);
        builder.AddExecutor(validator);

        builder.AddEdge(classifier, extractor);
        builder.AddEdge(extractor, validator);

        builder.SetStartExecutor(classifier);

        var workflow = builder.Build();

        // Run workflow
        Console.WriteLine("Processing document...");
        var result = await workflow.RunAsync(@"
            INVOICE #INV-2025-001

            From: TechCorp Solutions
            To: Acme Industries
            Date: January 15, 2025

            Services: Cloud Setup $5,000
            Total: $5,000
            Due: February 15, 2025
        ");

        Console.WriteLine($"\nResult:\n{result}");
    }
}

.NET Example: Approval Workflow

Conditional Routing

from agent_framework import WorkflowBuilder

def create_conditional_workflow(client):
    # Create type-specific processors
    invoice_processor = client.create_agent(
        name="InvoiceProcessor",
        instructions="Process invoices: validate amounts, check vendor, verify terms."
    )
    
    contract_processor = client.create_agent(
        name="ContractProcessor", 
        instructions="Process contracts: extract terms, identify obligations, flag risks."
    )
    
    general_processor = client.create_agent(
        name="GeneralProcessor",
        instructions="Process general documents: summarize content, extract key points."
    )
    
    classifier = client.create_agent(
        name="Classifier",
        instructions="Classify document as: invoice, contract, or other. Return only the type."
    )
    
    # Router function based on classifier output
    def route_by_type(classification_result: str):
        result_lower = classification_result.lower().strip()
        
        if "invoice" in result_lower:
            return invoice_processor
        elif "contract" in result_lower:
            return contract_processor
        else:
            return general_processor
    
    # Build workflow with conditional routing
    builder = WorkflowBuilder()
    
    builder.add_executor(classifier)
    builder.add_executor(invoice_processor)
    builder.add_executor(contract_processor)
    builder.add_executor(general_processor)
    
    # Add conditional edge - routes based on classification
    builder.add_conditional_edge(
        source=classifier,
        router=route_by_type
    )
    
    builder.set_start_executor(classifier)
    
    return builder.build()

# Usage
async def process_with_routing(document: str):
    workflow = create_conditional_workflow(client)
    result = await workflow.run(document)
    return result

.NET / C# Implementation

using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Azure.Identity;
using OpenAI;

namespace MAF.Part06.Workflows;

/// 
/// Part 6: Document Processing Workflow in .NET
/// 
public class DocumentWorkflow
{
    public static async Task Main(string[] args)
    {
        var client = new AzureOpenAIClient(
            new Uri(Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")!),
            new DefaultAzureCredential());

        var responseClient = client.GetOpenAIResponseClient("gpt-4o");

        // Create specialized agents
        var classifier = responseClient.CreateAIAgent(
            name: "DocumentClassifier",
            instructions: @"
                Classify documents into categories:
                - invoice: Bills, payment requests
                - contract: Legal agreements
                - report: Analysis documents
                - correspondence: Letters, emails
                Return ONLY the category name.");

        var extractor = responseClient.CreateAIAgent(
            name: "DataExtractor",
            instructions: @"
                Extract key fields based on document type:
                - invoice: vendor, amount, date, invoice_number
                - contract: parties, effective_date, terms
                - report: title, date, key_findings
                Return data as structured JSON.");

        var validator = responseClient.CreateAIAgent(
            name: "DataValidator",
            instructions: @"
                Validate extracted data for:
                - Completeness
                - Format correctness
                - Logical consistency
                Return validation result.");

        // Build workflow
        var builder = new WorkflowBuilder();

        builder.AddExecutor(classifier);
        builder.AddExecutor(extractor);
        builder.AddExecutor(validator);

        builder.AddEdge(classifier, extractor);
        builder.AddEdge(extractor, validator);

        builder.SetStartExecutor(classifier);

        var workflow = builder.Build();

        // Run workflow
        Console.WriteLine("Processing document...");
        var result = await workflow.RunAsync(@"
            INVOICE #INV-2025-001

            From: TechCorp Solutions
            To: Acme Industries
            Date: January 15, 2025

            Services: Cloud Setup $5,000
            Total: $5,000
            Due: February 15, 2025
        ");

        Console.WriteLine($"\nResult:\n{result}");
    }
}

Checkpointing & Recovery

Workflows can save state at any point for recovery:

from agent_framework import WorkflowBuilder
from agent_framework.checkpoints import RedisCheckpointStore
import asyncio

class CheckpointedWorkflow:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.checkpoint_store = RedisCheckpointStore(redis_url)
    
    def create_workflow(self, client):
        # Create agents
        step1 = client.create_agent(name="DataFetcher", instructions="Fetch and validate input data")
        step2 = client.create_agent(name="Analyzer", instructions="Analyze the data thoroughly")
        step3 = client.create_agent(name="Reporter", instructions="Generate final report")
        
        builder = WorkflowBuilder()
        
        builder.add_executor(step1)
        builder.add_executor(step2)
        builder.add_executor(step3)
        
        builder.add_edge(step1, step2)
        builder.add_edge(step2, step3)
        
        builder.set_start_executor(step1)
        
        # Enable checkpointing with Redis store
        return builder.build(checkpoint_store=self.checkpoint_store)
    
    async def run_with_checkpoints(self, workflow, input_data: str):
        """Run workflow with automatic checkpointing."""
        try:
            result = await workflow.run(
                input_data,
                checkpoint_after=["DataFetcher", "Analyzer"]  # Save after these steps
            )
            return result
        except Exception as e:
            print(f"Workflow failed: {e}")
            # Can resume from last checkpoint
            return None
    
    async def resume_workflow(self, workflow, checkpoint_id: str):
        """Resume a failed workflow from its last checkpoint."""
        print(f"Resuming workflow from checkpoint: {checkpoint_id}")
        
        result = await workflow.resume(checkpoint_id=checkpoint_id)
        
        print(f"Workflow resumed and completed: {result}")
        return result
    
    async def list_checkpoints(self):
        """List all available checkpoints."""
        checkpoints = await self.checkpoint_store.list_checkpoints()
        for cp in checkpoints:
            print(f"  - {cp['id']}: {cp['step']} at {cp['timestamp']}")
        return checkpoints

# Usage example
async def main():
    cw = CheckpointedWorkflow()
    workflow = cw.create_workflow(client)
    
    # Run with checkpointing
    result = await cw.run_with_checkpoints(
        workflow,
        "Analyze Q4 2024 sales data for North America region"
    )
    
    if result is None:
        # If failed, list checkpoints and resume
        checkpoints = await cw.list_checkpoints()
        if checkpoints:
            latest = checkpoints[-1]
            result = await cw.resume_workflow(workflow, latest['id'])
    
    print(f"Final result: {result}")

asyncio.run(main())

.NET / C# Implementation

using Microsoft.Agents.AI.Workflows;

// Initialize Checkpoint Store (File-based or DB)
var checkpointStore = new FileCheckpointStore("workflow_checkpoints");

// Build Workflow with Checkpointing
var workflow = new WorkflowBuilder(checkpointStore)
    .AddExecutor(classifier)
    .AddExecutor(extractor)
    .AddEdge(classifier, extractor)
    .Build();

// Run with a specific Thread/Session ID
// If the workflow was interrupted, it resumes from the last checkpoint
var result = await workflow.RunAsync(inputDocument, sessionId: "doc-process-123");

Workflow Events

Monitor workflow execution with built-in events:

EventDescription
WorkflowStartedEventWorkflow execution begins
ExecutorInvokeEventExecutor starts processing
ExecutorCompleteEventExecutor finishes
WorkflowOutputEventWorkflow produces output
WorkflowErrorEventError occurred
RequestInfoEventHuman input requested

📦 Source Code

All code examples from this article series are available on GitHub:

👉 https://github.com/nithinmohantk/microsoft-agent-framework-series-examples

Clone the repository to follow along:

git clone https://github.com/nithinmohantk/microsoft-agent-framework-series-examples.git
cd microsoft-agent-framework-series-examples

Workflow Checkpointing (C#)


Series Navigation

References


Discover more from C4: Container, Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a comment

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.