Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

Setting up Local NPM repository to Speedup Dev/CI Builds

As a modern day JavaScript developer working with Node.js and NPM, it has been always any developer’s case to clean up local node modules sometimes when local build is broken. It is a tedious tasks to cleanup %appData%\npm-cache  to do a fresh install of all the modules again. Depending on the number of modules your project have, you will get stuck up for few minutes to hours to complete npm module installation depending on your Internet bandwidth.

Another scenario we can think of it on a build server or CI server, where we need to cleanup the entire modules during each build process, and ‘npm install’ would be like a fresh start, would take longer time to have your build complete.

What if we have a simple way of caching these packages locally, so that we do not have to download again from Internet every-time.  I will help you with a simple solution, that once setup will resolve some of these problems effectively.

Introducing Local-NPM


local-npm is a Node server that acts as a local npm registry. It serves modules, caches them, and updates them whenever they change. Basically it’s a local mirror, but without having to replicate the entire npm registry.

This allows your npm install commands to (mostly) work off-line. Also, your NPM modules  get faster and faster over time, as commonly-installed modules are aggressively cached.

local-npm acts as a proxy between you and the main npm registry. You run npm install commands like normal, but under the hood, all requests are sent through the local server.

 

Getting Started with Local-NPM:

Step 1: Install the module ‘local-npm’
[code lang=”js”]$ npm install –g local-npm[/code]
Step 2: launch local-npm  and this will start the local npm server
[code lang=”js”]$ local-npm[/code]
This will start the local npm server at localhost:5080.
[code lang=”js”]http://127.0.0.1:5080[/code]
PS: Please note that, this step would take some time as this module tried to replicate the entire NPM repository remote skimdb to the local couchdb instance for efficient caching. But it will not eat up your disk space, as it caches modules based on usage only, it will not replicate the entire NPM repository.

Step 3: Validate the local-NPM registry

There is a basic NPMJS like UI to browse through local packages which can be accessed through.

http://localhost:5080/_browse.

Step 4: Then set npm to point to the local server:
[code lang=”js”]$ npm set registry http://127.0.0.1:5080[/code]
Step 5: run  “npm install” of your modules and you can see that local-NPM caches these modules that you regularly use.

Incase, to switch back to default NPMJS registry, you can do:
[code lang=”js”]$ npm set registry https://registry.npmjs.org[/code]

How it works?

npm is built on top of Apache CouchDB (a No-SQL db), so local-npm works by replicating the full “skimdb” database to a local PouchDB Server.

You can inspect the running database at http://127.0.0.1:16984/_utils.

References

To understand more on local-NPM and documentation visit the module repository in github@https://github.com/local-npm/local-npm

Introduction to Kubernetes

What is Kubernetes?

Kubernetes (a.k.a K8s) is an open-source system for automating deployment, scaling and management of containerized applications that was originally designed by Google and now maintained by the Cloud Native Computing Foundation.

What Kubernetes can do?
Kubernetes has a number of features in cloud computing world, it can be thought as a :

  • A container platform
  • A microservices platform
  • A portable cloud platform and a lot more

Kubernetes defines a set of building blocks (“primitives”) which collectively provide mechanisms for deploying, maintaining, and scaling applications. The components which make up Kubernetes are designed to be loosely coupled and extensible so that it can meet a wide variety of different workloads. The extensibility is provided in large part by the Kubernetes API, which is used by internal components as well as extensions and containers running on Kubernetes.

If you are interested  to know more, learn more about Kubernates  through Official tutorials:

Some useful online training is:

Azure Cosmos DB name changes

An update from Microsoft Azure says that – As part of the transition from Azure DocumentDB to Azure Cosmos DB, the service and resource names are changing from “Azure DocumentDB” to “Azure Cosmos DB” on June 1, 2018.

How does that Impact?

When Microsoft introduced Cosmos DB, then have ensured that there was a smooth transition or migration of existing Document DB customers /tenants to Cosmos DB. This was achieved by without changing underlying service and resource names from “Document DB” to “Cosmos DB”.

So, if you were an existing customer of Document DB, you have noticed the only disappearance of Document DB name and old service showing simply Cosmos DB. You did not feel much difference apart from some additional configuration options as part of multi-modal data source configuration.

Your ARM deployment templates might need some changes in resource sizing, resource location, and some other configuration aspects.

There is no a pricing impact because of this change, but you will have to modify billing parameters that rely on the new names. Now with this deadline what Microsoft intends to have is to deprecate the use of Old DocumentDB naming and start migrating all customers/tenants to follow the new naming for the resource billing/sizing purposes.

To read more about the naming changes: https://azure.microsoft.com/en-us/updates/name-changes-cosmos-db/

Kubernetes vs Service Fabric

What is the difference between Kubernates and Service Fabric?

It is a common question today among most of the business stakeholders, infrastructure specialists, and information technology architects.

 

 

 

 

 

 

 

 

 

To answer in simpler words, quoting from this Reddit log :

  • Kubernetes manage/orchestrate containers and applications within. 
  • ServiceFabric is a framework for microservices based on one of three models; stateful, stateless, actor. Service Fabric provides a framework for creating micro services, runtime for managing distributed instances, and also provides the ‘fabric’ that holds everything together.

A detailed comparison quoting from an MSDN blog  from here:

Azure Container Service: If you are looking to deploy your application in Linux environment and are comfortable with an orchestrator such as Swarm, Kubernetes or DC/OS, use ACS. A typical 3 tier application (such as a web front end, a caching layer, a API layer and a database layer) can be easily container-ized with 1 single dockerfile (or docker-compose file). It can be continuously decomposed into smaller services gradually. This approach provides an immediate benefit of portability of such an application. Containers is Open technology and there is great community support around containers.

Azure Service Fabric: If an application must have its state saved locally, then use Service Fabric. It is also a good choice if you are looking to deploy the application in Windows server ecosystem(Linux support is in the works as well!). Refer to common workloads on Service Fabric for more discussion on applications that can benefit from Service Fabric. Biggest benefit is that Service Fabric applications can run on-premise, on Azure or even in other cloud platforms also.

What’s Azure Container Service (ACS/AKS)

I will start with history: Sometime around 2016, Microsoft launched an IaaS service called Azure Container Service a.k.an ACS serves as a bridge between Azure Ecosystem and existing container ecosystem being used widely by the developer community around the world.

kubernates_azureIt helps as a gateway for infrastructure engineers and developers to manage underlying infrastructure such as Virtual Machines, Storage, Network Load Balancing services individually than the application itself.  The application developer doesn’t have to worry about planet-scale of the application, instead, a container orchestrator can manage the scale up and scale down of your application environment based on peaks and downs of your application usage.

It offers an option to select from 3 major container orchestrators available today such as DC/OS, Swarm, Docker, and Kubernates.   ACS along with your choice of container orchestrators works efficiently with different container ecosystems to enable the promise of application virtualization.

To make it simpler, ACS is your Super Glue to gel your Azure infrastructure and your container orchestrator together. Means you will be able to make your fully managed container cluster in a matter of minutes with Azure.

ACS is for making your microservices dream come true, by providing individual services scale according to the demand and automatically reduce the scale, if usage is low. You don’t have to worry, ACS and your container orchestrator will take care of you.

If you are a beginner to container-based infrastructure for your applications, you don’t have to take the pain at all of setting up Kubernates on your own, instead, ACS will simplify your implementation with a couple of easier click thru’s and your container infrastructure is ready to be fully managed by you. As simple as that.

What is Azure Container Kubernates Service (AKS) then?

As I am writing today, Microsoft has a new fully managed PaaS service called as Azure Container Service (AKS) or Managed Kubernates, meaning that Kubernates would be your default fully managed container orchestrator, if you choose Azure Container Service. But you would be able to deploy other open-source container orchestrators if you prefer to choose to have your own unmanaged Kubernates, Docker or DC/OS and then add your specific management and monitoring tools.

This service is currently available in PUBLIC PREVIEW, you can get started from here

Means though it is a fully managed service, you still have the option to manage it your own using your preferred set of tools and orchestrators.

Charging Model

Whether you manage your AKS service with your own set of tools and orchestrator or you use Fully Managed Kubernates, you only need to pay for resources you consume. No need to worry about per-cluster charges like other providers.

Useful References:

Context Window Management: Maximizing LLM Input Utilization

Introduction: Context windows are the lifeblood of LLM applications—they determine how much information your model can process at once. Even with 128K+ token models, you’ll hit limits when dealing with long documents, conversation histories, or multi-document RAG. Poor context management leads to truncated information, lost context, and degraded responses. This guide covers practical strategies for maximizing context utilization: smart truncation that preserves important content, sliding window approaches for conversations, content prioritization based on relevance, and compression techniques that maintain semantic meaning. Whether you’re building a document Q&A system, long-running agent, or multi-turn chatbot, these patterns will help you make the most of every token in your context window.

Context Window Management
Context Window: Smart Truncation, Content Prioritization, Compression

Token Counting and Limits

from dataclasses import dataclass, field
from typing import Any, Optional
from abc import ABC, abstractmethod

@dataclass
class TokenCount:
    """Token count result."""
    
    total: int
    by_section: dict[str, int] = field(default_factory=dict)

@dataclass
class ContextBudget:
    """Context window budget."""
    
    max_tokens: int
    reserved_for_output: int = 1000
    reserved_for_system: int = 500
    
    @property
    def available_for_input(self) -> int:
        """Tokens available for input content."""
        return self.max_tokens - self.reserved_for_output - self.reserved_for_system

class TokenCounter(ABC):
    """Abstract token counter."""
    
    @abstractmethod
    def count(self, text: str) -> int:
        """Count tokens in text."""
        pass
    
    @abstractmethod
    def count_messages(self, messages: list[dict]) -> int:
        """Count tokens in message list."""
        pass

class TiktokenCounter(TokenCounter):
    """Token counter using tiktoken."""
    
    def __init__(self, model: str = "gpt-4"):
        import tiktoken
        
        try:
            self.encoding = tiktoken.encoding_for_model(model)
        except KeyError:
            self.encoding = tiktoken.get_encoding("cl100k_base")
        
        self.model = model
    
    def count(self, text: str) -> int:
        """Count tokens in text."""
        return len(self.encoding.encode(text))
    
    def count_messages(self, messages: list[dict]) -> int:
        """Count tokens in messages with overhead."""
        
        # Message overhead varies by model
        tokens_per_message = 3  # <|start|>role<|end|>
        tokens_per_name = 1
        
        total = 0
        
        for message in messages:
            total += tokens_per_message
            
            for key, value in message.items():
                total += self.count(str(value))
                
                if key == "name":
                    total += tokens_per_name
        
        total += 3  # Reply priming
        
        return total

class ApproximateCounter(TokenCounter):
    """Fast approximate token counter."""
    
    def __init__(self, chars_per_token: float = 4.0):
        self.chars_per_token = chars_per_token
    
    def count(self, text: str) -> int:
        """Approximate token count."""
        return int(len(text) / self.chars_per_token)
    
    def count_messages(self, messages: list[dict]) -> int:
        """Approximate message token count."""
        
        total = 0
        
        for message in messages:
            # Add overhead per message
            total += 4
            
            for value in message.values():
                total += self.count(str(value))
        
        return total

class ContextManager:
    """Manage context window usage."""
    
    def __init__(
        self,
        budget: ContextBudget,
        counter: TokenCounter = None
    ):
        self.budget = budget
        self.counter = counter or ApproximateCounter()
    
    def check_fits(self, content: str) -> tuple[bool, int]:
        """Check if content fits in budget."""
        
        tokens = self.counter.count(content)
        fits = tokens <= self.budget.available_for_input
        
        return fits, tokens
    
    def get_usage(self, content: str) -> dict:
        """Get detailed usage stats."""
        
        tokens = self.counter.count(content)
        
        return {
            "used": tokens,
            "available": self.budget.available_for_input,
            "remaining": self.budget.available_for_input - tokens,
            "utilization": tokens / self.budget.available_for_input,
            "fits": tokens <= self.budget.available_for_input
        }
    
    def allocate(
        self,
        sections: dict[str, str],
        priorities: dict[str, int] = None
    ) -> dict[str, int]:
        """Allocate tokens to sections by priority."""
        
        if priorities is None:
            priorities = {name: 1 for name in sections}
        
        # Count tokens per section
        section_tokens = {
            name: self.counter.count(content)
            for name, content in sections.items()
        }
        
        total_needed = sum(section_tokens.values())
        available = self.budget.available_for_input
        
        if total_needed <= available:
            # Everything fits
            return section_tokens
        
        # Need to allocate proportionally by priority
        total_priority = sum(priorities.values())
        
        allocations = {}
        remaining = available
        
        # Sort by priority (highest first)
        sorted_sections = sorted(
            sections.keys(),
            key=lambda x: priorities.get(x, 1),
            reverse=True
        )
        
        for name in sorted_sections:
            priority_share = priorities.get(name, 1) / total_priority
            max_allocation = int(available * priority_share)
            
            actual = min(section_tokens[name], max_allocation, remaining)
            allocations[name] = actual
            remaining -= actual
        
        return allocations

Smart Truncation

from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum

class TruncationStrategy(Enum):
    """Truncation strategies."""
    
    HEAD = "head"      # Keep beginning
    TAIL = "tail"      # Keep end
    MIDDLE = "middle"  # Keep beginning and end
    SMART = "smart"    # Content-aware

class Truncator:
    """Smart text truncation."""
    
    def __init__(self, counter: TokenCounter):
        self.counter = counter
    
    def truncate(
        self,
        text: str,
        max_tokens: int,
        strategy: TruncationStrategy = TruncationStrategy.SMART
    ) -> str:
        """Truncate text to fit token limit."""
        
        current_tokens = self.counter.count(text)
        
        if current_tokens <= max_tokens:
            return text
        
        if strategy == TruncationStrategy.HEAD:
            return self._truncate_head(text, max_tokens)
        elif strategy == TruncationStrategy.TAIL:
            return self._truncate_tail(text, max_tokens)
        elif strategy == TruncationStrategy.MIDDLE:
            return self._truncate_middle(text, max_tokens)
        else:
            return self._truncate_smart(text, max_tokens)
    
    def _truncate_head(self, text: str, max_tokens: int) -> str:
        """Keep beginning of text."""
        
        # Binary search for cutoff point
        words = text.split()
        low, high = 0, len(words)
        
        while low < high:
            mid = (low + high + 1) // 2
            candidate = " ".join(words[:mid])
            
            if self.counter.count(candidate) <= max_tokens:
                low = mid
            else:
                high = mid - 1
        
        result = " ".join(words[:low])
        return result + "..." if low < len(words) else result
    
    def _truncate_tail(self, text: str, max_tokens: int) -> str:
        """Keep end of text."""
        
        words = text.split()
        low, high = 0, len(words)
        
        while low < high:
            mid = (low + high + 1) // 2
            candidate = " ".join(words[-mid:])
            
            if self.counter.count(candidate) <= max_tokens:
                low = mid
            else:
                high = mid - 1
        
        result = " ".join(words[-low:]) if low > 0 else ""
        return "..." + result if low < len(words) else result
    
    def _truncate_middle(self, text: str, max_tokens: int) -> str:
        """Keep beginning and end."""
        
        # Reserve tokens for ellipsis
        available = max_tokens - 5
        head_tokens = available // 2
        tail_tokens = available - head_tokens
        
        head = self._truncate_head(text, head_tokens)
        tail = self._truncate_tail(text, tail_tokens)
        
        return f"{head}\n\n[...content truncated...]\n\n{tail}"
    
    def _truncate_smart(self, text: str, max_tokens: int) -> str:
        """Content-aware truncation."""
        
        # Split into paragraphs
        paragraphs = text.split('\n\n')
        
        if len(paragraphs) <= 2:
            return self._truncate_middle(text, max_tokens)
        
        # Score paragraphs by importance
        scored = []
        for i, para in enumerate(paragraphs):
            score = self._score_paragraph(para, i, len(paragraphs))
            scored.append((i, para, score))
        
        # Sort by score
        scored.sort(key=lambda x: x[2], reverse=True)
        
        # Greedily add paragraphs
        selected = []
        used_tokens = 0
        
        for idx, para, score in scored:
            para_tokens = self.counter.count(para)
            
            if used_tokens + para_tokens <= max_tokens:
                selected.append((idx, para))
                used_tokens += para_tokens
        
        # Sort by original order
        selected.sort(key=lambda x: x[0])
        
        return "\n\n".join(para for _, para in selected)
    
    def _score_paragraph(
        self,
        paragraph: str,
        position: int,
        total: int
    ) -> float:
        """Score paragraph importance."""
        
        score = 0.0
        
        # Position score (first and last are important)
        if position == 0:
            score += 2.0
        elif position == total - 1:
            score += 1.5
        elif position == 1:
            score += 1.0
        
        # Length score (medium length preferred)
        words = len(paragraph.split())
        if 20 <= words <= 100:
            score += 1.0
        elif words > 100:
            score += 0.5
        
        # Content signals
        important_markers = [
            "important", "key", "main", "summary",
            "conclusion", "result", "finding"
        ]
        
        para_lower = paragraph.lower()
        for marker in important_markers:
            if marker in para_lower:
                score += 0.5
        
        return score

class SentenceTruncator:
    """Truncate at sentence boundaries."""
    
    def __init__(self, counter: TokenCounter):
        self.counter = counter
    
    def truncate(
        self,
        text: str,
        max_tokens: int,
        keep_end: bool = False
    ) -> str:
        """Truncate at sentence boundaries."""
        
        import re
        
        # Split into sentences
        sentences = re.split(r'(?<=[.!?])\s+', text)
        
        if keep_end:
            sentences = list(reversed(sentences))
        
        selected = []
        used_tokens = 0
        
        for sentence in sentences:
            sentence_tokens = self.counter.count(sentence)
            
            if used_tokens + sentence_tokens <= max_tokens:
                selected.append(sentence)
                used_tokens += sentence_tokens
            else:
                break
        
        if keep_end:
            selected = list(reversed(selected))
        
        result = " ".join(selected)
        
        if len(selected) < len(sentences):
            if keep_end:
                result = "..." + result
            else:
                result = result + "..."
        
        return result

Conversation Window Management

from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
from collections import deque

@dataclass
class Message:
    """A conversation message."""
    
    role: str
    content: str
    timestamp: datetime = field(default_factory=datetime.now)
    tokens: int = 0
    metadata: dict = field(default_factory=dict)

class SlidingWindowManager:
    """Manage conversation with sliding window."""
    
    def __init__(
        self,
        max_tokens: int,
        counter: TokenCounter,
        system_prompt: str = ""
    ):
        self.max_tokens = max_tokens
        self.counter = counter
        self.system_prompt = system_prompt
        self.system_tokens = counter.count(system_prompt)
        
        self.messages: deque[Message] = deque()
        self.total_tokens = self.system_tokens
    
    def add_message(self, role: str, content: str) -> bool:
        """Add message to conversation."""
        
        tokens = self.counter.count(content) + 4  # Message overhead
        
        message = Message(
            role=role,
            content=content,
            tokens=tokens
        )
        
        # Check if we need to evict old messages
        while (self.total_tokens + tokens > self.max_tokens 
               and len(self.messages) > 0):
            evicted = self.messages.popleft()
            self.total_tokens -= evicted.tokens
        
        self.messages.append(message)
        self.total_tokens += tokens
        
        return True
    
    def get_messages(self) -> list[dict]:
        """Get messages for API call."""
        
        result = []
        
        if self.system_prompt:
            result.append({
                "role": "system",
                "content": self.system_prompt
            })
        
        for msg in self.messages:
            result.append({
                "role": msg.role,
                "content": msg.content
            })
        
        return result
    
    def get_usage(self) -> dict:
        """Get window usage stats."""
        
        return {
            "total_tokens": self.total_tokens,
            "max_tokens": self.max_tokens,
            "message_count": len(self.messages),
            "utilization": self.total_tokens / self.max_tokens
        }

class SummarizingWindowManager:
    """Summarize old messages to save space."""
    
    def __init__(
        self,
        max_tokens: int,
        counter: TokenCounter,
        summarizer: Any,  # LLM client
        summary_threshold: float = 0.8
    ):
        self.max_tokens = max_tokens
        self.counter = counter
        self.summarizer = summarizer
        self.summary_threshold = summary_threshold
        
        self.messages: list[Message] = []
        self.summary: str = ""
        self.summary_tokens = 0
    
    async def add_message(self, role: str, content: str):
        """Add message, summarizing if needed."""
        
        tokens = self.counter.count(content) + 4
        
        message = Message(
            role=role,
            content=content,
            tokens=tokens
        )
        
        self.messages.append(message)
        
        # Check if we need to summarize
        total = self._get_total_tokens()
        
        if total / self.max_tokens > self.summary_threshold:
            await self._summarize_old_messages()
    
    def _get_total_tokens(self) -> int:
        """Get total token count."""
        
        return self.summary_tokens + sum(m.tokens for m in self.messages)
    
    async def _summarize_old_messages(self):
        """Summarize older messages."""
        
        if len(self.messages) < 4:
            return
        
        # Keep last 2 messages, summarize the rest
        to_summarize = self.messages[:-2]
        self.messages = self.messages[-2:]
        
        # Build summary prompt
        conversation = "\n".join(
            f"{m.role}: {m.content}"
            for m in to_summarize
        )
        
        prompt = f"""Summarize this conversation concisely, preserving key information:

{conversation}

Summary:"""
        
        response = await self.summarizer.complete(prompt)
        new_summary = response.content
        
        # Combine with existing summary
        if self.summary:
            self.summary = f"{self.summary}\n\n{new_summary}"
        else:
            self.summary = new_summary
        
        self.summary_tokens = self.counter.count(self.summary)
    
    def get_messages(self) -> list[dict]:
        """Get messages for API call."""
        
        result = []
        
        if self.summary:
            result.append({
                "role": "system",
                "content": f"Previous conversation summary:\n{self.summary}"
            })
        
        for msg in self.messages:
            result.append({
                "role": msg.role,
                "content": msg.content
            })
        
        return result

class ImportanceBasedWindow:
    """Keep messages based on importance."""
    
    def __init__(
        self,
        max_tokens: int,
        counter: TokenCounter
    ):
        self.max_tokens = max_tokens
        self.counter = counter
        self.messages: list[Message] = []
    
    def add_message(
        self,
        role: str,
        content: str,
        importance: float = 1.0
    ):
        """Add message with importance score."""
        
        tokens = self.counter.count(content) + 4
        
        message = Message(
            role=role,
            content=content,
            tokens=tokens,
            metadata={"importance": importance}
        )
        
        self.messages.append(message)
        self._prune_if_needed()
    
    def _prune_if_needed(self):
        """Remove low-importance messages if over budget."""
        
        total = sum(m.tokens for m in self.messages)
        
        while total > self.max_tokens and len(self.messages) > 2:
            # Find lowest importance message (not first or last)
            candidates = self.messages[1:-1]
            
            if not candidates:
                break
            
            lowest = min(
                candidates,
                key=lambda m: m.metadata.get("importance", 1.0)
            )
            
            self.messages.remove(lowest)
            total -= lowest.tokens
    
    def get_messages(self) -> list[dict]:
        """Get messages for API call."""
        
        return [
            {"role": m.role, "content": m.content}
            for m in self.messages
        ]

RAG Context Optimization

from dataclasses import dataclass
from typing import Any, Optional

@dataclass
class RetrievedChunk:
    """A retrieved document chunk."""
    
    content: str
    score: float
    source: str
    tokens: int = 0

class RAGContextOptimizer:
    """Optimize context for RAG."""
    
    def __init__(
        self,
        max_context_tokens: int,
        counter: TokenCounter
    ):
        self.max_tokens = max_context_tokens
        self.counter = counter
    
    def select_chunks(
        self,
        chunks: list[RetrievedChunk],
        query: str
    ) -> list[RetrievedChunk]:
        """Select chunks that fit in context."""
        
        # Count tokens for each chunk
        for chunk in chunks:
            chunk.tokens = self.counter.count(chunk.content)
        
        # Greedy selection by score
        selected = []
        used_tokens = 0
        
        # Sort by score
        sorted_chunks = sorted(chunks, key=lambda c: c.score, reverse=True)
        
        for chunk in sorted_chunks:
            if used_tokens + chunk.tokens <= self.max_tokens:
                selected.append(chunk)
                used_tokens += chunk.tokens
        
        return selected
    
    def deduplicate_chunks(
        self,
        chunks: list[RetrievedChunk],
        similarity_threshold: float = 0.9
    ) -> list[RetrievedChunk]:
        """Remove near-duplicate chunks."""
        
        if len(chunks) <= 1:
            return chunks
        
        # Simple deduplication based on content overlap
        unique = [chunks[0]]
        
        for chunk in chunks[1:]:
            is_duplicate = False
            
            for existing in unique:
                overlap = self._compute_overlap(chunk.content, existing.content)
                
                if overlap > similarity_threshold:
                    is_duplicate = True
                    break
            
            if not is_duplicate:
                unique.append(chunk)
        
        return unique
    
    def _compute_overlap(self, text1: str, text2: str) -> float:
        """Compute word overlap ratio."""
        
        words1 = set(text1.lower().split())
        words2 = set(text2.lower().split())
        
        if not words1 or not words2:
            return 0.0
        
        intersection = len(words1 & words2)
        union = len(words1 | words2)
        
        return intersection / union
    
    def reorder_chunks(
        self,
        chunks: list[RetrievedChunk],
        strategy: str = "score"
    ) -> list[RetrievedChunk]:
        """Reorder chunks for optimal context."""
        
        if strategy == "score":
            # Highest score first
            return sorted(chunks, key=lambda c: c.score, reverse=True)
        
        elif strategy == "lost_in_middle":
            # Important at beginning and end
            sorted_chunks = sorted(chunks, key=lambda c: c.score, reverse=True)
            
            if len(sorted_chunks) <= 2:
                return sorted_chunks
            
            # Interleave: best at start, second best at end, etc.
            result = []
            for i, chunk in enumerate(sorted_chunks):
                if i % 2 == 0:
                    result.append(chunk)
                else:
                    result.insert(len(result) // 2, chunk)
            
            return result
        
        elif strategy == "chronological":
            # By source order (if available)
            return sorted(
                chunks,
                key=lambda c: c.source
            )
        
        return chunks

class ContextBuilder:
    """Build optimized context from multiple sources."""
    
    def __init__(
        self,
        budget: ContextBudget,
        counter: TokenCounter
    ):
        self.budget = budget
        self.counter = counter
    
    def build(
        self,
        system_prompt: str,
        retrieved_context: str,
        conversation_history: list[dict],
        user_query: str
    ) -> list[dict]:
        """Build optimized message list."""
        
        # Calculate token budgets
        system_tokens = self.counter.count(system_prompt)
        query_tokens = self.counter.count(user_query)
        
        available = self.budget.available_for_input
        remaining = available - system_tokens - query_tokens - 50  # Buffer
        
        # Allocate remaining between context and history
        context_budget = int(remaining * 0.6)
        history_budget = remaining - context_budget
        
        # Truncate context if needed
        truncator = Truncator(self.counter)
        truncated_context = truncator.truncate(
            retrieved_context,
            context_budget,
            TruncationStrategy.SMART
        )
        
        # Truncate history if needed
        history_tokens = sum(
            self.counter.count(m["content"]) + 4
            for m in conversation_history
        )
        
        if history_tokens > history_budget:
            # Keep most recent messages
            kept_history = []
            used = 0
            
            for msg in reversed(conversation_history):
                msg_tokens = self.counter.count(msg["content"]) + 4
                
                if used + msg_tokens <= history_budget:
                    kept_history.insert(0, msg)
                    used += msg_tokens
                else:
                    break
            
            conversation_history = kept_history
        
        # Build final message list
        messages = [
            {"role": "system", "content": system_prompt}
        ]
        
        if truncated_context:
            messages.append({
                "role": "system",
                "content": f"Relevant context:\n{truncated_context}"
            })
        
        messages.extend(conversation_history)
        
        messages.append({
            "role": "user",
            "content": user_query
        })
        
        return messages

Long Document Processing

from dataclasses import dataclass
from typing import Any, Optional, AsyncIterator

@dataclass
class DocumentSection:
    """A document section."""
    
    title: str
    content: str
    level: int  # Heading level
    tokens: int = 0

class LongDocumentProcessor:
    """Process documents longer than context window."""
    
    def __init__(
        self,
        max_tokens: int,
        counter: TokenCounter,
        llm_client: Any
    ):
        self.max_tokens = max_tokens
        self.counter = counter
        self.llm = llm_client
    
    def split_by_sections(self, document: str) -> list[DocumentSection]:
        """Split document into sections."""
        
        import re
        
        # Find markdown headers
        header_pattern = r'^(#{1,6})\s+(.+)$'
        
        lines = document.split('\n')
        sections = []
        current_section = DocumentSection(title="", content="", level=0)
        
        for line in lines:
            match = re.match(header_pattern, line)
            
            if match:
                # Save current section
                if current_section.content.strip():
                    current_section.tokens = self.counter.count(current_section.content)
                    sections.append(current_section)
                
                # Start new section
                level = len(match.group(1))
                title = match.group(2)
                current_section = DocumentSection(
                    title=title,
                    content="",
                    level=level
                )
            else:
                current_section.content += line + "\n"
        
        # Don't forget last section
        if current_section.content.strip():
            current_section.tokens = self.counter.count(current_section.content)
            sections.append(current_section)
        
        return sections
    
    async def map_reduce_summarize(
        self,
        document: str,
        query: str = None
    ) -> str:
        """Summarize using map-reduce."""
        
        sections = self.split_by_sections(document)
        
        # Map: Summarize each section
        section_summaries = []
        
        for section in sections:
            if section.tokens > self.max_tokens:
                # Section too large, chunk it
                chunks = self._chunk_text(section.content, self.max_tokens // 2)
                
                chunk_summaries = []
                for chunk in chunks:
                    summary = await self._summarize_chunk(chunk, query)
                    chunk_summaries.append(summary)
                
                section_summary = "\n".join(chunk_summaries)
            else:
                section_summary = await self._summarize_chunk(
                    section.content,
                    query
                )
            
            section_summaries.append(f"## {section.title}\n{section_summary}")
        
        # Reduce: Combine summaries
        combined = "\n\n".join(section_summaries)
        
        if self.counter.count(combined) > self.max_tokens:
            # Need another reduction pass
            return await self._final_reduce(combined, query)
        
        return combined
    
    async def _summarize_chunk(self, chunk: str, query: str = None) -> str:
        """Summarize a single chunk."""
        
        if query:
            prompt = f"""Summarize the following text, focusing on information relevant to: {query}

Text:
{chunk}

Summary:"""
        else:
            prompt = f"""Summarize the following text concisely:

Text:
{chunk}

Summary:"""
        
        response = await self.llm.complete(prompt)
        return response.content
    
    async def _final_reduce(self, summaries: str, query: str = None) -> str:
        """Final reduction of summaries."""
        
        prompt = f"""Combine these section summaries into a coherent overall summary:

{summaries}

Combined summary:"""
        
        response = await self.llm.complete(prompt)
        return response.content
    
    def _chunk_text(self, text: str, max_tokens: int) -> list[str]:
        """Chunk text to fit token limit."""
        
        truncator = SentenceTruncator(self.counter)
        chunks = []
        remaining = text
        
        while remaining:
            chunk = truncator.truncate(remaining, max_tokens)
            chunks.append(chunk)
            
            # Remove processed content
            if len(chunk) >= len(remaining):
                break
            
            remaining = remaining[len(chunk):].strip()
        
        return chunks
    
    async def iterative_refinement(
        self,
        document: str,
        query: str
    ) -> AsyncIterator[str]:
        """Process document iteratively, yielding partial results."""
        
        sections = self.split_by_sections(document)
        
        accumulated_context = ""
        
        for section in sections:
            # Process section with accumulated context
            prompt = f"""Based on the document so far:
{accumulated_context}

And this new section:
{section.content}

Answer the question: {query}

Provide your current best answer:"""
            
            response = await self.llm.complete(prompt)
            
            yield response.content
            
            # Update accumulated context
            summary = await self._summarize_chunk(section.content, query)
            accumulated_context += f"\n\n{section.title}: {summary}"
            
            # Truncate if needed
            if self.counter.count(accumulated_context) > self.max_tokens // 2:
                accumulated_context = await self._summarize_chunk(
                    accumulated_context,
                    query
                )

Production Context Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

class TruncateRequest(BaseModel):
    text: str
    max_tokens: int
    strategy: str = "smart"

class ContextRequest(BaseModel):
    system_prompt: str
    retrieved_context: str
    conversation_history: list[dict]
    user_query: str
    max_tokens: int = 8000

class WindowRequest(BaseModel):
    messages: list[dict]
    max_tokens: int = 4000
    new_message: Optional[dict] = None

# Initialize components
counter = ApproximateCounter()
truncator = Truncator(counter)

@app.post("/v1/truncate")
async def truncate_text(request: TruncateRequest) -> dict:
    """Truncate text to fit token limit."""
    
    strategy = TruncationStrategy[request.strategy.upper()]
    
    result = truncator.truncate(
        request.text,
        request.max_tokens,
        strategy
    )
    
    return {
        "truncated": result,
        "original_tokens": counter.count(request.text),
        "result_tokens": counter.count(result)
    }

@app.post("/v1/context/build")
async def build_context(request: ContextRequest) -> dict:
    """Build optimized context."""
    
    budget = ContextBudget(
        max_tokens=request.max_tokens,
        reserved_for_output=1000
    )
    
    builder = ContextBuilder(budget, counter)
    
    messages = builder.build(
        request.system_prompt,
        request.retrieved_context,
        request.conversation_history,
        request.user_query
    )
    
    total_tokens = sum(
        counter.count(m["content"]) + 4
        for m in messages
    )
    
    return {
        "messages": messages,
        "total_tokens": total_tokens,
        "utilization": total_tokens / budget.available_for_input
    }

@app.post("/v1/window/manage")
async def manage_window(request: WindowRequest) -> dict:
    """Manage conversation window."""
    
    window = SlidingWindowManager(
        max_tokens=request.max_tokens,
        counter=counter
    )
    
    # Add existing messages
    for msg in request.messages:
        window.add_message(msg["role"], msg["content"])
    
    # Add new message if provided
    if request.new_message:
        window.add_message(
            request.new_message["role"],
            request.new_message["content"]
        )
    
    return {
        "messages": window.get_messages(),
        "usage": window.get_usage()
    }

@app.post("/v1/tokens/count")
async def count_tokens(text: str) -> dict:
    """Count tokens in text."""
    
    return {
        "tokens": counter.count(text),
        "characters": len(text)
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Context window management is a critical skill for building effective LLM applications. Start with accurate token counting—approximate methods work for rough estimates, but use tiktoken for precise budgeting. Implement smart truncation that preserves important content rather than blindly cutting at character limits. For conversations, use sliding windows with summarization to maintain context across long sessions. In RAG systems, deduplicate and reorder chunks to maximize information density. Be aware of the "lost in the middle" phenomenon—models pay more attention to content at the beginning and end of context. For documents longer than your context window, use map-reduce patterns to process in chunks and combine results. Monitor your context utilization and adjust budgets based on actual usage patterns. The key insight is that context management isn't just about fitting content—it's about maximizing the signal-to-noise ratio in your context window. Every token should earn its place by contributing to better responses.