Introduction: Token management is critical for LLM applications—tokens directly impact cost, latency, and whether your prompt fits within context limits. Understanding how to count tokens accurately, truncate context intelligently, and allocate token budgets across different parts of your prompt separates amateur implementations from production-ready systems. This guide covers practical token management: counting with tiktoken, smart truncation strategies, budget allocation patterns, and techniques for maximizing information density within token limits.

Token Counting with Tiktoken
import tiktoken
from typing import Union
class TokenCounter:
"""Count tokens for different models."""
# Model to encoding mapping
MODEL_ENCODINGS = {
"gpt-4o": "o200k_base",
"gpt-4o-mini": "o200k_base",
"gpt-4-turbo": "cl100k_base",
"gpt-4": "cl100k_base",
"gpt-3.5-turbo": "cl100k_base",
"text-embedding-3-small": "cl100k_base",
"text-embedding-3-large": "cl100k_base",
}
def __init__(self, model: str = "gpt-4o-mini"):
self.model = model
encoding_name = self.MODEL_ENCODINGS.get(model, "cl100k_base")
self.encoding = tiktoken.get_encoding(encoding_name)
def count(self, text: str) -> int:
"""Count tokens in text."""
return len(self.encoding.encode(text))
def count_messages(self, messages: list[dict]) -> int:
"""Count tokens in chat messages."""
# Base tokens per message (varies by model)
tokens_per_message = 3 # For gpt-4o models
tokens_per_name = 1
total = 0
for message in messages:
total += tokens_per_message
for key, value in message.items():
total += self.count(str(value))
if key == "name":
total += tokens_per_name
# Every reply is primed with assistant
total += 3
return total
def encode(self, text: str) -> list[int]:
"""Encode text to token IDs."""
return self.encoding.encode(text)
def decode(self, tokens: list[int]) -> str:
"""Decode token IDs to text."""
return self.encoding.decode(tokens)
def truncate_to_tokens(self, text: str, max_tokens: int) -> str:
"""Truncate text to max tokens."""
tokens = self.encode(text)
if len(tokens) <= max_tokens:
return text
return self.decode(tokens[:max_tokens])
# Usage
counter = TokenCounter("gpt-4o-mini")
text = "This is a sample text to count tokens."
print(f"Token count: {counter.count(text)}")
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is Python?"}
]
print(f"Message tokens: {counter.count_messages(messages)}")
# Truncate long text
long_text = "Lorem ipsum " * 1000
truncated = counter.truncate_to_tokens(long_text, 100)
print(f"Truncated to {counter.count(truncated)} tokens")
Context Window Management
from dataclasses import dataclass
from typing import Optional
from enum import Enum
class TruncationStrategy(str, Enum):
KEEP_START = "keep_start"
KEEP_END = "keep_end"
KEEP_BOTH = "keep_both"
SMART = "smart"
@dataclass
class ContextWindow:
"""Manage context window limits."""
max_tokens: int
reserved_output: int = 1000
@property
def available_input(self) -> int:
return self.max_tokens - self.reserved_output
# Model context windows
CONTEXT_WINDOWS = {
"gpt-4o": ContextWindow(128000, 16384),
"gpt-4o-mini": ContextWindow(128000, 16384),
"gpt-4-turbo": ContextWindow(128000, 4096),
"gpt-4": ContextWindow(8192, 4096),
"gpt-3.5-turbo": ContextWindow(16385, 4096),
"claude-3-5-sonnet": ContextWindow(200000, 8192),
"claude-3-opus": ContextWindow(200000, 4096),
}
class ContextManager:
"""Manage context within token limits."""
def __init__(
self,
model: str = "gpt-4o-mini",
counter: TokenCounter = None
):
self.model = model
self.window = CONTEXT_WINDOWS.get(model, ContextWindow(128000))
self.counter = counter or TokenCounter(model)
def truncate(
self,
text: str,
max_tokens: int,
strategy: TruncationStrategy = TruncationStrategy.KEEP_END
) -> str:
"""Truncate text using specified strategy."""
current_tokens = self.counter.count(text)
if current_tokens <= max_tokens:
return text
if strategy == TruncationStrategy.KEEP_START:
return self._truncate_end(text, max_tokens)
elif strategy == TruncationStrategy.KEEP_END:
return self._truncate_start(text, max_tokens)
elif strategy == TruncationStrategy.KEEP_BOTH:
return self._truncate_middle(text, max_tokens)
elif strategy == TruncationStrategy.SMART:
return self._smart_truncate(text, max_tokens)
return text
def _truncate_end(self, text: str, max_tokens: int) -> str:
"""Keep start, truncate end."""
tokens = self.counter.encode(text)
truncated = self.counter.decode(tokens[:max_tokens])
return truncated + "..."
def _truncate_start(self, text: str, max_tokens: int) -> str:
"""Keep end, truncate start."""
tokens = self.counter.encode(text)
truncated = self.counter.decode(tokens[-max_tokens:])
return "..." + truncated
def _truncate_middle(self, text: str, max_tokens: int) -> str:
"""Keep start and end, truncate middle."""
tokens = self.counter.encode(text)
half = max_tokens // 2
start = self.counter.decode(tokens[:half])
end = self.counter.decode(tokens[-half:])
return start + "\n...[truncated]...\n" + end
def _smart_truncate(self, text: str, max_tokens: int) -> str:
"""Smart truncation preserving sentence boundaries."""
sentences = text.split(". ")
result = []
current_tokens = 0
for sentence in sentences:
sentence_tokens = self.counter.count(sentence + ". ")
if current_tokens + sentence_tokens > max_tokens:
break
result.append(sentence)
current_tokens += sentence_tokens
return ". ".join(result) + "."
def fit_messages(
self,
messages: list[dict],
system_prompt: str = None
) -> list[dict]:
"""Fit messages within context window."""
available = self.window.available_input
# Reserve space for system prompt
if system_prompt:
system_tokens = self.counter.count(system_prompt) + 10
available -= system_tokens
# Calculate current usage
total_tokens = self.counter.count_messages(messages)
if total_tokens <= available:
return messages
# Truncate oldest messages first (keep recent context)
result = []
current_tokens = 0
for message in reversed(messages):
msg_tokens = self.counter.count(message.get("content", "")) + 10
if current_tokens + msg_tokens > available:
break
result.insert(0, message)
current_tokens += msg_tokens
return result
Token Budget Allocation
from dataclasses import dataclass, field
from typing import Callable
@dataclass
class TokenBudget:
"""Allocate token budget across prompt components."""
total: int
system_prompt: int = 0
context: int = 0
examples: int = 0
user_input: int = 0
reserved_output: int = 0
@property
def allocated(self) -> int:
return (
self.system_prompt +
self.context +
self.examples +
self.user_input +
self.reserved_output
)
@property
def remaining(self) -> int:
return self.total - self.allocated
class BudgetAllocator:
"""Allocate token budgets for different prompt components."""
def __init__(
self,
model: str = "gpt-4o-mini",
counter: TokenCounter = None
):
self.model = model
self.window = CONTEXT_WINDOWS.get(model, ContextWindow(128000))
self.counter = counter or TokenCounter(model)
def allocate(
self,
system_prompt: str,
context: str = "",
examples: list[dict] = None,
user_input: str = "",
output_tokens: int = 1000
) -> TokenBudget:
"""Allocate budget based on actual content."""
budget = TokenBudget(total=self.window.max_tokens)
# Fixed allocations
budget.reserved_output = output_tokens
budget.system_prompt = self.counter.count(system_prompt)
budget.user_input = self.counter.count(user_input)
# Calculate remaining for context and examples
remaining = budget.remaining
if examples:
examples_text = "\n".join([
f"Q: {e.get('input', '')}\nA: {e.get('output', '')}"
for e in examples
])
budget.examples = min(
self.counter.count(examples_text),
remaining // 2 # Max 50% for examples
)
remaining = budget.remaining
# Rest goes to context
if context:
budget.context = min(
self.counter.count(context),
remaining
)
return budget
def allocate_proportional(
self,
total_tokens: int,
proportions: dict[str, float]
) -> dict[str, int]:
"""Allocate tokens proportionally."""
# Normalize proportions
total_prop = sum(proportions.values())
normalized = {k: v / total_prop for k, v in proportions.items()}
# Allocate
allocated = {}
remaining = total_tokens
for component, prop in normalized.items():
tokens = int(total_tokens * prop)
allocated[component] = min(tokens, remaining)
remaining -= allocated[component]
return allocated
class PromptBuilder:
"""Build prompts within token budget."""
def __init__(
self,
model: str = "gpt-4o-mini",
max_output_tokens: int = 1000
):
self.model = model
self.counter = TokenCounter(model)
self.context_manager = ContextManager(model, self.counter)
self.allocator = BudgetAllocator(model, self.counter)
self.max_output = max_output_tokens
def build(
self,
system_prompt: str,
user_input: str,
context: str = "",
examples: list[dict] = None
) -> list[dict]:
"""Build messages within budget."""
# Get budget allocation
budget = self.allocator.allocate(
system_prompt=system_prompt,
context=context,
examples=examples,
user_input=user_input,
output_tokens=self.max_output
)
messages = []
# System prompt (always include full)
messages.append({
"role": "system",
"content": system_prompt
})
# Build user message
user_content_parts = []
# Add context (truncated if needed)
if context:
truncated_context = self.context_manager.truncate(
context,
budget.context,
TruncationStrategy.SMART
)
user_content_parts.append(f"Context:\n{truncated_context}")
# Add examples
if examples:
examples_text = self._format_examples(examples, budget.examples)
if examples_text:
user_content_parts.append(f"Examples:\n{examples_text}")
# Add user input
user_content_parts.append(f"Question: {user_input}")
messages.append({
"role": "user",
"content": "\n\n".join(user_content_parts)
})
return messages
def _format_examples(self, examples: list[dict], max_tokens: int) -> str:
"""Format examples within token budget."""
result = []
current_tokens = 0
for ex in examples:
ex_text = f"Q: {ex.get('input', '')}\nA: {ex.get('output', '')}"
ex_tokens = self.counter.count(ex_text)
if current_tokens + ex_tokens > max_tokens:
break
result.append(ex_text)
current_tokens += ex_tokens
return "\n\n".join(result)
# Usage
builder = PromptBuilder("gpt-4o-mini", max_output_tokens=2000)
messages = builder.build(
system_prompt="You are a helpful coding assistant.",
user_input="How do I read a file in Python?",
context="The user is working on a data processing script...",
examples=[
{"input": "How to print?", "output": "Use print('hello')"},
{"input": "How to loop?", "output": "Use for i in range(10):"}
]
)
print(f"Built {len(messages)} messages")
Conversation History Management
from collections import deque
from datetime import datetime
class ConversationBuffer:
"""Manage conversation history within token limits."""
def __init__(
self,
model: str = "gpt-4o-mini",
max_history_tokens: int = 10000,
max_messages: int = 50
):
self.counter = TokenCounter(model)
self.max_tokens = max_history_tokens
self.max_messages = max_messages
self.messages: deque = deque(maxlen=max_messages)
self.current_tokens = 0
def add_message(self, role: str, content: str):
"""Add message to buffer."""
message = {
"role": role,
"content": content,
"timestamp": datetime.now().isoformat()
}
msg_tokens = self.counter.count(content) + 10
# Remove old messages if over budget
while self.current_tokens + msg_tokens > self.max_tokens and self.messages:
old_msg = self.messages.popleft()
old_tokens = self.counter.count(old_msg["content"]) + 10
self.current_tokens -= old_tokens
self.messages.append(message)
self.current_tokens += msg_tokens
def get_messages(self) -> list[dict]:
"""Get messages for API call."""
return [
{"role": m["role"], "content": m["content"]}
for m in self.messages
]
def summarize_and_compress(self, client) -> str:
"""Summarize old messages to compress history."""
if len(self.messages) < 10:
return None
# Get oldest messages to summarize
old_messages = list(self.messages)[:len(self.messages) // 2]
summary_prompt = "Summarize this conversation concisely:\n\n"
for msg in old_messages:
summary_prompt += f"{msg['role']}: {msg['content']}\n"
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": summary_prompt}]
)
summary = response.choices[0].message.content
# Replace old messages with summary
for _ in range(len(old_messages)):
old_msg = self.messages.popleft()
self.current_tokens -= self.counter.count(old_msg["content"]) + 10
# Add summary as system context
self.messages.appendleft({
"role": "system",
"content": f"Previous conversation summary: {summary}",
"timestamp": datetime.now().isoformat()
})
self.current_tokens += self.counter.count(summary) + 20
return summary
class SlidingWindowBuffer:
"""Sliding window approach to conversation history."""
def __init__(
self,
model: str = "gpt-4o-mini",
window_tokens: int = 8000
):
self.counter = TokenCounter(model)
self.window_tokens = window_tokens
self.messages: list[dict] = []
def add_message(self, role: str, content: str):
"""Add message and slide window if needed."""
self.messages.append({"role": role, "content": content})
self._slide_window()
def _slide_window(self):
"""Remove oldest messages to fit window."""
while self._total_tokens() > self.window_tokens and len(self.messages) > 1:
self.messages.pop(0)
def _total_tokens(self) -> int:
"""Calculate total tokens in buffer."""
return sum(
self.counter.count(m["content"]) + 10
for m in self.messages
)
def get_messages(self) -> list[dict]:
"""Get current window of messages."""
return self.messages.copy()
Cost Estimation
from dataclasses import dataclass
@dataclass
class ModelPricing:
"""Pricing per 1K tokens."""
input_cost: float
output_cost: float
cached_input_cost: float = None
# Pricing as of late 2024 (USD per 1K tokens)
MODEL_PRICING = {
"gpt-4o": ModelPricing(0.0025, 0.01, 0.00125),
"gpt-4o-mini": ModelPricing(0.00015, 0.0006, 0.000075),
"gpt-4-turbo": ModelPricing(0.01, 0.03),
"gpt-4": ModelPricing(0.03, 0.06),
"gpt-3.5-turbo": ModelPricing(0.0005, 0.0015),
"claude-3-5-sonnet": ModelPricing(0.003, 0.015),
"claude-3-opus": ModelPricing(0.015, 0.075),
}
class CostEstimator:
"""Estimate API costs based on token usage."""
def __init__(self, model: str = "gpt-4o-mini"):
self.model = model
self.pricing = MODEL_PRICING.get(model)
self.counter = TokenCounter(model)
# Track usage
self.total_input_tokens = 0
self.total_output_tokens = 0
self.total_cost = 0.0
def estimate_cost(
self,
input_tokens: int,
output_tokens: int,
cached_input: int = 0
) -> float:
"""Estimate cost for token usage."""
if not self.pricing:
return 0.0
regular_input = input_tokens - cached_input
cost = (
(regular_input / 1000) * self.pricing.input_cost +
(output_tokens / 1000) * self.pricing.output_cost
)
if cached_input > 0 and self.pricing.cached_input_cost:
cost += (cached_input / 1000) * self.pricing.cached_input_cost
return cost
def estimate_from_prompt(
self,
prompt: str,
expected_output_tokens: int = 500
) -> dict:
"""Estimate cost from prompt text."""
input_tokens = self.counter.count(prompt)
cost = self.estimate_cost(input_tokens, expected_output_tokens)
return {
"input_tokens": input_tokens,
"expected_output_tokens": expected_output_tokens,
"estimated_cost": cost
}
def record_usage(self, input_tokens: int, output_tokens: int):
"""Record actual usage."""
self.total_input_tokens += input_tokens
self.total_output_tokens += output_tokens
self.total_cost += self.estimate_cost(input_tokens, output_tokens)
def get_usage_report(self) -> dict:
"""Get usage report."""
return {
"model": self.model,
"total_input_tokens": self.total_input_tokens,
"total_output_tokens": self.total_output_tokens,
"total_tokens": self.total_input_tokens + self.total_output_tokens,
"total_cost": round(self.total_cost, 6)
}
# Usage
estimator = CostEstimator("gpt-4o-mini")
# Estimate before calling
estimate = estimator.estimate_from_prompt(
"Explain machine learning in detail",
expected_output_tokens=1000
)
print(f"Estimated cost: ${estimate['estimated_cost']:.6f}")
# Record actual usage
estimator.record_usage(input_tokens=50, output_tokens=800)
estimator.record_usage(input_tokens=100, output_tokens=500)
# Get report
report = estimator.get_usage_report()
print(f"Total cost: ${report['total_cost']:.6f}")
Production Token Management Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
# Initialize components
counter = TokenCounter("gpt-4o-mini")
context_manager = ContextManager("gpt-4o-mini")
cost_estimator = CostEstimator("gpt-4o-mini")
class TokenCountRequest(BaseModel):
text: str
model: str = "gpt-4o-mini"
class TruncateRequest(BaseModel):
text: str
max_tokens: int
strategy: str = "smart"
class BuildPromptRequest(BaseModel):
system_prompt: str
user_input: str
context: Optional[str] = ""
max_output_tokens: int = 1000
@app.post("/tokens/count")
async def count_tokens(request: TokenCountRequest):
"""Count tokens in text."""
local_counter = TokenCounter(request.model)
count = local_counter.count(request.text)
return {
"text_length": len(request.text),
"token_count": count,
"model": request.model
}
@app.post("/tokens/truncate")
async def truncate_text(request: TruncateRequest):
"""Truncate text to token limit."""
strategy = TruncationStrategy(request.strategy)
truncated = context_manager.truncate(
request.text,
request.max_tokens,
strategy
)
return {
"original_tokens": counter.count(request.text),
"truncated_tokens": counter.count(truncated),
"truncated_text": truncated
}
@app.post("/prompt/build")
async def build_prompt(request: BuildPromptRequest):
"""Build optimized prompt within budget."""
builder = PromptBuilder("gpt-4o-mini", request.max_output_tokens)
messages = builder.build(
system_prompt=request.system_prompt,
user_input=request.user_input,
context=request.context or ""
)
total_tokens = counter.count_messages(messages)
return {
"messages": messages,
"total_tokens": total_tokens,
"available_output": request.max_output_tokens
}
@app.post("/cost/estimate")
async def estimate_cost(
input_tokens: int,
output_tokens: int,
model: str = "gpt-4o-mini"
):
"""Estimate API cost."""
local_estimator = CostEstimator(model)
cost = local_estimator.estimate_cost(input_tokens, output_tokens)
return {
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"estimated_cost": round(cost, 6)
}
@app.get("/usage/report")
async def usage_report():
"""Get usage report."""
return cost_estimator.get_usage_report()
References
- Tiktoken: https://github.com/openai/tiktoken
- OpenAI Tokenizer: https://platform.openai.com/tokenizer
- OpenAI Pricing: https://openai.com/pricing
- Context Length Guide: https://platform.openai.com/docs/models
Conclusion
Effective token management is essential for production LLM applications. Use tiktoken for accurate token counting—don't estimate based on word count. Implement smart truncation strategies that preserve important information: keep recent context for conversations, use sentence-boundary truncation for documents, and consider summarization for very long contexts. Allocate token budgets explicitly across prompt components to ensure each part gets appropriate space. Track costs continuously and set alerts for unexpected usage spikes. For conversations, use sliding windows or periodic summarization to maintain context within limits. The goal is maximizing information density while staying within context windows and cost budgets.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.

Leave a Reply