Introduction: Complex tasks often exceed what a single LLM call can handle well. Breaking problems into smaller steps—where each step’s output feeds into the next—produces better results than trying to do everything at once. Prompt chaining decomposes complex workflows into sequential LLM calls, each focused on a specific subtask. This guide covers practical chaining patterns: sequential chains for multi-step reasoning, parallel chains for independent subtasks, conditional chains that branch based on intermediate results, and building robust chain systems that handle failures gracefully.

Sequential Chains
from dataclasses import dataclass, field
from typing import Any, Optional, Callable
import asyncio
@dataclass
class ChainStep:
"""A step in a prompt chain."""
name: str
prompt_template: str
model: str = "gpt-4o-mini"
output_key: str = None
parser: Callable = None
def format_prompt(self, context: dict) -> str:
"""Format prompt with context variables."""
return self.prompt_template.format(**context)
@dataclass
class ChainResult:
"""Result of chain execution."""
success: bool
output: Any
steps_completed: int
step_outputs: dict = field(default_factory=dict)
error: str = None
total_tokens: int = 0
class SequentialChain:
"""Execute steps sequentially, passing outputs forward."""
def __init__(self, client: Any, steps: list[ChainStep]):
self.client = client
self.steps = steps
async def run(self, initial_input: dict) -> ChainResult:
"""Run the chain with initial input."""
context = dict(initial_input)
step_outputs = {}
total_tokens = 0
for i, step in enumerate(self.steps):
try:
# Format prompt
prompt = step.format_prompt(context)
# Call LLM
response = await self.client.chat.completions.create(
model=step.model,
messages=[{"role": "user", "content": prompt}]
)
output = response.choices[0].message.content
total_tokens += response.usage.total_tokens
# Parse if parser provided
if step.parser:
output = step.parser(output)
# Store output
output_key = step.output_key or step.name
context[output_key] = output
step_outputs[step.name] = output
except Exception as e:
return ChainResult(
success=False,
output=None,
steps_completed=i,
step_outputs=step_outputs,
error=str(e),
total_tokens=total_tokens
)
# Return final output
final_key = self.steps[-1].output_key or self.steps[-1].name
return ChainResult(
success=True,
output=context.get(final_key),
steps_completed=len(self.steps),
step_outputs=step_outputs,
total_tokens=total_tokens
)
# Example: Research and summarize chain
research_chain = SequentialChain(
client=None, # Initialize with client
steps=[
ChainStep(
name="extract_topics",
prompt_template="Extract the main topics from this text:\n{input}\n\nTopics:",
output_key="topics"
),
ChainStep(
name="research_each",
prompt_template="For each topic, provide key facts:\n{topics}\n\nFacts:",
output_key="facts"
),
ChainStep(
name="synthesize",
prompt_template="Synthesize these facts into a coherent summary:\n{facts}\n\nSummary:",
output_key="summary"
)
]
)
Parallel Chains
from dataclasses import dataclass
from typing import Any, Optional
import asyncio
@dataclass
class ParallelStep:
"""A step that can run in parallel."""
name: str
prompt_template: str
model: str = "gpt-4o-mini"
depends_on: list[str] = None # Steps this depends on
class ParallelChain:
"""Execute independent steps in parallel."""
def __init__(self, client: Any, steps: list[ParallelStep]):
self.client = client
self.steps = {s.name: s for s in steps}
async def run(self, initial_input: dict) -> ChainResult:
"""Run chain with parallel execution where possible."""
context = dict(initial_input)
completed = set()
step_outputs = {}
total_tokens = 0
while len(completed) < len(self.steps):
# Find steps ready to run
ready = []
for name, step in self.steps.items():
if name in completed:
continue
# Check dependencies
deps = step.depends_on or []
if all(d in completed for d in deps):
ready.append(step)
if not ready:
return ChainResult(
success=False,
output=None,
steps_completed=len(completed),
step_outputs=step_outputs,
error="Circular dependency detected"
)
# Run ready steps in parallel
tasks = [
self._run_step(step, context)
for step in ready
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for step, result in zip(ready, results):
if isinstance(result, Exception):
return ChainResult(
success=False,
output=None,
steps_completed=len(completed),
step_outputs=step_outputs,
error=f"{step.name}: {result}"
)
output, tokens = result
context[step.name] = output
step_outputs[step.name] = output
total_tokens += tokens
completed.add(step.name)
return ChainResult(
success=True,
output=step_outputs,
steps_completed=len(self.steps),
step_outputs=step_outputs,
total_tokens=total_tokens
)
async def _run_step(
self,
step: ParallelStep,
context: dict
) -> tuple[str, int]:
"""Run a single step."""
prompt = step.prompt_template.format(**context)
response = await self.client.chat.completions.create(
model=step.model,
messages=[{"role": "user", "content": prompt}]
)
return (
response.choices[0].message.content,
response.usage.total_tokens
)
# Example: Analyze document from multiple angles
analysis_chain = ParallelChain(
client=None,
steps=[
ParallelStep(
name="sentiment",
prompt_template="Analyze the sentiment of this text:\n{input}\n\nSentiment:"
),
ParallelStep(
name="entities",
prompt_template="Extract named entities from this text:\n{input}\n\nEntities:"
),
ParallelStep(
name="summary",
prompt_template="Summarize this text in 2-3 sentences:\n{input}\n\nSummary:"
),
ParallelStep(
name="combined",
prompt_template="Combine these analyses:\nSentiment: {sentiment}\nEntities: {entities}\nSummary: {summary}\n\nCombined analysis:",
depends_on=["sentiment", "entities", "summary"]
)
]
)
Conditional Chains
from dataclasses import dataclass
from typing import Any, Optional, Callable
from enum import Enum
class BranchCondition(Enum):
"""Types of branch conditions."""
CONTAINS = "contains"
EQUALS = "equals"
CUSTOM = "custom"
@dataclass
class ConditionalBranch:
"""A conditional branch in the chain."""
condition_type: BranchCondition
condition_value: Any
next_step: str
custom_check: Callable = None
@dataclass
class ConditionalStep:
"""A step with conditional branching."""
name: str
prompt_template: str
branches: list[ConditionalBranch] = None
default_next: str = None
model: str = "gpt-4o-mini"
class ConditionalChain:
"""Chain with conditional branching."""
def __init__(
self,
client: Any,
steps: dict[str, ConditionalStep],
start_step: str
):
self.client = client
self.steps = steps
self.start_step = start_step
async def run(
self,
initial_input: dict,
max_steps: int = 20
) -> ChainResult:
"""Run chain following conditional branches."""
context = dict(initial_input)
step_outputs = {}
total_tokens = 0
current_step = self.start_step
steps_executed = 0
while current_step and steps_executed < max_steps:
step = self.steps.get(current_step)
if not step:
break
# Execute step
prompt = step.prompt_template.format(**context)
response = await self.client.chat.completions.create(
model=step.model,
messages=[{"role": "user", "content": prompt}]
)
output = response.choices[0].message.content
total_tokens += response.usage.total_tokens
context[step.name] = output
step_outputs[step.name] = output
steps_executed += 1
# Determine next step
current_step = self._get_next_step(step, output)
return ChainResult(
success=True,
output=step_outputs.get(list(step_outputs.keys())[-1]) if step_outputs else None,
steps_completed=steps_executed,
step_outputs=step_outputs,
total_tokens=total_tokens
)
def _get_next_step(self, step: ConditionalStep, output: str) -> Optional[str]:
"""Determine next step based on output."""
if not step.branches:
return step.default_next
for branch in step.branches:
if self._check_condition(branch, output):
return branch.next_step
return step.default_next
def _check_condition(self, branch: ConditionalBranch, output: str) -> bool:
"""Check if branch condition is met."""
if branch.condition_type == BranchCondition.CONTAINS:
return branch.condition_value.lower() in output.lower()
elif branch.condition_type == BranchCondition.EQUALS:
return output.strip().lower() == branch.condition_value.lower()
elif branch.condition_type == BranchCondition.CUSTOM:
return branch.custom_check(output) if branch.custom_check else False
return False
# Example: Customer support routing
support_chain = ConditionalChain(
client=None,
steps={
"classify": ConditionalStep(
name="classify",
prompt_template="Classify this customer inquiry:\n{input}\n\nCategory (billing/technical/general):",
branches=[
ConditionalBranch(
condition_type=BranchCondition.CONTAINS,
condition_value="billing",
next_step="billing_response"
),
ConditionalBranch(
condition_type=BranchCondition.CONTAINS,
condition_value="technical",
next_step="technical_response"
)
],
default_next="general_response"
),
"billing_response": ConditionalStep(
name="billing_response",
prompt_template="Generate a billing support response for:\n{input}\n\nResponse:"
),
"technical_response": ConditionalStep(
name="technical_response",
prompt_template="Generate a technical support response for:\n{input}\n\nResponse:"
),
"general_response": ConditionalStep(
name="general_response",
prompt_template="Generate a general support response for:\n{input}\n\nResponse:"
)
},
start_step="classify"
)
Chain Composition
from dataclasses import dataclass
from typing import Any, Optional, Callable, Union
from abc import ABC, abstractmethod
class ChainComponent(ABC):
"""Base class for chain components."""
@abstractmethod
async def execute(self, context: dict) -> tuple[Any, int]:
"""Execute component and return (output, tokens)."""
pass
class LLMStep(ChainComponent):
"""Single LLM call step."""
def __init__(
self,
client: Any,
prompt_template: str,
model: str = "gpt-4o-mini",
parser: Callable = None
):
self.client = client
self.prompt_template = prompt_template
self.model = model
self.parser = parser
async def execute(self, context: dict) -> tuple[Any, int]:
prompt = self.prompt_template.format(**context)
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}]
)
output = response.choices[0].message.content
if self.parser:
output = self.parser(output)
return output, response.usage.total_tokens
class TransformStep(ChainComponent):
"""Transform data without LLM call."""
def __init__(self, transform: Callable):
self.transform = transform
async def execute(self, context: dict) -> tuple[Any, int]:
return self.transform(context), 0
class CompositeChain(ChainComponent):
"""Compose multiple chains together."""
def __init__(self, chains: list[tuple[str, ChainComponent]]):
self.chains = chains
async def execute(self, context: dict) -> tuple[dict, int]:
total_tokens = 0
for name, chain in self.chains:
output, tokens = await chain.execute(context)
context[name] = output
total_tokens += tokens
return context, total_tokens
class MapChain(ChainComponent):
"""Apply chain to each item in a list."""
def __init__(self, chain: ChainComponent, input_key: str, output_key: str):
self.chain = chain
self.input_key = input_key
self.output_key = output_key
async def execute(self, context: dict) -> tuple[list, int]:
items = context.get(self.input_key, [])
total_tokens = 0
results = []
for item in items:
item_context = {**context, "item": item}
output, tokens = await self.chain.execute(item_context)
results.append(output)
total_tokens += tokens
return results, total_tokens
class ReduceChain(ChainComponent):
"""Reduce list of results to single output."""
def __init__(
self,
client: Any,
reduce_template: str,
input_key: str,
model: str = "gpt-4o-mini"
):
self.client = client
self.reduce_template = reduce_template
self.input_key = input_key
self.model = model
async def execute(self, context: dict) -> tuple[str, int]:
items = context.get(self.input_key, [])
items_text = "\n".join(f"- {item}" for item in items)
prompt = self.reduce_template.format(items=items_text, **context)
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content, response.usage.total_tokens
# Example: Map-reduce summarization
def build_map_reduce_chain(client: Any) -> CompositeChain:
return CompositeChain([
("chunks", TransformStep(
lambda ctx: ctx["document"].split("\n\n")
)),
("summaries", MapChain(
chain=LLMStep(
client=client,
prompt_template="Summarize this section:\n{item}\n\nSummary:"
),
input_key="chunks",
output_key="summaries"
)),
("final_summary", ReduceChain(
client=client,
reduce_template="Combine these summaries into one:\n{items}\n\nFinal summary:",
input_key="summaries"
))
])
Error Handling and Retries
from dataclasses import dataclass
from typing import Any, Optional, Callable
import asyncio
@dataclass
class RetryConfig:
"""Configuration for retries."""
max_retries: int = 3
base_delay: float = 1.0
max_delay: float = 30.0
exponential_base: float = 2.0
class RobustChain:
"""Chain with error handling and retries."""
def __init__(
self,
client: Any,
steps: list[ChainStep],
retry_config: RetryConfig = None,
fallback_handler: Callable = None
):
self.client = client
self.steps = steps
self.retry_config = retry_config or RetryConfig()
self.fallback_handler = fallback_handler
async def run(self, initial_input: dict) -> ChainResult:
"""Run chain with error handling."""
context = dict(initial_input)
step_outputs = {}
total_tokens = 0
for i, step in enumerate(self.steps):
try:
output, tokens = await self._execute_with_retry(step, context)
output_key = step.output_key or step.name
context[output_key] = output
step_outputs[step.name] = output
total_tokens += tokens
except Exception as e:
# Try fallback if available
if self.fallback_handler:
try:
output = await self.fallback_handler(step, context, e)
output_key = step.output_key or step.name
context[output_key] = output
step_outputs[step.name] = output
continue
except Exception:
pass
return ChainResult(
success=False,
output=None,
steps_completed=i,
step_outputs=step_outputs,
error=f"Step {step.name} failed: {e}",
total_tokens=total_tokens
)
final_key = self.steps[-1].output_key or self.steps[-1].name
return ChainResult(
success=True,
output=context.get(final_key),
steps_completed=len(self.steps),
step_outputs=step_outputs,
total_tokens=total_tokens
)
async def _execute_with_retry(
self,
step: ChainStep,
context: dict
) -> tuple[Any, int]:
"""Execute step with retries."""
last_error = None
for attempt in range(self.retry_config.max_retries):
try:
prompt = step.format_prompt(context)
response = await self.client.chat.completions.create(
model=step.model,
messages=[{"role": "user", "content": prompt}]
)
output = response.choices[0].message.content
if step.parser:
output = step.parser(output)
return output, response.usage.total_tokens
except Exception as e:
last_error = e
if attempt < self.retry_config.max_retries - 1:
delay = min(
self.retry_config.base_delay * (
self.retry_config.exponential_base ** attempt
),
self.retry_config.max_delay
)
await asyncio.sleep(delay)
raise last_error
class CheckpointedChain:
"""Chain with checkpointing for long-running tasks."""
def __init__(
self,
client: Any,
steps: list[ChainStep],
checkpoint_store: Any = None
):
self.client = client
self.steps = steps
self.checkpoint_store = checkpoint_store or {}
async def run(
self,
initial_input: dict,
run_id: str
) -> ChainResult:
"""Run chain with checkpointing."""
# Load checkpoint if exists
checkpoint = self.checkpoint_store.get(run_id, {
"context": dict(initial_input),
"completed_steps": [],
"step_outputs": {},
"total_tokens": 0
})
context = checkpoint["context"]
completed = set(checkpoint["completed_steps"])
step_outputs = checkpoint["step_outputs"]
total_tokens = checkpoint["total_tokens"]
for step in self.steps:
if step.name in completed:
continue
try:
prompt = step.format_prompt(context)
response = await self.client.chat.completions.create(
model=step.model,
messages=[{"role": "user", "content": prompt}]
)
output = response.choices[0].message.content
if step.parser:
output = step.parser(output)
output_key = step.output_key or step.name
context[output_key] = output
step_outputs[step.name] = output
total_tokens += response.usage.total_tokens
completed.add(step.name)
# Save checkpoint
self.checkpoint_store[run_id] = {
"context": context,
"completed_steps": list(completed),
"step_outputs": step_outputs,
"total_tokens": total_tokens
}
except Exception as e:
return ChainResult(
success=False,
output=None,
steps_completed=len(completed),
step_outputs=step_outputs,
error=str(e),
total_tokens=total_tokens
)
# Clean up checkpoint on success
if run_id in self.checkpoint_store:
del self.checkpoint_store[run_id]
final_key = self.steps[-1].output_key or self.steps[-1].name
return ChainResult(
success=True,
output=context.get(final_key),
steps_completed=len(self.steps),
step_outputs=step_outputs,
total_tokens=total_tokens
)
Production Chain Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
# Initialize chains
chains = {} # Register chains here
class RunChainRequest(BaseModel):
chain_name: str
input: dict
run_id: Optional[str] = None
class ChainStepDefinition(BaseModel):
name: str
prompt_template: str
model: str = "gpt-4o-mini"
output_key: Optional[str] = None
class CreateChainRequest(BaseModel):
name: str
steps: list[ChainStepDefinition]
chain_type: str = "sequential"
@app.post("/v1/chains")
async def create_chain(request: CreateChainRequest):
"""Create a new chain."""
steps = [
ChainStep(
name=s.name,
prompt_template=s.prompt_template,
model=s.model,
output_key=s.output_key
)
for s in request.steps
]
if request.chain_type == "sequential":
chains[request.name] = SequentialChain(client=None, steps=steps)
elif request.chain_type == "robust":
chains[request.name] = RobustChain(client=None, steps=steps)
else:
raise HTTPException(status_code=400, detail=f"Unknown chain type: {request.chain_type}")
return {"name": request.name, "steps": len(steps)}
@app.post("/v1/chains/run")
async def run_chain(request: RunChainRequest):
"""Run a chain."""
chain = chains.get(request.chain_name)
if not chain:
raise HTTPException(status_code=404, detail=f"Chain not found: {request.chain_name}")
if hasattr(chain, 'run') and request.run_id:
result = await chain.run(request.input, request.run_id)
else:
result = await chain.run(request.input)
return {
"success": result.success,
"output": result.output,
"steps_completed": result.steps_completed,
"step_outputs": result.step_outputs,
"error": result.error,
"total_tokens": result.total_tokens
}
@app.get("/v1/chains")
async def list_chains():
"""List available chains."""
return {
"chains": [
{"name": name, "type": type(chain).__name__}
for name, chain in chains.items()
]
}
@app.delete("/v1/chains/{chain_name}")
async def delete_chain(chain_name: str):
"""Delete a chain."""
if chain_name in chains:
del chains[chain_name]
return {"deleted": True}
raise HTTPException(status_code=404, detail=f"Chain not found: {chain_name}")
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- LangChain Chains: https://python.langchain.com/docs/modules/chains/
- LlamaIndex Pipelines: https://docs.llamaindex.ai/en/stable/module_guides/querying/pipeline/
- Prompt Engineering Guide: https://www.promptingguide.ai/techniques/cot
- Chain-of-Thought Prompting: https://arxiv.org/abs/2201.11903
Conclusion
Prompt chaining transforms complex tasks into manageable sequences of focused LLM calls. Sequential chains work well for multi-step reasoning where each step builds on the previous. Parallel chains maximize throughput for independent subtasks that can run concurrently. Conditional chains enable dynamic workflows that branch based on intermediate results. Use composition patterns like map-reduce to handle variable-length inputs elegantly. Always implement robust error handling with retries and fallbacks—chains are only as reliable as their weakest step. For long-running chains, add checkpointing so you can resume from failures without starting over. The key insight is that breaking problems into smaller steps often produces better results than trying to solve everything in one prompt, even if it uses more tokens overall. Design your chains to match the natural structure of your problem, and you’ll get more reliable, higher-quality outputs.
