Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

Prompt Chaining Patterns: Breaking Complex Tasks into Manageable Steps

Introduction: Complex tasks often exceed what a single LLM call can handle well. Breaking problems into smaller steps—where each step’s output feeds into the next—produces better results than trying to do everything at once. Prompt chaining decomposes complex workflows into sequential LLM calls, each focused on a specific subtask. This guide covers practical chaining patterns: sequential chains for multi-step reasoning, parallel chains for independent subtasks, conditional chains that branch based on intermediate results, and building robust chain systems that handle failures gracefully.

Prompt Chaining
Prompt Chain: Step 1 Analyze, Step 2 Process, Step 3 Synthesize

Sequential Chains

from dataclasses import dataclass, field
from typing import Any, Optional, Callable
import asyncio

@dataclass
class ChainStep:
    """A step in a prompt chain."""
    
    name: str
    prompt_template: str
    model: str = "gpt-4o-mini"
    output_key: str = None
    parser: Callable = None
    
    def format_prompt(self, context: dict) -> str:
        """Format prompt with context variables."""
        return self.prompt_template.format(**context)

@dataclass
class ChainResult:
    """Result of chain execution."""
    
    success: bool
    output: Any
    steps_completed: int
    step_outputs: dict = field(default_factory=dict)
    error: str = None
    total_tokens: int = 0

class SequentialChain:
    """Execute steps sequentially, passing outputs forward."""
    
    def __init__(self, client: Any, steps: list[ChainStep]):
        self.client = client
        self.steps = steps
    
    async def run(self, initial_input: dict) -> ChainResult:
        """Run the chain with initial input."""
        
        context = dict(initial_input)
        step_outputs = {}
        total_tokens = 0
        
        for i, step in enumerate(self.steps):
            try:
                # Format prompt
                prompt = step.format_prompt(context)
                
                # Call LLM
                response = await self.client.chat.completions.create(
                    model=step.model,
                    messages=[{"role": "user", "content": prompt}]
                )
                
                output = response.choices[0].message.content
                total_tokens += response.usage.total_tokens
                
                # Parse if parser provided
                if step.parser:
                    output = step.parser(output)
                
                # Store output
                output_key = step.output_key or step.name
                context[output_key] = output
                step_outputs[step.name] = output
                
            except Exception as e:
                return ChainResult(
                    success=False,
                    output=None,
                    steps_completed=i,
                    step_outputs=step_outputs,
                    error=str(e),
                    total_tokens=total_tokens
                )
        
        # Return final output
        final_key = self.steps[-1].output_key or self.steps[-1].name
        
        return ChainResult(
            success=True,
            output=context.get(final_key),
            steps_completed=len(self.steps),
            step_outputs=step_outputs,
            total_tokens=total_tokens
        )

# Example: Research and summarize chain
research_chain = SequentialChain(
    client=None,  # Initialize with client
    steps=[
        ChainStep(
            name="extract_topics",
            prompt_template="Extract the main topics from this text:\n{input}\n\nTopics:",
            output_key="topics"
        ),
        ChainStep(
            name="research_each",
            prompt_template="For each topic, provide key facts:\n{topics}\n\nFacts:",
            output_key="facts"
        ),
        ChainStep(
            name="synthesize",
            prompt_template="Synthesize these facts into a coherent summary:\n{facts}\n\nSummary:",
            output_key="summary"
        )
    ]
)

Parallel Chains

from dataclasses import dataclass
from typing import Any, Optional
import asyncio

@dataclass
class ParallelStep:
    """A step that can run in parallel."""
    
    name: str
    prompt_template: str
    model: str = "gpt-4o-mini"
    depends_on: list[str] = None  # Steps this depends on

class ParallelChain:
    """Execute independent steps in parallel."""
    
    def __init__(self, client: Any, steps: list[ParallelStep]):
        self.client = client
        self.steps = {s.name: s for s in steps}
    
    async def run(self, initial_input: dict) -> ChainResult:
        """Run chain with parallel execution where possible."""
        
        context = dict(initial_input)
        completed = set()
        step_outputs = {}
        total_tokens = 0
        
        while len(completed) < len(self.steps):
            # Find steps ready to run
            ready = []
            for name, step in self.steps.items():
                if name in completed:
                    continue
                
                # Check dependencies
                deps = step.depends_on or []
                if all(d in completed for d in deps):
                    ready.append(step)
            
            if not ready:
                return ChainResult(
                    success=False,
                    output=None,
                    steps_completed=len(completed),
                    step_outputs=step_outputs,
                    error="Circular dependency detected"
                )
            
            # Run ready steps in parallel
            tasks = [
                self._run_step(step, context)
                for step in ready
            ]
            
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # Process results
            for step, result in zip(ready, results):
                if isinstance(result, Exception):
                    return ChainResult(
                        success=False,
                        output=None,
                        steps_completed=len(completed),
                        step_outputs=step_outputs,
                        error=f"{step.name}: {result}"
                    )
                
                output, tokens = result
                context[step.name] = output
                step_outputs[step.name] = output
                total_tokens += tokens
                completed.add(step.name)
        
        return ChainResult(
            success=True,
            output=step_outputs,
            steps_completed=len(self.steps),
            step_outputs=step_outputs,
            total_tokens=total_tokens
        )
    
    async def _run_step(
        self,
        step: ParallelStep,
        context: dict
    ) -> tuple[str, int]:
        """Run a single step."""
        
        prompt = step.prompt_template.format(**context)
        
        response = await self.client.chat.completions.create(
            model=step.model,
            messages=[{"role": "user", "content": prompt}]
        )
        
        return (
            response.choices[0].message.content,
            response.usage.total_tokens
        )

# Example: Analyze document from multiple angles
analysis_chain = ParallelChain(
    client=None,
    steps=[
        ParallelStep(
            name="sentiment",
            prompt_template="Analyze the sentiment of this text:\n{input}\n\nSentiment:"
        ),
        ParallelStep(
            name="entities",
            prompt_template="Extract named entities from this text:\n{input}\n\nEntities:"
        ),
        ParallelStep(
            name="summary",
            prompt_template="Summarize this text in 2-3 sentences:\n{input}\n\nSummary:"
        ),
        ParallelStep(
            name="combined",
            prompt_template="Combine these analyses:\nSentiment: {sentiment}\nEntities: {entities}\nSummary: {summary}\n\nCombined analysis:",
            depends_on=["sentiment", "entities", "summary"]
        )
    ]
)

Conditional Chains

from dataclasses import dataclass
from typing import Any, Optional, Callable
from enum import Enum

class BranchCondition(Enum):
    """Types of branch conditions."""
    
    CONTAINS = "contains"
    EQUALS = "equals"
    CUSTOM = "custom"

@dataclass
class ConditionalBranch:
    """A conditional branch in the chain."""
    
    condition_type: BranchCondition
    condition_value: Any
    next_step: str
    custom_check: Callable = None

@dataclass
class ConditionalStep:
    """A step with conditional branching."""
    
    name: str
    prompt_template: str
    branches: list[ConditionalBranch] = None
    default_next: str = None
    model: str = "gpt-4o-mini"

class ConditionalChain:
    """Chain with conditional branching."""
    
    def __init__(
        self,
        client: Any,
        steps: dict[str, ConditionalStep],
        start_step: str
    ):
        self.client = client
        self.steps = steps
        self.start_step = start_step
    
    async def run(
        self,
        initial_input: dict,
        max_steps: int = 20
    ) -> ChainResult:
        """Run chain following conditional branches."""
        
        context = dict(initial_input)
        step_outputs = {}
        total_tokens = 0
        current_step = self.start_step
        steps_executed = 0
        
        while current_step and steps_executed < max_steps:
            step = self.steps.get(current_step)
            if not step:
                break
            
            # Execute step
            prompt = step.prompt_template.format(**context)
            
            response = await self.client.chat.completions.create(
                model=step.model,
                messages=[{"role": "user", "content": prompt}]
            )
            
            output = response.choices[0].message.content
            total_tokens += response.usage.total_tokens
            
            context[step.name] = output
            step_outputs[step.name] = output
            steps_executed += 1
            
            # Determine next step
            current_step = self._get_next_step(step, output)
        
        return ChainResult(
            success=True,
            output=step_outputs.get(list(step_outputs.keys())[-1]) if step_outputs else None,
            steps_completed=steps_executed,
            step_outputs=step_outputs,
            total_tokens=total_tokens
        )
    
    def _get_next_step(self, step: ConditionalStep, output: str) -> Optional[str]:
        """Determine next step based on output."""
        
        if not step.branches:
            return step.default_next
        
        for branch in step.branches:
            if self._check_condition(branch, output):
                return branch.next_step
        
        return step.default_next
    
    def _check_condition(self, branch: ConditionalBranch, output: str) -> bool:
        """Check if branch condition is met."""
        
        if branch.condition_type == BranchCondition.CONTAINS:
            return branch.condition_value.lower() in output.lower()
        
        elif branch.condition_type == BranchCondition.EQUALS:
            return output.strip().lower() == branch.condition_value.lower()
        
        elif branch.condition_type == BranchCondition.CUSTOM:
            return branch.custom_check(output) if branch.custom_check else False
        
        return False

# Example: Customer support routing
support_chain = ConditionalChain(
    client=None,
    steps={
        "classify": ConditionalStep(
            name="classify",
            prompt_template="Classify this customer inquiry:\n{input}\n\nCategory (billing/technical/general):",
            branches=[
                ConditionalBranch(
                    condition_type=BranchCondition.CONTAINS,
                    condition_value="billing",
                    next_step="billing_response"
                ),
                ConditionalBranch(
                    condition_type=BranchCondition.CONTAINS,
                    condition_value="technical",
                    next_step="technical_response"
                )
            ],
            default_next="general_response"
        ),
        "billing_response": ConditionalStep(
            name="billing_response",
            prompt_template="Generate a billing support response for:\n{input}\n\nResponse:"
        ),
        "technical_response": ConditionalStep(
            name="technical_response",
            prompt_template="Generate a technical support response for:\n{input}\n\nResponse:"
        ),
        "general_response": ConditionalStep(
            name="general_response",
            prompt_template="Generate a general support response for:\n{input}\n\nResponse:"
        )
    },
    start_step="classify"
)

Chain Composition

from dataclasses import dataclass
from typing import Any, Optional, Callable, Union
from abc import ABC, abstractmethod

class ChainComponent(ABC):
    """Base class for chain components."""
    
    @abstractmethod
    async def execute(self, context: dict) -> tuple[Any, int]:
        """Execute component and return (output, tokens)."""
        pass

class LLMStep(ChainComponent):
    """Single LLM call step."""
    
    def __init__(
        self,
        client: Any,
        prompt_template: str,
        model: str = "gpt-4o-mini",
        parser: Callable = None
    ):
        self.client = client
        self.prompt_template = prompt_template
        self.model = model
        self.parser = parser
    
    async def execute(self, context: dict) -> tuple[Any, int]:
        prompt = self.prompt_template.format(**context)
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}]
        )
        
        output = response.choices[0].message.content
        
        if self.parser:
            output = self.parser(output)
        
        return output, response.usage.total_tokens

class TransformStep(ChainComponent):
    """Transform data without LLM call."""
    
    def __init__(self, transform: Callable):
        self.transform = transform
    
    async def execute(self, context: dict) -> tuple[Any, int]:
        return self.transform(context), 0

class CompositeChain(ChainComponent):
    """Compose multiple chains together."""
    
    def __init__(self, chains: list[tuple[str, ChainComponent]]):
        self.chains = chains
    
    async def execute(self, context: dict) -> tuple[dict, int]:
        total_tokens = 0
        
        for name, chain in self.chains:
            output, tokens = await chain.execute(context)
            context[name] = output
            total_tokens += tokens
        
        return context, total_tokens

class MapChain(ChainComponent):
    """Apply chain to each item in a list."""
    
    def __init__(self, chain: ChainComponent, input_key: str, output_key: str):
        self.chain = chain
        self.input_key = input_key
        self.output_key = output_key
    
    async def execute(self, context: dict) -> tuple[list, int]:
        items = context.get(self.input_key, [])
        total_tokens = 0
        results = []
        
        for item in items:
            item_context = {**context, "item": item}
            output, tokens = await self.chain.execute(item_context)
            results.append(output)
            total_tokens += tokens
        
        return results, total_tokens

class ReduceChain(ChainComponent):
    """Reduce list of results to single output."""
    
    def __init__(
        self,
        client: Any,
        reduce_template: str,
        input_key: str,
        model: str = "gpt-4o-mini"
    ):
        self.client = client
        self.reduce_template = reduce_template
        self.input_key = input_key
        self.model = model
    
    async def execute(self, context: dict) -> tuple[str, int]:
        items = context.get(self.input_key, [])
        items_text = "\n".join(f"- {item}" for item in items)
        
        prompt = self.reduce_template.format(items=items_text, **context)
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}]
        )
        
        return response.choices[0].message.content, response.usage.total_tokens

# Example: Map-reduce summarization
def build_map_reduce_chain(client: Any) -> CompositeChain:
    return CompositeChain([
        ("chunks", TransformStep(
            lambda ctx: ctx["document"].split("\n\n")
        )),
        ("summaries", MapChain(
            chain=LLMStep(
                client=client,
                prompt_template="Summarize this section:\n{item}\n\nSummary:"
            ),
            input_key="chunks",
            output_key="summaries"
        )),
        ("final_summary", ReduceChain(
            client=client,
            reduce_template="Combine these summaries into one:\n{items}\n\nFinal summary:",
            input_key="summaries"
        ))
    ])

Error Handling and Retries

from dataclasses import dataclass
from typing import Any, Optional, Callable
import asyncio

@dataclass
class RetryConfig:
    """Configuration for retries."""
    
    max_retries: int = 3
    base_delay: float = 1.0
    max_delay: float = 30.0
    exponential_base: float = 2.0

class RobustChain:
    """Chain with error handling and retries."""
    
    def __init__(
        self,
        client: Any,
        steps: list[ChainStep],
        retry_config: RetryConfig = None,
        fallback_handler: Callable = None
    ):
        self.client = client
        self.steps = steps
        self.retry_config = retry_config or RetryConfig()
        self.fallback_handler = fallback_handler
    
    async def run(self, initial_input: dict) -> ChainResult:
        """Run chain with error handling."""
        
        context = dict(initial_input)
        step_outputs = {}
        total_tokens = 0
        
        for i, step in enumerate(self.steps):
            try:
                output, tokens = await self._execute_with_retry(step, context)
                
                output_key = step.output_key or step.name
                context[output_key] = output
                step_outputs[step.name] = output
                total_tokens += tokens
                
            except Exception as e:
                # Try fallback if available
                if self.fallback_handler:
                    try:
                        output = await self.fallback_handler(step, context, e)
                        output_key = step.output_key or step.name
                        context[output_key] = output
                        step_outputs[step.name] = output
                        continue
                    except Exception:
                        pass
                
                return ChainResult(
                    success=False,
                    output=None,
                    steps_completed=i,
                    step_outputs=step_outputs,
                    error=f"Step {step.name} failed: {e}",
                    total_tokens=total_tokens
                )
        
        final_key = self.steps[-1].output_key or self.steps[-1].name
        
        return ChainResult(
            success=True,
            output=context.get(final_key),
            steps_completed=len(self.steps),
            step_outputs=step_outputs,
            total_tokens=total_tokens
        )
    
    async def _execute_with_retry(
        self,
        step: ChainStep,
        context: dict
    ) -> tuple[Any, int]:
        """Execute step with retries."""
        
        last_error = None
        
        for attempt in range(self.retry_config.max_retries):
            try:
                prompt = step.format_prompt(context)
                
                response = await self.client.chat.completions.create(
                    model=step.model,
                    messages=[{"role": "user", "content": prompt}]
                )
                
                output = response.choices[0].message.content
                
                if step.parser:
                    output = step.parser(output)
                
                return output, response.usage.total_tokens
                
            except Exception as e:
                last_error = e
                
                if attempt < self.retry_config.max_retries - 1:
                    delay = min(
                        self.retry_config.base_delay * (
                            self.retry_config.exponential_base ** attempt
                        ),
                        self.retry_config.max_delay
                    )
                    await asyncio.sleep(delay)
        
        raise last_error

class CheckpointedChain:
    """Chain with checkpointing for long-running tasks."""
    
    def __init__(
        self,
        client: Any,
        steps: list[ChainStep],
        checkpoint_store: Any = None
    ):
        self.client = client
        self.steps = steps
        self.checkpoint_store = checkpoint_store or {}
    
    async def run(
        self,
        initial_input: dict,
        run_id: str
    ) -> ChainResult:
        """Run chain with checkpointing."""
        
        # Load checkpoint if exists
        checkpoint = self.checkpoint_store.get(run_id, {
            "context": dict(initial_input),
            "completed_steps": [],
            "step_outputs": {},
            "total_tokens": 0
        })
        
        context = checkpoint["context"]
        completed = set(checkpoint["completed_steps"])
        step_outputs = checkpoint["step_outputs"]
        total_tokens = checkpoint["total_tokens"]
        
        for step in self.steps:
            if step.name in completed:
                continue
            
            try:
                prompt = step.format_prompt(context)
                
                response = await self.client.chat.completions.create(
                    model=step.model,
                    messages=[{"role": "user", "content": prompt}]
                )
                
                output = response.choices[0].message.content
                
                if step.parser:
                    output = step.parser(output)
                
                output_key = step.output_key or step.name
                context[output_key] = output
                step_outputs[step.name] = output
                total_tokens += response.usage.total_tokens
                completed.add(step.name)
                
                # Save checkpoint
                self.checkpoint_store[run_id] = {
                    "context": context,
                    "completed_steps": list(completed),
                    "step_outputs": step_outputs,
                    "total_tokens": total_tokens
                }
                
            except Exception as e:
                return ChainResult(
                    success=False,
                    output=None,
                    steps_completed=len(completed),
                    step_outputs=step_outputs,
                    error=str(e),
                    total_tokens=total_tokens
                )
        
        # Clean up checkpoint on success
        if run_id in self.checkpoint_store:
            del self.checkpoint_store[run_id]
        
        final_key = self.steps[-1].output_key or self.steps[-1].name
        
        return ChainResult(
            success=True,
            output=context.get(final_key),
            steps_completed=len(self.steps),
            step_outputs=step_outputs,
            total_tokens=total_tokens
        )

Production Chain Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

# Initialize chains
chains = {}  # Register chains here

class RunChainRequest(BaseModel):
    chain_name: str
    input: dict
    run_id: Optional[str] = None

class ChainStepDefinition(BaseModel):
    name: str
    prompt_template: str
    model: str = "gpt-4o-mini"
    output_key: Optional[str] = None

class CreateChainRequest(BaseModel):
    name: str
    steps: list[ChainStepDefinition]
    chain_type: str = "sequential"

@app.post("/v1/chains")
async def create_chain(request: CreateChainRequest):
    """Create a new chain."""
    
    steps = [
        ChainStep(
            name=s.name,
            prompt_template=s.prompt_template,
            model=s.model,
            output_key=s.output_key
        )
        for s in request.steps
    ]
    
    if request.chain_type == "sequential":
        chains[request.name] = SequentialChain(client=None, steps=steps)
    elif request.chain_type == "robust":
        chains[request.name] = RobustChain(client=None, steps=steps)
    else:
        raise HTTPException(status_code=400, detail=f"Unknown chain type: {request.chain_type}")
    
    return {"name": request.name, "steps": len(steps)}

@app.post("/v1/chains/run")
async def run_chain(request: RunChainRequest):
    """Run a chain."""
    
    chain = chains.get(request.chain_name)
    if not chain:
        raise HTTPException(status_code=404, detail=f"Chain not found: {request.chain_name}")
    
    if hasattr(chain, 'run') and request.run_id:
        result = await chain.run(request.input, request.run_id)
    else:
        result = await chain.run(request.input)
    
    return {
        "success": result.success,
        "output": result.output,
        "steps_completed": result.steps_completed,
        "step_outputs": result.step_outputs,
        "error": result.error,
        "total_tokens": result.total_tokens
    }

@app.get("/v1/chains")
async def list_chains():
    """List available chains."""
    
    return {
        "chains": [
            {"name": name, "type": type(chain).__name__}
            for name, chain in chains.items()
        ]
    }

@app.delete("/v1/chains/{chain_name}")
async def delete_chain(chain_name: str):
    """Delete a chain."""
    
    if chain_name in chains:
        del chains[chain_name]
        return {"deleted": True}
    
    raise HTTPException(status_code=404, detail=f"Chain not found: {chain_name}")

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Prompt chaining transforms complex tasks into manageable sequences of focused LLM calls. Sequential chains work well for multi-step reasoning where each step builds on the previous. Parallel chains maximize throughput for independent subtasks that can run concurrently. Conditional chains enable dynamic workflows that branch based on intermediate results. Use composition patterns like map-reduce to handle variable-length inputs elegantly. Always implement robust error handling with retries and fallbacks—chains are only as reliable as their weakest step. For long-running chains, add checkpointing so you can resume from failures without starting over. The key insight is that breaking problems into smaller steps often produces better results than trying to solve everything in one prompt, even if it uses more tokens overall. Design your chains to match the natural structure of your problem, and you’ll get more reliable, higher-quality outputs.