Latest Articles

Document Processing Pipelines: From Raw Files to Vector-Ready Chunks

Introduction: Document processing is the foundation of any RAG (Retrieval-Augmented Generation) system. Before you can search and retrieve relevant information, you need to extract text from various file formats, split it into meaningful chunks, and generate embeddings for vector search. The quality of your document processing pipeline directly impacts retrieval accuracy and ultimately the quality of your AI application’s responses. A well-designed pipeline handles diverse formats (PDF, Word, HTML, Markdown), preserves document structure and metadata, chunks text intelligently to maintain semantic coherence, and processes documents efficiently at scale. This guide covers practical patterns for building robust document processing pipelines: from basic text extraction to sophisticated chunking strategies and production-ready ingestion systems.

Document Processing Pipeline
Document Processing: Extract, Chunk, Embed

Document Extraction

from dataclasses import dataclass, field
from typing import Any, Optional, List, Dict
from abc import ABC, abstractmethod
from pathlib import Path
from enum import Enum
import mimetypes

class DocumentType(Enum):
    """Supported document types."""
    
    PDF = "pdf"
    DOCX = "docx"
    HTML = "html"
    MARKDOWN = "markdown"
    TEXT = "text"
    CSV = "csv"
    JSON = "json"

@dataclass
class ExtractedDocument:
    """Extracted document content."""
    
    content: str
    metadata: dict = field(default_factory=dict)
    pages: list[str] = field(default_factory=list)
    tables: list[dict] = field(default_factory=list)
    images: list[dict] = field(default_factory=list)
    
    @property
    def page_count(self) -> int:
        return len(self.pages) if self.pages else 1

class DocumentExtractor(ABC):
    """Abstract document extractor."""
    
    @abstractmethod
    def extract(self, file_path: str) -> ExtractedDocument:
        """Extract content from document."""
        pass
    
    @abstractmethod
    def supports(self, file_path: str) -> bool:
        """Check if extractor supports file type."""
        pass

class PDFExtractor(DocumentExtractor):
    """Extract content from PDF files."""
    
    def supports(self, file_path: str) -> bool:
        return file_path.lower().endswith('.pdf')
    
    def extract(self, file_path: str) -> ExtractedDocument:
        """Extract text from PDF."""
        
        import fitz  # PyMuPDF
        
        doc = fitz.open(file_path)
        
        pages = []
        tables = []
        images = []
        
        for page_num, page in enumerate(doc):
            # Extract text
            text = page.get_text()
            pages.append(text)
            
            # Extract tables (simplified)
            page_tables = page.find_tables()
            for table in page_tables:
                tables.append({
                    "page": page_num,
                    "data": table.extract()
                })
            
            # Extract images
            for img_index, img in enumerate(page.get_images()):
                images.append({
                    "page": page_num,
                    "index": img_index,
                    "xref": img[0]
                })
        
        metadata = {
            "title": doc.metadata.get("title", ""),
            "author": doc.metadata.get("author", ""),
            "page_count": len(pages),
            "file_path": file_path
        }
        
        doc.close()
        
        return ExtractedDocument(
            content="\n\n".join(pages),
            metadata=metadata,
            pages=pages,
            tables=tables,
            images=images
        )

class DocxExtractor(DocumentExtractor):
    """Extract content from Word documents."""
    
    def supports(self, file_path: str) -> bool:
        return file_path.lower().endswith('.docx')
    
    def extract(self, file_path: str) -> ExtractedDocument:
        """Extract text from DOCX."""
        
        from docx import Document
        
        doc = Document(file_path)
        
        paragraphs = []
        tables = []
        
        for para in doc.paragraphs:
            if para.text.strip():
                paragraphs.append(para.text)
        
        for table_idx, table in enumerate(doc.tables):
            table_data = []
            for row in table.rows:
                row_data = [cell.text for cell in row.cells]
                table_data.append(row_data)
            tables.append({
                "index": table_idx,
                "data": table_data
            })
        
        metadata = {
            "file_path": file_path,
            "paragraph_count": len(paragraphs)
        }
        
        return ExtractedDocument(
            content="\n\n".join(paragraphs),
            metadata=metadata,
            tables=tables
        )

class HTMLExtractor(DocumentExtractor):
    """Extract content from HTML files."""
    
    def supports(self, file_path: str) -> bool:
        return file_path.lower().endswith(('.html', '.htm'))
    
    def extract(self, file_path: str) -> ExtractedDocument:
        """Extract text from HTML."""
        
        from bs4 import BeautifulSoup
        
        with open(file_path, 'r', encoding='utf-8') as f:
            html = f.read()
        
        soup = BeautifulSoup(html, 'html.parser')
        
        # Remove script and style elements
        for element in soup(['script', 'style', 'nav', 'footer']):
            element.decompose()
        
        # Extract text
        text = soup.get_text(separator='\n', strip=True)
        
        # Extract title
        title = soup.title.string if soup.title else ""
        
        # Extract tables
        tables = []
        for table_idx, table in enumerate(soup.find_all('table')):
            rows = []
            for row in table.find_all('tr'):
                cells = [cell.get_text(strip=True) for cell in row.find_all(['td', 'th'])]
                rows.append(cells)
            tables.append({
                "index": table_idx,
                "data": rows
            })
        
        metadata = {
            "title": title,
            "file_path": file_path
        }
        
        return ExtractedDocument(
            content=text,
            metadata=metadata,
            tables=tables
        )

class MarkdownExtractor(DocumentExtractor):
    """Extract content from Markdown files."""
    
    def supports(self, file_path: str) -> bool:
        return file_path.lower().endswith(('.md', '.markdown'))
    
    def extract(self, file_path: str) -> ExtractedDocument:
        """Extract text from Markdown."""
        
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
        
        # Extract headers for structure
        import re
        headers = re.findall(r'^#+\s+(.+)$', content, re.MULTILINE)
        
        metadata = {
            "file_path": file_path,
            "headers": headers
        }
        
        return ExtractedDocument(
            content=content,
            metadata=metadata
        )

class UniversalExtractor:
    """Universal document extractor."""
    
    def __init__(self):
        self.extractors: list[DocumentExtractor] = [
            PDFExtractor(),
            DocxExtractor(),
            HTMLExtractor(),
            MarkdownExtractor()
        ]
    
    def extract(self, file_path: str) -> ExtractedDocument:
        """Extract content from any supported file."""
        
        for extractor in self.extractors:
            if extractor.supports(file_path):
                return extractor.extract(file_path)
        
        # Fallback to plain text
        with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
            content = f.read()
        
        return ExtractedDocument(
            content=content,
            metadata={"file_path": file_path}
        )
    
    def add_extractor(self, extractor: DocumentExtractor):
        """Add custom extractor."""
        
        self.extractors.insert(0, extractor)

Text Chunking Strategies

from dataclasses import dataclass
from typing import Any, Optional, List, Callable
from abc import ABC, abstractmethod
import re

@dataclass
class TextChunk:
    """A chunk of text."""
    
    content: str
    metadata: dict
    start_index: int
    end_index: int
    chunk_index: int

class ChunkingStrategy(ABC):
    """Abstract chunking strategy."""
    
    @abstractmethod
    def chunk(self, text: str, metadata: dict = None) -> list[TextChunk]:
        """Split text into chunks."""
        pass

class FixedSizeChunker(ChunkingStrategy):
    """Fixed size chunking with overlap."""
    
    def __init__(
        self,
        chunk_size: int = 1000,
        chunk_overlap: int = 200
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
    
    def chunk(self, text: str, metadata: dict = None) -> list[TextChunk]:
        """Split text into fixed-size chunks."""
        
        chunks = []
        start = 0
        chunk_index = 0
        
        while start < len(text):
            end = start + self.chunk_size
            
            # Try to break at sentence boundary
            if end < len(text):
                # Look for sentence end near chunk boundary
                search_start = max(end - 100, start)
                search_text = text[search_start:end + 100]
                
                sentence_ends = [m.end() for m in re.finditer(r'[.!?]\s', search_text)]
                
                if sentence_ends:
                    # Find closest to target
                    target = end - search_start
                    closest = min(sentence_ends, key=lambda x: abs(x - target))
                    end = search_start + closest
            
            chunk_text = text[start:end].strip()
            
            if chunk_text:
                chunks.append(TextChunk(
                    content=chunk_text,
                    metadata={**(metadata or {}), "chunk_index": chunk_index},
                    start_index=start,
                    end_index=end,
                    chunk_index=chunk_index
                ))
                chunk_index += 1
            
            start = end - self.chunk_overlap
        
        return chunks

class SentenceChunker(ChunkingStrategy):
    """Chunk by sentences."""
    
    def __init__(
        self,
        sentences_per_chunk: int = 5,
        overlap_sentences: int = 1
    ):
        self.sentences_per_chunk = sentences_per_chunk
        self.overlap_sentences = overlap_sentences
    
    def chunk(self, text: str, metadata: dict = None) -> list[TextChunk]:
        """Split text into sentence-based chunks."""
        
        # Split into sentences
        sentences = re.split(r'(?<=[.!?])\s+', text)
        sentences = [s.strip() for s in sentences if s.strip()]
        
        chunks = []
        chunk_index = 0
        i = 0
        
        while i < len(sentences):
            chunk_sentences = sentences[i:i + self.sentences_per_chunk]
            chunk_text = ' '.join(chunk_sentences)
            
            # Calculate character positions
            start_index = text.find(chunk_sentences[0]) if chunk_sentences else 0
            end_index = start_index + len(chunk_text)
            
            chunks.append(TextChunk(
                content=chunk_text,
                metadata={**(metadata or {}), "chunk_index": chunk_index},
                start_index=start_index,
                end_index=end_index,
                chunk_index=chunk_index
            ))
            
            chunk_index += 1
            i += self.sentences_per_chunk - self.overlap_sentences
        
        return chunks

class SemanticChunker(ChunkingStrategy):
    """Chunk by semantic boundaries."""
    
    def __init__(
        self,
        embedding_model: Any,
        similarity_threshold: float = 0.5,
        min_chunk_size: int = 100,
        max_chunk_size: int = 2000
    ):
        self.embedding_model = embedding_model
        self.similarity_threshold = similarity_threshold
        self.min_chunk_size = min_chunk_size
        self.max_chunk_size = max_chunk_size
    
    async def chunk(self, text: str, metadata: dict = None) -> list[TextChunk]:
        """Split text at semantic boundaries."""
        
        import numpy as np
        
        # Split into sentences
        sentences = re.split(r'(?<=[.!?])\s+', text)
        sentences = [s.strip() for s in sentences if s.strip()]
        
        if len(sentences) <= 1:
            return [TextChunk(
                content=text,
                metadata=metadata or {},
                start_index=0,
                end_index=len(text),
                chunk_index=0
            )]
        
        # Get embeddings for each sentence
        embeddings = await self.embedding_model.embed_batch(sentences)
        
        # Find semantic boundaries
        boundaries = [0]
        
        for i in range(1, len(sentences)):
            similarity = self._cosine_similarity(embeddings[i-1], embeddings[i])
            
            if similarity < self.similarity_threshold:
                boundaries.append(i)
        
        boundaries.append(len(sentences))
        
        # Create chunks from boundaries
        chunks = []
        chunk_index = 0
        
        for i in range(len(boundaries) - 1):
            start_sent = boundaries[i]
            end_sent = boundaries[i + 1]
            
            chunk_sentences = sentences[start_sent:end_sent]
            chunk_text = ' '.join(chunk_sentences)
            
            # Enforce size limits
            if len(chunk_text) > self.max_chunk_size:
                # Split large chunks
                sub_chunks = self._split_large_chunk(chunk_text, metadata, chunk_index)
                chunks.extend(sub_chunks)
                chunk_index += len(sub_chunks)
            elif len(chunk_text) >= self.min_chunk_size:
                chunks.append(TextChunk(
                    content=chunk_text,
                    metadata={**(metadata or {}), "chunk_index": chunk_index},
                    start_index=text.find(chunk_sentences[0]),
                    end_index=text.find(chunk_sentences[-1]) + len(chunk_sentences[-1]),
                    chunk_index=chunk_index
                ))
                chunk_index += 1
        
        return chunks
    
    def _cosine_similarity(self, a: list, b: list) -> float:
        import numpy as np
        a = np.array(a)
        b = np.array(b)
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
    
    def _split_large_chunk(
        self,
        text: str,
        metadata: dict,
        start_index: int
    ) -> list[TextChunk]:
        """Split oversized chunk."""
        
        chunker = FixedSizeChunker(
            chunk_size=self.max_chunk_size,
            chunk_overlap=100
        )
        
        sub_chunks = chunker.chunk(text, metadata)
        
        # Update indices
        for i, chunk in enumerate(sub_chunks):
            chunk.chunk_index = start_index + i
        
        return sub_chunks

class RecursiveChunker(ChunkingStrategy):
    """Recursively chunk by structure."""
    
    def __init__(
        self,
        chunk_size: int = 1000,
        chunk_overlap: int = 200,
        separators: list[str] = None
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.separators = separators or [
            "\n\n",  # Paragraphs
            "\n",    # Lines
            ". ",    # Sentences
            " ",     # Words
        ]
    
    def chunk(self, text: str, metadata: dict = None) -> list[TextChunk]:
        """Recursively split text."""
        
        return self._recursive_split(text, self.separators, metadata or {})
    
    def _recursive_split(
        self,
        text: str,
        separators: list[str],
        metadata: dict
    ) -> list[TextChunk]:
        """Recursively split using separators."""
        
        if len(text) <= self.chunk_size:
            return [TextChunk(
                content=text,
                metadata=metadata,
                start_index=0,
                end_index=len(text),
                chunk_index=0
            )]
        
        if not separators:
            # No more separators, force split
            return self._force_split(text, metadata)
        
        separator = separators[0]
        remaining_separators = separators[1:]
        
        splits = text.split(separator)
        
        chunks = []
        current_chunk = ""
        chunk_index = 0
        
        for split in splits:
            if len(current_chunk) + len(split) + len(separator) <= self.chunk_size:
                current_chunk += split + separator
            else:
                if current_chunk:
                    # Recursively process if still too large
                    if len(current_chunk) > self.chunk_size:
                        sub_chunks = self._recursive_split(
                            current_chunk,
                            remaining_separators,
                            metadata
                        )
                        for sc in sub_chunks:
                            sc.chunk_index = chunk_index
                            chunk_index += 1
                        chunks.extend(sub_chunks)
                    else:
                        chunks.append(TextChunk(
                            content=current_chunk.strip(),
                            metadata={**metadata, "chunk_index": chunk_index},
                            start_index=0,
                            end_index=len(current_chunk),
                            chunk_index=chunk_index
                        ))
                        chunk_index += 1
                
                current_chunk = split + separator
        
        # Handle remaining
        if current_chunk.strip():
            chunks.append(TextChunk(
                content=current_chunk.strip(),
                metadata={**metadata, "chunk_index": chunk_index},
                start_index=0,
                end_index=len(current_chunk),
                chunk_index=chunk_index
            ))
        
        return chunks
    
    def _force_split(self, text: str, metadata: dict) -> list[TextChunk]:
        """Force split at chunk_size."""
        
        chunks = []
        start = 0
        chunk_index = 0
        
        while start < len(text):
            end = min(start + self.chunk_size, len(text))
            
            chunks.append(TextChunk(
                content=text[start:end],
                metadata={**metadata, "chunk_index": chunk_index},
                start_index=start,
                end_index=end,
                chunk_index=chunk_index
            ))
            
            chunk_index += 1
            start = end - self.chunk_overlap
        
        return chunks

Document Pipeline

from dataclasses import dataclass
from typing import Any, Optional, List, Callable
import asyncio

@dataclass
class ProcessedDocument:
    """Fully processed document."""
    
    document_id: str
    source_path: str
    chunks: list[TextChunk]
    embeddings: list[list[float]]
    metadata: dict

class DocumentPipeline:
    """Complete document processing pipeline."""
    
    def __init__(
        self,
        extractor: UniversalExtractor,
        chunker: ChunkingStrategy,
        embedding_model: Any
    ):
        self.extractor = extractor
        self.chunker = chunker
        self.embedding_model = embedding_model
        self.preprocessors: list[Callable] = []
        self.postprocessors: list[Callable] = []
    
    def add_preprocessor(self, func: Callable[[str], str]):
        """Add text preprocessor."""
        
        self.preprocessors.append(func)
    
    def add_postprocessor(self, func: Callable[[TextChunk], TextChunk]):
        """Add chunk postprocessor."""
        
        self.postprocessors.append(func)
    
    async def process(
        self,
        file_path: str,
        document_id: str = None
    ) -> ProcessedDocument:
        """Process a single document."""
        
        import uuid
        
        document_id = document_id or str(uuid.uuid4())
        
        # Extract content
        extracted = self.extractor.extract(file_path)
        
        # Preprocess text
        text = extracted.content
        for preprocessor in self.preprocessors:
            text = preprocessor(text)
        
        # Chunk text
        chunks = self.chunker.chunk(text, extracted.metadata)
        
        # Postprocess chunks
        for postprocessor in self.postprocessors:
            chunks = [postprocessor(chunk) for chunk in chunks]
        
        # Generate embeddings
        chunk_texts = [chunk.content for chunk in chunks]
        embeddings = await self.embedding_model.embed_batch(chunk_texts)
        
        return ProcessedDocument(
            document_id=document_id,
            source_path=file_path,
            chunks=chunks,
            embeddings=embeddings,
            metadata=extracted.metadata
        )
    
    async def process_batch(
        self,
        file_paths: list[str],
        concurrency: int = 5
    ) -> list[ProcessedDocument]:
        """Process multiple documents."""
        
        semaphore = asyncio.Semaphore(concurrency)
        
        async def process_with_semaphore(path: str) -> ProcessedDocument:
            async with semaphore:
                return await self.process(path)
        
        tasks = [process_with_semaphore(path) for path in file_paths]
        
        return await asyncio.gather(*tasks)

class IncrementalPipeline:
    """Pipeline with incremental updates."""
    
    def __init__(
        self,
        pipeline: DocumentPipeline,
        vector_store: Any
    ):
        self.pipeline = pipeline
        self.vector_store = vector_store
        self.processed_docs: dict[str, str] = {}  # path -> hash
    
    def _file_hash(self, file_path: str) -> str:
        """Calculate file hash."""
        
        import hashlib
        
        with open(file_path, 'rb') as f:
            return hashlib.md5(f.read()).hexdigest()
    
    async def sync(self, file_paths: list[str]) -> dict:
        """Sync documents with vector store."""
        
        results = {
            "added": [],
            "updated": [],
            "unchanged": [],
            "removed": []
        }
        
        current_paths = set(file_paths)
        
        # Find removed documents
        for path in list(self.processed_docs.keys()):
            if path not in current_paths:
                # Remove from vector store
                await self.vector_store.delete(
                    filter={"source_path": path}
                )
                del self.processed_docs[path]
                results["removed"].append(path)
        
        # Process new and updated documents
        for path in file_paths:
            file_hash = self._file_hash(path)
            
            if path not in self.processed_docs:
                # New document
                doc = await self.pipeline.process(path)
                await self._index_document(doc)
                self.processed_docs[path] = file_hash
                results["added"].append(path)
            
            elif self.processed_docs[path] != file_hash:
                # Updated document
                await self.vector_store.delete(
                    filter={"source_path": path}
                )
                doc = await self.pipeline.process(path)
                await self._index_document(doc)
                self.processed_docs[path] = file_hash
                results["updated"].append(path)
            
            else:
                results["unchanged"].append(path)
        
        return results
    
    async def _index_document(self, doc: ProcessedDocument):
        """Index document in vector store."""
        
        for chunk, embedding in zip(doc.chunks, doc.embeddings):
            await self.vector_store.upsert(
                id=f"{doc.document_id}_{chunk.chunk_index}",
                embedding=embedding,
                metadata={
                    "document_id": doc.document_id,
                    "source_path": doc.source_path,
                    "chunk_index": chunk.chunk_index,
                    "content": chunk.content,
                    **chunk.metadata
                }
            )

Production Processing Service

from fastapi import FastAPI, HTTPException, UploadFile, File, BackgroundTasks
from pydantic import BaseModel
from typing import Optional, List, Dict
import tempfile
import os

app = FastAPI()

class ProcessRequest(BaseModel):
    file_paths: List[str]
    chunk_size: int = 1000
    chunk_overlap: int = 200

class ProcessStatus(BaseModel):
    job_id: str
    status: str
    processed: int
    total: int
    errors: List[str]

# Initialize components
extractor = UniversalExtractor()
chunker = RecursiveChunker()

# Job tracking
jobs: dict[str, ProcessStatus] = {}

@app.post("/v1/process")
async def process_documents(
    request: ProcessRequest,
    background_tasks: BackgroundTasks
) -> dict:
    """Start document processing job."""
    
    import uuid
    
    job_id = str(uuid.uuid4())
    
    jobs[job_id] = ProcessStatus(
        job_id=job_id,
        status="pending",
        processed=0,
        total=len(request.file_paths),
        errors=[]
    )
    
    background_tasks.add_task(
        process_documents_task,
        job_id,
        request.file_paths,
        request.chunk_size,
        request.chunk_overlap
    )
    
    return {"job_id": job_id}

async def process_documents_task(
    job_id: str,
    file_paths: list[str],
    chunk_size: int,
    chunk_overlap: int
):
    """Background task for processing."""
    
    jobs[job_id].status = "processing"
    
    chunker = RecursiveChunker(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap
    )
    
    for path in file_paths:
        try:
            # Extract
            extracted = extractor.extract(path)
            
            # Chunk
            chunks = chunker.chunk(extracted.content, extracted.metadata)
            
            # Store results (would save to database)
            jobs[job_id].processed += 1
            
        except Exception as e:
            jobs[job_id].errors.append(f"{path}: {str(e)}")
    
    jobs[job_id].status = "completed"

@app.get("/v1/jobs/{job_id}")
async def get_job_status(job_id: str) -> ProcessStatus:
    """Get job status."""
    
    if job_id not in jobs:
        raise HTTPException(status_code=404, detail="Job not found")
    
    return jobs[job_id]

@app.post("/v1/upload")
async def upload_and_process(
    file: UploadFile = File(...)
) -> dict:
    """Upload and process a single file."""
    
    # Save to temp file
    with tempfile.NamedTemporaryFile(delete=False, suffix=file.filename) as tmp:
        content = await file.read()
        tmp.write(content)
        tmp_path = tmp.name
    
    try:
        # Extract
        extracted = extractor.extract(tmp_path)
        
        # Chunk
        chunks = chunker.chunk(extracted.content, extracted.metadata)
        
        return {
            "filename": file.filename,
            "content_length": len(extracted.content),
            "chunk_count": len(chunks),
            "metadata": extracted.metadata,
            "chunks": [
                {
                    "index": c.chunk_index,
                    "length": len(c.content),
                    "preview": c.content[:200]
                }
                for c in chunks[:5]  # First 5 chunks
            ]
        }
    
    finally:
        os.unlink(tmp_path)

@app.post("/v1/chunk")
async def chunk_text(
    text: str,
    chunk_size: int = 1000,
    chunk_overlap: int = 200,
    strategy: str = "recursive"
) -> list[dict]:
    """Chunk raw text."""
    
    if strategy == "recursive":
        chunker = RecursiveChunker(chunk_size, chunk_overlap)
    elif strategy == "sentence":
        chunker = SentenceChunker()
    else:
        chunker = FixedSizeChunker(chunk_size, chunk_overlap)
    
    chunks = chunker.chunk(text)
    
    return [
        {
            "index": c.chunk_index,
            "content": c.content,
            "length": len(c.content)
        }
        for c in chunks
    ]

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Document processing is where RAG quality begins. A well-designed pipeline extracts content accurately from diverse formats, preserves important structure and metadata, and chunks text in ways that maintain semantic coherence. Start with robust extraction: use specialized libraries for each format (PyMuPDF for PDFs, python-docx for Word, BeautifulSoup for HTML) and handle edge cases gracefully. Chunking strategy matters more than you might expect—fixed-size chunks are simple but can split sentences mid-thought; semantic chunking preserves meaning but requires embedding calls; recursive chunking offers a good balance by respecting document structure. Always include overlap between chunks to maintain context across boundaries. For production systems, build incremental pipelines that only reprocess changed documents, implement proper error handling and retry logic, and monitor processing metrics. Consider preprocessing steps like removing boilerplate, normalizing whitespace, and handling special characters. The goal is to transform raw documents into clean, well-structured chunks that give your embedding model and LLM the best possible input for accurate retrieval and generation.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

About the Author

I am a Cloud Architect and Developer passionate about solving complex problems with modern technology. My blog explores the intersection of Cloud Architecture, Artificial Intelligence, and Software Engineering. I share tutorials, deep dives, and insights into building scalable, intelligent systems.

Areas of Expertise

Cloud Architecture (Azure, AWS)
Artificial Intelligence & LLMs
DevOps & Kubernetes
Backend Dev (C#, .NET, Python, Node.js)
© 2025 Code, Cloud & Context | Built by Nithin Mohan TK | Powered by Passion