Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

Document Processing with LLMs: From PDFs to Structured Data

Introduction: Documents are everywhere—PDFs, Word files, scanned images, spreadsheets. Extracting structured information from unstructured documents is one of the most valuable LLM applications. This guide covers building document processing pipelines: extracting text from various formats, chunking strategies for long documents, processing with LLMs for extraction and summarization, and handling edge cases like tables, images, and multi-column layouts. These patterns apply to invoice processing, contract analysis, research paper summarization, and any workflow involving document understanding.

Document Processing
Document Processing: From Raw Documents to Structured Output

Text Extraction

# pip install pypdf python-docx openpyxl

from pathlib import Path
from typing import Union
import io

def extract_from_pdf(file_path: Union[str, Path]) -> str:
    """Extract text from PDF."""
    
    from pypdf import PdfReader
    
    reader = PdfReader(file_path)
    
    text_parts = []
    for page in reader.pages:
        text = page.extract_text()
        if text:
            text_parts.append(text)
    
    return "\n\n".join(text_parts)

def extract_from_docx(file_path: Union[str, Path]) -> str:
    """Extract text from Word document."""
    
    from docx import Document
    
    doc = Document(file_path)
    
    text_parts = []
    for para in doc.paragraphs:
        if para.text.strip():
            text_parts.append(para.text)
    
    # Also extract from tables
    for table in doc.tables:
        for row in table.rows:
            row_text = " | ".join(cell.text for cell in row.cells)
            if row_text.strip():
                text_parts.append(row_text)
    
    return "\n\n".join(text_parts)

def extract_from_xlsx(file_path: Union[str, Path]) -> str:
    """Extract text from Excel spreadsheet."""
    
    from openpyxl import load_workbook
    
    wb = load_workbook(file_path, data_only=True)
    
    text_parts = []
    for sheet_name in wb.sheetnames:
        sheet = wb[sheet_name]
        text_parts.append(f"Sheet: {sheet_name}")
        
        for row in sheet.iter_rows(values_only=True):
            row_text = " | ".join(str(cell) if cell else "" for cell in row)
            if row_text.strip(" |"):
                text_parts.append(row_text)
    
    return "\n\n".join(text_parts)

def extract_text(file_path: Union[str, Path]) -> str:
    """Extract text from any supported document type."""
    
    path = Path(file_path)
    suffix = path.suffix.lower()
    
    extractors = {
        ".pdf": extract_from_pdf,
        ".docx": extract_from_docx,
        ".xlsx": extract_from_xlsx,
        ".txt": lambda p: Path(p).read_text(),
        ".md": lambda p: Path(p).read_text(),
    }
    
    if suffix not in extractors:
        raise ValueError(f"Unsupported file type: {suffix}")
    
    return extractors[suffix](path)

# Usage
text = extract_text("contract.pdf")
print(f"Extracted {len(text)} characters")

Smart Chunking

from dataclasses import dataclass
from typing import Iterator
import re

@dataclass
class Chunk:
    text: str
    index: int
    metadata: dict

class DocumentChunker:
    """Split documents into processable chunks."""
    
    def __init__(
        self,
        chunk_size: int = 1000,
        chunk_overlap: int = 200,
        separators: list[str] = None
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.separators = separators or ["\n\n", "\n", ". ", " "]
    
    def chunk_by_size(self, text: str) -> Iterator[Chunk]:
        """Simple size-based chunking with overlap."""
        
        start = 0
        index = 0
        
        while start < len(text):
            end = start + self.chunk_size
            
            # Try to break at a natural boundary
            if end < len(text):
                for sep in self.separators:
                    last_sep = text.rfind(sep, start, end)
                    if last_sep > start:
                        end = last_sep + len(sep)
                        break
            
            chunk_text = text[start:end].strip()
            
            if chunk_text:
                yield Chunk(
                    text=chunk_text,
                    index=index,
                    metadata={"start": start, "end": end}
                )
                index += 1
            
            start = end - self.chunk_overlap
    
    def chunk_by_sections(self, text: str) -> Iterator[Chunk]:
        """Chunk by document sections (headers)."""
        
        # Split by markdown-style headers
        pattern = r'(^#{1,3}\s+.+$)'
        parts = re.split(pattern, text, flags=re.MULTILINE)
        
        current_header = ""
        current_content = []
        index = 0
        
        for part in parts:
            if re.match(r'^#{1,3}\s+', part):
                # This is a header
                if current_content:
                    yield Chunk(
                        text="\n".join(current_content),
                        index=index,
                        metadata={"header": current_header}
                    )
                    index += 1
                
                current_header = part.strip()
                current_content = [current_header]
            else:
                current_content.append(part.strip())
        
        # Don't forget the last section
        if current_content:
            yield Chunk(
                text="\n".join(current_content),
                index=index,
                metadata={"header": current_header}
            )
    
    def chunk_semantic(self, text: str) -> Iterator[Chunk]:
        """Chunk by semantic similarity (paragraph grouping)."""
        
        paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]
        
        current_chunk = []
        current_size = 0
        index = 0
        
        for para in paragraphs:
            para_size = len(para)
            
            if current_size + para_size > self.chunk_size and current_chunk:
                yield Chunk(
                    text="\n\n".join(current_chunk),
                    index=index,
                    metadata={"paragraphs": len(current_chunk)}
                )
                index += 1
                
                # Keep last paragraph for context
                current_chunk = [current_chunk[-1]] if current_chunk else []
                current_size = len(current_chunk[0]) if current_chunk else 0
            
            current_chunk.append(para)
            current_size += para_size
        
        if current_chunk:
            yield Chunk(
                text="\n\n".join(current_chunk),
                index=index,
                metadata={"paragraphs": len(current_chunk)}
            )

# Usage
chunker = DocumentChunker(chunk_size=1500, chunk_overlap=200)

text = extract_text("long_document.pdf")
chunks = list(chunker.chunk_by_size(text))

print(f"Split into {len(chunks)} chunks")

LLM Document Processing

from openai import OpenAI
from pydantic import BaseModel
from typing import Optional
import json

client = OpenAI()

class ExtractedEntity(BaseModel):
    name: str
    type: str
    value: str
    confidence: float

class DocumentSummary(BaseModel):
    title: str
    summary: str
    key_points: list[str]
    entities: list[ExtractedEntity]

def summarize_document(text: str, max_length: int = 500) -> str:
    """Summarize a document."""
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": f"Summarize documents concisely in under {max_length} words."
            },
            {
                "role": "user",
                "content": f"Summarize this document:\n\n{text[:10000]}"
            }
        ]
    )
    
    return response.choices[0].message.content

def extract_entities(text: str, entity_types: list[str]) -> list[ExtractedEntity]:
    """Extract specific entities from text."""
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": f"""Extract entities of these types: {', '.join(entity_types)}.
Return as JSON array: [{{"name": "...", "type": "...", "value": "...", "confidence": 0.0-1.0}}]"""
            },
            {
                "role": "user",
                "content": text[:8000]
            }
        ],
        response_format={"type": "json_object"}
    )
    
    data = json.loads(response.choices[0].message.content)
    entities = data.get("entities", data) if isinstance(data, dict) else data
    
    return [ExtractedEntity(**e) for e in entities]

def process_document_chunks(
    chunks: list[Chunk],
    processor: callable,
    combine: callable = None
) -> any:
    """Process document chunks and optionally combine results."""
    
    results = []
    
    for chunk in chunks:
        result = processor(chunk.text)
        results.append({
            "chunk_index": chunk.index,
            "result": result,
            "metadata": chunk.metadata
        })
    
    if combine:
        return combine(results)
    
    return results

# Usage
text = extract_text("research_paper.pdf")
chunks = list(DocumentChunker(chunk_size=2000).chunk_by_size(text))

# Summarize each chunk
chunk_summaries = process_document_chunks(
    chunks,
    processor=lambda t: summarize_document(t, max_length=100)
)

# Combine into final summary
all_summaries = "\n\n".join(r["result"] for r in chunk_summaries)
final_summary = summarize_document(all_summaries, max_length=300)

print(final_summary)

Invoice Processing

from pydantic import BaseModel
from typing import Optional
from datetime import date

class LineItem(BaseModel):
    description: str
    quantity: float
    unit_price: float
    total: float

class Invoice(BaseModel):
    invoice_number: str
    invoice_date: Optional[str]
    due_date: Optional[str]
    vendor_name: str
    vendor_address: Optional[str]
    customer_name: Optional[str]
    line_items: list[LineItem]
    subtotal: float
    tax: Optional[float]
    total: float
    currency: str = "USD"

def extract_invoice(text: str) -> Invoice:
    """Extract structured invoice data from text."""
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": """Extract invoice information into structured JSON.
Include: invoice_number, invoice_date, due_date, vendor_name, vendor_address,
customer_name, line_items (description, quantity, unit_price, total),
subtotal, tax, total, currency.
Use null for missing fields. Dates in YYYY-MM-DD format."""
            },
            {
                "role": "user",
                "content": f"Extract invoice data:\n\n{text}"
            }
        ],
        response_format={"type": "json_object"}
    )
    
    data = json.loads(response.choices[0].message.content)
    return Invoice(**data)

def process_invoice_batch(file_paths: list[str]) -> list[dict]:
    """Process multiple invoices."""
    
    results = []
    
    for path in file_paths:
        try:
            text = extract_text(path)
            invoice = extract_invoice(text)
            
            results.append({
                "file": path,
                "status": "success",
                "invoice": invoice.model_dump()
            })
        except Exception as e:
            results.append({
                "file": path,
                "status": "error",
                "error": str(e)
            })
    
    return results

# Usage
invoice_files = ["invoice1.pdf", "invoice2.pdf", "invoice3.pdf"]
results = process_invoice_batch(invoice_files)

# Export to CSV
import csv

with open("invoices.csv", "w", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=[
        "file", "invoice_number", "vendor_name", "total", "currency"
    ])
    writer.writeheader()
    
    for r in results:
        if r["status"] == "success":
            inv = r["invoice"]
            writer.writerow({
                "file": r["file"],
                "invoice_number": inv["invoice_number"],
                "vendor_name": inv["vendor_name"],
                "total": inv["total"],
                "currency": inv["currency"]
            })

Contract Analysis

from pydantic import BaseModel
from enum import Enum

class RiskLevel(str, Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"

class ContractClause(BaseModel):
    clause_type: str
    text: str
    risk_level: RiskLevel
    summary: str
    recommendations: list[str]

class ContractAnalysis(BaseModel):
    contract_type: str
    parties: list[str]
    effective_date: Optional[str]
    termination_date: Optional[str]
    key_terms: list[str]
    obligations: list[str]
    risky_clauses: list[ContractClause]
    overall_risk: RiskLevel
    summary: str

def analyze_contract(text: str) -> ContractAnalysis:
    """Analyze a contract for key terms and risks."""
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": """You are a legal contract analyst. Analyze contracts for:
1. Contract type and parties
2. Key dates (effective, termination)
3. Key terms and obligations
4. Risky clauses with risk levels (low/medium/high)
5. Overall risk assessment
6. Summary and recommendations

Return structured JSON matching the ContractAnalysis schema."""
            },
            {
                "role": "user",
                "content": f"Analyze this contract:\n\n{text[:15000]}"
            }
        ],
        response_format={"type": "json_object"}
    )
    
    data = json.loads(response.choices[0].message.content)
    return ContractAnalysis(**data)

def compare_contracts(contract1: str, contract2: str) -> dict:
    """Compare two contracts for differences."""
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": """Compare two contracts and identify:
1. Key differences in terms
2. Added/removed clauses
3. Changed obligations
4. Risk implications of differences"""
            },
            {
                "role": "user",
                "content": f"""Contract 1:
{contract1[:7000]}

Contract 2:
{contract2[:7000]}

Compare these contracts:"""
            }
        ],
        response_format={"type": "json_object"}
    )
    
    return json.loads(response.choices[0].message.content)

# Usage
contract_text = extract_text("service_agreement.pdf")
analysis = analyze_contract(contract_text)

print(f"Contract Type: {analysis.contract_type}")
print(f"Parties: {', '.join(analysis.parties)}")
print(f"Overall Risk: {analysis.overall_risk}")

for clause in analysis.risky_clauses:
    if clause.risk_level == RiskLevel.HIGH:
        print(f"\nHIGH RISK: {clause.clause_type}")
        print(f"  {clause.summary}")
        for rec in clause.recommendations:
            print(f"  - {rec}")

References

Conclusion

Document processing with LLMs unlocks value from unstructured data at scale. Start with reliable text extraction—pypdf for PDFs, python-docx for Word files. Implement smart chunking that respects document structure rather than arbitrary character limits. Use structured output (JSON mode) for reliable entity extraction. Build specialized processors for common document types like invoices and contracts. For production systems, add error handling, validation, and human review workflows for high-stakes decisions. The combination of traditional document parsing and LLM understanding creates powerful automation for document-heavy workflows that previously required manual processing.

Building AI Agents with Tool Use: From ReAct to Production Systems

Introduction: AI agents represent the next evolution beyond simple chatbots—they can reason about problems, break them into steps, use external tools, and iterate until they achieve a goal. Unlike traditional LLM applications that respond to a single prompt, agents maintain state, make decisions, and take actions in the real world. The key innovation is tool use: giving LLMs the ability to search the web, execute code, query databases, and interact with APIs. This guide covers the fundamentals of building AI agents, from the ReAct pattern to production-ready implementations with LangChain and OpenAI’s function calling.

AI Agents with Tool Use Architecture
AI Agents: Reasoning, Planning, and Tool Execution

The ReAct Pattern

ReAct (Reasoning + Acting) is the foundational pattern for AI agents. The agent follows a loop: think about what to do next (Thought), take an action using a tool (Action), observe the result (Observation), and repeat until the task is complete. This explicit reasoning trace makes agents more interpretable and allows them to recover from errors.

from openai import OpenAI
import json

client = OpenAI()

# Define available tools
tools = [
    {
        "type": "function",
        "function": {
            "name": "search_web",
            "description": "Search the web for current information",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {"type": "string", "description": "Search query"}
                },
                "required": ["query"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "execute_python",
            "description": "Execute Python code and return the result",
            "parameters": {
                "type": "object",
                "properties": {
                    "code": {"type": "string", "description": "Python code to execute"}
                },
                "required": ["code"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "read_file",
            "description": "Read contents of a file",
            "parameters": {
                "type": "object",
                "properties": {
                    "path": {"type": "string", "description": "File path to read"}
                },
                "required": ["path"]
            }
        }
    }
]

# Tool implementations
def search_web(query: str) -> str:
    # In production, use a real search API
    return f"Search results for '{query}': [Result 1, Result 2, Result 3]"

def execute_python(code: str) -> str:
    try:
        # WARNING: In production, use a sandboxed environment
        local_vars = {}
        exec(code, {"__builtins__": {}}, local_vars)
        return str(local_vars.get("result", "Code executed successfully"))
    except Exception as e:
        return f"Error: {str(e)}"

def read_file(path: str) -> str:
    try:
        with open(path, 'r') as f:
            return f.read()[:1000]  # Limit output
    except Exception as e:
        return f"Error reading file: {str(e)}"

tool_functions = {
    "search_web": search_web,
    "execute_python": execute_python,
    "read_file": read_file
}

def run_agent(user_query: str, max_iterations: int = 10) -> str:
    """Run an agent loop until task completion."""
    
    messages = [
        {"role": "system", "content": """You are a helpful AI agent that can use tools to accomplish tasks.
Think step by step about what you need to do.
Use tools when needed to gather information or take actions.
When you have enough information to answer, provide your final response."""},
        {"role": "user", "content": user_query}
    ]
    
    for i in range(max_iterations):
        response = client.chat.completions.create(
            model="gpt-4-turbo-preview",
            messages=messages,
            tools=tools,
            tool_choice="auto"
        )
        
        message = response.choices[0].message
        messages.append(message)
        
        # Check if agent wants to use tools
        if message.tool_calls:
            for tool_call in message.tool_calls:
                function_name = tool_call.function.name
                arguments = json.loads(tool_call.function.arguments)
                
                print(f"[Agent] Calling {function_name} with {arguments}")
                
                # Execute the tool
                result = tool_functions[function_name](**arguments)
                
                # Add tool result to messages
                messages.append({
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": result
                })
        else:
            # No tool calls - agent is done
            return message.content
    
    return "Max iterations reached"

# Example usage
result = run_agent("What's the current weather in Tokyo and convert 25°C to Fahrenheit?")
print(result)

Building Agents with LangChain

from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_openai import ChatOpenAI
from langchain.tools import Tool, StructuredTool
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_community.tools import DuckDuckGoSearchRun
from pydantic import BaseModel, Field
import subprocess

# Initialize LLM
llm = ChatOpenAI(model="gpt-4-turbo-preview", temperature=0)

# Define tools
search = DuckDuckGoSearchRun()

class CalculatorInput(BaseModel):
    expression: str = Field(description="Mathematical expression to evaluate")

def calculate(expression: str) -> str:
    """Safely evaluate a mathematical expression."""
    try:
        # Only allow safe operations
        allowed_chars = set("0123456789+-*/.() ")
        if not all(c in allowed_chars for c in expression):
            return "Error: Invalid characters in expression"
        result = eval(expression)
        return str(result)
    except Exception as e:
        return f"Error: {str(e)}"

class ShellInput(BaseModel):
    command: str = Field(description="Shell command to execute")

def run_shell(command: str) -> str:
    """Execute a shell command (use with caution)."""
    # Whitelist safe commands
    safe_commands = ["ls", "pwd", "date", "whoami", "cat", "head", "tail", "wc"]
    cmd_parts = command.split()
    
    if not cmd_parts or cmd_parts[0] not in safe_commands:
        return f"Error: Command '{cmd_parts[0]}' not in allowed list"
    
    try:
        result = subprocess.run(
            command, shell=True, capture_output=True, text=True, timeout=10
        )
        return result.stdout or result.stderr
    except Exception as e:
        return f"Error: {str(e)}"

tools = [
    Tool(
        name="search",
        func=search.run,
        description="Search the web for current information. Input should be a search query."
    ),
    StructuredTool.from_function(
        func=calculate,
        name="calculator",
        description="Evaluate mathematical expressions. Input should be a valid math expression.",
        args_schema=CalculatorInput
    ),
    StructuredTool.from_function(
        func=run_shell,
        name="shell",
        description="Execute safe shell commands (ls, pwd, date, cat, etc.)",
        args_schema=ShellInput
    )
]

# Create agent prompt
prompt = ChatPromptTemplate.from_messages([
    ("system", """You are a helpful AI assistant with access to tools.
Use tools when you need to search for information, perform calculations, or interact with the system.
Always explain your reasoning before taking actions.
If you're unsure, search for more information before answering."""),
    ("human", "{input}"),
    MessagesPlaceholder(variable_name="agent_scratchpad")
])

# Create and run agent
agent = create_openai_tools_agent(llm, tools, prompt)
agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    verbose=True,
    max_iterations=10,
    handle_parsing_errors=True
)

# Run the agent
result = agent_executor.invoke({
    "input": "What is 15% of 847.50? Also, what files are in the current directory?"
})
print(result["output"])

Custom Tool Creation

from langchain.tools import BaseTool
from pydantic import BaseModel, Field
from typing import Optional, Type
import requests
import sqlite3

# Custom tool with complex logic
class DatabaseQueryInput(BaseModel):
    query: str = Field(description="SQL SELECT query to execute")
    database: str = Field(default="main.db", description="Database file path")

class DatabaseQueryTool(BaseTool):
    name: str = "database_query"
    description: str = "Execute read-only SQL queries against a SQLite database"
    args_schema: Type[BaseModel] = DatabaseQueryInput
    
    def _run(self, query: str, database: str = "main.db") -> str:
        # Security: Only allow SELECT queries
        if not query.strip().upper().startswith("SELECT"):
            return "Error: Only SELECT queries are allowed"
        
        try:
            conn = sqlite3.connect(database)
            cursor = conn.cursor()
            cursor.execute(query)
            
            columns = [desc[0] for desc in cursor.description]
            rows = cursor.fetchall()
            
            # Format as table
            result = " | ".join(columns) + "\n"
            result += "-" * len(result) + "\n"
            for row in rows:
                result += " | ".join(str(v) for v in row) + "\n"
            
            conn.close()
            return result
        except Exception as e:
            return f"Database error: {str(e)}"

# API integration tool
class WeatherInput(BaseModel):
    city: str = Field(description="City name for weather lookup")

class WeatherTool(BaseTool):
    name: str = "get_weather"
    description: str = "Get current weather for a city"
    args_schema: Type[BaseModel] = WeatherInput
    api_key: str = ""
    
    def _run(self, city: str) -> str:
        try:
            # Using OpenWeatherMap API
            url = f"http://api.openweathermap.org/data/2.5/weather"
            params = {"q": city, "appid": self.api_key, "units": "metric"}
            response = requests.get(url, params=params, timeout=10)
            data = response.json()
            
            if response.status_code == 200:
                return f"""Weather in {city}:
Temperature: {data['main']['temp']}°C
Feels like: {data['main']['feels_like']}°C
Humidity: {data['main']['humidity']}%
Conditions: {data['weather'][0]['description']}"""
            else:
                return f"Error: {data.get('message', 'Unknown error')}"
        except Exception as e:
            return f"Error fetching weather: {str(e)}"

# File manipulation tool with safety checks
class FileWriteInput(BaseModel):
    path: str = Field(description="File path to write to")
    content: str = Field(description="Content to write")
    mode: str = Field(default="w", description="Write mode: 'w' for overwrite, 'a' for append")

class SafeFileWriteTool(BaseTool):
    name: str = "write_file"
    description: str = "Write content to a file (restricted to safe directories)"
    args_schema: Type[BaseModel] = FileWriteInput
    allowed_dirs: list = ["/tmp", "./output"]
    
    def _run(self, path: str, content: str, mode: str = "w") -> str:
        import os
        
        # Security checks
        abs_path = os.path.abspath(path)
        if not any(abs_path.startswith(os.path.abspath(d)) for d in self.allowed_dirs):
            return f"Error: Path must be in allowed directories: {self.allowed_dirs}"
        
        if mode not in ["w", "a"]:
            return "Error: Mode must be 'w' or 'a'"
        
        try:
            os.makedirs(os.path.dirname(abs_path), exist_ok=True)
            with open(abs_path, mode) as f:
                f.write(content)
            return f"Successfully wrote {len(content)} characters to {path}"
        except Exception as e:
            return f"Error writing file: {str(e)}"

# Use custom tools
tools = [
    DatabaseQueryTool(),
    WeatherTool(api_key="your-api-key"),
    SafeFileWriteTool()
]

Agent Memory and Context

from langchain.memory import ConversationBufferWindowMemory, ConversationSummaryMemory
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder

llm = ChatOpenAI(model="gpt-4-turbo-preview", temperature=0)

# Window memory - keeps last N interactions
window_memory = ConversationBufferWindowMemory(
    memory_key="chat_history",
    return_messages=True,
    k=10  # Keep last 10 exchanges
)

# Summary memory - summarizes older conversations
summary_memory = ConversationSummaryMemory(
    llm=llm,
    memory_key="chat_history",
    return_messages=True
)

# Agent with memory
prompt = ChatPromptTemplate.from_messages([
    ("system", """You are a helpful AI assistant with memory of our conversation.
Use the chat history to maintain context and provide consistent responses.
Reference previous discussions when relevant."""),
    MessagesPlaceholder(variable_name="chat_history"),
    ("human", "{input}"),
    MessagesPlaceholder(variable_name="agent_scratchpad")
])

agent = create_openai_tools_agent(llm, tools, prompt)

agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    memory=window_memory,
    verbose=True
)

# Multi-turn conversation
agent_executor.invoke({"input": "My name is Alex and I'm working on a Python project"})
agent_executor.invoke({"input": "What's my name and what am I working on?"})
agent_executor.invoke({"input": "Can you help me with error handling in my project?"})

Error Handling and Reliability

from langchain.agents import AgentExecutor
from langchain.callbacks import get_openai_callback
import time

def run_agent_with_retry(
    agent_executor: AgentExecutor,
    input_text: str,
    max_retries: int = 3,
    timeout_seconds: int = 60
) -> dict:
    """Run agent with retry logic and timeout."""
    
    last_error = None
    
    for attempt in range(max_retries):
        try:
            with get_openai_callback() as cb:
                start_time = time.time()
                
                result = agent_executor.invoke(
                    {"input": input_text},
                    config={"max_execution_time": timeout_seconds}
                )
                
                elapsed = time.time() - start_time
                
                return {
                    "success": True,
                    "output": result["output"],
                    "tokens_used": cb.total_tokens,
                    "cost": cb.total_cost,
                    "elapsed_seconds": elapsed,
                    "attempts": attempt + 1
                }
                
        except Exception as e:
            last_error = e
            print(f"Attempt {attempt + 1} failed: {str(e)}")
            
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt  # Exponential backoff
                print(f"Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
    
    return {
        "success": False,
        "error": str(last_error),
        "attempts": max_retries
    }

# Graceful degradation
class RobustAgent:
    def __init__(self, agent_executor: AgentExecutor, fallback_llm):
        self.agent = agent_executor
        self.fallback = fallback_llm
    
    def run(self, query: str) -> str:
        # Try agent first
        result = run_agent_with_retry(self.agent, query)
        
        if result["success"]:
            return result["output"]
        
        # Fall back to simple LLM response
        print("Agent failed, falling back to simple LLM...")
        response = self.fallback.invoke(query)
        return f"[Fallback response] {response.content}"

References

Conclusion

AI agents with tool use represent a fundamental shift from passive question-answering to active problem-solving. By combining LLM reasoning with the ability to search, compute, and interact with external systems, agents can tackle complex tasks that require multiple steps and real-world information. Start with simple tools like search and calculation, then gradually add more capabilities as you understand the patterns. Remember that reliability is crucial—implement proper error handling, timeouts, and fallbacks. The ReAct pattern provides a solid foundation, while frameworks like LangChain accelerate development. As you build more sophisticated agents, focus on safety: validate inputs, restrict tool capabilities, and always maintain human oversight for critical operations.

Token Management for LLM Applications: Counting, Budgeting, and Cost Control

Introduction: Token management is critical for LLM applications—tokens directly impact cost, latency, and whether your prompt fits within context limits. Understanding how to count tokens accurately, truncate context intelligently, and allocate token budgets across different parts of your prompt separates amateur implementations from production-ready systems. This guide covers practical token management: counting with tiktoken, smart truncation strategies, budget allocation patterns, and techniques for maximizing information density within token limits.

Token Management
Token Management: Counting, Truncation, and Budget Allocation

Token Counting with Tiktoken

import tiktoken
from typing import Union

class TokenCounter:
    """Count tokens for different models."""
    
    # Model to encoding mapping
    MODEL_ENCODINGS = {
        "gpt-4o": "o200k_base",
        "gpt-4o-mini": "o200k_base",
        "gpt-4-turbo": "cl100k_base",
        "gpt-4": "cl100k_base",
        "gpt-3.5-turbo": "cl100k_base",
        "text-embedding-3-small": "cl100k_base",
        "text-embedding-3-large": "cl100k_base",
    }
    
    def __init__(self, model: str = "gpt-4o-mini"):
        self.model = model
        encoding_name = self.MODEL_ENCODINGS.get(model, "cl100k_base")
        self.encoding = tiktoken.get_encoding(encoding_name)
    
    def count(self, text: str) -> int:
        """Count tokens in text."""
        return len(self.encoding.encode(text))
    
    def count_messages(self, messages: list[dict]) -> int:
        """Count tokens in chat messages."""
        
        # Base tokens per message (varies by model)
        tokens_per_message = 3  # For gpt-4o models
        tokens_per_name = 1
        
        total = 0
        
        for message in messages:
            total += tokens_per_message
            
            for key, value in message.items():
                total += self.count(str(value))
                
                if key == "name":
                    total += tokens_per_name
        
        # Every reply is primed with assistant
        total += 3
        
        return total
    
    def encode(self, text: str) -> list[int]:
        """Encode text to token IDs."""
        return self.encoding.encode(text)
    
    def decode(self, tokens: list[int]) -> str:
        """Decode token IDs to text."""
        return self.encoding.decode(tokens)
    
    def truncate_to_tokens(self, text: str, max_tokens: int) -> str:
        """Truncate text to max tokens."""
        
        tokens = self.encode(text)
        
        if len(tokens) <= max_tokens:
            return text
        
        return self.decode(tokens[:max_tokens])

# Usage
counter = TokenCounter("gpt-4o-mini")

text = "This is a sample text to count tokens."
print(f"Token count: {counter.count(text)}")

messages = [
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "What is Python?"}
]
print(f"Message tokens: {counter.count_messages(messages)}")

# Truncate long text
long_text = "Lorem ipsum " * 1000
truncated = counter.truncate_to_tokens(long_text, 100)
print(f"Truncated to {counter.count(truncated)} tokens")

Context Window Management

from dataclasses import dataclass
from typing import Optional
from enum import Enum

class TruncationStrategy(str, Enum):
    KEEP_START = "keep_start"
    KEEP_END = "keep_end"
    KEEP_BOTH = "keep_both"
    SMART = "smart"

@dataclass
class ContextWindow:
    """Manage context window limits."""
    
    max_tokens: int
    reserved_output: int = 1000
    
    @property
    def available_input(self) -> int:
        return self.max_tokens - self.reserved_output

# Model context windows
CONTEXT_WINDOWS = {
    "gpt-4o": ContextWindow(128000, 16384),
    "gpt-4o-mini": ContextWindow(128000, 16384),
    "gpt-4-turbo": ContextWindow(128000, 4096),
    "gpt-4": ContextWindow(8192, 4096),
    "gpt-3.5-turbo": ContextWindow(16385, 4096),
    "claude-3-5-sonnet": ContextWindow(200000, 8192),
    "claude-3-opus": ContextWindow(200000, 4096),
}

class ContextManager:
    """Manage context within token limits."""
    
    def __init__(
        self,
        model: str = "gpt-4o-mini",
        counter: TokenCounter = None
    ):
        self.model = model
        self.window = CONTEXT_WINDOWS.get(model, ContextWindow(128000))
        self.counter = counter or TokenCounter(model)
    
    def truncate(
        self,
        text: str,
        max_tokens: int,
        strategy: TruncationStrategy = TruncationStrategy.KEEP_END
    ) -> str:
        """Truncate text using specified strategy."""
        
        current_tokens = self.counter.count(text)
        
        if current_tokens <= max_tokens:
            return text
        
        if strategy == TruncationStrategy.KEEP_START:
            return self._truncate_end(text, max_tokens)
        
        elif strategy == TruncationStrategy.KEEP_END:
            return self._truncate_start(text, max_tokens)
        
        elif strategy == TruncationStrategy.KEEP_BOTH:
            return self._truncate_middle(text, max_tokens)
        
        elif strategy == TruncationStrategy.SMART:
            return self._smart_truncate(text, max_tokens)
        
        return text
    
    def _truncate_end(self, text: str, max_tokens: int) -> str:
        """Keep start, truncate end."""
        
        tokens = self.counter.encode(text)
        truncated = self.counter.decode(tokens[:max_tokens])
        
        return truncated + "..."
    
    def _truncate_start(self, text: str, max_tokens: int) -> str:
        """Keep end, truncate start."""
        
        tokens = self.counter.encode(text)
        truncated = self.counter.decode(tokens[-max_tokens:])
        
        return "..." + truncated
    
    def _truncate_middle(self, text: str, max_tokens: int) -> str:
        """Keep start and end, truncate middle."""
        
        tokens = self.counter.encode(text)
        half = max_tokens // 2
        
        start = self.counter.decode(tokens[:half])
        end = self.counter.decode(tokens[-half:])
        
        return start + "\n...[truncated]...\n" + end
    
    def _smart_truncate(self, text: str, max_tokens: int) -> str:
        """Smart truncation preserving sentence boundaries."""
        
        sentences = text.split(". ")
        result = []
        current_tokens = 0
        
        for sentence in sentences:
            sentence_tokens = self.counter.count(sentence + ". ")
            
            if current_tokens + sentence_tokens > max_tokens:
                break
            
            result.append(sentence)
            current_tokens += sentence_tokens
        
        return ". ".join(result) + "."
    
    def fit_messages(
        self,
        messages: list[dict],
        system_prompt: str = None
    ) -> list[dict]:
        """Fit messages within context window."""
        
        available = self.window.available_input
        
        # Reserve space for system prompt
        if system_prompt:
            system_tokens = self.counter.count(system_prompt) + 10
            available -= system_tokens
        
        # Calculate current usage
        total_tokens = self.counter.count_messages(messages)
        
        if total_tokens <= available:
            return messages
        
        # Truncate oldest messages first (keep recent context)
        result = []
        current_tokens = 0
        
        for message in reversed(messages):
            msg_tokens = self.counter.count(message.get("content", "")) + 10
            
            if current_tokens + msg_tokens > available:
                break
            
            result.insert(0, message)
            current_tokens += msg_tokens
        
        return result

Token Budget Allocation

from dataclasses import dataclass, field
from typing import Callable

@dataclass
class TokenBudget:
    """Allocate token budget across prompt components."""
    
    total: int
    system_prompt: int = 0
    context: int = 0
    examples: int = 0
    user_input: int = 0
    reserved_output: int = 0
    
    @property
    def allocated(self) -> int:
        return (
            self.system_prompt +
            self.context +
            self.examples +
            self.user_input +
            self.reserved_output
        )
    
    @property
    def remaining(self) -> int:
        return self.total - self.allocated

class BudgetAllocator:
    """Allocate token budgets for different prompt components."""
    
    def __init__(
        self,
        model: str = "gpt-4o-mini",
        counter: TokenCounter = None
    ):
        self.model = model
        self.window = CONTEXT_WINDOWS.get(model, ContextWindow(128000))
        self.counter = counter or TokenCounter(model)
    
    def allocate(
        self,
        system_prompt: str,
        context: str = "",
        examples: list[dict] = None,
        user_input: str = "",
        output_tokens: int = 1000
    ) -> TokenBudget:
        """Allocate budget based on actual content."""
        
        budget = TokenBudget(total=self.window.max_tokens)
        
        # Fixed allocations
        budget.reserved_output = output_tokens
        budget.system_prompt = self.counter.count(system_prompt)
        budget.user_input = self.counter.count(user_input)
        
        # Calculate remaining for context and examples
        remaining = budget.remaining
        
        if examples:
            examples_text = "\n".join([
                f"Q: {e.get('input', '')}\nA: {e.get('output', '')}"
                for e in examples
            ])
            budget.examples = min(
                self.counter.count(examples_text),
                remaining // 2  # Max 50% for examples
            )
            remaining = budget.remaining
        
        # Rest goes to context
        if context:
            budget.context = min(
                self.counter.count(context),
                remaining
            )
        
        return budget
    
    def allocate_proportional(
        self,
        total_tokens: int,
        proportions: dict[str, float]
    ) -> dict[str, int]:
        """Allocate tokens proportionally."""
        
        # Normalize proportions
        total_prop = sum(proportions.values())
        normalized = {k: v / total_prop for k, v in proportions.items()}
        
        # Allocate
        allocated = {}
        remaining = total_tokens
        
        for component, prop in normalized.items():
            tokens = int(total_tokens * prop)
            allocated[component] = min(tokens, remaining)
            remaining -= allocated[component]
        
        return allocated

class PromptBuilder:
    """Build prompts within token budget."""
    
    def __init__(
        self,
        model: str = "gpt-4o-mini",
        max_output_tokens: int = 1000
    ):
        self.model = model
        self.counter = TokenCounter(model)
        self.context_manager = ContextManager(model, self.counter)
        self.allocator = BudgetAllocator(model, self.counter)
        self.max_output = max_output_tokens
    
    def build(
        self,
        system_prompt: str,
        user_input: str,
        context: str = "",
        examples: list[dict] = None
    ) -> list[dict]:
        """Build messages within budget."""
        
        # Get budget allocation
        budget = self.allocator.allocate(
            system_prompt=system_prompt,
            context=context,
            examples=examples,
            user_input=user_input,
            output_tokens=self.max_output
        )
        
        messages = []
        
        # System prompt (always include full)
        messages.append({
            "role": "system",
            "content": system_prompt
        })
        
        # Build user message
        user_content_parts = []
        
        # Add context (truncated if needed)
        if context:
            truncated_context = self.context_manager.truncate(
                context,
                budget.context,
                TruncationStrategy.SMART
            )
            user_content_parts.append(f"Context:\n{truncated_context}")
        
        # Add examples
        if examples:
            examples_text = self._format_examples(examples, budget.examples)
            if examples_text:
                user_content_parts.append(f"Examples:\n{examples_text}")
        
        # Add user input
        user_content_parts.append(f"Question: {user_input}")
        
        messages.append({
            "role": "user",
            "content": "\n\n".join(user_content_parts)
        })
        
        return messages
    
    def _format_examples(self, examples: list[dict], max_tokens: int) -> str:
        """Format examples within token budget."""
        
        result = []
        current_tokens = 0
        
        for ex in examples:
            ex_text = f"Q: {ex.get('input', '')}\nA: {ex.get('output', '')}"
            ex_tokens = self.counter.count(ex_text)
            
            if current_tokens + ex_tokens > max_tokens:
                break
            
            result.append(ex_text)
            current_tokens += ex_tokens
        
        return "\n\n".join(result)

# Usage
builder = PromptBuilder("gpt-4o-mini", max_output_tokens=2000)

messages = builder.build(
    system_prompt="You are a helpful coding assistant.",
    user_input="How do I read a file in Python?",
    context="The user is working on a data processing script...",
    examples=[
        {"input": "How to print?", "output": "Use print('hello')"},
        {"input": "How to loop?", "output": "Use for i in range(10):"}
    ]
)

print(f"Built {len(messages)} messages")

Conversation History Management

from collections import deque
from datetime import datetime

class ConversationBuffer:
    """Manage conversation history within token limits."""
    
    def __init__(
        self,
        model: str = "gpt-4o-mini",
        max_history_tokens: int = 10000,
        max_messages: int = 50
    ):
        self.counter = TokenCounter(model)
        self.max_tokens = max_history_tokens
        self.max_messages = max_messages
        self.messages: deque = deque(maxlen=max_messages)
        self.current_tokens = 0
    
    def add_message(self, role: str, content: str):
        """Add message to buffer."""
        
        message = {
            "role": role,
            "content": content,
            "timestamp": datetime.now().isoformat()
        }
        
        msg_tokens = self.counter.count(content) + 10
        
        # Remove old messages if over budget
        while self.current_tokens + msg_tokens > self.max_tokens and self.messages:
            old_msg = self.messages.popleft()
            old_tokens = self.counter.count(old_msg["content"]) + 10
            self.current_tokens -= old_tokens
        
        self.messages.append(message)
        self.current_tokens += msg_tokens
    
    def get_messages(self) -> list[dict]:
        """Get messages for API call."""
        
        return [
            {"role": m["role"], "content": m["content"]}
            for m in self.messages
        ]
    
    def summarize_and_compress(self, client) -> str:
        """Summarize old messages to compress history."""
        
        if len(self.messages) < 10:
            return None
        
        # Get oldest messages to summarize
        old_messages = list(self.messages)[:len(self.messages) // 2]
        
        summary_prompt = "Summarize this conversation concisely:\n\n"
        for msg in old_messages:
            summary_prompt += f"{msg['role']}: {msg['content']}\n"
        
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": summary_prompt}]
        )
        
        summary = response.choices[0].message.content
        
        # Replace old messages with summary
        for _ in range(len(old_messages)):
            old_msg = self.messages.popleft()
            self.current_tokens -= self.counter.count(old_msg["content"]) + 10
        
        # Add summary as system context
        self.messages.appendleft({
            "role": "system",
            "content": f"Previous conversation summary: {summary}",
            "timestamp": datetime.now().isoformat()
        })
        self.current_tokens += self.counter.count(summary) + 20
        
        return summary

class SlidingWindowBuffer:
    """Sliding window approach to conversation history."""
    
    def __init__(
        self,
        model: str = "gpt-4o-mini",
        window_tokens: int = 8000
    ):
        self.counter = TokenCounter(model)
        self.window_tokens = window_tokens
        self.messages: list[dict] = []
    
    def add_message(self, role: str, content: str):
        """Add message and slide window if needed."""
        
        self.messages.append({"role": role, "content": content})
        self._slide_window()
    
    def _slide_window(self):
        """Remove oldest messages to fit window."""
        
        while self._total_tokens() > self.window_tokens and len(self.messages) > 1:
            self.messages.pop(0)
    
    def _total_tokens(self) -> int:
        """Calculate total tokens in buffer."""
        
        return sum(
            self.counter.count(m["content"]) + 10
            for m in self.messages
        )
    
    def get_messages(self) -> list[dict]:
        """Get current window of messages."""
        return self.messages.copy()

Cost Estimation

from dataclasses import dataclass

@dataclass
class ModelPricing:
    """Pricing per 1K tokens."""
    input_cost: float
    output_cost: float
    cached_input_cost: float = None

# Pricing as of late 2024 (USD per 1K tokens)
MODEL_PRICING = {
    "gpt-4o": ModelPricing(0.0025, 0.01, 0.00125),
    "gpt-4o-mini": ModelPricing(0.00015, 0.0006, 0.000075),
    "gpt-4-turbo": ModelPricing(0.01, 0.03),
    "gpt-4": ModelPricing(0.03, 0.06),
    "gpt-3.5-turbo": ModelPricing(0.0005, 0.0015),
    "claude-3-5-sonnet": ModelPricing(0.003, 0.015),
    "claude-3-opus": ModelPricing(0.015, 0.075),
}

class CostEstimator:
    """Estimate API costs based on token usage."""
    
    def __init__(self, model: str = "gpt-4o-mini"):
        self.model = model
        self.pricing = MODEL_PRICING.get(model)
        self.counter = TokenCounter(model)
        
        # Track usage
        self.total_input_tokens = 0
        self.total_output_tokens = 0
        self.total_cost = 0.0
    
    def estimate_cost(
        self,
        input_tokens: int,
        output_tokens: int,
        cached_input: int = 0
    ) -> float:
        """Estimate cost for token usage."""
        
        if not self.pricing:
            return 0.0
        
        regular_input = input_tokens - cached_input
        
        cost = (
            (regular_input / 1000) * self.pricing.input_cost +
            (output_tokens / 1000) * self.pricing.output_cost
        )
        
        if cached_input > 0 and self.pricing.cached_input_cost:
            cost += (cached_input / 1000) * self.pricing.cached_input_cost
        
        return cost
    
    def estimate_from_prompt(
        self,
        prompt: str,
        expected_output_tokens: int = 500
    ) -> dict:
        """Estimate cost from prompt text."""
        
        input_tokens = self.counter.count(prompt)
        cost = self.estimate_cost(input_tokens, expected_output_tokens)
        
        return {
            "input_tokens": input_tokens,
            "expected_output_tokens": expected_output_tokens,
            "estimated_cost": cost
        }
    
    def record_usage(self, input_tokens: int, output_tokens: int):
        """Record actual usage."""
        
        self.total_input_tokens += input_tokens
        self.total_output_tokens += output_tokens
        self.total_cost += self.estimate_cost(input_tokens, output_tokens)
    
    def get_usage_report(self) -> dict:
        """Get usage report."""
        
        return {
            "model": self.model,
            "total_input_tokens": self.total_input_tokens,
            "total_output_tokens": self.total_output_tokens,
            "total_tokens": self.total_input_tokens + self.total_output_tokens,
            "total_cost": round(self.total_cost, 6)
        }

# Usage
estimator = CostEstimator("gpt-4o-mini")

# Estimate before calling
estimate = estimator.estimate_from_prompt(
    "Explain machine learning in detail",
    expected_output_tokens=1000
)
print(f"Estimated cost: ${estimate['estimated_cost']:.6f}")

# Record actual usage
estimator.record_usage(input_tokens=50, output_tokens=800)
estimator.record_usage(input_tokens=100, output_tokens=500)

# Get report
report = estimator.get_usage_report()
print(f"Total cost: ${report['total_cost']:.6f}")

Production Token Management Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

# Initialize components
counter = TokenCounter("gpt-4o-mini")
context_manager = ContextManager("gpt-4o-mini")
cost_estimator = CostEstimator("gpt-4o-mini")

class TokenCountRequest(BaseModel):
    text: str
    model: str = "gpt-4o-mini"

class TruncateRequest(BaseModel):
    text: str
    max_tokens: int
    strategy: str = "smart"

class BuildPromptRequest(BaseModel):
    system_prompt: str
    user_input: str
    context: Optional[str] = ""
    max_output_tokens: int = 1000

@app.post("/tokens/count")
async def count_tokens(request: TokenCountRequest):
    """Count tokens in text."""
    
    local_counter = TokenCounter(request.model)
    count = local_counter.count(request.text)
    
    return {
        "text_length": len(request.text),
        "token_count": count,
        "model": request.model
    }

@app.post("/tokens/truncate")
async def truncate_text(request: TruncateRequest):
    """Truncate text to token limit."""
    
    strategy = TruncationStrategy(request.strategy)
    truncated = context_manager.truncate(
        request.text,
        request.max_tokens,
        strategy
    )
    
    return {
        "original_tokens": counter.count(request.text),
        "truncated_tokens": counter.count(truncated),
        "truncated_text": truncated
    }

@app.post("/prompt/build")
async def build_prompt(request: BuildPromptRequest):
    """Build optimized prompt within budget."""
    
    builder = PromptBuilder("gpt-4o-mini", request.max_output_tokens)
    
    messages = builder.build(
        system_prompt=request.system_prompt,
        user_input=request.user_input,
        context=request.context or ""
    )
    
    total_tokens = counter.count_messages(messages)
    
    return {
        "messages": messages,
        "total_tokens": total_tokens,
        "available_output": request.max_output_tokens
    }

@app.post("/cost/estimate")
async def estimate_cost(
    input_tokens: int,
    output_tokens: int,
    model: str = "gpt-4o-mini"
):
    """Estimate API cost."""
    
    local_estimator = CostEstimator(model)
    cost = local_estimator.estimate_cost(input_tokens, output_tokens)
    
    return {
        "model": model,
        "input_tokens": input_tokens,
        "output_tokens": output_tokens,
        "estimated_cost": round(cost, 6)
    }

@app.get("/usage/report")
async def usage_report():
    """Get usage report."""
    return cost_estimator.get_usage_report()

References

Conclusion

Effective token management is essential for production LLM applications. Use tiktoken for accurate token counting—don't estimate based on word count. Implement smart truncation strategies that preserve important information: keep recent context for conversations, use sentence-boundary truncation for documents, and consider summarization for very long contexts. Allocate token budgets explicitly across prompt components to ensure each part gets appropriate space. Track costs continuously and set alerts for unexpected usage spikes. For conversations, use sliding windows or periodic summarization to maintain context within limits. The goal is maximizing information density while staying within context windows and cost budgets.

A Comparative Guide to Generative AI Frameworks for Chatbot Development

Generative AI Chatbot Frameworks Decision Architecture
Generative AI Chatbot Frameworks Decision Architecture

After two decades of building conversational systems, I have watched the chatbot landscape transform from simple rule-based decision trees to sophisticated AI-powered agents capable of nuanced, context-aware dialogue. The explosion of generative AI frameworks has created both unprecedented opportunities and significant decision paralysis for engineering teams. This guide distills my production experience across dozens of enterprise chatbot deployments into a practical decision framework.

The Modern Chatbot Architecture Landscape

Today’s chatbot frameworks fall into two fundamental categories: open-source frameworks that provide maximum control and customization, and cloud-managed platforms that offer rapid deployment with managed infrastructure. Understanding this distinction is crucial because it determines not just your technical architecture, but your operational model, cost structure, and long-term flexibility.

Open-source frameworks like LangChain, Rasa, Haystack, and Botpress give you complete ownership of your conversational AI stack. You control the models, the data, the deployment infrastructure, and every aspect of the user experience. This control comes with responsibility: you manage scaling, security, model updates, and operational monitoring.

Cloud-managed platforms like Google Dialogflow, Azure Bot Framework, and Amazon Lex abstract away infrastructure complexity. They provide pre-built NLU capabilities, managed scaling, and integration with their respective cloud ecosystems. The trade-off is reduced flexibility and potential vendor lock-in.

When to Use What: A Decision Framework

LangChain: The AI Agent Builder’s Choice

LangChain has emerged as the dominant framework for building LLM-powered applications, and for good reason. Its composable architecture makes it ideal for complex conversational agents that need to orchestrate multiple AI capabilities, retrieve information from diverse sources, and maintain sophisticated conversation state.

Use LangChain when: You are building RAG-powered chatbots that need to query your knowledge base, you want to leverage multiple LLM providers (OpenAI, Anthropic, local models), you need agentic capabilities where the bot can use tools and APIs, or you are prototyping rapidly and need maximum flexibility. LangChain excels at startups and innovation teams who need to move fast and iterate on conversational AI experiences.

Avoid LangChain when: You need a simple FAQ bot with deterministic responses, your team lacks Python expertise, or you need enterprise-grade support with SLAs. LangChain’s rapid evolution means APIs change frequently, which can create maintenance burden.

Rasa: Enterprise-Grade Open Source

Rasa represents the gold standard for organizations that need production-grade conversational AI with complete data sovereignty. Its dual-model architecture (NLU for intent classification, Core for dialogue management) provides fine-grained control over conversation flow.

Use Rasa when: Data privacy is paramount and you cannot send conversation data to third-party APIs, you need deterministic dialogue flows for regulated industries (healthcare, finance), you have ML engineering capacity to train and maintain custom models, or you are building multi-turn, task-oriented assistants. Rasa is particularly strong for enterprise deployments where compliance and auditability matter.

Avoid Rasa when: You need rapid prototyping without ML expertise, your use case is primarily generative (creative writing, open-ended conversation), or you lack infrastructure to host and scale the models.

Haystack: The Search-First Approach

Haystack from deepset excels at building question-answering systems over large document collections. If your chatbot’s primary function is helping users find information in your knowledge base, Haystack’s pipeline architecture makes it straightforward to build sophisticated retrieval systems.

Use Haystack when: Your chatbot is primarily a knowledge retrieval system, you have large document collections (technical documentation, legal documents, research papers), you need hybrid search combining semantic and keyword approaches, or you want tight integration with vector databases like Pinecone, Weaviate, or Qdrant.

Google Dialogflow: The Enterprise Cloud Choice

Dialogflow (especially Dialogflow CX) provides a mature, enterprise-ready platform for building conversational experiences. Its visual flow builder makes it accessible to non-developers while providing sophisticated capabilities for complex use cases.

Use Dialogflow when: You are already invested in Google Cloud Platform, you need multi-language support out of the box, your team includes non-technical conversation designers, or you need voice integration (telephony, Google Assistant). Dialogflow CX’s state machine approach is excellent for complex, multi-turn conversations with many branches.

Azure Bot Framework: The Microsoft Ecosystem Play

Microsoft’s Bot Framework shines when you need deep integration with the Microsoft ecosystem. Teams integration, Azure Cognitive Services, and the broader Microsoft 365 platform make it the natural choice for enterprise Microsoft shops.

Use Azure Bot Framework when: You need native Microsoft Teams integration, you are building internal enterprise bots for Microsoft 365 users, you want to leverage Azure OpenAI Service for GPT models with enterprise compliance, or you need the Bot Framework Composer for visual bot building.

Amazon Lex: AWS-Native Conversational AI

Amazon Lex provides tight integration with the AWS ecosystem, making it ideal for organizations already running workloads on AWS. Its integration with Lambda, Connect, and other AWS services enables sophisticated conversational applications.

Use Amazon Lex when: You are building contact center solutions with Amazon Connect, your infrastructure is AWS-native and you want seamless integration, you need voice-first experiences with Amazon Polly, or you want pay-per-use pricing without upfront commitments.

Cost and Scalability Considerations

Cost structures vary dramatically across these frameworks. Open-source options like LangChain and Rasa have no licensing costs but require infrastructure investment and engineering time. Cloud platforms charge per request or per conversation, which can become expensive at scale but eliminate operational overhead.

For startups and MVPs, I typically recommend starting with LangChain or Botpress for rapid iteration, then evaluating whether to migrate to a more structured platform as requirements solidify. For enterprises with existing cloud commitments, leveraging the native conversational AI service (Dialogflow for GCP, Bot Framework for Azure, Lex for AWS) often provides the best total cost of ownership when factoring in integration and operational costs.

Production Lessons Learned

Across my chatbot deployments, several patterns consistently emerge. First, invest heavily in conversation design before writing code. The best framework cannot compensate for poorly designed dialogue flows. Second, implement comprehensive logging and analytics from day one. Understanding how users actually interact with your bot is essential for iterative improvement. Third, plan for graceful degradation. LLM APIs fail, models hallucinate, and users ask unexpected questions. Your bot should handle these situations elegantly.

The chatbot framework landscape will continue evolving rapidly as generative AI capabilities advance. The frameworks that thrive will be those that balance cutting-edge AI capabilities with production reliability and developer experience. Choose based on your team’s capabilities, your organization’s constraints, and your users’ needs rather than chasing the newest technology.

Generative AI in Natural Language Processing: Chatbots and Beyond

Generative AI in NLP Applications Architecture
Generative AI in NLP Applications Architecture

After two decades of building language-aware systems, I have witnessed the most profound transformation in how machines understand and generate human language. The emergence of generative AI has fundamentally altered the NLP landscape, moving us from rigid rule-based systems to fluid, context-aware models that can engage in nuanced dialogue, create compelling content, and reason about complex linguistic structures. This evolution represents not just a technological shift but a paradigm change in human-computer interaction.

The Foundation: Understanding Modern NLP Architecture

The transformer architecture, introduced in the seminal “Attention is All You Need” paper, revolutionized how we approach language understanding. Unlike earlier recurrent neural networks that processed text sequentially, transformers use self-attention mechanisms to consider all words in a sentence simultaneously, capturing long-range dependencies that were previously difficult to model. This architectural innovation enabled the development of increasingly powerful language models.

GPT-4 and its successors represent the current state of the art, demonstrating emergent capabilities that surprised even their creators. These models can perform complex reasoning, follow nuanced instructions, and generate text that is often indistinguishable from human writing. The progression from GPT-3’s 175 billion parameters to more efficient architectures shows that raw scale is being complemented by architectural innovations and training methodology improvements.

Beyond Chatbots: The Expanding Application Landscape

While conversational AI captures headlines, the applications of generative NLP extend far beyond chatbots. In my production deployments, I have implemented systems for automated document summarization that reduce legal review time by 60%, code generation assistants that accelerate developer productivity, and content localization pipelines that maintain brand voice across 40+ languages. Each application requires careful consideration of the specific NLP capabilities needed and the production constraints involved.

Machine translation has evolved from phrase-based statistical methods to neural approaches that understand context and idiom. Modern translation systems can preserve tone, handle domain-specific terminology, and even adapt formality levels based on the target audience. The quality improvements have made real-time translation viable for business-critical communications.

Production Considerations: Hallucination and Reliability

The most significant challenge in deploying generative AI for NLP applications is managing hallucination, where models generate plausible-sounding but factually incorrect information. In enterprise deployments, I implement multiple mitigation strategies including retrieval-augmented generation (RAG) to ground responses in verified data sources, confidence scoring to flag uncertain outputs, and human-in-the-loop workflows for high-stakes decisions.

Latency and cost optimization become critical at scale. Techniques like model distillation, quantization, and intelligent caching can reduce inference costs by 80% while maintaining acceptable quality. The choice between cloud-hosted APIs and self-hosted models depends on data sensitivity, volume, and latency requirements specific to each use case.

Ethical Dimensions and Responsible Deployment

Bias in language models reflects and can amplify biases present in training data. Responsible deployment requires systematic bias auditing, diverse evaluation datasets, and ongoing monitoring of model outputs. The potential for misuse in generating misinformation or manipulative content demands robust content policies and technical safeguards.

As we continue to push the boundaries of what generative AI can accomplish in NLP, the focus must remain on building systems that augment human capabilities while maintaining transparency, accountability, and ethical standards. The technology is powerful, but its value ultimately depends on how thoughtfully we deploy it.

Context Window Management: Token Budgets, Prioritization, and Compression

Introduction: Context windows define how much information an LLM can process at once—from 4K tokens in older models to 128K+ in modern ones. Effective context management means fitting the most relevant information within these limits while leaving room for generation. This guide covers practical context window strategies: token counting and budget allocation, content prioritization, compression techniques, dynamic context assembly, and handling conversations that exceed window limits gracefully.

Context Window
Context Management: Prioritization, Compression, Allocation

Token Counting and Budget Allocation

import tiktoken
from dataclasses import dataclass
from typing import Optional

@dataclass
class TokenBudget:
    """Token budget allocation."""
    
    total: int
    system_prompt: int
    conversation_history: int
    retrieved_context: int
    user_input: int
    reserved_for_output: int
    
    @property
    def available(self) -> int:
        """Available tokens after allocations."""
        
        used = (
            self.system_prompt +
            self.conversation_history +
            self.retrieved_context +
            self.user_input
        )
        return self.total - used - self.reserved_for_output

class TokenCounter:
    """Count tokens for different models."""
    
    def __init__(self, model: str = "gpt-4o"):
        self.model = model
        self.encoding = self._get_encoding(model)
    
    def _get_encoding(self, model: str):
        """Get tiktoken encoding for model."""
        
        try:
            return tiktoken.encoding_for_model(model)
        except KeyError:
            return tiktoken.get_encoding("cl100k_base")
    
    def count(self, text: str) -> int:
        """Count tokens in text."""
        
        return len(self.encoding.encode(text))
    
    def count_messages(self, messages: list[dict]) -> int:
        """Count tokens in message list."""
        
        # OpenAI message format overhead
        tokens_per_message = 3
        tokens_per_name = 1
        
        total = 0
        
        for message in messages:
            total += tokens_per_message
            
            for key, value in message.items():
                total += self.count(str(value))
                
                if key == "name":
                    total += tokens_per_name
        
        total += 3  # Reply priming
        return total
    
    def truncate_to_tokens(self, text: str, max_tokens: int) -> str:
        """Truncate text to fit token limit."""
        
        tokens = self.encoding.encode(text)
        
        if len(tokens) <= max_tokens:
            return text
        
        truncated_tokens = tokens[:max_tokens]
        return self.encoding.decode(truncated_tokens)

class BudgetAllocator:
    """Allocate token budget across components."""
    
    def __init__(
        self,
        model: str = "gpt-4o",
        max_context: int = 128000,
        output_reserve: int = 4096
    ):
        self.counter = TokenCounter(model)
        self.max_context = max_context
        self.output_reserve = output_reserve
    
    def allocate(
        self,
        system_prompt: str,
        user_input: str,
        history_priority: float = 0.3,
        context_priority: float = 0.5
    ) -> TokenBudget:
        """Allocate budget based on priorities."""
        
        # Fixed allocations
        system_tokens = self.counter.count(system_prompt)
        input_tokens = self.counter.count(user_input)
        
        # Available for dynamic allocation
        available = (
            self.max_context -
            system_tokens -
            input_tokens -
            self.output_reserve
        )
        
        # Allocate based on priorities
        history_budget = int(available * history_priority)
        context_budget = int(available * context_priority)
        
        return TokenBudget(
            total=self.max_context,
            system_prompt=system_tokens,
            conversation_history=history_budget,
            retrieved_context=context_budget,
            user_input=input_tokens,
            reserved_for_output=self.output_reserve
        )
    
    def fit_content(
        self,
        content: str,
        budget: int
    ) -> str:
        """Fit content within budget."""
        
        current_tokens = self.counter.count(content)
        
        if current_tokens <= budget:
            return content
        
        return self.counter.truncate_to_tokens(content, budget)

Content Prioritization

from dataclasses import dataclass
from typing import Callable
from enum import Enum

class Priority(int, Enum):
    CRITICAL = 100
    HIGH = 75
    MEDIUM = 50
    LOW = 25
    OPTIONAL = 10

@dataclass
class ContextItem:
    """An item that can be included in context."""
    
    content: str
    priority: Priority
    tokens: int
    source: str
    metadata: dict = None

class ContentPrioritizer:
    """Prioritize content for context inclusion."""
    
    def __init__(self, counter: TokenCounter):
        self.counter = counter
    
    def prioritize(
        self,
        items: list[ContextItem],
        budget: int
    ) -> list[ContextItem]:
        """Select items that fit within budget."""
        
        # Sort by priority (highest first)
        sorted_items = sorted(
            items,
            key=lambda x: x.priority.value,
            reverse=True
        )
        
        selected = []
        used_tokens = 0
        
        for item in sorted_items:
            if used_tokens + item.tokens <= budget:
                selected.append(item)
                used_tokens += item.tokens
        
        return selected
    
    def prioritize_with_recency(
        self,
        items: list[ContextItem],
        budget: int,
        recency_weight: float = 0.3
    ) -> list[ContextItem]:
        """Prioritize with recency bonus."""
        
        # Add recency score (later items get bonus)
        scored_items = []
        
        for i, item in enumerate(items):
            recency_score = (i / len(items)) * 100 * recency_weight
            combined_score = item.priority.value + recency_score
            scored_items.append((item, combined_score))
        
        # Sort by combined score
        scored_items.sort(key=lambda x: x[1], reverse=True)
        
        selected = []
        used_tokens = 0
        
        for item, _ in scored_items:
            if used_tokens + item.tokens <= budget:
                selected.append(item)
                used_tokens += item.tokens
        
        return selected

class MessagePrioritizer:
    """Prioritize conversation messages."""
    
    def __init__(self, counter: TokenCounter):
        self.counter = counter
    
    def select_messages(
        self,
        messages: list[dict],
        budget: int,
        keep_recent: int = 4
    ) -> list[dict]:
        """Select messages within budget, keeping recent ones."""
        
        if not messages:
            return []
        
        # Always keep most recent messages
        recent = messages[-keep_recent:] if len(messages) >= keep_recent else messages
        recent_tokens = sum(
            self.counter.count(m.get("content", ""))
            for m in recent
        )
        
        if recent_tokens >= budget:
            # Truncate recent messages if needed
            return self._truncate_messages(recent, budget)
        
        # Add older messages if budget allows
        remaining_budget = budget - recent_tokens
        older = messages[:-keep_recent] if len(messages) > keep_recent else []
        
        # Add from most recent older messages
        selected_older = []
        
        for msg in reversed(older):
            msg_tokens = self.counter.count(msg.get("content", ""))
            
            if msg_tokens <= remaining_budget:
                selected_older.insert(0, msg)
                remaining_budget -= msg_tokens
        
        return selected_older + recent
    
    def _truncate_messages(
        self,
        messages: list[dict],
        budget: int
    ) -> list[dict]:
        """Truncate messages to fit budget."""
        
        result = []
        remaining = budget
        
        for msg in reversed(messages):
            content = msg.get("content", "")
            tokens = self.counter.count(content)
            
            if tokens <= remaining:
                result.insert(0, msg)
                remaining -= tokens
            else:
                # Truncate this message
                truncated = self.counter.truncate_to_tokens(content, remaining)
                result.insert(0, {**msg, "content": truncated})
                break
        
        return result

Context Compression

from dataclasses import dataclass

@dataclass
class CompressionResult:
    """Result of context compression."""
    
    original_tokens: int
    compressed_tokens: int
    content: str
    compression_ratio: float

class LLMCompressor:
    """Compress context using LLM summarization."""
    
    def __init__(self, client, counter: TokenCounter):
        self.client = client
        self.counter = counter
    
    def compress(
        self,
        content: str,
        target_tokens: int,
        preserve_key_info: bool = True
    ) -> CompressionResult:
        """Compress content to target token count."""
        
        original_tokens = self.counter.count(content)
        
        if original_tokens <= target_tokens:
            return CompressionResult(
                original_tokens=original_tokens,
                compressed_tokens=original_tokens,
                content=content,
                compression_ratio=1.0
            )
        
        # Calculate target word count (rough estimate)
        target_words = int(target_tokens * 0.75)
        
        prompt = f"""Compress the following text to approximately {target_words} words.
{"Preserve all key facts, names, dates, and specific details." if preserve_key_info else "Focus on main ideas only."}

Text to compress:
{content}

Compressed version:"""
        
        response = self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            max_tokens=target_tokens + 100
        )
        
        compressed = response.choices[0].message.content
        compressed_tokens = self.counter.count(compressed)
        
        return CompressionResult(
            original_tokens=original_tokens,
            compressed_tokens=compressed_tokens,
            content=compressed,
            compression_ratio=compressed_tokens / original_tokens
        )
    
    def compress_conversation(
        self,
        messages: list[dict],
        target_tokens: int
    ) -> str:
        """Compress conversation history."""
        
        # Format conversation
        formatted = "\n".join([
            f"{m['role'].upper()}: {m['content']}"
            for m in messages
        ])
        
        prompt = f"""Summarize this conversation, preserving:
- Key decisions and conclusions
- Important facts mentioned
- Current topic and context
- Any pending questions or tasks

Target length: approximately {int(target_tokens * 0.75)} words.

Conversation:
{formatted}

Summary:"""
        
        response = self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            max_tokens=target_tokens + 100
        )
        
        return response.choices[0].message.content

class ExtractiveCompressor:
    """Compress by extracting key sentences."""
    
    def __init__(self, counter: TokenCounter):
        self.counter = counter
    
    def compress(
        self,
        content: str,
        target_tokens: int
    ) -> CompressionResult:
        """Extract key sentences to fit budget."""
        
        import re
        
        original_tokens = self.counter.count(content)
        
        if original_tokens <= target_tokens:
            return CompressionResult(
                original_tokens=original_tokens,
                compressed_tokens=original_tokens,
                content=content,
                compression_ratio=1.0
            )
        
        # Split into sentences
        sentences = re.split(r'(?<=[.!?])\s+', content)
        
        # Score sentences by position and length
        scored = []
        for i, sent in enumerate(sentences):
            # Prefer first and last sentences
            position_score = 1.0
            if i < 3:
                position_score = 1.5
            elif i >= len(sentences) - 2:
                position_score = 1.3
            
            # Prefer medium-length sentences
            words = len(sent.split())
            length_score = 1.0 if 10 <= words <= 30 else 0.8
            
            scored.append((sent, position_score * length_score, i))
        
        # Sort by score, keeping original order for ties
        scored.sort(key=lambda x: (-x[1], x[2]))
        
        # Select sentences within budget
        selected = []
        used_tokens = 0
        
        for sent, _, orig_idx in scored:
            sent_tokens = self.counter.count(sent)
            
            if used_tokens + sent_tokens <= target_tokens:
                selected.append((sent, orig_idx))
                used_tokens += sent_tokens
        
        # Restore original order
        selected.sort(key=lambda x: x[1])
        compressed = " ".join(s for s, _ in selected)
        
        return CompressionResult(
            original_tokens=original_tokens,
            compressed_tokens=used_tokens,
            content=compressed,
            compression_ratio=used_tokens / original_tokens
        )

Dynamic Context Assembly

from dataclasses import dataclass, field
from typing import Optional

@dataclass
class ContextSection:
    """A section of the context."""
    
    name: str
    content: str
    priority: Priority
    min_tokens: int = 0
    max_tokens: Optional[int] = None
    compressible: bool = True

@dataclass
class AssembledContext:
    """Assembled context ready for LLM."""
    
    sections: dict[str, str]
    total_tokens: int
    budget_used: float

class DynamicContextAssembler:
    """Assemble context dynamically based on budget."""
    
    def __init__(
        self,
        client,
        counter: TokenCounter,
        max_context: int = 128000
    ):
        self.client = client
        self.counter = counter
        self.max_context = max_context
        self.compressor = LLMCompressor(client, counter)
    
    def assemble(
        self,
        sections: list[ContextSection],
        output_reserve: int = 4096
    ) -> AssembledContext:
        """Assemble context from sections."""
        
        budget = self.max_context - output_reserve
        
        # Sort by priority
        sorted_sections = sorted(
            sections,
            key=lambda x: x.priority.value,
            reverse=True
        )
        
        # First pass: allocate minimum tokens
        result = {}
        used_tokens = 0
        
        for section in sorted_sections:
            tokens = self.counter.count(section.content)
            
            if section.min_tokens > 0:
                # Must include at least min_tokens
                if tokens <= section.min_tokens:
                    result[section.name] = section.content
                    used_tokens += tokens
                else:
                    # Compress to min_tokens
                    compressed = self.compressor.compress(
                        section.content,
                        section.min_tokens
                    )
                    result[section.name] = compressed.content
                    used_tokens += compressed.compressed_tokens
        
        # Second pass: expand sections if budget allows
        remaining = budget - used_tokens
        
        for section in sorted_sections:
            if section.name in result:
                current_tokens = self.counter.count(result[section.name])
                original_tokens = self.counter.count(section.content)
                
                if current_tokens < original_tokens:
                    # Can we expand?
                    max_expand = section.max_tokens or original_tokens
                    expand_to = min(
                        max_expand,
                        current_tokens + remaining
                    )
                    
                    if expand_to > current_tokens:
                        if expand_to >= original_tokens:
                            result[section.name] = section.content
                            remaining -= (original_tokens - current_tokens)
                        else:
                            compressed = self.compressor.compress(
                                section.content,
                                expand_to
                            )
                            remaining -= (compressed.compressed_tokens - current_tokens)
                            result[section.name] = compressed.content
            else:
                # Section not yet included
                tokens = self.counter.count(section.content)
                
                if tokens <= remaining:
                    result[section.name] = section.content
                    remaining -= tokens
                elif section.compressible and remaining > 100:
                    compressed = self.compressor.compress(
                        section.content,
                        remaining
                    )
                    result[section.name] = compressed.content
                    remaining -= compressed.compressed_tokens
        
        total_used = budget - remaining
        
        return AssembledContext(
            sections=result,
            total_tokens=total_used,
            budget_used=total_used / budget
        )
    
    def build_messages(
        self,
        assembled: AssembledContext,
        system_prompt: str,
        user_message: str
    ) -> list[dict]:
        """Build message list from assembled context."""
        
        messages = [{"role": "system", "content": system_prompt}]
        
        # Add context sections
        if "conversation_summary" in assembled.sections:
            messages.append({
                "role": "system",
                "content": f"Previous conversation summary:\n{assembled.sections['conversation_summary']}"
            })
        
        if "retrieved_context" in assembled.sections:
            messages.append({
                "role": "system",
                "content": f"Relevant context:\n{assembled.sections['retrieved_context']}"
            })
        
        # Add conversation history if present
        if "history" in assembled.sections:
            # Parse history back into messages
            # (simplified - in practice, store structured)
            messages.append({
                "role": "system",
                "content": f"Recent conversation:\n{assembled.sections['history']}"
            })
        
        messages.append({"role": "user", "content": user_message})
        
        return messages

Production Context Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

# Initialize components
from openai import OpenAI
client = OpenAI()

counter = TokenCounter("gpt-4o")
allocator = BudgetAllocator("gpt-4o", max_context=128000)
assembler = DynamicContextAssembler(client, counter)
compressor = LLMCompressor(client, counter)

class ContextRequest(BaseModel):
    system_prompt: str
    user_message: str
    conversation_history: list[dict] = []
    retrieved_documents: list[str] = []
    max_output_tokens: int = 4096

class CompressRequest(BaseModel):
    content: str
    target_tokens: int
    preserve_key_info: bool = True

@app.post("/v1/context/build")
async def build_context(request: ContextRequest):
    """Build optimized context for LLM."""
    
    # Allocate budget
    budget = allocator.allocate(
        system_prompt=request.system_prompt,
        user_input=request.user_message
    )
    
    # Create context sections
    sections = []
    
    # System prompt (critical)
    sections.append(ContextSection(
        name="system",
        content=request.system_prompt,
        priority=Priority.CRITICAL,
        compressible=False
    ))
    
    # Conversation history
    if request.conversation_history:
        history_text = "\n".join([
            f"{m['role']}: {m['content']}"
            for m in request.conversation_history
        ])
        
        sections.append(ContextSection(
            name="history",
            content=history_text,
            priority=Priority.HIGH,
            min_tokens=500,
            max_tokens=budget.conversation_history
        ))
    
    # Retrieved documents
    if request.retrieved_documents:
        docs_text = "\n\n---\n\n".join(request.retrieved_documents)
        
        sections.append(ContextSection(
            name="retrieved_context",
            content=docs_text,
            priority=Priority.MEDIUM,
            min_tokens=200,
            max_tokens=budget.retrieved_context
        ))
    
    # Assemble context
    assembled = assembler.assemble(
        sections,
        output_reserve=request.max_output_tokens
    )
    
    # Build messages
    messages = assembler.build_messages(
        assembled,
        request.system_prompt,
        request.user_message
    )
    
    return {
        "messages": messages,
        "total_tokens": assembled.total_tokens,
        "budget_used": assembled.budget_used,
        "sections_included": list(assembled.sections.keys())
    }

@app.post("/v1/context/compress")
async def compress_content(request: CompressRequest):
    """Compress content to target tokens."""
    
    result = compressor.compress(
        request.content,
        request.target_tokens,
        request.preserve_key_info
    )
    
    return {
        "original_tokens": result.original_tokens,
        "compressed_tokens": result.compressed_tokens,
        "compression_ratio": result.compression_ratio,
        "content": result.content
    }

@app.post("/v1/context/count")
async def count_tokens(content: str):
    """Count tokens in content."""
    
    return {
        "tokens": counter.count(content),
        "model": counter.model
    }

@app.get("/v1/context/budget")
async def get_budget(
    system_tokens: int = 500,
    input_tokens: int = 100
):
    """Get token budget allocation."""
    
    # Create dummy content for budget calculation
    budget = allocator.allocate(
        system_prompt="x" * (system_tokens * 4),  # Rough char estimate
        user_input="x" * (input_tokens * 4)
    )
    
    return {
        "total": budget.total,
        "system_prompt": budget.system_prompt,
        "conversation_history": budget.conversation_history,
        "retrieved_context": budget.retrieved_context,
        "user_input": budget.user_input,
        "reserved_for_output": budget.reserved_for_output,
        "available": budget.available
    }

@app.get("/health")
async def health():
    return {"status": "healthy", "model": counter.model}

References

Conclusion

Context window management is about making the most of limited space. Start with accurate token counting using tiktoken or model-specific tokenizers. Allocate budget across components based on their importance—system prompts and recent messages typically get priority. Implement content prioritization to select the most relevant information when you can't fit everything. Use compression techniques—both extractive (selecting key sentences) and abstractive (LLM summarization)—to fit more information in less space. Build dynamic context assembly that adapts to varying input sizes. The goal is maximizing information density while maintaining coherence—every token should contribute to better responses.