Introduction: LangChain has emerged as the dominant framework for building production Retrieval-Augmented Generation (RAG) applications, providing abstractions for document loading, text splitting, embedding, vector storage, and retrieval chains. By late 2023, LangChain reached production maturity with improved stability, better documentation, and enterprise-ready features. After deploying LangChain-based RAG systems across multiple organizations, I’ve found that its modular architecture enables rapid prototyping while supporting production requirements like observability, caching, and error handling. Organizations should adopt LangChain for knowledge-grounded AI applications requiring flexibility in model selection and retrieval strategies.
RAG Architecture Fundamentals
Retrieval-Augmented Generation combines the knowledge retrieval capabilities of search systems with the generative power of large language models. Rather than relying solely on the LLM’s training data, RAG systems retrieve relevant context from external knowledge bases before generating responses. This approach grounds responses in factual, up-to-date information while reducing hallucinations.
The RAG pipeline consists of two phases: indexing and retrieval. During indexing, documents are loaded, split into chunks, embedded into vector representations, and stored in a vector database. During retrieval, user queries are embedded, similar chunks are retrieved, and the LLM generates responses using the retrieved context. LangChain provides abstractions for each step, enabling customization while maintaining clean architecture.
Chunking strategy significantly impacts retrieval quality. Chunks too small lose context; chunks too large dilute relevance. LangChain offers multiple text splitters including recursive character splitting, semantic splitting, and document-aware splitting that respects headers and sections. Choosing the right splitter depends on document structure and query patterns.
LangChain Expression Language (LCEL)
LangChain Expression Language provides a declarative way to compose chains using the pipe operator. LCEL chains are streaming-first, support async execution, and enable parallel processing. This composability allows building complex pipelines from simple, reusable components while maintaining readability.
LCEL’s RunnablePassthrough and RunnableParallel enable sophisticated data flow patterns. Pass context through chains unchanged, execute multiple retrievers in parallel, or combine results from different sources. These primitives support advanced RAG patterns like multi-query retrieval and ensemble retrieval without custom orchestration code.
Python Implementation: Production RAG with LangChain
Here’s a comprehensive implementation demonstrating production RAG patterns with LangChain:
"""Production RAG Implementation with LangChain"""
import asyncio
import logging
from typing import Dict, Any, List, Optional, AsyncIterator
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
import hashlib
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma, FAISS
from langchain_community.document_loaders import (
PyPDFLoader,
TextLoader,
UnstructuredMarkdownLoader,
DirectoryLoader
)
from langchain.text_splitter import (
RecursiveCharacterTextSplitter,
MarkdownHeaderTextSplitter
)
from langchain.retrievers import (
ContextualCompressionRetriever,
MultiQueryRetriever,
EnsembleRetriever
)
from langchain.retrievers.document_compressors import LLMChainExtractor
from langchain_community.retrievers import BM25Retriever
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ==================== Configuration ====================
@dataclass
class RAGConfig:
"""Configuration for RAG pipeline."""
# Model settings
llm_model: str = "gpt-4-turbo-preview"
embedding_model: str = "text-embedding-3-small"
temperature: float = 0.0
# Chunking settings
chunk_size: int = 1000
chunk_overlap: int = 200
# Retrieval settings
retrieval_k: int = 4
use_reranking: bool = True
use_multi_query: bool = True
# Vector store settings
vector_store_type: str = "chroma" # "chroma" or "faiss"
persist_directory: str = "./vector_store"
# Cache settings
enable_cache: bool = True
cache_ttl_seconds: int = 3600
@dataclass
class RetrievalResult:
"""Result from retrieval operation."""
documents: List[Document]
query: str
retrieval_time_ms: float
source_count: int
metadata: Dict[str, Any] = field(default_factory=dict)
# ==================== Document Processing ====================
class DocumentProcessor:
"""Handles document loading and chunking."""
def __init__(self, config: RAGConfig):
self.config = config
# Initialize text splitter
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=config.chunk_size,
chunk_overlap=config.chunk_overlap,
length_function=len,
separators=["\n\n", "\n", ". ", " ", ""]
)
# Markdown-aware splitter for structured documents
self.markdown_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=[
("#", "header_1"),
("##", "header_2"),
("###", "header_3"),
]
)
def load_documents(self, path: str) -> List[Document]:
"""Load documents from file or directory."""
path_obj = Path(path)
if path_obj.is_file():
return self._load_single_file(path_obj)
elif path_obj.is_dir():
return self._load_directory(path_obj)
else:
raise ValueError(f"Invalid path: {path}")
def _load_single_file(self, path: Path) -> List[Document]:
"""Load a single file based on extension."""
suffix = path.suffix.lower()
loaders = {
".pdf": PyPDFLoader,
".txt": TextLoader,
".md": UnstructuredMarkdownLoader,
}
loader_class = loaders.get(suffix)
if not loader_class:
raise ValueError(f"Unsupported file type: {suffix}")
loader = loader_class(str(path))
documents = loader.load()
# Add source metadata
for doc in documents:
doc.metadata["source_file"] = path.name
doc.metadata["file_type"] = suffix
doc.metadata["loaded_at"] = datetime.utcnow().isoformat()
return documents
def _load_directory(self, path: Path) -> List[Document]:
"""Load all supported files from directory."""
all_documents = []
for pattern in ["**/*.pdf", "**/*.txt", "**/*.md"]:
for file_path in path.glob(pattern):
try:
docs = self._load_single_file(file_path)
all_documents.extend(docs)
logger.info(f"Loaded {len(docs)} documents from {file_path}")
except Exception as e:
logger.error(f"Failed to load {file_path}: {e}")
return all_documents
def split_documents(
self,
documents: List[Document],
use_markdown_aware: bool = False
) -> List[Document]:
"""Split documents into chunks."""
if use_markdown_aware:
# First split by markdown headers, then by size
md_splits = []
for doc in documents:
if doc.metadata.get("file_type") == ".md":
header_splits = self.markdown_splitter.split_text(doc.page_content)
for split in header_splits:
new_doc = Document(
page_content=split.page_content,
metadata={**doc.metadata, **split.metadata}
)
md_splits.append(new_doc)
else:
md_splits.append(doc)
documents = md_splits
# Apply recursive character splitting
chunks = self.text_splitter.split_documents(documents)
# Add chunk metadata
for i, chunk in enumerate(chunks):
chunk.metadata["chunk_index"] = i
chunk.metadata["chunk_hash"] = hashlib.md5(
chunk.page_content.encode()
).hexdigest()[:8]
logger.info(f"Split {len(documents)} documents into {len(chunks)} chunks")
return chunks
# ==================== Vector Store Management ====================
class VectorStoreManager:
"""Manages vector store operations."""
def __init__(self, config: RAGConfig):
self.config = config
self.embeddings = OpenAIEmbeddings(model=config.embedding_model)
self.vector_store = None
def create_vector_store(self, documents: List[Document]) -> None:
"""Create vector store from documents."""
if self.config.vector_store_type == "chroma":
self.vector_store = Chroma.from_documents(
documents=documents,
embedding=self.embeddings,
persist_directory=self.config.persist_directory
)
elif self.config.vector_store_type == "faiss":
self.vector_store = FAISS.from_documents(
documents=documents,
embedding=self.embeddings
)
else:
raise ValueError(f"Unknown vector store: {self.config.vector_store_type}")
logger.info(f"Created {self.config.vector_store_type} vector store with {len(documents)} documents")
def load_vector_store(self) -> None:
"""Load existing vector store."""
if self.config.vector_store_type == "chroma":
self.vector_store = Chroma(
persist_directory=self.config.persist_directory,
embedding_function=self.embeddings
)
elif self.config.vector_store_type == "faiss":
self.vector_store = FAISS.load_local(
self.config.persist_directory,
self.embeddings
)
logger.info(f"Loaded {self.config.vector_store_type} vector store")
def add_documents(self, documents: List[Document]) -> None:
"""Add documents to existing vector store."""
if self.vector_store is None:
raise ValueError("Vector store not initialized")
self.vector_store.add_documents(documents)
logger.info(f"Added {len(documents)} documents to vector store")
def get_retriever(self, k: int = 4):
"""Get base retriever from vector store."""
if self.vector_store is None:
raise ValueError("Vector store not initialized")
return self.vector_store.as_retriever(
search_type="similarity",
search_kwargs={"k": k}
)
# ==================== Advanced Retrievers ====================
class AdvancedRetrieverFactory:
"""Factory for creating advanced retrievers."""
def __init__(self, config: RAGConfig, vector_store_manager: VectorStoreManager):
self.config = config
self.vector_store_manager = vector_store_manager
self.llm = ChatOpenAI(
model=config.llm_model,
temperature=0
)
def create_multi_query_retriever(self):
"""Create retriever that generates multiple query variations."""
base_retriever = self.vector_store_manager.get_retriever(self.config.retrieval_k)
return MultiQueryRetriever.from_llm(
retriever=base_retriever,
llm=self.llm
)
def create_contextual_compression_retriever(self):
"""Create retriever with LLM-based compression."""
base_retriever = self.vector_store_manager.get_retriever(self.config.retrieval_k * 2)
compressor = LLMChainExtractor.from_llm(self.llm)
return ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=base_retriever
)
def create_ensemble_retriever(self, documents: List[Document]):
"""Create ensemble retriever combining dense and sparse retrieval."""
# Dense retriever (vector similarity)
dense_retriever = self.vector_store_manager.get_retriever(self.config.retrieval_k)
# Sparse retriever (BM25)
bm25_retriever = BM25Retriever.from_documents(documents)
bm25_retriever.k = self.config.retrieval_k
# Combine with weights
return EnsembleRetriever(
retrievers=[dense_retriever, bm25_retriever],
weights=[0.6, 0.4]
)
# ==================== RAG Chain ====================
class RAGChain:
"""Production RAG chain with LCEL."""
def __init__(
self,
config: RAGConfig,
retriever,
system_prompt: Optional[str] = None
):
self.config = config
self.retriever = retriever
# Initialize LLM
self.llm = ChatOpenAI(
model=config.llm_model,
temperature=config.temperature,
streaming=True
)
# Build prompts
self.system_prompt = system_prompt or self._default_system_prompt()
self.qa_prompt = self._build_qa_prompt()
# Build chain
self.chain = self._build_chain()
def _default_system_prompt(self) -> str:
"""Default system prompt for RAG."""
return """You are a helpful assistant that answers questions based on the provided context.
Guidelines:
- Answer based ONLY on the provided context
- If the context doesn't contain enough information, say so
- Cite specific parts of the context when relevant
- Be concise but thorough
- If asked about something not in the context, acknowledge the limitation"""
def _build_qa_prompt(self) -> ChatPromptTemplate:
"""Build the QA prompt template."""
return ChatPromptTemplate.from_messages([
("system", self.system_prompt),
MessagesPlaceholder(variable_name="chat_history", optional=True),
("human", """Context:
{context}
Question: {question}
Answer based on the context above:""")
])
def _format_docs(self, docs: List[Document]) -> str:
"""Format retrieved documents for context."""
formatted = []
for i, doc in enumerate(docs, 1):
source = doc.metadata.get("source_file", "Unknown")
formatted.append(f"[{i}] Source: {source}\n{doc.page_content}")
return "\n\n---\n\n".join(formatted)
def _build_chain(self):
"""Build the LCEL chain."""
# Retrieval and formatting
retrieval_chain = RunnableParallel(
context=self.retriever | self._format_docs,
question=RunnablePassthrough()
)
# Full chain
return (
retrieval_chain
| self.qa_prompt
| self.llm
| StrOutputParser()
)
def invoke(self, question: str, chat_history: List = None) -> str:
"""Invoke the RAG chain."""
inputs = {"question": question}
if chat_history:
inputs["chat_history"] = chat_history
return self.chain.invoke(inputs)
async def ainvoke(self, question: str, chat_history: List = None) -> str:
"""Async invoke the RAG chain."""
inputs = {"question": question}
if chat_history:
inputs["chat_history"] = chat_history
return await self.chain.ainvoke(inputs)
async def astream(
self,
question: str,
chat_history: List = None
) -> AsyncIterator[str]:
"""Stream responses from the RAG chain."""
inputs = {"question": question}
if chat_history:
inputs["chat_history"] = chat_history
async for chunk in self.chain.astream(inputs):
yield chunk
# ==================== Conversational RAG ====================
class ConversationalRAG:
"""RAG with conversation history support."""
def __init__(self, rag_chain: RAGChain):
self.rag_chain = rag_chain
self.conversations: Dict[str, List] = {}
def get_chat_history(self, session_id: str) -> List:
"""Get chat history for a session."""
return self.conversations.get(session_id, [])
def add_to_history(
self,
session_id: str,
human_message: str,
ai_message: str
) -> None:
"""Add exchange to chat history."""
if session_id not in self.conversations:
self.conversations[session_id] = []
self.conversations[session_id].extend([
HumanMessage(content=human_message),
AIMessage(content=ai_message)
])
# Keep only last 10 exchanges
if len(self.conversations[session_id]) > 20:
self.conversations[session_id] = self.conversations[session_id][-20:]
def chat(self, session_id: str, question: str) -> str:
"""Process a chat message with history."""
history = self.get_chat_history(session_id)
response = self.rag_chain.invoke(question, chat_history=history)
self.add_to_history(session_id, question, response)
return response
async def achat(self, session_id: str, question: str) -> str:
"""Async chat with history."""
history = self.get_chat_history(session_id)
response = await self.rag_chain.ainvoke(question, chat_history=history)
self.add_to_history(session_id, question, response)
return response
def clear_history(self, session_id: str) -> None:
"""Clear chat history for a session."""
self.conversations.pop(session_id, None)
# ==================== RAG Application ====================
class RAGApplication:
"""High-level RAG application."""
def __init__(self, config: RAGConfig):
self.config = config
self.document_processor = DocumentProcessor(config)
self.vector_store_manager = VectorStoreManager(config)
self.rag_chain: Optional[RAGChain] = None
self.conversational_rag: Optional[ConversationalRAG] = None
def index_documents(self, path: str) -> int:
"""Index documents from path."""
# Load documents
documents = self.document_processor.load_documents(path)
# Split into chunks
chunks = self.document_processor.split_documents(documents)
# Create vector store
self.vector_store_manager.create_vector_store(chunks)
return len(chunks)
def initialize_chain(
self,
use_advanced_retrieval: bool = True,
system_prompt: Optional[str] = None
) -> None:
"""Initialize the RAG chain."""
if use_advanced_retrieval and self.config.use_multi_query:
retriever_factory = AdvancedRetrieverFactory(
self.config,
self.vector_store_manager
)
retriever = retriever_factory.create_multi_query_retriever()
else:
retriever = self.vector_store_manager.get_retriever(
self.config.retrieval_k
)
self.rag_chain = RAGChain(
self.config,
retriever,
system_prompt
)
self.conversational_rag = ConversationalRAG(self.rag_chain)
def query(self, question: str) -> str:
"""Query the RAG system."""
if self.rag_chain is None:
raise ValueError("RAG chain not initialized")
return self.rag_chain.invoke(question)
def chat(self, session_id: str, question: str) -> str:
"""Chat with conversation history."""
if self.conversational_rag is None:
raise ValueError("Conversational RAG not initialized")
return self.conversational_rag.chat(session_id, question)
async def astream_response(
self,
question: str
) -> AsyncIterator[str]:
"""Stream response for a question."""
if self.rag_chain is None:
raise ValueError("RAG chain not initialized")
async for chunk in self.rag_chain.astream(question):
yield chunk
# ==================== Example Usage ====================
async def main():
"""Demonstrate RAG application."""
# Configuration
config = RAGConfig(
llm_model="gpt-4-turbo-preview",
chunk_size=1000,
chunk_overlap=200,
retrieval_k=4,
use_multi_query=True
)
# Initialize application
app = RAGApplication(config)
# Index documents (example path)
# num_chunks = app.index_documents("./documents")
# print(f"Indexed {num_chunks} chunks")
# Initialize chain
# app.initialize_chain(use_advanced_retrieval=True)
# Query
# response = app.query("What are the main features?")
# print(f"Response: {response}")
# Conversational chat
# response1 = app.chat("session_1", "What is this document about?")
# response2 = app.chat("session_1", "Can you elaborate on that?")
print("RAG Application initialized successfully")
if __name__ == "__main__":
asyncio.run(main())
Production Considerations
Production RAG systems require attention to observability, error handling, and performance optimization. Implement logging at each pipeline stage to diagnose retrieval quality issues. Use LangSmith or similar tools for tracing chain execution and identifying bottlenecks. Cache embeddings and retrieval results to reduce latency and API costs.
Evaluation is critical for RAG quality. Measure retrieval precision and recall using labeled datasets. Track answer relevance and faithfulness to retrieved context. Implement feedback loops to identify and address common failure modes. Regular evaluation ensures the system maintains quality as documents and queries evolve.

Key Takeaways and Implementation Strategy
LangChain provides the building blocks for production RAG systems, from document loading to advanced retrieval strategies. LCEL enables composable, streaming-first chains that scale from prototypes to production. The framework’s flexibility supports experimentation while maintaining clean architecture.
For implementation, start with basic retrieval and progressively add complexity. Begin with simple similarity search, then evaluate multi-query and ensemble retrieval for improved recall. Add contextual compression when precision matters more than latency. Invest in evaluation infrastructure early to guide optimization decisions.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.