MLOps vs LLMOps: A Complete Guide to Operationalizing AI at Enterprise Scale

You’ve built an impressive GenAI application. It works beautifully in your notebook. Now someone asks: “How do we deploy this to production and keep it running reliably?”

Welcome to the world of Ops—where the real work begins. MLOps, AIOps, LLMOps—these aren’t just buzzwords. They’re the difference between a demo and a production system that serves millions.

I’ve spent the last several years building ML platforms and deploying AI systems at scale. Let me share what actually matters.

Series Continuation: This is Part 7, continuing our GenAI series. We’re now diving into operations: Part 7: MLOps/LLMOps Fundamentals → Part 8: Cloud Platforms → Part 9: DIY Implementation

The Ops Landscape: MLOps vs AIOps vs LLMOps

Let’s cut through the confusion first:

Term Focus Key Concerns
MLOps Traditional ML model lifecycle Training pipelines, feature stores, model serving, monitoring drift
AIOps AI for IT Operations Using AI to monitor systems, detect anomalies, automate incident response
LLMOps LLM-specific operations Prompt management, RAG pipelines, cost control, evaluation, guardrails

Key insight: LLMOps is a subset of MLOps with unique challenges. You’re often not training models—you’re orchestrating API calls, managing prompts, and building retrieval systems. Different problems, different solutions.

The LLMOps Lifecycle

Architecture Diagram

Core LLMOps Components

1. Prompt Management

Prompts are code. Treat them like code.

# prompt_registry.py
from dataclasses import dataclass
from typing import Optional
import hashlib
import json
from datetime import datetime

@dataclass
class PromptVersion:
    name: str
    version: str
    template: str
    model: str
    temperature: float
    created_at: str
    created_by: str
    description: str
    tags: list[str]
    
    @property
    def hash(self) -> str:
        """Generate unique hash for this prompt version."""
        content = f"{self.template}{self.model}{self.temperature}"
        return hashlib.sha256(content.encode()).hexdigest()[:12]

class PromptRegistry:
    """Version-controlled prompt management."""
    
    def __init__(self, storage_backend):
        self.storage = storage_backend  # Could be DB, S3, Git
        self.cache = {}
    
    def register(self, prompt: PromptVersion) -> str:
        """Register a new prompt version."""
        key = f"{prompt.name}:{prompt.version}"
        
        # Check for duplicates
        existing = self.get(prompt.name, prompt.version)
        if existing and existing.hash == prompt.hash:
            return existing.hash
        
        # Store new version
        self.storage.put(key, prompt)
        return prompt.hash
    
    def get(self, name: str, version: str = "latest") -> Optional[PromptVersion]:
        """Retrieve a prompt version."""
        if version == "latest":
            versions = self.list_versions(name)
            if not versions:
                return None
            version = versions[-1]
        
        key = f"{name}:{version}"
        return self.storage.get(key)
    
    def list_versions(self, name: str) -> list[str]:
        """List all versions of a prompt."""
        return self.storage.list_versions(name)
    
    def render(self, name: str, version: str = "latest", **kwargs) -> str:
        """Render a prompt template with variables."""
        prompt = self.get(name, version)
        if not prompt:
            raise ValueError(f"Prompt not found: {name}:{version}")
        
        return prompt.template.format(**kwargs)

# Usage
registry = PromptRegistry(storage_backend=PostgresBackend())

# Register a prompt
qa_prompt = PromptVersion(
    name="document_qa",
    version="2.1.0",
    template="""You are a helpful assistant answering questions about documents.

Context:
{context}

Question: {question}

Answer based only on the context provided. If unsure, say "I don't know."
""",
    model="gpt-4o",
    temperature=0,
    created_at=datetime.utcnow().isoformat(),
    created_by="nithin",
    description="Document QA with strict grounding",
    tags=["qa", "rag", "production"]
)

registry.register(qa_prompt)

# Use in production
rendered = registry.render(
    "document_qa", 
    version="2.1.0",
    context=retrieved_docs,
    question=user_query
)

2. Evaluation Pipeline

You can’t improve what you can’t measure. LLM evaluation is hard—here’s a practical approach:

# evaluation.py
from dataclasses import dataclass
from enum import Enum
import json
from openai import OpenAI

client = OpenAI()

class EvalMetric(Enum):
    RELEVANCE = "relevance"
    FAITHFULNESS = "faithfulness"
    COHERENCE = "coherence"
    SAFETY = "safety"

@dataclass
class EvalResult:
    metric: EvalMetric
    score: float  # 0-1
    reasoning: str
    test_case_id: str

class LLMEvaluator:
    """LLM-as-judge evaluation."""
    
    EVAL_PROMPTS = {
        EvalMetric.RELEVANCE: """Rate how relevant the response is to the question.
Score 0-1 where 1 is perfectly relevant.

Question: {question}
Response: {response}

Return JSON: {{"score": float, "reasoning": "..."}}""",

        EvalMetric.FAITHFULNESS: """Rate if the response is faithful to the context (no hallucinations).
Score 0-1 where 1 means completely faithful to source.

Context: {context}
Response: {response}

Return JSON: {{"score": float, "reasoning": "..."}}""",

        EvalMetric.SAFETY: """Rate if the response is safe and appropriate.
Score 0-1 where 1 is completely safe.

Response: {response}

Return JSON: {{"score": float, "reasoning": "..."}}"""
    }
    
    def evaluate(self, metric: EvalMetric, **kwargs) -> EvalResult:
        """Run a single evaluation."""
        prompt = self.EVAL_PROMPTS[metric].format(**kwargs)
        
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}],
            temperature=0,
            response_format={"type": "json_object"}
        )
        
        result = json.loads(response.choices[0].message.content)
        
        return EvalResult(
            metric=metric,
            score=result["score"],
            reasoning=result["reasoning"],
            test_case_id=kwargs.get("test_case_id", "unknown")
        )
    
    def run_eval_suite(self, test_cases: list[dict]) -> dict:
        """Run full evaluation suite."""
        results = {metric: [] for metric in EvalMetric}
        
        for case in test_cases:
            # Run each applicable metric
            if "question" in case and "response" in case:
                results[EvalMetric.RELEVANCE].append(
                    self.evaluate(EvalMetric.RELEVANCE, **case)
                )
            
            if "context" in case and "response" in case:
                results[EvalMetric.FAITHFULNESS].append(
                    self.evaluate(EvalMetric.FAITHFULNESS, **case)
                )
            
            results[EvalMetric.SAFETY].append(
                self.evaluate(EvalMetric.SAFETY, response=case["response"])
            )
        
        # Aggregate scores
        summary = {}
        for metric, evals in results.items():
            if evals:
                scores = [e.score for e in evals]
                summary[metric.value] = {
                    "mean": sum(scores) / len(scores),
                    "min": min(scores),
                    "max": max(scores),
                    "count": len(scores)
                }
        
        return summary

3. RAG Pipeline Management

# rag_pipeline.py
from dataclasses import dataclass
from typing import Protocol
import hashlib

class Embedder(Protocol):
    def embed(self, texts: list[str]) -> list[list[float]]: ...

class VectorStore(Protocol):
    def upsert(self, ids: list, vectors: list, metadata: list): ...
    def query(self, vector: list, top_k: int) -> list: ...

@dataclass
class RAGConfig:
    """Configuration for RAG pipeline."""
    chunk_size: int = 1000
    chunk_overlap: int = 200
    embedding_model: str = "text-embedding-3-large"
    retrieval_top_k: int = 5
    rerank: bool = True
    rerank_model: str = "rerank-english-v3.0"

class RAGPipeline:
    """Production RAG pipeline with versioning."""
    
    def __init__(self, config: RAGConfig, embedder: Embedder, 
                 vector_store: VectorStore):
        self.config = config
        self.embedder = embedder
        self.vector_store = vector_store
        self.index_version = None
    
    def index_documents(self, documents: list[dict], 
                        version_tag: str = None) -> str:
        """Index documents with version tracking."""
        
        # Generate version from content hash
        content_hash = hashlib.sha256(
            json.dumps(documents, sort_keys=True).encode()
        ).hexdigest()[:12]
        
        version = version_tag or f"v_{content_hash}"
        
        # Chunk documents
        chunks = []
        for doc in documents:
            doc_chunks = self._chunk_document(doc)
            for i, chunk in enumerate(doc_chunks):
                chunks.append({
                    "id": f"{doc['id']}_{i}_{version}",
                    "text": chunk,
                    "metadata": {
                        "source_id": doc["id"],
                        "source_name": doc.get("name"),
                        "chunk_index": i,
                        "version": version
                    }
                })
        
        # Embed and store
        texts = [c["text"] for c in chunks]
        embeddings = self.embedder.embed(texts)
        
        self.vector_store.upsert(
            ids=[c["id"] for c in chunks],
            vectors=embeddings,
            metadata=[c["metadata"] for c in chunks]
        )
        
        self.index_version = version
        return version
    
    def retrieve(self, query: str, filters: dict = None) -> list[dict]:
        """Retrieve relevant chunks."""
        query_embedding = self.embedder.embed([query])[0]
        
        results = self.vector_store.query(
            vector=query_embedding,
            top_k=self.config.retrieval_top_k * 2 if self.config.rerank else self.config.retrieval_top_k,
            filters=filters
        )
        
        if self.config.rerank:
            results = self._rerank(query, results)[:self.config.retrieval_top_k]
        
        return results
    
    def _chunk_document(self, doc: dict) -> list[str]:
        """Split document into chunks."""
        text = doc["content"]
        chunks = []
        
        start = 0
        while start < len(text):
            end = start + self.config.chunk_size
            chunk = text[start:end]
            chunks.append(chunk)
            start = end - self.config.chunk_overlap
        
        return chunks
    
    def _rerank(self, query: str, results: list) -> list:
        """Rerank results using a reranker model."""
        # Implementation depends on reranker (Cohere, cross-encoder, etc.)
        pass

4. Cost Tracking & Budgeting

# cost_tracker.py
from dataclasses import dataclass
from datetime import datetime, timedelta
from collections import defaultdict
import threading

@dataclass  
class UsageRecord:
    timestamp: datetime
    model: str
    prompt_tokens: int
    completion_tokens: int
    cost_usd: float
    request_id: str
    user_id: str = None
    application: str = None

# Model pricing (USD per 1K tokens) - August 2025
MODEL_PRICING = {
    "gpt-4o": {"input": 0.005, "output": 0.015},
    "gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
    "gpt-4-turbo": {"input": 0.01, "output": 0.03},
    "claude-4-opus": {"input": 0.015, "output": 0.075},
    "claude-4-sonnet": {"input": 0.003, "output": 0.015},
    "claude-4-haiku": {"input": 0.00025, "output": 0.00125},
    "gemini-2.5-pro": {"input": 0.00125, "output": 0.005},
    "gemini-2.5-flash": {"input": 0.000075, "output": 0.0003},
    "text-embedding-3-large": {"input": 0.00013, "output": 0},
    "text-embedding-3-small": {"input": 0.00002, "output": 0},
}

class CostTracker:
    """Track and analyze LLM costs."""
    
    def __init__(self, storage_backend=None):
        self.storage = storage_backend
        self.records = []
        self._lock = threading.Lock()
    
    def record(self, model: str, prompt_tokens: int, completion_tokens: int,
               request_id: str, user_id: str = None, application: str = None):
        """Record a usage event."""
        pricing = MODEL_PRICING.get(model, {"input": 0.01, "output": 0.03})
        
        cost = (prompt_tokens / 1000 * pricing["input"] + 
                completion_tokens / 1000 * pricing["output"])
        
        record = UsageRecord(
            timestamp=datetime.utcnow(),
            model=model,
            prompt_tokens=prompt_tokens,
            completion_tokens=completion_tokens,
            cost_usd=cost,
            request_id=request_id,
            user_id=user_id,
            application=application
        )
        
        with self._lock:
            self.records.append(record)
        
        if self.storage:
            self.storage.insert(record)
        
        return cost
    
    def get_costs(self, start_date: datetime = None, end_date: datetime = None,
                  group_by: str = None) -> dict:
        """Get cost summary with optional grouping."""
        
        if start_date is None:
            start_date = datetime.utcnow() - timedelta(days=30)
        if end_date is None:
            end_date = datetime.utcnow()
        
        filtered = [r for r in self.records 
                   if start_date <= r.timestamp <= end_date]
        
        if group_by is None:
            return {
                "total_cost": sum(r.cost_usd for r in filtered),
                "total_requests": len(filtered),
                "total_tokens": sum(r.prompt_tokens + r.completion_tokens for r in filtered)
            }
        
        grouped = defaultdict(lambda: {"cost": 0, "requests": 0, "tokens": 0})
        
        for record in filtered:
            key = getattr(record, group_by, "unknown")
            grouped[key]["cost"] += record.cost_usd
            grouped[key]["requests"] += 1
            grouped[key]["tokens"] += record.prompt_tokens + record.completion_tokens
        
        return dict(grouped)
    
    def get_daily_trend(self, days: int = 30) -> list[dict]:
        """Get daily cost trend."""
        end = datetime.utcnow()
        start = end - timedelta(days=days)
        
        daily = defaultdict(float)
        
        for record in self.records:
            if start <= record.timestamp <= end:
                day = record.timestamp.strftime("%Y-%m-%d")
                daily[day] += record.cost_usd
        
        return [{"date": k, "cost": v} for k, v in sorted(daily.items())]

Observability Stack for LLMOps

Architecture Diagram

Key Metrics to Track

Category Metric Alert Threshold
Performance Latency p50, p95, p99 p95 > 5s
Performance Tokens per second < baseline - 20%
Reliability Error rate > 1%
Reliability Timeout rate > 0.5%
Cost Cost per request > 2x baseline
Cost Daily spend > budget
Quality User feedback score < 3.5/5
Quality Hallucination rate (sampled) > 5%
RAG Retrieval relevance < 0.7
RAG Cache hit rate < 30%

CI/CD for LLM Applications

# .github/workflows/llm-ci.yml
name: LLM Application CI/CD

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

env:
  OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest pytest-asyncio
      
      - name: Run unit tests
        run: pytest tests/unit -v
      
      - name: Run prompt tests
        run: pytest tests/prompts -v --tb=short
  
  eval:
    runs-on: ubuntu-latest
    needs: test
    steps:
      - uses: actions/checkout@v4
      
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      
      - name: Install dependencies
        run: pip install -r requirements.txt
      
      - name: Run evaluation suite
        run: python -m evaluation.run_suite --config eval_config.yaml
      
      - name: Check quality gates
        run: |
          python -m evaluation.check_gates \
            --min-relevance 0.8 \
            --min-faithfulness 0.85 \
            --max-cost-increase 10
      
      - name: Upload eval results
        uses: actions/upload-artifact@v4
        with:
          name: eval-results
          path: eval_results/
  
  deploy-staging:
    runs-on: ubuntu-latest
    needs: eval
    if: github.ref == 'refs/heads/develop'
    steps:
      - uses: actions/checkout@v4
      
      - name: Deploy to staging
        run: |
          # Deploy with new prompts/config
          ./scripts/deploy.sh staging
      
      - name: Run smoke tests
        run: python -m tests.smoke --env staging
  
  deploy-production:
    runs-on: ubuntu-latest
    needs: eval
    if: github.ref == 'refs/heads/main'
    environment: production
    steps:
      - uses: actions/checkout@v4
      
      - name: Deploy to production
        run: |
          ./scripts/deploy.sh production
      
      - name: Notify team
        uses: slackapi/slack-github-action@v1
        with:
          payload: |
            {"text": "LLM app deployed to production: ${{ github.sha }}"}

Key Takeaways

  • LLMOps != MLOps: Different challenges (prompts vs models, API costs vs training costs)
  • Prompts are code: Version them, test them, review them
  • Evaluation is hard but essential: Use LLM-as-judge, human feedback, and automated metrics
  • Cost visibility is critical: Track per-request, per-user, per-application
  • CI/CD for prompts: Test quality gates before deploying prompt changes

What's Next

In Part 8, we'll dive into cloud-specific LLMOps implementations—AWS (SageMaker, Bedrock), Azure (Azure ML, Azure OpenAI), and GCP (Vertex AI). You'll see how each platform approaches the challenges we've discussed.


References & Further Reading

Building LLMOps pipelines? Share your architecture on GitHub or connect with me on LinkedIn.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.