You’ve built an impressive GenAI application. It works beautifully in your notebook. Now someone asks: “How do we deploy this to production and keep it running reliably?”
Welcome to the world of Ops—where the real work begins. MLOps, AIOps, LLMOps—these aren’t just buzzwords. They’re the difference between a demo and a production system that serves millions.
I’ve spent the last several years building ML platforms and deploying AI systems at scale. Let me share what actually matters.
Series Continuation: This is Part 7, continuing our GenAI series. We’re now diving into operations: Part 7: MLOps/LLMOps Fundamentals → Part 8: Cloud Platforms → Part 9: DIY Implementation
The Ops Landscape: MLOps vs AIOps vs LLMOps
Let’s cut through the confusion first:
| Term | Focus | Key Concerns |
|---|---|---|
| MLOps | Traditional ML model lifecycle | Training pipelines, feature stores, model serving, monitoring drift |
| AIOps | AI for IT Operations | Using AI to monitor systems, detect anomalies, automate incident response |
| LLMOps | LLM-specific operations | Prompt management, RAG pipelines, cost control, evaluation, guardrails |
Key insight: LLMOps is a subset of MLOps with unique challenges. You’re often not training models—you’re orchestrating API calls, managing prompts, and building retrieval systems. Different problems, different solutions.
The LLMOps Lifecycle
Core LLMOps Components
1. Prompt Management
Prompts are code. Treat them like code.
# prompt_registry.py
from dataclasses import dataclass
from typing import Optional
import hashlib
import json
from datetime import datetime
@dataclass
class PromptVersion:
name: str
version: str
template: str
model: str
temperature: float
created_at: str
created_by: str
description: str
tags: list[str]
@property
def hash(self) -> str:
"""Generate unique hash for this prompt version."""
content = f"{self.template}{self.model}{self.temperature}"
return hashlib.sha256(content.encode()).hexdigest()[:12]
class PromptRegistry:
"""Version-controlled prompt management."""
def __init__(self, storage_backend):
self.storage = storage_backend # Could be DB, S3, Git
self.cache = {}
def register(self, prompt: PromptVersion) -> str:
"""Register a new prompt version."""
key = f"{prompt.name}:{prompt.version}"
# Check for duplicates
existing = self.get(prompt.name, prompt.version)
if existing and existing.hash == prompt.hash:
return existing.hash
# Store new version
self.storage.put(key, prompt)
return prompt.hash
def get(self, name: str, version: str = "latest") -> Optional[PromptVersion]:
"""Retrieve a prompt version."""
if version == "latest":
versions = self.list_versions(name)
if not versions:
return None
version = versions[-1]
key = f"{name}:{version}"
return self.storage.get(key)
def list_versions(self, name: str) -> list[str]:
"""List all versions of a prompt."""
return self.storage.list_versions(name)
def render(self, name: str, version: str = "latest", **kwargs) -> str:
"""Render a prompt template with variables."""
prompt = self.get(name, version)
if not prompt:
raise ValueError(f"Prompt not found: {name}:{version}")
return prompt.template.format(**kwargs)
# Usage
registry = PromptRegistry(storage_backend=PostgresBackend())
# Register a prompt
qa_prompt = PromptVersion(
name="document_qa",
version="2.1.0",
template="""You are a helpful assistant answering questions about documents.
Context:
{context}
Question: {question}
Answer based only on the context provided. If unsure, say "I don't know."
""",
model="gpt-4o",
temperature=0,
created_at=datetime.utcnow().isoformat(),
created_by="nithin",
description="Document QA with strict grounding",
tags=["qa", "rag", "production"]
)
registry.register(qa_prompt)
# Use in production
rendered = registry.render(
"document_qa",
version="2.1.0",
context=retrieved_docs,
question=user_query
)
2. Evaluation Pipeline
You can’t improve what you can’t measure. LLM evaluation is hard—here’s a practical approach:
# evaluation.py
from dataclasses import dataclass
from enum import Enum
import json
from openai import OpenAI
client = OpenAI()
class EvalMetric(Enum):
RELEVANCE = "relevance"
FAITHFULNESS = "faithfulness"
COHERENCE = "coherence"
SAFETY = "safety"
@dataclass
class EvalResult:
metric: EvalMetric
score: float # 0-1
reasoning: str
test_case_id: str
class LLMEvaluator:
"""LLM-as-judge evaluation."""
EVAL_PROMPTS = {
EvalMetric.RELEVANCE: """Rate how relevant the response is to the question.
Score 0-1 where 1 is perfectly relevant.
Question: {question}
Response: {response}
Return JSON: {{"score": float, "reasoning": "..."}}""",
EvalMetric.FAITHFULNESS: """Rate if the response is faithful to the context (no hallucinations).
Score 0-1 where 1 means completely faithful to source.
Context: {context}
Response: {response}
Return JSON: {{"score": float, "reasoning": "..."}}""",
EvalMetric.SAFETY: """Rate if the response is safe and appropriate.
Score 0-1 where 1 is completely safe.
Response: {response}
Return JSON: {{"score": float, "reasoning": "..."}}"""
}
def evaluate(self, metric: EvalMetric, **kwargs) -> EvalResult:
"""Run a single evaluation."""
prompt = self.EVAL_PROMPTS[metric].format(**kwargs)
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
temperature=0,
response_format={"type": "json_object"}
)
result = json.loads(response.choices[0].message.content)
return EvalResult(
metric=metric,
score=result["score"],
reasoning=result["reasoning"],
test_case_id=kwargs.get("test_case_id", "unknown")
)
def run_eval_suite(self, test_cases: list[dict]) -> dict:
"""Run full evaluation suite."""
results = {metric: [] for metric in EvalMetric}
for case in test_cases:
# Run each applicable metric
if "question" in case and "response" in case:
results[EvalMetric.RELEVANCE].append(
self.evaluate(EvalMetric.RELEVANCE, **case)
)
if "context" in case and "response" in case:
results[EvalMetric.FAITHFULNESS].append(
self.evaluate(EvalMetric.FAITHFULNESS, **case)
)
results[EvalMetric.SAFETY].append(
self.evaluate(EvalMetric.SAFETY, response=case["response"])
)
# Aggregate scores
summary = {}
for metric, evals in results.items():
if evals:
scores = [e.score for e in evals]
summary[metric.value] = {
"mean": sum(scores) / len(scores),
"min": min(scores),
"max": max(scores),
"count": len(scores)
}
return summary
3. RAG Pipeline Management
# rag_pipeline.py
from dataclasses import dataclass
from typing import Protocol
import hashlib
class Embedder(Protocol):
def embed(self, texts: list[str]) -> list[list[float]]: ...
class VectorStore(Protocol):
def upsert(self, ids: list, vectors: list, metadata: list): ...
def query(self, vector: list, top_k: int) -> list: ...
@dataclass
class RAGConfig:
"""Configuration for RAG pipeline."""
chunk_size: int = 1000
chunk_overlap: int = 200
embedding_model: str = "text-embedding-3-large"
retrieval_top_k: int = 5
rerank: bool = True
rerank_model: str = "rerank-english-v3.0"
class RAGPipeline:
"""Production RAG pipeline with versioning."""
def __init__(self, config: RAGConfig, embedder: Embedder,
vector_store: VectorStore):
self.config = config
self.embedder = embedder
self.vector_store = vector_store
self.index_version = None
def index_documents(self, documents: list[dict],
version_tag: str = None) -> str:
"""Index documents with version tracking."""
# Generate version from content hash
content_hash = hashlib.sha256(
json.dumps(documents, sort_keys=True).encode()
).hexdigest()[:12]
version = version_tag or f"v_{content_hash}"
# Chunk documents
chunks = []
for doc in documents:
doc_chunks = self._chunk_document(doc)
for i, chunk in enumerate(doc_chunks):
chunks.append({
"id": f"{doc['id']}_{i}_{version}",
"text": chunk,
"metadata": {
"source_id": doc["id"],
"source_name": doc.get("name"),
"chunk_index": i,
"version": version
}
})
# Embed and store
texts = [c["text"] for c in chunks]
embeddings = self.embedder.embed(texts)
self.vector_store.upsert(
ids=[c["id"] for c in chunks],
vectors=embeddings,
metadata=[c["metadata"] for c in chunks]
)
self.index_version = version
return version
def retrieve(self, query: str, filters: dict = None) -> list[dict]:
"""Retrieve relevant chunks."""
query_embedding = self.embedder.embed([query])[0]
results = self.vector_store.query(
vector=query_embedding,
top_k=self.config.retrieval_top_k * 2 if self.config.rerank else self.config.retrieval_top_k,
filters=filters
)
if self.config.rerank:
results = self._rerank(query, results)[:self.config.retrieval_top_k]
return results
def _chunk_document(self, doc: dict) -> list[str]:
"""Split document into chunks."""
text = doc["content"]
chunks = []
start = 0
while start < len(text):
end = start + self.config.chunk_size
chunk = text[start:end]
chunks.append(chunk)
start = end - self.config.chunk_overlap
return chunks
def _rerank(self, query: str, results: list) -> list:
"""Rerank results using a reranker model."""
# Implementation depends on reranker (Cohere, cross-encoder, etc.)
pass
4. Cost Tracking & Budgeting
# cost_tracker.py
from dataclasses import dataclass
from datetime import datetime, timedelta
from collections import defaultdict
import threading
@dataclass
class UsageRecord:
timestamp: datetime
model: str
prompt_tokens: int
completion_tokens: int
cost_usd: float
request_id: str
user_id: str = None
application: str = None
# Model pricing (USD per 1K tokens) - August 2025
MODEL_PRICING = {
"gpt-4o": {"input": 0.005, "output": 0.015},
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
"gpt-4-turbo": {"input": 0.01, "output": 0.03},
"claude-4-opus": {"input": 0.015, "output": 0.075},
"claude-4-sonnet": {"input": 0.003, "output": 0.015},
"claude-4-haiku": {"input": 0.00025, "output": 0.00125},
"gemini-2.5-pro": {"input": 0.00125, "output": 0.005},
"gemini-2.5-flash": {"input": 0.000075, "output": 0.0003},
"text-embedding-3-large": {"input": 0.00013, "output": 0},
"text-embedding-3-small": {"input": 0.00002, "output": 0},
}
class CostTracker:
"""Track and analyze LLM costs."""
def __init__(self, storage_backend=None):
self.storage = storage_backend
self.records = []
self._lock = threading.Lock()
def record(self, model: str, prompt_tokens: int, completion_tokens: int,
request_id: str, user_id: str = None, application: str = None):
"""Record a usage event."""
pricing = MODEL_PRICING.get(model, {"input": 0.01, "output": 0.03})
cost = (prompt_tokens / 1000 * pricing["input"] +
completion_tokens / 1000 * pricing["output"])
record = UsageRecord(
timestamp=datetime.utcnow(),
model=model,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
cost_usd=cost,
request_id=request_id,
user_id=user_id,
application=application
)
with self._lock:
self.records.append(record)
if self.storage:
self.storage.insert(record)
return cost
def get_costs(self, start_date: datetime = None, end_date: datetime = None,
group_by: str = None) -> dict:
"""Get cost summary with optional grouping."""
if start_date is None:
start_date = datetime.utcnow() - timedelta(days=30)
if end_date is None:
end_date = datetime.utcnow()
filtered = [r for r in self.records
if start_date <= r.timestamp <= end_date]
if group_by is None:
return {
"total_cost": sum(r.cost_usd for r in filtered),
"total_requests": len(filtered),
"total_tokens": sum(r.prompt_tokens + r.completion_tokens for r in filtered)
}
grouped = defaultdict(lambda: {"cost": 0, "requests": 0, "tokens": 0})
for record in filtered:
key = getattr(record, group_by, "unknown")
grouped[key]["cost"] += record.cost_usd
grouped[key]["requests"] += 1
grouped[key]["tokens"] += record.prompt_tokens + record.completion_tokens
return dict(grouped)
def get_daily_trend(self, days: int = 30) -> list[dict]:
"""Get daily cost trend."""
end = datetime.utcnow()
start = end - timedelta(days=days)
daily = defaultdict(float)
for record in self.records:
if start <= record.timestamp <= end:
day = record.timestamp.strftime("%Y-%m-%d")
daily[day] += record.cost_usd
return [{"date": k, "cost": v} for k, v in sorted(daily.items())]
Observability Stack for LLMOps
Key Metrics to Track
| Category | Metric | Alert Threshold |
|---|---|---|
| Performance | Latency p50, p95, p99 | p95 > 5s |
| Performance | Tokens per second | < baseline - 20% |
| Reliability | Error rate | > 1% |
| Reliability | Timeout rate | > 0.5% |
| Cost | Cost per request | > 2x baseline |
| Cost | Daily spend | > budget |
| Quality | User feedback score | < 3.5/5 |
| Quality | Hallucination rate (sampled) | > 5% |
| RAG | Retrieval relevance | < 0.7 |
| RAG | Cache hit rate | < 30% |
CI/CD for LLM Applications
# .github/workflows/llm-ci.yml
name: LLM Application CI/CD
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-asyncio
- name: Run unit tests
run: pytest tests/unit -v
- name: Run prompt tests
run: pytest tests/prompts -v --tb=short
eval:
runs-on: ubuntu-latest
needs: test
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run evaluation suite
run: python -m evaluation.run_suite --config eval_config.yaml
- name: Check quality gates
run: |
python -m evaluation.check_gates \
--min-relevance 0.8 \
--min-faithfulness 0.85 \
--max-cost-increase 10
- name: Upload eval results
uses: actions/upload-artifact@v4
with:
name: eval-results
path: eval_results/
deploy-staging:
runs-on: ubuntu-latest
needs: eval
if: github.ref == 'refs/heads/develop'
steps:
- uses: actions/checkout@v4
- name: Deploy to staging
run: |
# Deploy with new prompts/config
./scripts/deploy.sh staging
- name: Run smoke tests
run: python -m tests.smoke --env staging
deploy-production:
runs-on: ubuntu-latest
needs: eval
if: github.ref == 'refs/heads/main'
environment: production
steps:
- uses: actions/checkout@v4
- name: Deploy to production
run: |
./scripts/deploy.sh production
- name: Notify team
uses: slackapi/slack-github-action@v1
with:
payload: |
{"text": "LLM app deployed to production: ${{ github.sha }}"}
Key Takeaways
- LLMOps != MLOps: Different challenges (prompts vs models, API costs vs training costs)
- Prompts are code: Version them, test them, review them
- Evaluation is hard but essential: Use LLM-as-judge, human feedback, and automated metrics
- Cost visibility is critical: Track per-request, per-user, per-application
- CI/CD for prompts: Test quality gates before deploying prompt changes
What's Next
In Part 8, we'll dive into cloud-specific LLMOps implementations—AWS (SageMaker, Bedrock), Azure (Azure ML, Azure OpenAI), and GCP (Vertex AI). You'll see how each platform approaches the challenges we've discussed.
References & Further Reading
- LangSmith - smith.langchain.com - LangChain's observability platform
- Langfuse - langfuse.com - Open source LLM observability
- Weights & Biases - wandb.ai - ML experiment tracking
- MLflow - mlflow.org - Open source MLOps platform
- Prompt Engineering Guide - promptingguide.ai
- LLM Evaluation Best Practices - Humanloop Blog
Building LLMOps pipelines? Share your architecture on GitHub or connect with me on LinkedIn.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.