Introduction: Complex LLM applications rarely consist of a single prompt—they chain multiple steps together, each building on the previous output. Chain composition enables sophisticated workflows: retrieval-augmented generation, multi-step reasoning, iterative refinement, and conditional branching. Understanding how to compose chains effectively is essential for building production LLM systems. This guide covers practical chain patterns: sequential chains, parallel execution, conditional routing, error handling in chains, and production-ready composition frameworks that enable complex AI workflows.

Basic Chain Structure
from dataclasses import dataclass, field
from typing import Any, Callable, Optional, TypeVar, Generic
from abc import ABC, abstractmethod
import asyncio
T = TypeVar('T')
U = TypeVar('U')
@dataclass
class ChainContext:
"""Context passed through chain steps."""
input: Any
intermediate: dict = field(default_factory=dict)
metadata: dict = field(default_factory=dict)
def set(self, key: str, value: Any):
self.intermediate[key] = value
def get(self, key: str, default: Any = None) -> Any:
return self.intermediate.get(key, default)
class ChainStep(ABC, Generic[T, U]):
"""Base class for chain steps."""
name: str = "step"
@abstractmethod
async def run(self, input: T, context: ChainContext) -> U:
"""Execute the step."""
pass
class LLMStep(ChainStep[str, str]):
"""A step that calls an LLM."""
def __init__(
self,
client: Any,
model: str,
system_prompt: str = "",
name: str = "llm"
):
self.client = client
self.model = model
self.system_prompt = system_prompt
self.name = name
async def run(self, input: str, context: ChainContext) -> str:
"""Call LLM with input."""
messages = []
if self.system_prompt:
messages.append({"role": "system", "content": self.system_prompt})
messages.append({"role": "user", "content": input})
response = await self.client.chat.completions.create(
model=self.model,
messages=messages
)
return response.choices[0].message.content
class TransformStep(ChainStep[T, U]):
"""A step that transforms data."""
def __init__(self, transform_fn: Callable[[T], U], name: str = "transform"):
self.transform_fn = transform_fn
self.name = name
async def run(self, input: T, context: ChainContext) -> U:
"""Apply transformation."""
if asyncio.iscoroutinefunction(self.transform_fn):
return await self.transform_fn(input)
return self.transform_fn(input)
class Chain:
"""A chain of steps executed sequentially."""
def __init__(self, steps: list[ChainStep] = None, name: str = "chain"):
self.steps = steps or []
self.name = name
def add(self, step: ChainStep) -> 'Chain':
"""Add a step to the chain."""
self.steps.append(step)
return self
async def run(self, input: Any, context: ChainContext = None) -> Any:
"""Execute all steps in sequence."""
context = context or ChainContext(input=input)
current = input
for step in self.steps:
current = await step.run(current, context)
context.set(step.name, current)
return current
# Example: Simple summarization chain
def create_summarization_chain(client: Any) -> Chain:
return Chain([
LLMStep(
client=client,
model="gpt-4o-mini",
system_prompt="Extract the key points from the following text.",
name="extract"
),
LLMStep(
client=client,
model="gpt-4o-mini",
system_prompt="Write a concise summary based on these key points.",
name="summarize"
)
], name="summarization")
Parallel Execution
from dataclasses import dataclass
from typing import Any, Callable
import asyncio
class ParallelChain(ChainStep):
"""Execute multiple chains in parallel."""
def __init__(
self,
chains: list[Chain],
merge_fn: Callable[[list[Any]], Any] = None,
name: str = "parallel"
):
self.chains = chains
self.merge_fn = merge_fn or (lambda x: x)
self.name = name
async def run(self, input: Any, context: ChainContext) -> Any:
"""Execute all chains in parallel."""
tasks = [
chain.run(input, context)
for chain in self.chains
]
results = await asyncio.gather(*tasks)
return self.merge_fn(results)
class MapChain(ChainStep):
"""Apply a chain to each item in a list."""
def __init__(
self,
chain: Chain,
max_concurrency: int = 10,
name: str = "map"
):
self.chain = chain
self.semaphore = asyncio.Semaphore(max_concurrency)
self.name = name
async def run(self, inputs: list[Any], context: ChainContext) -> list[Any]:
"""Apply chain to each input."""
async def process_one(item: Any) -> Any:
async with self.semaphore:
return await self.chain.run(item, context)
tasks = [process_one(item) for item in inputs]
return await asyncio.gather(*tasks)
class FanOutFanIn(ChainStep):
"""Fan out to multiple processors, then fan in results."""
def __init__(
self,
split_fn: Callable[[Any], list[Any]],
process_chain: Chain,
merge_fn: Callable[[list[Any]], Any],
name: str = "fan_out_fan_in"
):
self.split_fn = split_fn
self.process_chain = process_chain
self.merge_fn = merge_fn
self.name = name
async def run(self, input: Any, context: ChainContext) -> Any:
"""Split, process in parallel, merge."""
# Fan out
parts = self.split_fn(input)
# Process each part
tasks = [
self.process_chain.run(part, context)
for part in parts
]
results = await asyncio.gather(*tasks)
# Fan in
return self.merge_fn(results)
# Example: Parallel analysis chain
def create_parallel_analysis_chain(client: Any) -> Chain:
sentiment_chain = Chain([
LLMStep(
client=client,
model="gpt-4o-mini",
system_prompt="Analyze the sentiment of this text. Respond with: positive, negative, or neutral.",
name="sentiment"
)
])
topic_chain = Chain([
LLMStep(
client=client,
model="gpt-4o-mini",
system_prompt="Extract the main topics from this text as a comma-separated list.",
name="topics"
)
])
entity_chain = Chain([
LLMStep(
client=client,
model="gpt-4o-mini",
system_prompt="Extract named entities (people, places, organizations) from this text.",
name="entities"
)
])
def merge_analyses(results: list[str]) -> dict:
return {
"sentiment": results[0],
"topics": results[1],
"entities": results[2]
}
return Chain([
ParallelChain(
chains=[sentiment_chain, topic_chain, entity_chain],
merge_fn=merge_analyses,
name="analysis"
)
])
Conditional Routing
from dataclasses import dataclass
from typing import Any, Callable, Optional
class RouterStep(ChainStep):
"""Route to different chains based on condition."""
def __init__(
self,
router_fn: Callable[[Any], str],
routes: dict[str, Chain],
default: Optional[Chain] = None,
name: str = "router"
):
self.router_fn = router_fn
self.routes = routes
self.default = default
self.name = name
async def run(self, input: Any, context: ChainContext) -> Any:
"""Route to appropriate chain."""
route_key = self.router_fn(input)
context.set("route", route_key)
chain = self.routes.get(route_key, self.default)
if chain is None:
raise ValueError(f"No route found for: {route_key}")
return await chain.run(input, context)
class LLMRouter(ChainStep):
"""Use LLM to determine routing."""
ROUTER_PROMPT = """Classify the following input into one of these categories: {categories}
Input: {input}
Respond with only the category name."""
def __init__(
self,
client: Any,
model: str,
routes: dict[str, Chain],
default: Optional[Chain] = None,
name: str = "llm_router"
):
self.client = client
self.model = model
self.routes = routes
self.default = default
self.name = name
async def run(self, input: Any, context: ChainContext) -> Any:
"""Use LLM to route."""
categories = ", ".join(self.routes.keys())
prompt = self.ROUTER_PROMPT.format(
categories=categories,
input=str(input)
)
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}]
)
route_key = response.choices[0].message.content.strip().lower()
context.set("route", route_key)
chain = self.routes.get(route_key, self.default)
if chain is None:
raise ValueError(f"No route found for: {route_key}")
return await chain.run(input, context)
class ConditionalStep(ChainStep):
"""Execute step only if condition is met."""
def __init__(
self,
condition: Callable[[Any, ChainContext], bool],
if_true: ChainStep,
if_false: Optional[ChainStep] = None,
name: str = "conditional"
):
self.condition = condition
self.if_true = if_true
self.if_false = if_false
self.name = name
async def run(self, input: Any, context: ChainContext) -> Any:
"""Execute based on condition."""
if self.condition(input, context):
return await self.if_true.run(input, context)
elif self.if_false:
return await self.if_false.run(input, context)
else:
return input
# Example: Intent-based routing
def create_intent_router(client: Any) -> Chain:
question_chain = Chain([
LLMStep(
client=client,
model="gpt-4o",
system_prompt="Answer the following question thoroughly and accurately.",
name="answer"
)
])
task_chain = Chain([
LLMStep(
client=client,
model="gpt-4o",
system_prompt="Help complete the following task. Provide step-by-step guidance.",
name="task"
)
])
chat_chain = Chain([
LLMStep(
client=client,
model="gpt-4o-mini",
system_prompt="Engage in friendly conversation.",
name="chat"
)
])
return Chain([
LLMRouter(
client=client,
model="gpt-4o-mini",
routes={
"question": question_chain,
"task": task_chain,
"chat": chat_chain
},
default=chat_chain,
name="intent_router"
)
])
Error Handling
from dataclasses import dataclass
from typing import Any, Callable, Optional
import asyncio
@dataclass
class ChainError:
"""Error information from chain execution."""
step_name: str
error: Exception
input: Any
context: ChainContext
class RetryStep(ChainStep):
"""Wrap a step with retry logic."""
def __init__(
self,
step: ChainStep,
max_retries: int = 3,
delay: float = 1.0,
name: str = None
):
self.step = step
self.max_retries = max_retries
self.delay = delay
self.name = name or f"retry_{step.name}"
async def run(self, input: Any, context: ChainContext) -> Any:
"""Execute with retries."""
last_error = None
for attempt in range(self.max_retries + 1):
try:
return await self.step.run(input, context)
except Exception as e:
last_error = e
if attempt < self.max_retries:
await asyncio.sleep(self.delay * (2 ** attempt))
raise last_error
class FallbackStep(ChainStep):
"""Try primary step, fall back to secondary on failure."""
def __init__(
self,
primary: ChainStep,
fallback: ChainStep,
name: str = "fallback"
):
self.primary = primary
self.fallback = fallback
self.name = name
async def run(self, input: Any, context: ChainContext) -> Any:
"""Try primary, use fallback on failure."""
try:
result = await self.primary.run(input, context)
context.set("used_fallback", False)
return result
except Exception as e:
context.set("primary_error", str(e))
context.set("used_fallback", True)
return await self.fallback.run(input, context)
class ErrorHandlingChain(Chain):
"""Chain with comprehensive error handling."""
def __init__(
self,
steps: list[ChainStep] = None,
on_error: Callable[[ChainError], Any] = None,
name: str = "error_handling_chain"
):
super().__init__(steps, name)
self.on_error = on_error
async def run(self, input: Any, context: ChainContext = None) -> Any:
"""Execute with error handling."""
context = context or ChainContext(input=input)
current = input
for step in self.steps:
try:
current = await step.run(current, context)
context.set(step.name, current)
except Exception as e:
error = ChainError(
step_name=step.name,
error=e,
input=current,
context=context
)
if self.on_error:
return self.on_error(error)
raise
return current
class ValidationStep(ChainStep):
"""Validate output before continuing."""
def __init__(
self,
validator: Callable[[Any], bool],
error_message: str = "Validation failed",
name: str = "validate"
):
self.validator = validator
self.error_message = error_message
self.name = name
async def run(self, input: Any, context: ChainContext) -> Any:
"""Validate input."""
if not self.validator(input):
raise ValueError(self.error_message)
return input
# Example: Robust chain with error handling
def create_robust_chain(client: Any) -> Chain:
primary_llm = LLMStep(
client=client,
model="gpt-4o",
system_prompt="Process the following request.",
name="primary"
)
fallback_llm = LLMStep(
client=client,
model="gpt-4o-mini",
system_prompt="Process the following request.",
name="fallback"
)
return ErrorHandlingChain([
RetryStep(
FallbackStep(primary_llm, fallback_llm),
max_retries=2
),
ValidationStep(
validator=lambda x: len(x) > 0,
error_message="Empty response"
)
])
Production Chain Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, Any
import asyncio
app = FastAPI()
# Chain registry
chains: dict[str, Chain] = {}
class ChainRequest(BaseModel):
chain_name: str
input: Any
metadata: Optional[dict] = None
class ChainResponse(BaseModel):
output: Any
intermediate: dict
metadata: dict
@app.post("/v1/chains/execute")
async def execute_chain(request: ChainRequest) -> ChainResponse:
"""Execute a registered chain."""
chain = chains.get(request.chain_name)
if not chain:
raise HTTPException(404, f"Chain not found: {request.chain_name}")
context = ChainContext(
input=request.input,
metadata=request.metadata or {}
)
try:
output = await chain.run(request.input, context)
return ChainResponse(
output=output,
intermediate=context.intermediate,
metadata=context.metadata
)
except Exception as e:
raise HTTPException(500, f"Chain execution failed: {str(e)}")
@app.get("/v1/chains")
async def list_chains():
"""List all registered chains."""
return {
"chains": [
{
"name": name,
"steps": [step.name for step in chain.steps]
}
for name, chain in chains.items()
]
}
@app.post("/v1/chains/register")
async def register_chain(name: str, chain_config: dict):
"""Register a new chain from configuration."""
# Build chain from config (simplified)
# In production, use a proper chain builder
return {"registered": name}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- LangChain LCEL: https://python.langchain.com/docs/expression_language/
- LlamaIndex Pipelines: https://docs.llamaindex.ai/en/stable/module_guides/querying/pipeline/
- Haystack Pipelines: https://docs.haystack.deepset.ai/docs/pipelines
- Prefect Workflows: https://docs.prefect.io/
Conclusion
Chain composition is the foundation of sophisticated LLM applications. Sequential chains enable multi-step processing where each step builds on the previous output—use them for workflows like summarization, analysis, and content generation. Parallel chains maximize throughput by executing independent operations concurrently—ideal for multi-faceted analysis or processing multiple items. Conditional routing enables dynamic workflows that adapt to input characteristics—route questions to different models or processing paths based on intent. Robust error handling with retries, fallbacks, and validation ensures chains fail gracefully. Design chains to be composable and reusable—small, focused steps that can be combined into larger workflows. The goal is building flexible, maintainable AI systems that can handle complex real-world requirements.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.

Leave a Reply