LLM Rate Limiting: Maximizing API Throughput Without Getting Throttled

Introduction: LLM APIs have strict rate limits—requests per minute, tokens per minute, and concurrent request limits. Hit these limits and your application grinds to a halt with 429 errors. Effective rate limiting isn’t just about staying under limits; it’s about maximizing throughput while maintaining reliability. This guide covers practical rate limiting patterns: token bucket algorithms for smooth request distribution, sliding window counters for accurate tracking, adaptive limiters that respond to API feedback, and distributed rate limiting for multi-instance deployments. Whether you’re building a chatbot serving thousands of users or a batch processing pipeline, these patterns will help you maximize API utilization while avoiding throttling and ensuring fair resource allocation across your application.

Rate Limiting
Rate Limiting: Token Bucket, Sliding Window, Adaptive Limiter

Token Bucket Algorithm

import asyncio
import time
from dataclasses import dataclass, field
from typing import Any, Optional
from abc import ABC, abstractmethod

@dataclass
class RateLimitConfig:
    """Rate limit configuration."""
    
    requests_per_minute: int = 60
    tokens_per_minute: int = 100000
    max_concurrent: int = 10
    burst_multiplier: float = 1.5

class RateLimiter(ABC):
    """Abstract rate limiter."""
    
    @abstractmethod
    async def acquire(self, tokens: int = 1) -> bool:
        """Acquire permission to make request."""
        pass
    
    @abstractmethod
    async def wait_and_acquire(self, tokens: int = 1) -> float:
        """Wait until permission granted, return wait time."""
        pass

class TokenBucket(RateLimiter):
    """Token bucket rate limiter."""
    
    def __init__(
        self,
        rate: float,  # tokens per second
        capacity: int  # max tokens
    ):
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_update = time.time()
        self._lock = asyncio.Lock()
    
    async def _refill(self):
        """Refill tokens based on elapsed time."""
        
        now = time.time()
        elapsed = now - self.last_update
        
        # Add tokens based on time elapsed
        self.tokens = min(
            self.capacity,
            self.tokens + elapsed * self.rate
        )
        
        self.last_update = now
    
    async def acquire(self, tokens: int = 1) -> bool:
        """Try to acquire tokens."""
        
        async with self._lock:
            await self._refill()
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            
            return False
    
    async def wait_and_acquire(self, tokens: int = 1) -> float:
        """Wait until tokens available."""
        
        total_wait = 0
        
        while True:
            async with self._lock:
                await self._refill()
                
                if self.tokens >= tokens:
                    self.tokens -= tokens
                    return total_wait
                
                # Calculate wait time
                needed = tokens - self.tokens
                wait_time = needed / self.rate
            
            await asyncio.sleep(wait_time)
            total_wait += wait_time
    
    @property
    def available_tokens(self) -> float:
        """Get current available tokens."""
        return self.tokens

class DualTokenBucket(RateLimiter):
    """Separate buckets for requests and tokens."""
    
    def __init__(
        self,
        requests_per_minute: int = 60,
        tokens_per_minute: int = 100000
    ):
        # Request bucket
        self.request_bucket = TokenBucket(
            rate=requests_per_minute / 60,
            capacity=int(requests_per_minute * 1.5)  # Allow burst
        )
        
        # Token bucket
        self.token_bucket = TokenBucket(
            rate=tokens_per_minute / 60,
            capacity=int(tokens_per_minute * 1.5)
        )
    
    async def acquire(self, tokens: int = 1) -> bool:
        """Acquire both request and token permits."""
        
        # Check both buckets
        request_ok = await self.request_bucket.acquire(1)
        
        if not request_ok:
            return False
        
        token_ok = await self.token_bucket.acquire(tokens)
        
        if not token_ok:
            # Return request token
            self.request_bucket.tokens += 1
            return False
        
        return True
    
    async def wait_and_acquire(self, tokens: int = 1) -> float:
        """Wait for both permits."""
        
        # Wait for request permit
        request_wait = await self.request_bucket.wait_and_acquire(1)
        
        # Wait for token permit
        token_wait = await self.token_bucket.wait_and_acquire(tokens)
        
        return request_wait + token_wait

class LeakyBucket(RateLimiter):
    """Leaky bucket for smooth output rate."""
    
    def __init__(
        self,
        rate: float,  # requests per second
        capacity: int  # max queue size
    ):
        self.rate = rate
        self.capacity = capacity
        self.queue: asyncio.Queue = asyncio.Queue(maxsize=capacity)
        self._running = False
        self._processor: Optional[asyncio.Task] = None
    
    async def start(self):
        """Start the leaky bucket processor."""
        
        self._running = True
        self._processor = asyncio.create_task(self._process_loop())
    
    async def stop(self):
        """Stop the processor."""
        
        self._running = False
        
        if self._processor:
            self._processor.cancel()
            try:
                await self._processor
            except asyncio.CancelledError:
                pass
    
    async def _process_loop(self):
        """Process queue at fixed rate."""
        
        interval = 1.0 / self.rate
        
        while self._running:
            try:
                future = await asyncio.wait_for(
                    self.queue.get(),
                    timeout=interval
                )
                future.set_result(True)
            except asyncio.TimeoutError:
                pass
    
    async def acquire(self, tokens: int = 1) -> bool:
        """Try to add to queue."""
        
        try:
            future = asyncio.get_event_loop().create_future()
            self.queue.put_nowait(future)
            return True
        except asyncio.QueueFull:
            return False
    
    async def wait_and_acquire(self, tokens: int = 1) -> float:
        """Wait for queue slot and processing."""
        
        start = time.time()
        
        future = asyncio.get_event_loop().create_future()
        await self.queue.put(future)
        await future
        
        return time.time() - start

Sliding Window Algorithms

import asyncio
import time
from dataclasses import dataclass
from typing import Any, Optional
from collections import deque

class SlidingWindowLog(RateLimiter):
    """Sliding window log rate limiter."""
    
    def __init__(
        self,
        limit: int,
        window_seconds: float = 60
    ):
        self.limit = limit
        self.window_seconds = window_seconds
        self.requests: deque = deque()
        self._lock = asyncio.Lock()
    
    async def _cleanup(self):
        """Remove old requests outside window."""
        
        cutoff = time.time() - self.window_seconds
        
        while self.requests and self.requests[0] < cutoff:
            self.requests.popleft()
    
    async def acquire(self, tokens: int = 1) -> bool:
        """Try to acquire permit."""
        
        async with self._lock:
            await self._cleanup()
            
            if len(self.requests) < self.limit:
                self.requests.append(time.time())
                return True
            
            return False
    
    async def wait_and_acquire(self, tokens: int = 1) -> float:
        """Wait until permit available."""
        
        total_wait = 0
        
        while True:
            async with self._lock:
                await self._cleanup()
                
                if len(self.requests) < self.limit:
                    self.requests.append(time.time())
                    return total_wait
                
                # Calculate wait time until oldest request expires
                oldest = self.requests[0]
                wait_time = (oldest + self.window_seconds) - time.time()
                wait_time = max(0.01, wait_time)
            
            await asyncio.sleep(wait_time)
            total_wait += wait_time

class SlidingWindowCounter(RateLimiter):
    """Sliding window counter (more memory efficient)."""
    
    def __init__(
        self,
        limit: int,
        window_seconds: float = 60
    ):
        self.limit = limit
        self.window_seconds = window_seconds
        
        # Current and previous window counts
        self.current_count = 0
        self.previous_count = 0
        self.current_window_start = time.time()
        
        self._lock = asyncio.Lock()
    
    async def _get_count(self) -> float:
        """Get weighted count across windows."""
        
        now = time.time()
        window_start = now - (now % self.window_seconds)
        
        # Check if we've moved to a new window
        if window_start > self.current_window_start:
            if window_start > self.current_window_start + self.window_seconds:
                # Skipped a window
                self.previous_count = 0
            else:
                self.previous_count = self.current_count
            
            self.current_count = 0
            self.current_window_start = window_start
        
        # Calculate weighted count
        elapsed_in_window = now - self.current_window_start
        weight = elapsed_in_window / self.window_seconds
        
        return self.previous_count * (1 - weight) + self.current_count
    
    async def acquire(self, tokens: int = 1) -> bool:
        """Try to acquire permit."""
        
        async with self._lock:
            count = await self._get_count()
            
            if count + tokens <= self.limit:
                self.current_count += tokens
                return True
            
            return False
    
    async def wait_and_acquire(self, tokens: int = 1) -> float:
        """Wait until permit available."""
        
        total_wait = 0
        
        while True:
            async with self._lock:
                count = await self._get_count()
                
                if count + tokens <= self.limit:
                    self.current_count += tokens
                    return total_wait
            
            # Wait a bit and retry
            wait_time = 0.1
            await asyncio.sleep(wait_time)
            total_wait += wait_time

class FixedWindowCounter(RateLimiter):
    """Simple fixed window counter."""
    
    def __init__(
        self,
        limit: int,
        window_seconds: float = 60
    ):
        self.limit = limit
        self.window_seconds = window_seconds
        self.count = 0
        self.window_start = time.time()
        self._lock = asyncio.Lock()
    
    async def _check_window(self):
        """Reset if window expired."""
        
        now = time.time()
        
        if now - self.window_start >= self.window_seconds:
            self.count = 0
            self.window_start = now
    
    async def acquire(self, tokens: int = 1) -> bool:
        """Try to acquire permit."""
        
        async with self._lock:
            await self._check_window()
            
            if self.count + tokens <= self.limit:
                self.count += tokens
                return True
            
            return False
    
    async def wait_and_acquire(self, tokens: int = 1) -> float:
        """Wait until permit available."""
        
        total_wait = 0
        
        while True:
            async with self._lock:
                await self._check_window()
                
                if self.count + tokens <= self.limit:
                    self.count += tokens
                    return total_wait
                
                # Wait until window resets
                wait_time = self.window_seconds - (time.time() - self.window_start)
                wait_time = max(0.01, wait_time)
            
            await asyncio.sleep(wait_time)
            total_wait += wait_time

Adaptive Rate Limiting

import asyncio
import time
from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum

class RateLimitState(Enum):
    """Rate limiter state."""
    
    NORMAL = "normal"
    THROTTLED = "throttled"
    RECOVERING = "recovering"

@dataclass
class AdaptiveConfig:
    """Adaptive rate limiter config."""
    
    initial_rate: float = 1.0  # requests per second
    min_rate: float = 0.1
    max_rate: float = 10.0
    increase_factor: float = 1.1
    decrease_factor: float = 0.5
    recovery_time: float = 30.0

class AdaptiveRateLimiter(RateLimiter):
    """Adapts rate based on API responses."""
    
    def __init__(self, config: AdaptiveConfig = None):
        self.config = config or AdaptiveConfig()
        self.current_rate = self.config.initial_rate
        self.state = RateLimitState.NORMAL
        self.last_throttle = 0
        
        self.bucket = TokenBucket(
            rate=self.current_rate,
            capacity=int(self.current_rate * 2)
        )
        
        self._lock = asyncio.Lock()
    
    async def acquire(self, tokens: int = 1) -> bool:
        """Try to acquire permit."""
        return await self.bucket.acquire(tokens)
    
    async def wait_and_acquire(self, tokens: int = 1) -> float:
        """Wait for permit."""
        return await self.bucket.wait_and_acquire(tokens)
    
    async def on_success(self):
        """Called on successful request."""
        
        async with self._lock:
            if self.state == RateLimitState.RECOVERING:
                # Check if recovery period passed
                if time.time() - self.last_throttle >= self.config.recovery_time:
                    self.state = RateLimitState.NORMAL
            
            if self.state == RateLimitState.NORMAL:
                # Gradually increase rate
                new_rate = min(
                    self.current_rate * self.config.increase_factor,
                    self.config.max_rate
                )
                
                if new_rate != self.current_rate:
                    self.current_rate = new_rate
                    self._update_bucket()
    
    async def on_rate_limit(self, retry_after: float = None):
        """Called when rate limited (429)."""
        
        async with self._lock:
            self.state = RateLimitState.THROTTLED
            self.last_throttle = time.time()
            
            # Decrease rate
            self.current_rate = max(
                self.current_rate * self.config.decrease_factor,
                self.config.min_rate
            )
            
            self._update_bucket()
            
            # Wait for retry_after if provided
            if retry_after:
                await asyncio.sleep(retry_after)
            
            self.state = RateLimitState.RECOVERING
    
    def _update_bucket(self):
        """Update bucket with new rate."""
        
        self.bucket.rate = self.current_rate
        self.bucket.capacity = int(self.current_rate * 2)

class HeaderBasedLimiter(RateLimiter):
    """Adjust based on rate limit headers."""
    
    def __init__(
        self,
        initial_limit: int = 60,
        initial_remaining: int = 60
    ):
        self.limit = initial_limit
        self.remaining = initial_remaining
        self.reset_time = time.time() + 60
        self._lock = asyncio.Lock()
    
    async def update_from_headers(self, headers: dict):
        """Update limits from response headers."""
        
        async with self._lock:
            # OpenAI style headers
            if 'x-ratelimit-limit-requests' in headers:
                self.limit = int(headers['x-ratelimit-limit-requests'])
            
            if 'x-ratelimit-remaining-requests' in headers:
                self.remaining = int(headers['x-ratelimit-remaining-requests'])
            
            if 'x-ratelimit-reset-requests' in headers:
                # Parse reset time
                reset_str = headers['x-ratelimit-reset-requests']
                if reset_str.endswith('s'):
                    self.reset_time = time.time() + float(reset_str[:-1])
                elif reset_str.endswith('ms'):
                    self.reset_time = time.time() + float(reset_str[:-2]) / 1000
    
    async def acquire(self, tokens: int = 1) -> bool:
        """Try to acquire permit."""
        
        async with self._lock:
            # Check if reset time passed
            if time.time() >= self.reset_time:
                self.remaining = self.limit
                self.reset_time = time.time() + 60
            
            if self.remaining > 0:
                self.remaining -= 1
                return True
            
            return False
    
    async def wait_and_acquire(self, tokens: int = 1) -> float:
        """Wait for permit."""
        
        total_wait = 0
        
        while True:
            async with self._lock:
                if time.time() >= self.reset_time:
                    self.remaining = self.limit
                    self.reset_time = time.time() + 60
                
                if self.remaining > 0:
                    self.remaining -= 1
                    return total_wait
                
                wait_time = self.reset_time - time.time()
                wait_time = max(0.01, wait_time)
            
            await asyncio.sleep(wait_time)
            total_wait += wait_time

class ConcurrencyLimiter:
    """Limit concurrent requests."""
    
    def __init__(self, max_concurrent: int = 10):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.current = 0
        self._lock = asyncio.Lock()
    
    async def __aenter__(self):
        await self.semaphore.acquire()
        async with self._lock:
            self.current += 1
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        async with self._lock:
            self.current -= 1
        self.semaphore.release()
    
    @property
    def available(self) -> int:
        """Get available slots."""
        return self.semaphore._value

Distributed Rate Limiting

import asyncio
import time
from dataclasses import dataclass
from typing import Any, Optional
import hashlib

class RedisRateLimiter(RateLimiter):
    """Distributed rate limiter using Redis."""
    
    def __init__(
        self,
        redis_client: Any,
        key_prefix: str,
        limit: int,
        window_seconds: float = 60
    ):
        self.redis = redis_client
        self.key_prefix = key_prefix
        self.limit = limit
        self.window_seconds = window_seconds
    
    def _get_key(self) -> str:
        """Get current window key."""
        
        window = int(time.time() / self.window_seconds)
        return f"{self.key_prefix}:{window}"
    
    async def acquire(self, tokens: int = 1) -> bool:
        """Try to acquire permit using Redis."""
        
        key = self._get_key()
        
        # Atomic increment and check
        pipe = self.redis.pipeline()
        pipe.incr(key)
        pipe.expire(key, int(self.window_seconds) + 1)
        
        results = await pipe.execute()
        count = results[0]
        
        if count <= self.limit:
            return True
        
        # Over limit, decrement
        await self.redis.decr(key)
        return False
    
    async def wait_and_acquire(self, tokens: int = 1) -> float:
        """Wait for permit."""
        
        total_wait = 0
        
        while True:
            if await self.acquire(tokens):
                return total_wait
            
            # Wait until next window
            current_window = int(time.time() / self.window_seconds)
            next_window = (current_window + 1) * self.window_seconds
            wait_time = next_window - time.time()
            wait_time = max(0.01, min(wait_time, 1.0))
            
            await asyncio.sleep(wait_time)
            total_wait += wait_time

class RedisSlidingWindow(RateLimiter):
    """Redis sliding window using sorted sets."""
    
    def __init__(
        self,
        redis_client: Any,
        key: str,
        limit: int,
        window_seconds: float = 60
    ):
        self.redis = redis_client
        self.key = key
        self.limit = limit
        self.window_seconds = window_seconds
    
    async def acquire(self, tokens: int = 1) -> bool:
        """Try to acquire using sorted set."""
        
        now = time.time()
        window_start = now - self.window_seconds
        
        # Lua script for atomic operation
        script = """
        local key = KEYS[1]
        local window_start = tonumber(ARGV[1])
        local now = tonumber(ARGV[2])
        local limit = tonumber(ARGV[3])
        local ttl = tonumber(ARGV[4])
        
        -- Remove old entries
        redis.call('ZREMRANGEBYSCORE', key, '-inf', window_start)
        
        -- Count current entries
        local count = redis.call('ZCARD', key)
        
        if count < limit then
            -- Add new entry
            redis.call('ZADD', key, now, now .. '-' .. math.random())
            redis.call('EXPIRE', key, ttl)
            return 1
        end
        
        return 0
        """
        
        result = await self.redis.eval(
            script,
            1,
            self.key,
            window_start,
            now,
            self.limit,
            int(self.window_seconds) + 1
        )
        
        return result == 1
    
    async def wait_and_acquire(self, tokens: int = 1) -> float:
        """Wait for permit."""
        
        total_wait = 0
        
        while True:
            if await self.acquire(tokens):
                return total_wait
            
            await asyncio.sleep(0.1)
            total_wait += 0.1

class ConsistentHashLimiter:
    """Distribute rate limits across instances."""
    
    def __init__(
        self,
        limiters: dict[str, RateLimiter],
        replicas: int = 100
    ):
        self.limiters = limiters
        self.replicas = replicas
        self.ring: list[tuple[int, str]] = []
        
        self._build_ring()
    
    def _build_ring(self):
        """Build consistent hash ring."""
        
        for name in self.limiters:
            for i in range(self.replicas):
                key = f"{name}:{i}"
                hash_val = int(hashlib.md5(key.encode()).hexdigest(), 16)
                self.ring.append((hash_val, name))
        
        self.ring.sort(key=lambda x: x[0])
    
    def _get_limiter(self, key: str) -> RateLimiter:
        """Get limiter for key using consistent hashing."""
        
        hash_val = int(hashlib.md5(key.encode()).hexdigest(), 16)
        
        for ring_hash, name in self.ring:
            if hash_val <= ring_hash:
                return self.limiters[name]
        
        return self.limiters[self.ring[0][1]]
    
    async def acquire(self, key: str, tokens: int = 1) -> bool:
        """Acquire from appropriate limiter."""
        
        limiter = self._get_limiter(key)
        return await limiter.acquire(tokens)
    
    async def wait_and_acquire(self, key: str, tokens: int = 1) -> float:
        """Wait and acquire from appropriate limiter."""
        
        limiter = self._get_limiter(key)
        return await limiter.wait_and_acquire(tokens)

Rate Limited LLM Client

import asyncio
import aiohttp
from dataclasses import dataclass
from typing import Any, Optional

@dataclass
class RateLimitedConfig:
    """Rate limited client config."""
    
    requests_per_minute: int = 60
    tokens_per_minute: int = 100000
    max_concurrent: int = 10
    adaptive: bool = True

class RateLimitedLLMClient:
    """LLM client with comprehensive rate limiting."""
    
    def __init__(
        self,
        api_key: str,
        config: RateLimitedConfig = None
    ):
        self.api_key = api_key
        self.config = config or RateLimitedConfig()
        
        # Request rate limiter
        self.request_limiter = SlidingWindowCounter(
            limit=self.config.requests_per_minute,
            window_seconds=60
        )
        
        # Token rate limiter
        self.token_limiter = TokenBucket(
            rate=self.config.tokens_per_minute / 60,
            capacity=self.config.tokens_per_minute
        )
        
        # Concurrency limiter
        self.concurrency = ConcurrencyLimiter(self.config.max_concurrent)
        
        # Adaptive limiter
        if self.config.adaptive:
            self.adaptive = AdaptiveRateLimiter()
        else:
            self.adaptive = None
        
        # Header-based limiter
        self.header_limiter = HeaderBasedLimiter()
        
        self._session: Optional[aiohttp.ClientSession] = None
    
    async def _get_session(self) -> aiohttp.ClientSession:
        """Get or create session."""
        
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession(
                headers={
                    "Authorization": f"Bearer {self.api_key}",
                    "Content-Type": "application/json"
                }
            )
        return self._session
    
    def _estimate_tokens(self, prompt: str) -> int:
        """Estimate token count."""
        return len(prompt) // 4
    
    async def complete(
        self,
        prompt: str,
        model: str = "gpt-4",
        max_tokens: int = 1000
    ) -> dict:
        """Make rate-limited completion request."""
        
        estimated_tokens = self._estimate_tokens(prompt) + max_tokens
        
        # Wait for all rate limiters
        await self.request_limiter.wait_and_acquire(1)
        await self.token_limiter.wait_and_acquire(estimated_tokens)
        
        if self.adaptive:
            await self.adaptive.wait_and_acquire(1)
        
        async with self.concurrency:
            session = await self._get_session()
            
            try:
                async with session.post(
                    "https://api.openai.com/v1/chat/completions",
                    json={
                        "model": model,
                        "messages": [{"role": "user", "content": prompt}],
                        "max_tokens": max_tokens
                    }
                ) as response:
                    # Update header-based limiter
                    await self.header_limiter.update_from_headers(
                        dict(response.headers)
                    )
                    
                    if response.status == 429:
                        # Rate limited
                        retry_after = response.headers.get('retry-after')
                        
                        if self.adaptive:
                            await self.adaptive.on_rate_limit(
                                float(retry_after) if retry_after else None
                            )
                        
                        raise Exception("Rate limited")
                    
                    data = await response.json()
                    
                    if self.adaptive:
                        await self.adaptive.on_success()
                    
                    return data
            
            except aiohttp.ClientError as e:
                if self.adaptive:
                    await self.adaptive.on_rate_limit()
                raise
    
    async def close(self):
        """Close session."""
        
        if self._session and not self._session.closed:
            await self._session.close()
    
    def get_stats(self) -> dict:
        """Get rate limiter stats."""
        
        return {
            "request_limiter": {
                "current_count": self.request_limiter.current_count
            },
            "token_limiter": {
                "available": self.token_limiter.available_tokens
            },
            "concurrency": {
                "available": self.concurrency.available,
                "current": self.concurrency.current
            },
            "adaptive": {
                "state": self.adaptive.state.value if self.adaptive else None,
                "rate": self.adaptive.current_rate if self.adaptive else None
            }
        }

Production Rate Limiting Service

from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

class CompletionRequest(BaseModel):
    prompt: str
    model: str = "gpt-4"
    max_tokens: int = 1000
    user_id: Optional[str] = None

# Global rate limiters
global_limiter = SlidingWindowCounter(limit=1000, window_seconds=60)

# Per-user rate limiters
user_limiters: dict[str, SlidingWindowCounter] = {}

def get_user_limiter(user_id: str) -> SlidingWindowCounter:
    """Get or create user rate limiter."""
    
    if user_id not in user_limiters:
        user_limiters[user_id] = SlidingWindowCounter(
            limit=60,  # 60 requests per minute per user
            window_seconds=60
        )
    
    return user_limiters[user_id]

# LLM client
llm_client = RateLimitedLLMClient(
    api_key="your-key",
    config=RateLimitedConfig(
        requests_per_minute=500,
        tokens_per_minute=500000,
        max_concurrent=50,
        adaptive=True
    )
)

@app.post("/v1/completions")
async def create_completion(request: CompletionRequest):
    """Create rate-limited completion."""
    
    # Check global limit
    if not await global_limiter.acquire(1):
        raise HTTPException(
            status_code=429,
            detail="Global rate limit exceeded"
        )
    
    # Check user limit
    if request.user_id:
        user_limiter = get_user_limiter(request.user_id)
        
        if not await user_limiter.acquire(1):
            raise HTTPException(
                status_code=429,
                detail="User rate limit exceeded"
            )
    
    try:
        result = await llm_client.complete(
            prompt=request.prompt,
            model=request.model,
            max_tokens=request.max_tokens
        )
        
        return {
            "content": result["choices"][0]["message"]["content"],
            "usage": result.get("usage", {})
        }
    
    except Exception as e:
        if "Rate limited" in str(e):
            raise HTTPException(status_code=429, detail="API rate limited")
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/v1/limits")
async def get_limits(user_id: Optional[str] = None):
    """Get current rate limit status."""
    
    response = {
        "global": {
            "limit": global_limiter.limit,
            "current": global_limiter.current_count
        },
        "llm_client": llm_client.get_stats()
    }
    
    if user_id and user_id in user_limiters:
        response["user"] = {
            "limit": user_limiters[user_id].limit,
            "current": user_limiters[user_id].current_count
        }
    
    return response

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Effective rate limiting is essential for building reliable LLM applications. Start with token bucket or sliding window algorithms—they provide smooth request distribution and accurate tracking. Use dual limiters for both request count and token count, since LLM APIs typically enforce both. Implement adaptive rate limiting that responds to 429 errors by backing off, then gradually recovering. For multi-instance deployments, use Redis-based distributed limiters to share state across instances. Always respect rate limit headers from API responses—they provide real-time feedback on your remaining quota. Layer your rate limiting: global limits protect your API keys, per-user limits ensure fair access, and per-endpoint limits prevent abuse of expensive operations. Monitor your rate limiter metrics to understand usage patterns and adjust limits accordingly. The key insight is that rate limiting isn't just about avoiding errors—it's about maximizing throughput within constraints while maintaining a good user experience. Well-designed rate limiting lets you serve more users reliably while staying within API quotas and budget constraints.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.