Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

LLM Chain Composition: Building Complex AI Workflows with Sequential, Parallel, and Conditional Patterns

Introduction: Complex LLM applications rarely consist of a single prompt—they chain multiple steps together, each building on the previous output. Chain composition enables sophisticated workflows: retrieval-augmented generation, multi-step reasoning, iterative refinement, and conditional branching. Understanding how to compose chains effectively is essential for building production LLM systems. This guide covers practical chain patterns: sequential chains, parallel execution, conditional routing, error handling in chains, and production-ready composition frameworks that enable complex AI workflows.

Chain Composition
LLM Chain: Sequential Steps from Input to Output

Basic Chain Structure

from dataclasses import dataclass, field
from typing import Any, Callable, Optional, TypeVar, Generic
from abc import ABC, abstractmethod
import asyncio

T = TypeVar('T')
U = TypeVar('U')

@dataclass
class ChainContext:
    """Context passed through chain steps."""
    
    input: Any
    intermediate: dict = field(default_factory=dict)
    metadata: dict = field(default_factory=dict)
    
    def set(self, key: str, value: Any):
        self.intermediate[key] = value
    
    def get(self, key: str, default: Any = None) -> Any:
        return self.intermediate.get(key, default)

class ChainStep(ABC, Generic[T, U]):
    """Base class for chain steps."""
    
    name: str = "step"
    
    @abstractmethod
    async def run(self, input: T, context: ChainContext) -> U:
        """Execute the step."""
        pass

class LLMStep(ChainStep[str, str]):
    """A step that calls an LLM."""
    
    def __init__(
        self,
        client: Any,
        model: str,
        system_prompt: str = "",
        name: str = "llm"
    ):
        self.client = client
        self.model = model
        self.system_prompt = system_prompt
        self.name = name
    
    async def run(self, input: str, context: ChainContext) -> str:
        """Call LLM with input."""
        
        messages = []
        
        if self.system_prompt:
            messages.append({"role": "system", "content": self.system_prompt})
        
        messages.append({"role": "user", "content": input})
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=messages
        )
        
        return response.choices[0].message.content

class TransformStep(ChainStep[T, U]):
    """A step that transforms data."""
    
    def __init__(self, transform_fn: Callable[[T], U], name: str = "transform"):
        self.transform_fn = transform_fn
        self.name = name
    
    async def run(self, input: T, context: ChainContext) -> U:
        """Apply transformation."""
        
        if asyncio.iscoroutinefunction(self.transform_fn):
            return await self.transform_fn(input)
        return self.transform_fn(input)

class Chain:
    """A chain of steps executed sequentially."""
    
    def __init__(self, steps: list[ChainStep] = None, name: str = "chain"):
        self.steps = steps or []
        self.name = name
    
    def add(self, step: ChainStep) -> 'Chain':
        """Add a step to the chain."""
        self.steps.append(step)
        return self
    
    async def run(self, input: Any, context: ChainContext = None) -> Any:
        """Execute all steps in sequence."""
        
        context = context or ChainContext(input=input)
        current = input
        
        for step in self.steps:
            current = await step.run(current, context)
            context.set(step.name, current)
        
        return current

# Example: Simple summarization chain
def create_summarization_chain(client: Any) -> Chain:
    return Chain([
        LLMStep(
            client=client,
            model="gpt-4o-mini",
            system_prompt="Extract the key points from the following text.",
            name="extract"
        ),
        LLMStep(
            client=client,
            model="gpt-4o-mini",
            system_prompt="Write a concise summary based on these key points.",
            name="summarize"
        )
    ], name="summarization")

Parallel Execution

from dataclasses import dataclass
from typing import Any, Callable
import asyncio

class ParallelChain(ChainStep):
    """Execute multiple chains in parallel."""
    
    def __init__(
        self,
        chains: list[Chain],
        merge_fn: Callable[[list[Any]], Any] = None,
        name: str = "parallel"
    ):
        self.chains = chains
        self.merge_fn = merge_fn or (lambda x: x)
        self.name = name
    
    async def run(self, input: Any, context: ChainContext) -> Any:
        """Execute all chains in parallel."""
        
        tasks = [
            chain.run(input, context)
            for chain in self.chains
        ]
        
        results = await asyncio.gather(*tasks)
        
        return self.merge_fn(results)

class MapChain(ChainStep):
    """Apply a chain to each item in a list."""
    
    def __init__(
        self,
        chain: Chain,
        max_concurrency: int = 10,
        name: str = "map"
    ):
        self.chain = chain
        self.semaphore = asyncio.Semaphore(max_concurrency)
        self.name = name
    
    async def run(self, inputs: list[Any], context: ChainContext) -> list[Any]:
        """Apply chain to each input."""
        
        async def process_one(item: Any) -> Any:
            async with self.semaphore:
                return await self.chain.run(item, context)
        
        tasks = [process_one(item) for item in inputs]
        return await asyncio.gather(*tasks)

class FanOutFanIn(ChainStep):
    """Fan out to multiple processors, then fan in results."""
    
    def __init__(
        self,
        split_fn: Callable[[Any], list[Any]],
        process_chain: Chain,
        merge_fn: Callable[[list[Any]], Any],
        name: str = "fan_out_fan_in"
    ):
        self.split_fn = split_fn
        self.process_chain = process_chain
        self.merge_fn = merge_fn
        self.name = name
    
    async def run(self, input: Any, context: ChainContext) -> Any:
        """Split, process in parallel, merge."""
        
        # Fan out
        parts = self.split_fn(input)
        
        # Process each part
        tasks = [
            self.process_chain.run(part, context)
            for part in parts
        ]
        results = await asyncio.gather(*tasks)
        
        # Fan in
        return self.merge_fn(results)

# Example: Parallel analysis chain
def create_parallel_analysis_chain(client: Any) -> Chain:
    sentiment_chain = Chain([
        LLMStep(
            client=client,
            model="gpt-4o-mini",
            system_prompt="Analyze the sentiment of this text. Respond with: positive, negative, or neutral.",
            name="sentiment"
        )
    ])
    
    topic_chain = Chain([
        LLMStep(
            client=client,
            model="gpt-4o-mini",
            system_prompt="Extract the main topics from this text as a comma-separated list.",
            name="topics"
        )
    ])
    
    entity_chain = Chain([
        LLMStep(
            client=client,
            model="gpt-4o-mini",
            system_prompt="Extract named entities (people, places, organizations) from this text.",
            name="entities"
        )
    ])
    
    def merge_analyses(results: list[str]) -> dict:
        return {
            "sentiment": results[0],
            "topics": results[1],
            "entities": results[2]
        }
    
    return Chain([
        ParallelChain(
            chains=[sentiment_chain, topic_chain, entity_chain],
            merge_fn=merge_analyses,
            name="analysis"
        )
    ])

Conditional Routing

from dataclasses import dataclass
from typing import Any, Callable, Optional

class RouterStep(ChainStep):
    """Route to different chains based on condition."""
    
    def __init__(
        self,
        router_fn: Callable[[Any], str],
        routes: dict[str, Chain],
        default: Optional[Chain] = None,
        name: str = "router"
    ):
        self.router_fn = router_fn
        self.routes = routes
        self.default = default
        self.name = name
    
    async def run(self, input: Any, context: ChainContext) -> Any:
        """Route to appropriate chain."""
        
        route_key = self.router_fn(input)
        context.set("route", route_key)
        
        chain = self.routes.get(route_key, self.default)
        
        if chain is None:
            raise ValueError(f"No route found for: {route_key}")
        
        return await chain.run(input, context)

class LLMRouter(ChainStep):
    """Use LLM to determine routing."""
    
    ROUTER_PROMPT = """Classify the following input into one of these categories: {categories}

Input: {input}

Respond with only the category name."""
    
    def __init__(
        self,
        client: Any,
        model: str,
        routes: dict[str, Chain],
        default: Optional[Chain] = None,
        name: str = "llm_router"
    ):
        self.client = client
        self.model = model
        self.routes = routes
        self.default = default
        self.name = name
    
    async def run(self, input: Any, context: ChainContext) -> Any:
        """Use LLM to route."""
        
        categories = ", ".join(self.routes.keys())
        
        prompt = self.ROUTER_PROMPT.format(
            categories=categories,
            input=str(input)
        )
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}]
        )
        
        route_key = response.choices[0].message.content.strip().lower()
        context.set("route", route_key)
        
        chain = self.routes.get(route_key, self.default)
        
        if chain is None:
            raise ValueError(f"No route found for: {route_key}")
        
        return await chain.run(input, context)

class ConditionalStep(ChainStep):
    """Execute step only if condition is met."""
    
    def __init__(
        self,
        condition: Callable[[Any, ChainContext], bool],
        if_true: ChainStep,
        if_false: Optional[ChainStep] = None,
        name: str = "conditional"
    ):
        self.condition = condition
        self.if_true = if_true
        self.if_false = if_false
        self.name = name
    
    async def run(self, input: Any, context: ChainContext) -> Any:
        """Execute based on condition."""
        
        if self.condition(input, context):
            return await self.if_true.run(input, context)
        elif self.if_false:
            return await self.if_false.run(input, context)
        else:
            return input

# Example: Intent-based routing
def create_intent_router(client: Any) -> Chain:
    question_chain = Chain([
        LLMStep(
            client=client,
            model="gpt-4o",
            system_prompt="Answer the following question thoroughly and accurately.",
            name="answer"
        )
    ])
    
    task_chain = Chain([
        LLMStep(
            client=client,
            model="gpt-4o",
            system_prompt="Help complete the following task. Provide step-by-step guidance.",
            name="task"
        )
    ])
    
    chat_chain = Chain([
        LLMStep(
            client=client,
            model="gpt-4o-mini",
            system_prompt="Engage in friendly conversation.",
            name="chat"
        )
    ])
    
    return Chain([
        LLMRouter(
            client=client,
            model="gpt-4o-mini",
            routes={
                "question": question_chain,
                "task": task_chain,
                "chat": chat_chain
            },
            default=chat_chain,
            name="intent_router"
        )
    ])

Error Handling

from dataclasses import dataclass
from typing import Any, Callable, Optional
import asyncio

@dataclass
class ChainError:
    """Error information from chain execution."""
    
    step_name: str
    error: Exception
    input: Any
    context: ChainContext

class RetryStep(ChainStep):
    """Wrap a step with retry logic."""
    
    def __init__(
        self,
        step: ChainStep,
        max_retries: int = 3,
        delay: float = 1.0,
        name: str = None
    ):
        self.step = step
        self.max_retries = max_retries
        self.delay = delay
        self.name = name or f"retry_{step.name}"
    
    async def run(self, input: Any, context: ChainContext) -> Any:
        """Execute with retries."""
        
        last_error = None
        
        for attempt in range(self.max_retries + 1):
            try:
                return await self.step.run(input, context)
            except Exception as e:
                last_error = e
                
                if attempt < self.max_retries:
                    await asyncio.sleep(self.delay * (2 ** attempt))
        
        raise last_error

class FallbackStep(ChainStep):
    """Try primary step, fall back to secondary on failure."""
    
    def __init__(
        self,
        primary: ChainStep,
        fallback: ChainStep,
        name: str = "fallback"
    ):
        self.primary = primary
        self.fallback = fallback
        self.name = name
    
    async def run(self, input: Any, context: ChainContext) -> Any:
        """Try primary, use fallback on failure."""
        
        try:
            result = await self.primary.run(input, context)
            context.set("used_fallback", False)
            return result
        except Exception as e:
            context.set("primary_error", str(e))
            context.set("used_fallback", True)
            return await self.fallback.run(input, context)

class ErrorHandlingChain(Chain):
    """Chain with comprehensive error handling."""
    
    def __init__(
        self,
        steps: list[ChainStep] = None,
        on_error: Callable[[ChainError], Any] = None,
        name: str = "error_handling_chain"
    ):
        super().__init__(steps, name)
        self.on_error = on_error
    
    async def run(self, input: Any, context: ChainContext = None) -> Any:
        """Execute with error handling."""
        
        context = context or ChainContext(input=input)
        current = input
        
        for step in self.steps:
            try:
                current = await step.run(current, context)
                context.set(step.name, current)
            except Exception as e:
                error = ChainError(
                    step_name=step.name,
                    error=e,
                    input=current,
                    context=context
                )
                
                if self.on_error:
                    return self.on_error(error)
                raise
        
        return current

class ValidationStep(ChainStep):
    """Validate output before continuing."""
    
    def __init__(
        self,
        validator: Callable[[Any], bool],
        error_message: str = "Validation failed",
        name: str = "validate"
    ):
        self.validator = validator
        self.error_message = error_message
        self.name = name
    
    async def run(self, input: Any, context: ChainContext) -> Any:
        """Validate input."""
        
        if not self.validator(input):
            raise ValueError(self.error_message)
        
        return input

# Example: Robust chain with error handling
def create_robust_chain(client: Any) -> Chain:
    primary_llm = LLMStep(
        client=client,
        model="gpt-4o",
        system_prompt="Process the following request.",
        name="primary"
    )
    
    fallback_llm = LLMStep(
        client=client,
        model="gpt-4o-mini",
        system_prompt="Process the following request.",
        name="fallback"
    )
    
    return ErrorHandlingChain([
        RetryStep(
            FallbackStep(primary_llm, fallback_llm),
            max_retries=2
        ),
        ValidationStep(
            validator=lambda x: len(x) > 0,
            error_message="Empty response"
        )
    ])

Production Chain Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, Any
import asyncio

app = FastAPI()

# Chain registry
chains: dict[str, Chain] = {}

class ChainRequest(BaseModel):
    chain_name: str
    input: Any
    metadata: Optional[dict] = None

class ChainResponse(BaseModel):
    output: Any
    intermediate: dict
    metadata: dict

@app.post("/v1/chains/execute")
async def execute_chain(request: ChainRequest) -> ChainResponse:
    """Execute a registered chain."""
    
    chain = chains.get(request.chain_name)
    
    if not chain:
        raise HTTPException(404, f"Chain not found: {request.chain_name}")
    
    context = ChainContext(
        input=request.input,
        metadata=request.metadata or {}
    )
    
    try:
        output = await chain.run(request.input, context)
        
        return ChainResponse(
            output=output,
            intermediate=context.intermediate,
            metadata=context.metadata
        )
    
    except Exception as e:
        raise HTTPException(500, f"Chain execution failed: {str(e)}")

@app.get("/v1/chains")
async def list_chains():
    """List all registered chains."""
    
    return {
        "chains": [
            {
                "name": name,
                "steps": [step.name for step in chain.steps]
            }
            for name, chain in chains.items()
        ]
    }

@app.post("/v1/chains/register")
async def register_chain(name: str, chain_config: dict):
    """Register a new chain from configuration."""
    
    # Build chain from config (simplified)
    # In production, use a proper chain builder
    
    return {"registered": name}

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Chain composition is the foundation of sophisticated LLM applications. Sequential chains enable multi-step processing where each step builds on the previous output—use them for workflows like summarization, analysis, and content generation. Parallel chains maximize throughput by executing independent operations concurrently—ideal for multi-faceted analysis or processing multiple items. Conditional routing enables dynamic workflows that adapt to input characteristics—route questions to different models or processing paths based on intent. Robust error handling with retries, fallbacks, and validation ensures chains fail gracefully. Design chains to be composable and reusable—small, focused steps that can be combined into larger workflows. The goal is building flexible, maintainable AI systems that can handle complex real-world requirements.