Introduction: Document processing is the foundation of any RAG (Retrieval-Augmented Generation) system. Before you can search and retrieve relevant information, you need to extract text from various file formats, split it into meaningful chunks, and generate embeddings for vector search. The quality of your document processing pipeline directly impacts retrieval accuracy and ultimately the quality of your AI application’s responses. A well-designed pipeline handles diverse formats (PDF, Word, HTML, Markdown), preserves document structure and metadata, chunks text intelligently to maintain semantic coherence, and processes documents efficiently at scale. This guide covers practical patterns for building robust document processing pipelines: from basic text extraction to sophisticated chunking strategies and production-ready ingestion systems.

Document Extraction
from dataclasses import dataclass, field
from typing import Any, Optional, List, Dict
from abc import ABC, abstractmethod
from pathlib import Path
from enum import Enum
import mimetypes
class DocumentType(Enum):
"""Supported document types."""
PDF = "pdf"
DOCX = "docx"
HTML = "html"
MARKDOWN = "markdown"
TEXT = "text"
CSV = "csv"
JSON = "json"
@dataclass
class ExtractedDocument:
"""Extracted document content."""
content: str
metadata: dict = field(default_factory=dict)
pages: list[str] = field(default_factory=list)
tables: list[dict] = field(default_factory=list)
images: list[dict] = field(default_factory=list)
@property
def page_count(self) -> int:
return len(self.pages) if self.pages else 1
class DocumentExtractor(ABC):
"""Abstract document extractor."""
@abstractmethod
def extract(self, file_path: str) -> ExtractedDocument:
"""Extract content from document."""
pass
@abstractmethod
def supports(self, file_path: str) -> bool:
"""Check if extractor supports file type."""
pass
class PDFExtractor(DocumentExtractor):
"""Extract content from PDF files."""
def supports(self, file_path: str) -> bool:
return file_path.lower().endswith('.pdf')
def extract(self, file_path: str) -> ExtractedDocument:
"""Extract text from PDF."""
import fitz # PyMuPDF
doc = fitz.open(file_path)
pages = []
tables = []
images = []
for page_num, page in enumerate(doc):
# Extract text
text = page.get_text()
pages.append(text)
# Extract tables (simplified)
page_tables = page.find_tables()
for table in page_tables:
tables.append({
"page": page_num,
"data": table.extract()
})
# Extract images
for img_index, img in enumerate(page.get_images()):
images.append({
"page": page_num,
"index": img_index,
"xref": img[0]
})
metadata = {
"title": doc.metadata.get("title", ""),
"author": doc.metadata.get("author", ""),
"page_count": len(pages),
"file_path": file_path
}
doc.close()
return ExtractedDocument(
content="\n\n".join(pages),
metadata=metadata,
pages=pages,
tables=tables,
images=images
)
class DocxExtractor(DocumentExtractor):
"""Extract content from Word documents."""
def supports(self, file_path: str) -> bool:
return file_path.lower().endswith('.docx')
def extract(self, file_path: str) -> ExtractedDocument:
"""Extract text from DOCX."""
from docx import Document
doc = Document(file_path)
paragraphs = []
tables = []
for para in doc.paragraphs:
if para.text.strip():
paragraphs.append(para.text)
for table_idx, table in enumerate(doc.tables):
table_data = []
for row in table.rows:
row_data = [cell.text for cell in row.cells]
table_data.append(row_data)
tables.append({
"index": table_idx,
"data": table_data
})
metadata = {
"file_path": file_path,
"paragraph_count": len(paragraphs)
}
return ExtractedDocument(
content="\n\n".join(paragraphs),
metadata=metadata,
tables=tables
)
class HTMLExtractor(DocumentExtractor):
"""Extract content from HTML files."""
def supports(self, file_path: str) -> bool:
return file_path.lower().endswith(('.html', '.htm'))
def extract(self, file_path: str) -> ExtractedDocument:
"""Extract text from HTML."""
from bs4 import BeautifulSoup
with open(file_path, 'r', encoding='utf-8') as f:
html = f.read()
soup = BeautifulSoup(html, 'html.parser')
# Remove script and style elements
for element in soup(['script', 'style', 'nav', 'footer']):
element.decompose()
# Extract text
text = soup.get_text(separator='\n', strip=True)
# Extract title
title = soup.title.string if soup.title else ""
# Extract tables
tables = []
for table_idx, table in enumerate(soup.find_all('table')):
rows = []
for row in table.find_all('tr'):
cells = [cell.get_text(strip=True) for cell in row.find_all(['td', 'th'])]
rows.append(cells)
tables.append({
"index": table_idx,
"data": rows
})
metadata = {
"title": title,
"file_path": file_path
}
return ExtractedDocument(
content=text,
metadata=metadata,
tables=tables
)
class MarkdownExtractor(DocumentExtractor):
"""Extract content from Markdown files."""
def supports(self, file_path: str) -> bool:
return file_path.lower().endswith(('.md', '.markdown'))
def extract(self, file_path: str) -> ExtractedDocument:
"""Extract text from Markdown."""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Extract headers for structure
import re
headers = re.findall(r'^#+\s+(.+)$', content, re.MULTILINE)
metadata = {
"file_path": file_path,
"headers": headers
}
return ExtractedDocument(
content=content,
metadata=metadata
)
class UniversalExtractor:
"""Universal document extractor."""
def __init__(self):
self.extractors: list[DocumentExtractor] = [
PDFExtractor(),
DocxExtractor(),
HTMLExtractor(),
MarkdownExtractor()
]
def extract(self, file_path: str) -> ExtractedDocument:
"""Extract content from any supported file."""
for extractor in self.extractors:
if extractor.supports(file_path):
return extractor.extract(file_path)
# Fallback to plain text
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
return ExtractedDocument(
content=content,
metadata={"file_path": file_path}
)
def add_extractor(self, extractor: DocumentExtractor):
"""Add custom extractor."""
self.extractors.insert(0, extractor)
Text Chunking Strategies
from dataclasses import dataclass
from typing import Any, Optional, List, Callable
from abc import ABC, abstractmethod
import re
@dataclass
class TextChunk:
"""A chunk of text."""
content: str
metadata: dict
start_index: int
end_index: int
chunk_index: int
class ChunkingStrategy(ABC):
"""Abstract chunking strategy."""
@abstractmethod
def chunk(self, text: str, metadata: dict = None) -> list[TextChunk]:
"""Split text into chunks."""
pass
class FixedSizeChunker(ChunkingStrategy):
"""Fixed size chunking with overlap."""
def __init__(
self,
chunk_size: int = 1000,
chunk_overlap: int = 200
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def chunk(self, text: str, metadata: dict = None) -> list[TextChunk]:
"""Split text into fixed-size chunks."""
chunks = []
start = 0
chunk_index = 0
while start < len(text):
end = start + self.chunk_size
# Try to break at sentence boundary
if end < len(text):
# Look for sentence end near chunk boundary
search_start = max(end - 100, start)
search_text = text[search_start:end + 100]
sentence_ends = [m.end() for m in re.finditer(r'[.!?]\s', search_text)]
if sentence_ends:
# Find closest to target
target = end - search_start
closest = min(sentence_ends, key=lambda x: abs(x - target))
end = search_start + closest
chunk_text = text[start:end].strip()
if chunk_text:
chunks.append(TextChunk(
content=chunk_text,
metadata={**(metadata or {}), "chunk_index": chunk_index},
start_index=start,
end_index=end,
chunk_index=chunk_index
))
chunk_index += 1
start = end - self.chunk_overlap
return chunks
class SentenceChunker(ChunkingStrategy):
"""Chunk by sentences."""
def __init__(
self,
sentences_per_chunk: int = 5,
overlap_sentences: int = 1
):
self.sentences_per_chunk = sentences_per_chunk
self.overlap_sentences = overlap_sentences
def chunk(self, text: str, metadata: dict = None) -> list[TextChunk]:
"""Split text into sentence-based chunks."""
# Split into sentences
sentences = re.split(r'(?<=[.!?])\s+', text)
sentences = [s.strip() for s in sentences if s.strip()]
chunks = []
chunk_index = 0
i = 0
while i < len(sentences):
chunk_sentences = sentences[i:i + self.sentences_per_chunk]
chunk_text = ' '.join(chunk_sentences)
# Calculate character positions
start_index = text.find(chunk_sentences[0]) if chunk_sentences else 0
end_index = start_index + len(chunk_text)
chunks.append(TextChunk(
content=chunk_text,
metadata={**(metadata or {}), "chunk_index": chunk_index},
start_index=start_index,
end_index=end_index,
chunk_index=chunk_index
))
chunk_index += 1
i += self.sentences_per_chunk - self.overlap_sentences
return chunks
class SemanticChunker(ChunkingStrategy):
"""Chunk by semantic boundaries."""
def __init__(
self,
embedding_model: Any,
similarity_threshold: float = 0.5,
min_chunk_size: int = 100,
max_chunk_size: int = 2000
):
self.embedding_model = embedding_model
self.similarity_threshold = similarity_threshold
self.min_chunk_size = min_chunk_size
self.max_chunk_size = max_chunk_size
async def chunk(self, text: str, metadata: dict = None) -> list[TextChunk]:
"""Split text at semantic boundaries."""
import numpy as np
# Split into sentences
sentences = re.split(r'(?<=[.!?])\s+', text)
sentences = [s.strip() for s in sentences if s.strip()]
if len(sentences) <= 1:
return [TextChunk(
content=text,
metadata=metadata or {},
start_index=0,
end_index=len(text),
chunk_index=0
)]
# Get embeddings for each sentence
embeddings = await self.embedding_model.embed_batch(sentences)
# Find semantic boundaries
boundaries = [0]
for i in range(1, len(sentences)):
similarity = self._cosine_similarity(embeddings[i-1], embeddings[i])
if similarity < self.similarity_threshold:
boundaries.append(i)
boundaries.append(len(sentences))
# Create chunks from boundaries
chunks = []
chunk_index = 0
for i in range(len(boundaries) - 1):
start_sent = boundaries[i]
end_sent = boundaries[i + 1]
chunk_sentences = sentences[start_sent:end_sent]
chunk_text = ' '.join(chunk_sentences)
# Enforce size limits
if len(chunk_text) > self.max_chunk_size:
# Split large chunks
sub_chunks = self._split_large_chunk(chunk_text, metadata, chunk_index)
chunks.extend(sub_chunks)
chunk_index += len(sub_chunks)
elif len(chunk_text) >= self.min_chunk_size:
chunks.append(TextChunk(
content=chunk_text,
metadata={**(metadata or {}), "chunk_index": chunk_index},
start_index=text.find(chunk_sentences[0]),
end_index=text.find(chunk_sentences[-1]) + len(chunk_sentences[-1]),
chunk_index=chunk_index
))
chunk_index += 1
return chunks
def _cosine_similarity(self, a: list, b: list) -> float:
import numpy as np
a = np.array(a)
b = np.array(b)
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
def _split_large_chunk(
self,
text: str,
metadata: dict,
start_index: int
) -> list[TextChunk]:
"""Split oversized chunk."""
chunker = FixedSizeChunker(
chunk_size=self.max_chunk_size,
chunk_overlap=100
)
sub_chunks = chunker.chunk(text, metadata)
# Update indices
for i, chunk in enumerate(sub_chunks):
chunk.chunk_index = start_index + i
return sub_chunks
class RecursiveChunker(ChunkingStrategy):
"""Recursively chunk by structure."""
def __init__(
self,
chunk_size: int = 1000,
chunk_overlap: int = 200,
separators: list[str] = None
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.separators = separators or [
"\n\n", # Paragraphs
"\n", # Lines
". ", # Sentences
" ", # Words
]
def chunk(self, text: str, metadata: dict = None) -> list[TextChunk]:
"""Recursively split text."""
return self._recursive_split(text, self.separators, metadata or {})
def _recursive_split(
self,
text: str,
separators: list[str],
metadata: dict
) -> list[TextChunk]:
"""Recursively split using separators."""
if len(text) <= self.chunk_size:
return [TextChunk(
content=text,
metadata=metadata,
start_index=0,
end_index=len(text),
chunk_index=0
)]
if not separators:
# No more separators, force split
return self._force_split(text, metadata)
separator = separators[0]
remaining_separators = separators[1:]
splits = text.split(separator)
chunks = []
current_chunk = ""
chunk_index = 0
for split in splits:
if len(current_chunk) + len(split) + len(separator) <= self.chunk_size:
current_chunk += split + separator
else:
if current_chunk:
# Recursively process if still too large
if len(current_chunk) > self.chunk_size:
sub_chunks = self._recursive_split(
current_chunk,
remaining_separators,
metadata
)
for sc in sub_chunks:
sc.chunk_index = chunk_index
chunk_index += 1
chunks.extend(sub_chunks)
else:
chunks.append(TextChunk(
content=current_chunk.strip(),
metadata={**metadata, "chunk_index": chunk_index},
start_index=0,
end_index=len(current_chunk),
chunk_index=chunk_index
))
chunk_index += 1
current_chunk = split + separator
# Handle remaining
if current_chunk.strip():
chunks.append(TextChunk(
content=current_chunk.strip(),
metadata={**metadata, "chunk_index": chunk_index},
start_index=0,
end_index=len(current_chunk),
chunk_index=chunk_index
))
return chunks
def _force_split(self, text: str, metadata: dict) -> list[TextChunk]:
"""Force split at chunk_size."""
chunks = []
start = 0
chunk_index = 0
while start < len(text):
end = min(start + self.chunk_size, len(text))
chunks.append(TextChunk(
content=text[start:end],
metadata={**metadata, "chunk_index": chunk_index},
start_index=start,
end_index=end,
chunk_index=chunk_index
))
chunk_index += 1
start = end - self.chunk_overlap
return chunks
Document Pipeline
from dataclasses import dataclass
from typing import Any, Optional, List, Callable
import asyncio
@dataclass
class ProcessedDocument:
"""Fully processed document."""
document_id: str
source_path: str
chunks: list[TextChunk]
embeddings: list[list[float]]
metadata: dict
class DocumentPipeline:
"""Complete document processing pipeline."""
def __init__(
self,
extractor: UniversalExtractor,
chunker: ChunkingStrategy,
embedding_model: Any
):
self.extractor = extractor
self.chunker = chunker
self.embedding_model = embedding_model
self.preprocessors: list[Callable] = []
self.postprocessors: list[Callable] = []
def add_preprocessor(self, func: Callable[[str], str]):
"""Add text preprocessor."""
self.preprocessors.append(func)
def add_postprocessor(self, func: Callable[[TextChunk], TextChunk]):
"""Add chunk postprocessor."""
self.postprocessors.append(func)
async def process(
self,
file_path: str,
document_id: str = None
) -> ProcessedDocument:
"""Process a single document."""
import uuid
document_id = document_id or str(uuid.uuid4())
# Extract content
extracted = self.extractor.extract(file_path)
# Preprocess text
text = extracted.content
for preprocessor in self.preprocessors:
text = preprocessor(text)
# Chunk text
chunks = self.chunker.chunk(text, extracted.metadata)
# Postprocess chunks
for postprocessor in self.postprocessors:
chunks = [postprocessor(chunk) for chunk in chunks]
# Generate embeddings
chunk_texts = [chunk.content for chunk in chunks]
embeddings = await self.embedding_model.embed_batch(chunk_texts)
return ProcessedDocument(
document_id=document_id,
source_path=file_path,
chunks=chunks,
embeddings=embeddings,
metadata=extracted.metadata
)
async def process_batch(
self,
file_paths: list[str],
concurrency: int = 5
) -> list[ProcessedDocument]:
"""Process multiple documents."""
semaphore = asyncio.Semaphore(concurrency)
async def process_with_semaphore(path: str) -> ProcessedDocument:
async with semaphore:
return await self.process(path)
tasks = [process_with_semaphore(path) for path in file_paths]
return await asyncio.gather(*tasks)
class IncrementalPipeline:
"""Pipeline with incremental updates."""
def __init__(
self,
pipeline: DocumentPipeline,
vector_store: Any
):
self.pipeline = pipeline
self.vector_store = vector_store
self.processed_docs: dict[str, str] = {} # path -> hash
def _file_hash(self, file_path: str) -> str:
"""Calculate file hash."""
import hashlib
with open(file_path, 'rb') as f:
return hashlib.md5(f.read()).hexdigest()
async def sync(self, file_paths: list[str]) -> dict:
"""Sync documents with vector store."""
results = {
"added": [],
"updated": [],
"unchanged": [],
"removed": []
}
current_paths = set(file_paths)
# Find removed documents
for path in list(self.processed_docs.keys()):
if path not in current_paths:
# Remove from vector store
await self.vector_store.delete(
filter={"source_path": path}
)
del self.processed_docs[path]
results["removed"].append(path)
# Process new and updated documents
for path in file_paths:
file_hash = self._file_hash(path)
if path not in self.processed_docs:
# New document
doc = await self.pipeline.process(path)
await self._index_document(doc)
self.processed_docs[path] = file_hash
results["added"].append(path)
elif self.processed_docs[path] != file_hash:
# Updated document
await self.vector_store.delete(
filter={"source_path": path}
)
doc = await self.pipeline.process(path)
await self._index_document(doc)
self.processed_docs[path] = file_hash
results["updated"].append(path)
else:
results["unchanged"].append(path)
return results
async def _index_document(self, doc: ProcessedDocument):
"""Index document in vector store."""
for chunk, embedding in zip(doc.chunks, doc.embeddings):
await self.vector_store.upsert(
id=f"{doc.document_id}_{chunk.chunk_index}",
embedding=embedding,
metadata={
"document_id": doc.document_id,
"source_path": doc.source_path,
"chunk_index": chunk.chunk_index,
"content": chunk.content,
**chunk.metadata
}
)
Production Processing Service
from fastapi import FastAPI, HTTPException, UploadFile, File, BackgroundTasks
from pydantic import BaseModel
from typing import Optional, List, Dict
import tempfile
import os
app = FastAPI()
class ProcessRequest(BaseModel):
file_paths: List[str]
chunk_size: int = 1000
chunk_overlap: int = 200
class ProcessStatus(BaseModel):
job_id: str
status: str
processed: int
total: int
errors: List[str]
# Initialize components
extractor = UniversalExtractor()
chunker = RecursiveChunker()
# Job tracking
jobs: dict[str, ProcessStatus] = {}
@app.post("/v1/process")
async def process_documents(
request: ProcessRequest,
background_tasks: BackgroundTasks
) -> dict:
"""Start document processing job."""
import uuid
job_id = str(uuid.uuid4())
jobs[job_id] = ProcessStatus(
job_id=job_id,
status="pending",
processed=0,
total=len(request.file_paths),
errors=[]
)
background_tasks.add_task(
process_documents_task,
job_id,
request.file_paths,
request.chunk_size,
request.chunk_overlap
)
return {"job_id": job_id}
async def process_documents_task(
job_id: str,
file_paths: list[str],
chunk_size: int,
chunk_overlap: int
):
"""Background task for processing."""
jobs[job_id].status = "processing"
chunker = RecursiveChunker(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap
)
for path in file_paths:
try:
# Extract
extracted = extractor.extract(path)
# Chunk
chunks = chunker.chunk(extracted.content, extracted.metadata)
# Store results (would save to database)
jobs[job_id].processed += 1
except Exception as e:
jobs[job_id].errors.append(f"{path}: {str(e)}")
jobs[job_id].status = "completed"
@app.get("/v1/jobs/{job_id}")
async def get_job_status(job_id: str) -> ProcessStatus:
"""Get job status."""
if job_id not in jobs:
raise HTTPException(status_code=404, detail="Job not found")
return jobs[job_id]
@app.post("/v1/upload")
async def upload_and_process(
file: UploadFile = File(...)
) -> dict:
"""Upload and process a single file."""
# Save to temp file
with tempfile.NamedTemporaryFile(delete=False, suffix=file.filename) as tmp:
content = await file.read()
tmp.write(content)
tmp_path = tmp.name
try:
# Extract
extracted = extractor.extract(tmp_path)
# Chunk
chunks = chunker.chunk(extracted.content, extracted.metadata)
return {
"filename": file.filename,
"content_length": len(extracted.content),
"chunk_count": len(chunks),
"metadata": extracted.metadata,
"chunks": [
{
"index": c.chunk_index,
"length": len(c.content),
"preview": c.content[:200]
}
for c in chunks[:5] # First 5 chunks
]
}
finally:
os.unlink(tmp_path)
@app.post("/v1/chunk")
async def chunk_text(
text: str,
chunk_size: int = 1000,
chunk_overlap: int = 200,
strategy: str = "recursive"
) -> list[dict]:
"""Chunk raw text."""
if strategy == "recursive":
chunker = RecursiveChunker(chunk_size, chunk_overlap)
elif strategy == "sentence":
chunker = SentenceChunker()
else:
chunker = FixedSizeChunker(chunk_size, chunk_overlap)
chunks = chunker.chunk(text)
return [
{
"index": c.chunk_index,
"content": c.content,
"length": len(c.content)
}
for c in chunks
]
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- LangChain Text Splitters: https://python.langchain.com/docs/modules/data_connection/document_transformers/
- LlamaIndex Node Parsers: https://docs.llamaindex.ai/en/stable/module_guides/loading/node_parsers/
- Unstructured: https://unstructured.io/
- PyMuPDF: https://pymupdf.readthedocs.io/
- Beautiful Soup: https://www.crummy.com/software/BeautifulSoup/bs4/doc/
Conclusion
Document processing is where RAG quality begins. A well-designed pipeline extracts content accurately from diverse formats, preserves important structure and metadata, and chunks text in ways that maintain semantic coherence. Start with robust extraction: use specialized libraries for each format (PyMuPDF for PDFs, python-docx for Word, BeautifulSoup for HTML) and handle edge cases gracefully. Chunking strategy matters more than you might expect—fixed-size chunks are simple but can split sentences mid-thought; semantic chunking preserves meaning but requires embedding calls; recursive chunking offers a good balance by respecting document structure. Always include overlap between chunks to maintain context across boundaries. For production systems, build incremental pipelines that only reprocess changed documents, implement proper error handling and retry logic, and monitor processing metrics. Consider preprocessing steps like removing boilerplate, normalizing whitespace, and handling special characters. The goal is to transform raw documents into clean, well-structured chunks that give your embedding model and LLM the best possible input for accurate retrieval and generation.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.