Introduction: LLM APIs have strict rate limits—requests per minute, tokens per minute, and concurrent request limits. Hit these limits and your application grinds to a halt with 429 errors. Effective rate limiting isn’t just about staying under limits; it’s about maximizing throughput while maintaining reliability. This guide covers practical rate limiting patterns: token bucket algorithms for smooth request distribution, sliding window counters for accurate tracking, adaptive limiters that respond to API feedback, and distributed rate limiting for multi-instance deployments. Whether you’re building a chatbot serving thousands of users or a batch processing pipeline, these patterns will help you maximize API utilization while avoiding throttling and ensuring fair resource allocation across your application.

Token Bucket Algorithm
import asyncio
import time
from dataclasses import dataclass, field
from typing import Any, Optional
from abc import ABC, abstractmethod
@dataclass
class RateLimitConfig:
"""Rate limit configuration."""
requests_per_minute: int = 60
tokens_per_minute: int = 100000
max_concurrent: int = 10
burst_multiplier: float = 1.5
class RateLimiter(ABC):
"""Abstract rate limiter."""
@abstractmethod
async def acquire(self, tokens: int = 1) -> bool:
"""Acquire permission to make request."""
pass
@abstractmethod
async def wait_and_acquire(self, tokens: int = 1) -> float:
"""Wait until permission granted, return wait time."""
pass
class TokenBucket(RateLimiter):
"""Token bucket rate limiter."""
def __init__(
self,
rate: float, # tokens per second
capacity: int # max tokens
):
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_update = time.time()
self._lock = asyncio.Lock()
async def _refill(self):
"""Refill tokens based on elapsed time."""
now = time.time()
elapsed = now - self.last_update
# Add tokens based on time elapsed
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.rate
)
self.last_update = now
async def acquire(self, tokens: int = 1) -> bool:
"""Try to acquire tokens."""
async with self._lock:
await self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
async def wait_and_acquire(self, tokens: int = 1) -> float:
"""Wait until tokens available."""
total_wait = 0
while True:
async with self._lock:
await self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return total_wait
# Calculate wait time
needed = tokens - self.tokens
wait_time = needed / self.rate
await asyncio.sleep(wait_time)
total_wait += wait_time
@property
def available_tokens(self) -> float:
"""Get current available tokens."""
return self.tokens
class DualTokenBucket(RateLimiter):
"""Separate buckets for requests and tokens."""
def __init__(
self,
requests_per_minute: int = 60,
tokens_per_minute: int = 100000
):
# Request bucket
self.request_bucket = TokenBucket(
rate=requests_per_minute / 60,
capacity=int(requests_per_minute * 1.5) # Allow burst
)
# Token bucket
self.token_bucket = TokenBucket(
rate=tokens_per_minute / 60,
capacity=int(tokens_per_minute * 1.5)
)
async def acquire(self, tokens: int = 1) -> bool:
"""Acquire both request and token permits."""
# Check both buckets
request_ok = await self.request_bucket.acquire(1)
if not request_ok:
return False
token_ok = await self.token_bucket.acquire(tokens)
if not token_ok:
# Return request token
self.request_bucket.tokens += 1
return False
return True
async def wait_and_acquire(self, tokens: int = 1) -> float:
"""Wait for both permits."""
# Wait for request permit
request_wait = await self.request_bucket.wait_and_acquire(1)
# Wait for token permit
token_wait = await self.token_bucket.wait_and_acquire(tokens)
return request_wait + token_wait
class LeakyBucket(RateLimiter):
"""Leaky bucket for smooth output rate."""
def __init__(
self,
rate: float, # requests per second
capacity: int # max queue size
):
self.rate = rate
self.capacity = capacity
self.queue: asyncio.Queue = asyncio.Queue(maxsize=capacity)
self._running = False
self._processor: Optional[asyncio.Task] = None
async def start(self):
"""Start the leaky bucket processor."""
self._running = True
self._processor = asyncio.create_task(self._process_loop())
async def stop(self):
"""Stop the processor."""
self._running = False
if self._processor:
self._processor.cancel()
try:
await self._processor
except asyncio.CancelledError:
pass
async def _process_loop(self):
"""Process queue at fixed rate."""
interval = 1.0 / self.rate
while self._running:
try:
future = await asyncio.wait_for(
self.queue.get(),
timeout=interval
)
future.set_result(True)
except asyncio.TimeoutError:
pass
async def acquire(self, tokens: int = 1) -> bool:
"""Try to add to queue."""
try:
future = asyncio.get_event_loop().create_future()
self.queue.put_nowait(future)
return True
except asyncio.QueueFull:
return False
async def wait_and_acquire(self, tokens: int = 1) -> float:
"""Wait for queue slot and processing."""
start = time.time()
future = asyncio.get_event_loop().create_future()
await self.queue.put(future)
await future
return time.time() - start
Sliding Window Algorithms
import asyncio
import time
from dataclasses import dataclass
from typing import Any, Optional
from collections import deque
class SlidingWindowLog(RateLimiter):
"""Sliding window log rate limiter."""
def __init__(
self,
limit: int,
window_seconds: float = 60
):
self.limit = limit
self.window_seconds = window_seconds
self.requests: deque = deque()
self._lock = asyncio.Lock()
async def _cleanup(self):
"""Remove old requests outside window."""
cutoff = time.time() - self.window_seconds
while self.requests and self.requests[0] < cutoff:
self.requests.popleft()
async def acquire(self, tokens: int = 1) -> bool:
"""Try to acquire permit."""
async with self._lock:
await self._cleanup()
if len(self.requests) < self.limit:
self.requests.append(time.time())
return True
return False
async def wait_and_acquire(self, tokens: int = 1) -> float:
"""Wait until permit available."""
total_wait = 0
while True:
async with self._lock:
await self._cleanup()
if len(self.requests) < self.limit:
self.requests.append(time.time())
return total_wait
# Calculate wait time until oldest request expires
oldest = self.requests[0]
wait_time = (oldest + self.window_seconds) - time.time()
wait_time = max(0.01, wait_time)
await asyncio.sleep(wait_time)
total_wait += wait_time
class SlidingWindowCounter(RateLimiter):
"""Sliding window counter (more memory efficient)."""
def __init__(
self,
limit: int,
window_seconds: float = 60
):
self.limit = limit
self.window_seconds = window_seconds
# Current and previous window counts
self.current_count = 0
self.previous_count = 0
self.current_window_start = time.time()
self._lock = asyncio.Lock()
async def _get_count(self) -> float:
"""Get weighted count across windows."""
now = time.time()
window_start = now - (now % self.window_seconds)
# Check if we've moved to a new window
if window_start > self.current_window_start:
if window_start > self.current_window_start + self.window_seconds:
# Skipped a window
self.previous_count = 0
else:
self.previous_count = self.current_count
self.current_count = 0
self.current_window_start = window_start
# Calculate weighted count
elapsed_in_window = now - self.current_window_start
weight = elapsed_in_window / self.window_seconds
return self.previous_count * (1 - weight) + self.current_count
async def acquire(self, tokens: int = 1) -> bool:
"""Try to acquire permit."""
async with self._lock:
count = await self._get_count()
if count + tokens <= self.limit:
self.current_count += tokens
return True
return False
async def wait_and_acquire(self, tokens: int = 1) -> float:
"""Wait until permit available."""
total_wait = 0
while True:
async with self._lock:
count = await self._get_count()
if count + tokens <= self.limit:
self.current_count += tokens
return total_wait
# Wait a bit and retry
wait_time = 0.1
await asyncio.sleep(wait_time)
total_wait += wait_time
class FixedWindowCounter(RateLimiter):
"""Simple fixed window counter."""
def __init__(
self,
limit: int,
window_seconds: float = 60
):
self.limit = limit
self.window_seconds = window_seconds
self.count = 0
self.window_start = time.time()
self._lock = asyncio.Lock()
async def _check_window(self):
"""Reset if window expired."""
now = time.time()
if now - self.window_start >= self.window_seconds:
self.count = 0
self.window_start = now
async def acquire(self, tokens: int = 1) -> bool:
"""Try to acquire permit."""
async with self._lock:
await self._check_window()
if self.count + tokens <= self.limit:
self.count += tokens
return True
return False
async def wait_and_acquire(self, tokens: int = 1) -> float:
"""Wait until permit available."""
total_wait = 0
while True:
async with self._lock:
await self._check_window()
if self.count + tokens <= self.limit:
self.count += tokens
return total_wait
# Wait until window resets
wait_time = self.window_seconds - (time.time() - self.window_start)
wait_time = max(0.01, wait_time)
await asyncio.sleep(wait_time)
total_wait += wait_time
Adaptive Rate Limiting
import asyncio
import time
from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum
class RateLimitState(Enum):
"""Rate limiter state."""
NORMAL = "normal"
THROTTLED = "throttled"
RECOVERING = "recovering"
@dataclass
class AdaptiveConfig:
"""Adaptive rate limiter config."""
initial_rate: float = 1.0 # requests per second
min_rate: float = 0.1
max_rate: float = 10.0
increase_factor: float = 1.1
decrease_factor: float = 0.5
recovery_time: float = 30.0
class AdaptiveRateLimiter(RateLimiter):
"""Adapts rate based on API responses."""
def __init__(self, config: AdaptiveConfig = None):
self.config = config or AdaptiveConfig()
self.current_rate = self.config.initial_rate
self.state = RateLimitState.NORMAL
self.last_throttle = 0
self.bucket = TokenBucket(
rate=self.current_rate,
capacity=int(self.current_rate * 2)
)
self._lock = asyncio.Lock()
async def acquire(self, tokens: int = 1) -> bool:
"""Try to acquire permit."""
return await self.bucket.acquire(tokens)
async def wait_and_acquire(self, tokens: int = 1) -> float:
"""Wait for permit."""
return await self.bucket.wait_and_acquire(tokens)
async def on_success(self):
"""Called on successful request."""
async with self._lock:
if self.state == RateLimitState.RECOVERING:
# Check if recovery period passed
if time.time() - self.last_throttle >= self.config.recovery_time:
self.state = RateLimitState.NORMAL
if self.state == RateLimitState.NORMAL:
# Gradually increase rate
new_rate = min(
self.current_rate * self.config.increase_factor,
self.config.max_rate
)
if new_rate != self.current_rate:
self.current_rate = new_rate
self._update_bucket()
async def on_rate_limit(self, retry_after: float = None):
"""Called when rate limited (429)."""
async with self._lock:
self.state = RateLimitState.THROTTLED
self.last_throttle = time.time()
# Decrease rate
self.current_rate = max(
self.current_rate * self.config.decrease_factor,
self.config.min_rate
)
self._update_bucket()
# Wait for retry_after if provided
if retry_after:
await asyncio.sleep(retry_after)
self.state = RateLimitState.RECOVERING
def _update_bucket(self):
"""Update bucket with new rate."""
self.bucket.rate = self.current_rate
self.bucket.capacity = int(self.current_rate * 2)
class HeaderBasedLimiter(RateLimiter):
"""Adjust based on rate limit headers."""
def __init__(
self,
initial_limit: int = 60,
initial_remaining: int = 60
):
self.limit = initial_limit
self.remaining = initial_remaining
self.reset_time = time.time() + 60
self._lock = asyncio.Lock()
async def update_from_headers(self, headers: dict):
"""Update limits from response headers."""
async with self._lock:
# OpenAI style headers
if 'x-ratelimit-limit-requests' in headers:
self.limit = int(headers['x-ratelimit-limit-requests'])
if 'x-ratelimit-remaining-requests' in headers:
self.remaining = int(headers['x-ratelimit-remaining-requests'])
if 'x-ratelimit-reset-requests' in headers:
# Parse reset time
reset_str = headers['x-ratelimit-reset-requests']
if reset_str.endswith('s'):
self.reset_time = time.time() + float(reset_str[:-1])
elif reset_str.endswith('ms'):
self.reset_time = time.time() + float(reset_str[:-2]) / 1000
async def acquire(self, tokens: int = 1) -> bool:
"""Try to acquire permit."""
async with self._lock:
# Check if reset time passed
if time.time() >= self.reset_time:
self.remaining = self.limit
self.reset_time = time.time() + 60
if self.remaining > 0:
self.remaining -= 1
return True
return False
async def wait_and_acquire(self, tokens: int = 1) -> float:
"""Wait for permit."""
total_wait = 0
while True:
async with self._lock:
if time.time() >= self.reset_time:
self.remaining = self.limit
self.reset_time = time.time() + 60
if self.remaining > 0:
self.remaining -= 1
return total_wait
wait_time = self.reset_time - time.time()
wait_time = max(0.01, wait_time)
await asyncio.sleep(wait_time)
total_wait += wait_time
class ConcurrencyLimiter:
"""Limit concurrent requests."""
def __init__(self, max_concurrent: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.current = 0
self._lock = asyncio.Lock()
async def __aenter__(self):
await self.semaphore.acquire()
async with self._lock:
self.current += 1
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
async with self._lock:
self.current -= 1
self.semaphore.release()
@property
def available(self) -> int:
"""Get available slots."""
return self.semaphore._value
Distributed Rate Limiting
import asyncio
import time
from dataclasses import dataclass
from typing import Any, Optional
import hashlib
class RedisRateLimiter(RateLimiter):
"""Distributed rate limiter using Redis."""
def __init__(
self,
redis_client: Any,
key_prefix: str,
limit: int,
window_seconds: float = 60
):
self.redis = redis_client
self.key_prefix = key_prefix
self.limit = limit
self.window_seconds = window_seconds
def _get_key(self) -> str:
"""Get current window key."""
window = int(time.time() / self.window_seconds)
return f"{self.key_prefix}:{window}"
async def acquire(self, tokens: int = 1) -> bool:
"""Try to acquire permit using Redis."""
key = self._get_key()
# Atomic increment and check
pipe = self.redis.pipeline()
pipe.incr(key)
pipe.expire(key, int(self.window_seconds) + 1)
results = await pipe.execute()
count = results[0]
if count <= self.limit:
return True
# Over limit, decrement
await self.redis.decr(key)
return False
async def wait_and_acquire(self, tokens: int = 1) -> float:
"""Wait for permit."""
total_wait = 0
while True:
if await self.acquire(tokens):
return total_wait
# Wait until next window
current_window = int(time.time() / self.window_seconds)
next_window = (current_window + 1) * self.window_seconds
wait_time = next_window - time.time()
wait_time = max(0.01, min(wait_time, 1.0))
await asyncio.sleep(wait_time)
total_wait += wait_time
class RedisSlidingWindow(RateLimiter):
"""Redis sliding window using sorted sets."""
def __init__(
self,
redis_client: Any,
key: str,
limit: int,
window_seconds: float = 60
):
self.redis = redis_client
self.key = key
self.limit = limit
self.window_seconds = window_seconds
async def acquire(self, tokens: int = 1) -> bool:
"""Try to acquire using sorted set."""
now = time.time()
window_start = now - self.window_seconds
# Lua script for atomic operation
script = """
local key = KEYS[1]
local window_start = tonumber(ARGV[1])
local now = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local ttl = tonumber(ARGV[4])
-- Remove old entries
redis.call('ZREMRANGEBYSCORE', key, '-inf', window_start)
-- Count current entries
local count = redis.call('ZCARD', key)
if count < limit then
-- Add new entry
redis.call('ZADD', key, now, now .. '-' .. math.random())
redis.call('EXPIRE', key, ttl)
return 1
end
return 0
"""
result = await self.redis.eval(
script,
1,
self.key,
window_start,
now,
self.limit,
int(self.window_seconds) + 1
)
return result == 1
async def wait_and_acquire(self, tokens: int = 1) -> float:
"""Wait for permit."""
total_wait = 0
while True:
if await self.acquire(tokens):
return total_wait
await asyncio.sleep(0.1)
total_wait += 0.1
class ConsistentHashLimiter:
"""Distribute rate limits across instances."""
def __init__(
self,
limiters: dict[str, RateLimiter],
replicas: int = 100
):
self.limiters = limiters
self.replicas = replicas
self.ring: list[tuple[int, str]] = []
self._build_ring()
def _build_ring(self):
"""Build consistent hash ring."""
for name in self.limiters:
for i in range(self.replicas):
key = f"{name}:{i}"
hash_val = int(hashlib.md5(key.encode()).hexdigest(), 16)
self.ring.append((hash_val, name))
self.ring.sort(key=lambda x: x[0])
def _get_limiter(self, key: str) -> RateLimiter:
"""Get limiter for key using consistent hashing."""
hash_val = int(hashlib.md5(key.encode()).hexdigest(), 16)
for ring_hash, name in self.ring:
if hash_val <= ring_hash:
return self.limiters[name]
return self.limiters[self.ring[0][1]]
async def acquire(self, key: str, tokens: int = 1) -> bool:
"""Acquire from appropriate limiter."""
limiter = self._get_limiter(key)
return await limiter.acquire(tokens)
async def wait_and_acquire(self, key: str, tokens: int = 1) -> float:
"""Wait and acquire from appropriate limiter."""
limiter = self._get_limiter(key)
return await limiter.wait_and_acquire(tokens)
Rate Limited LLM Client
import asyncio
import aiohttp
from dataclasses import dataclass
from typing import Any, Optional
@dataclass
class RateLimitedConfig:
"""Rate limited client config."""
requests_per_minute: int = 60
tokens_per_minute: int = 100000
max_concurrent: int = 10
adaptive: bool = True
class RateLimitedLLMClient:
"""LLM client with comprehensive rate limiting."""
def __init__(
self,
api_key: str,
config: RateLimitedConfig = None
):
self.api_key = api_key
self.config = config or RateLimitedConfig()
# Request rate limiter
self.request_limiter = SlidingWindowCounter(
limit=self.config.requests_per_minute,
window_seconds=60
)
# Token rate limiter
self.token_limiter = TokenBucket(
rate=self.config.tokens_per_minute / 60,
capacity=self.config.tokens_per_minute
)
# Concurrency limiter
self.concurrency = ConcurrencyLimiter(self.config.max_concurrent)
# Adaptive limiter
if self.config.adaptive:
self.adaptive = AdaptiveRateLimiter()
else:
self.adaptive = None
# Header-based limiter
self.header_limiter = HeaderBasedLimiter()
self._session: Optional[aiohttp.ClientSession] = None
async def _get_session(self) -> aiohttp.ClientSession:
"""Get or create session."""
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
)
return self._session
def _estimate_tokens(self, prompt: str) -> int:
"""Estimate token count."""
return len(prompt) // 4
async def complete(
self,
prompt: str,
model: str = "gpt-4",
max_tokens: int = 1000
) -> dict:
"""Make rate-limited completion request."""
estimated_tokens = self._estimate_tokens(prompt) + max_tokens
# Wait for all rate limiters
await self.request_limiter.wait_and_acquire(1)
await self.token_limiter.wait_and_acquire(estimated_tokens)
if self.adaptive:
await self.adaptive.wait_and_acquire(1)
async with self.concurrency:
session = await self._get_session()
try:
async with session.post(
"https://api.openai.com/v1/chat/completions",
json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": max_tokens
}
) as response:
# Update header-based limiter
await self.header_limiter.update_from_headers(
dict(response.headers)
)
if response.status == 429:
# Rate limited
retry_after = response.headers.get('retry-after')
if self.adaptive:
await self.adaptive.on_rate_limit(
float(retry_after) if retry_after else None
)
raise Exception("Rate limited")
data = await response.json()
if self.adaptive:
await self.adaptive.on_success()
return data
except aiohttp.ClientError as e:
if self.adaptive:
await self.adaptive.on_rate_limit()
raise
async def close(self):
"""Close session."""
if self._session and not self._session.closed:
await self._session.close()
def get_stats(self) -> dict:
"""Get rate limiter stats."""
return {
"request_limiter": {
"current_count": self.request_limiter.current_count
},
"token_limiter": {
"available": self.token_limiter.available_tokens
},
"concurrency": {
"available": self.concurrency.available,
"current": self.concurrency.current
},
"adaptive": {
"state": self.adaptive.state.value if self.adaptive else None,
"rate": self.adaptive.current_rate if self.adaptive else None
}
}
Production Rate Limiting Service
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
class CompletionRequest(BaseModel):
prompt: str
model: str = "gpt-4"
max_tokens: int = 1000
user_id: Optional[str] = None
# Global rate limiters
global_limiter = SlidingWindowCounter(limit=1000, window_seconds=60)
# Per-user rate limiters
user_limiters: dict[str, SlidingWindowCounter] = {}
def get_user_limiter(user_id: str) -> SlidingWindowCounter:
"""Get or create user rate limiter."""
if user_id not in user_limiters:
user_limiters[user_id] = SlidingWindowCounter(
limit=60, # 60 requests per minute per user
window_seconds=60
)
return user_limiters[user_id]
# LLM client
llm_client = RateLimitedLLMClient(
api_key="your-key",
config=RateLimitedConfig(
requests_per_minute=500,
tokens_per_minute=500000,
max_concurrent=50,
adaptive=True
)
)
@app.post("/v1/completions")
async def create_completion(request: CompletionRequest):
"""Create rate-limited completion."""
# Check global limit
if not await global_limiter.acquire(1):
raise HTTPException(
status_code=429,
detail="Global rate limit exceeded"
)
# Check user limit
if request.user_id:
user_limiter = get_user_limiter(request.user_id)
if not await user_limiter.acquire(1):
raise HTTPException(
status_code=429,
detail="User rate limit exceeded"
)
try:
result = await llm_client.complete(
prompt=request.prompt,
model=request.model,
max_tokens=request.max_tokens
)
return {
"content": result["choices"][0]["message"]["content"],
"usage": result.get("usage", {})
}
except Exception as e:
if "Rate limited" in str(e):
raise HTTPException(status_code=429, detail="API rate limited")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/v1/limits")
async def get_limits(user_id: Optional[str] = None):
"""Get current rate limit status."""
response = {
"global": {
"limit": global_limiter.limit,
"current": global_limiter.current_count
},
"llm_client": llm_client.get_stats()
}
if user_id and user_id in user_limiters:
response["user"] = {
"limit": user_limiters[user_id].limit,
"current": user_limiters[user_id].current_count
}
return response
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- OpenAI Rate Limits: https://platform.openai.com/docs/guides/rate-limits
- Anthropic Rate Limits: https://docs.anthropic.com/en/api/rate-limits
- Token Bucket Algorithm: https://en.wikipedia.org/wiki/Token_bucket
- Redis Rate Limiting: https://redis.io/commands/incr/#pattern-rate-limiter
Conclusion
Effective rate limiting is essential for building reliable LLM applications. Start with token bucket or sliding window algorithms—they provide smooth request distribution and accurate tracking. Use dual limiters for both request count and token count, since LLM APIs typically enforce both. Implement adaptive rate limiting that responds to 429 errors by backing off, then gradually recovering. For multi-instance deployments, use Redis-based distributed limiters to share state across instances. Always respect rate limit headers from API responses—they provide real-time feedback on your remaining quota. Layer your rate limiting: global limits protect your API keys, per-user limits ensure fair access, and per-endpoint limits prevent abuse of expensive operations. Monitor your rate limiter metrics to understand usage patterns and adjust limits accordingly. The key insight is that rate limiting isn't just about avoiding errors—it's about maximizing throughput within constraints while maintaining a good user experience. Well-designed rate limiting lets you serve more users reliably while staying within API quotas and budget constraints.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.