Introduction: Production LLM applications need guardrails to ensure safe, appropriate outputs. Without proper safeguards, models can generate harmful content, leak sensitive information, or produce responses that violate business policies. Guardrails provide defense-in-depth: input validation catches problematic requests before they reach the model, output filtering ensures responses meet safety standards, and content moderation prevents harmful generations. This guide covers practical guardrail patterns: input sanitization, topic restriction, PII detection, toxicity filtering, and production-ready safety pipelines that protect both users and your organization.

Input Validation
from dataclasses import dataclass
from typing import Optional
from enum import Enum
import re
class ValidationResult(Enum):
PASS = "pass"
BLOCK = "block"
WARN = "warn"
@dataclass
class ValidationResponse:
"""Result of input validation."""
result: ValidationResult
reason: Optional[str] = None
sanitized_input: Optional[str] = None
class InputValidator:
"""Validate and sanitize user inputs."""
def __init__(
self,
max_length: int = 10000,
blocked_patterns: list[str] = None,
allowed_topics: list[str] = None
):
self.max_length = max_length
self.blocked_patterns = blocked_patterns or []
self.allowed_topics = allowed_topics
def validate(self, text: str) -> ValidationResponse:
"""Validate input text."""
# Length check
if len(text) > self.max_length:
return ValidationResponse(
result=ValidationResult.BLOCK,
reason=f"Input exceeds maximum length of {self.max_length}"
)
# Empty check
if not text.strip():
return ValidationResponse(
result=ValidationResult.BLOCK,
reason="Input is empty"
)
# Blocked patterns
for pattern in self.blocked_patterns:
if re.search(pattern, text, re.IGNORECASE):
return ValidationResponse(
result=ValidationResult.BLOCK,
reason=f"Input contains blocked pattern"
)
return ValidationResponse(
result=ValidationResult.PASS,
sanitized_input=text.strip()
)
class PromptInjectionDetector:
"""Detect prompt injection attempts."""
INJECTION_PATTERNS = [
r"ignore\s+(previous|all|above)\s+instructions",
r"disregard\s+(previous|all|above)",
r"forget\s+(everything|all|previous)",
r"you\s+are\s+now\s+",
r"new\s+instructions:",
r"system\s*:\s*",
r"<\|.*\|>",
r"\[INST\]",
r"```system",
]
def __init__(self, custom_patterns: list[str] = None):
self.patterns = self.INJECTION_PATTERNS + (custom_patterns or [])
def detect(self, text: str) -> ValidationResponse:
"""Detect prompt injection attempts."""
text_lower = text.lower()
for pattern in self.patterns:
if re.search(pattern, text_lower):
return ValidationResponse(
result=ValidationResult.BLOCK,
reason="Potential prompt injection detected"
)
return ValidationResponse(result=ValidationResult.PASS)
class TopicRestrictor:
"""Restrict inputs to allowed topics."""
def __init__(
self,
allowed_topics: list[str],
blocked_topics: list[str] = None
):
self.allowed_topics = allowed_topics
self.blocked_topics = blocked_topics or []
async def check_topic(
self,
text: str,
classifier: callable
) -> ValidationResponse:
"""Check if input is on-topic."""
# Use classifier to determine topic
topic = await classifier(text)
# Check blocked topics first
if topic in self.blocked_topics:
return ValidationResponse(
result=ValidationResult.BLOCK,
reason=f"Topic '{topic}' is not allowed"
)
# Check allowed topics
if self.allowed_topics and topic not in self.allowed_topics:
return ValidationResponse(
result=ValidationResult.BLOCK,
reason=f"Topic '{topic}' is outside allowed scope"
)
return ValidationResponse(result=ValidationResult.PASS)
# Combined input guardrail
class InputGuardrail:
"""Combined input validation guardrail."""
def __init__(
self,
max_length: int = 10000,
blocked_patterns: list[str] = None,
detect_injection: bool = True
):
self.validator = InputValidator(
max_length=max_length,
blocked_patterns=blocked_patterns
)
self.injection_detector = PromptInjectionDetector() if detect_injection else None
def validate(self, text: str) -> ValidationResponse:
"""Run all input validations."""
# Basic validation
result = self.validator.validate(text)
if result.result != ValidationResult.PASS:
return result
# Injection detection
if self.injection_detector:
result = self.injection_detector.detect(text)
if result.result != ValidationResult.PASS:
return result
return ValidationResponse(
result=ValidationResult.PASS,
sanitized_input=text.strip()
)
PII Detection and Masking
from dataclasses import dataclass
from typing import Optional
import re
@dataclass
class PIIMatch:
"""A detected PII match."""
type: str
value: str
start: int
end: int
masked: str
class PIIDetector:
"""Detect and mask personally identifiable information."""
PATTERNS = {
"email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
"phone_us": r'\b(?:\+1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b',
"ssn": r'\b\d{3}-\d{2}-\d{4}\b',
"credit_card": r'\b(?:\d{4}[-\s]?){3}\d{4}\b',
"ip_address": r'\b(?:\d{1,3}\.){3}\d{1,3}\b',
"date_of_birth": r'\b(?:0?[1-9]|1[0-2])[/-](?:0?[1-9]|[12]\d|3[01])[/-](?:19|20)\d{2}\b',
}
MASKS = {
"email": "[EMAIL]",
"phone_us": "[PHONE]",
"ssn": "[SSN]",
"credit_card": "[CREDIT_CARD]",
"ip_address": "[IP_ADDRESS]",
"date_of_birth": "[DOB]",
}
def __init__(self, custom_patterns: dict = None):
self.patterns = {**self.PATTERNS, **(custom_patterns or {})}
def detect(self, text: str) -> list[PIIMatch]:
"""Detect all PII in text."""
matches = []
for pii_type, pattern in self.patterns.items():
for match in re.finditer(pattern, text):
matches.append(PIIMatch(
type=pii_type,
value=match.group(),
start=match.start(),
end=match.end(),
masked=self.MASKS.get(pii_type, "[REDACTED]")
))
return matches
def mask(self, text: str) -> tuple[str, list[PIIMatch]]:
"""Mask all PII in text."""
matches = self.detect(text)
# Sort by position (reverse) to replace from end
matches.sort(key=lambda m: m.start, reverse=True)
masked_text = text
for match in matches:
masked_text = (
masked_text[:match.start] +
match.masked +
masked_text[match.end:]
)
return masked_text, matches
def has_pii(self, text: str) -> bool:
"""Check if text contains PII."""
return len(self.detect(text)) > 0
class PIIGuardrail:
"""Guardrail for PII handling."""
def __init__(
self,
action: str = "mask", # "mask", "block", "warn"
detector: PIIDetector = None
):
self.action = action
self.detector = detector or PIIDetector()
def process_input(self, text: str) -> ValidationResponse:
"""Process input for PII."""
matches = self.detector.detect(text)
if not matches:
return ValidationResponse(
result=ValidationResult.PASS,
sanitized_input=text
)
if self.action == "block":
return ValidationResponse(
result=ValidationResult.BLOCK,
reason=f"Input contains PII: {[m.type for m in matches]}"
)
elif self.action == "warn":
return ValidationResponse(
result=ValidationResult.WARN,
reason=f"Input contains PII: {[m.type for m in matches]}",
sanitized_input=text
)
else: # mask
masked_text, _ = self.detector.mask(text)
return ValidationResponse(
result=ValidationResult.PASS,
sanitized_input=masked_text
)
def process_output(self, text: str) -> str:
"""Mask PII in output."""
masked_text, _ = self.detector.mask(text)
return masked_text
Output Filtering
from dataclasses import dataclass
from typing import Optional, Callable
from enum import Enum
class ContentCategory(Enum):
SAFE = "safe"
TOXIC = "toxic"
HARMFUL = "harmful"
INAPPROPRIATE = "inappropriate"
OFF_TOPIC = "off_topic"
@dataclass
class FilterResult:
"""Result of content filtering."""
allowed: bool
category: ContentCategory
confidence: float
reason: Optional[str] = None
filtered_content: Optional[str] = None
class ToxicityFilter:
"""Filter toxic content from outputs."""
TOXIC_PATTERNS = [
r'\b(hate|kill|murder|attack)\b.*\b(people|group|race)\b',
r'\b(stupid|idiot|moron)\b',
r'profanity_pattern_here',
]
def __init__(
self,
threshold: float = 0.7,
custom_patterns: list[str] = None
):
self.threshold = threshold
self.patterns = self.TOXIC_PATTERNS + (custom_patterns or [])
def filter(self, text: str) -> FilterResult:
"""Filter text for toxicity."""
text_lower = text.lower()
for pattern in self.patterns:
if re.search(pattern, text_lower):
return FilterResult(
allowed=False,
category=ContentCategory.TOXIC,
confidence=0.9,
reason="Toxic content detected"
)
return FilterResult(
allowed=True,
category=ContentCategory.SAFE,
confidence=0.95
)
class HarmfulContentFilter:
"""Filter harmful content like dangerous instructions."""
HARMFUL_CATEGORIES = [
"weapons_instructions",
"drug_synthesis",
"hacking_instructions",
"self_harm",
"illegal_activities"
]
HARMFUL_PATTERNS = {
"weapons_instructions": [
r'how\s+to\s+(make|build|create)\s+(bomb|explosive|weapon)',
],
"drug_synthesis": [
r'how\s+to\s+(make|synthesize|cook)\s+(meth|drugs|cocaine)',
],
"hacking_instructions": [
r'how\s+to\s+hack\s+(into|password|account)',
],
}
def filter(self, text: str) -> FilterResult:
"""Filter harmful content."""
text_lower = text.lower()
for category, patterns in self.HARMFUL_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, text_lower):
return FilterResult(
allowed=False,
category=ContentCategory.HARMFUL,
confidence=0.95,
reason=f"Harmful content detected: {category}"
)
return FilterResult(
allowed=True,
category=ContentCategory.SAFE,
confidence=0.9
)
class OutputGuardrail:
"""Combined output filtering guardrail."""
def __init__(
self,
filters: list = None,
fallback_response: str = "I cannot provide that information."
):
self.filters = filters or [
ToxicityFilter(),
HarmfulContentFilter()
]
self.fallback_response = fallback_response
def filter(self, text: str) -> tuple[str, FilterResult]:
"""Filter output through all filters."""
for filter_obj in self.filters:
result = filter_obj.filter(text)
if not result.allowed:
return self.fallback_response, result
return text, FilterResult(
allowed=True,
category=ContentCategory.SAFE,
confidence=0.95
)
Content Moderation
from dataclasses import dataclass
from typing import Any, Optional
import asyncio
@dataclass
class ModerationResult:
"""Result from content moderation."""
flagged: bool
categories: dict[str, bool]
scores: dict[str, float]
def get_flagged_categories(self) -> list[str]:
return [cat for cat, flagged in self.categories.items() if flagged]
class OpenAIModerator:
"""Use OpenAI's moderation API."""
def __init__(self, client: Any):
self.client = client
async def moderate(self, text: str) -> ModerationResult:
"""Moderate text using OpenAI API."""
response = await self.client.moderations.create(input=text)
result = response.results[0]
return ModerationResult(
flagged=result.flagged,
categories={
"hate": result.categories.hate,
"hate_threatening": result.categories.hate_threatening,
"harassment": result.categories.harassment,
"self_harm": result.categories.self_harm,
"sexual": result.categories.sexual,
"violence": result.categories.violence,
},
scores={
"hate": result.category_scores.hate,
"hate_threatening": result.category_scores.hate_threatening,
"harassment": result.category_scores.harassment,
"self_harm": result.category_scores.self_harm,
"sexual": result.category_scores.sexual,
"violence": result.category_scores.violence,
}
)
class LLMBasedModerator:
"""Use an LLM for content moderation."""
MODERATION_PROMPT = """Analyze the following content for safety issues.
Categories to check:
- harmful: Instructions for dangerous activities
- toxic: Hate speech, harassment, discrimination
- inappropriate: Adult content, profanity
- off_topic: Content unrelated to allowed topics
Content to analyze:
{content}
Respond with JSON:
{{"flagged": true/false, "category": "category_name", "reason": "explanation"}}"""
def __init__(self, client: Any, model: str = "gpt-4o-mini"):
self.client = client
self.model = model
async def moderate(self, text: str) -> ModerationResult:
"""Moderate using LLM."""
import json
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "user", "content": self.MODERATION_PROMPT.format(content=text)}
],
response_format={"type": "json_object"}
)
result = json.loads(response.choices[0].message.content)
category = result.get("category", "safe")
return ModerationResult(
flagged=result.get("flagged", False),
categories={category: result.get("flagged", False)},
scores={category: 1.0 if result.get("flagged") else 0.0}
)
class ContentModerationPipeline:
"""Pipeline combining multiple moderation approaches."""
def __init__(
self,
moderators: list,
threshold: float = 0.5
):
self.moderators = moderators
self.threshold = threshold
async def moderate(self, text: str) -> ModerationResult:
"""Run all moderators and combine results."""
results = await asyncio.gather(*[
mod.moderate(text) for mod in self.moderators
])
# Combine results - flag if any moderator flags
combined_flagged = any(r.flagged for r in results)
# Merge categories and scores
combined_categories = {}
combined_scores = {}
for result in results:
for cat, flagged in result.categories.items():
if cat not in combined_categories or flagged:
combined_categories[cat] = flagged
for cat, score in result.scores.items():
if cat not in combined_scores or score > combined_scores[cat]:
combined_scores[cat] = score
return ModerationResult(
flagged=combined_flagged,
categories=combined_categories,
scores=combined_scores
)
Production Guardrails Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
# Initialize components
input_guardrail = InputGuardrail()
pii_guardrail = PIIGuardrail(action="mask")
output_guardrail = OutputGuardrail()
class ValidateRequest(BaseModel):
text: str
check_pii: bool = True
check_injection: bool = True
class FilterRequest(BaseModel):
text: str
mask_pii: bool = True
class GuardedCompletionRequest(BaseModel):
messages: list[dict]
model: str = "gpt-4o-mini"
max_tokens: Optional[int] = 1000
@app.post("/v1/validate/input")
async def validate_input(request: ValidateRequest):
"""Validate input text."""
# Input validation
result = input_guardrail.validate(request.text)
if result.result == ValidationResult.BLOCK:
return {
"valid": False,
"reason": result.reason
}
# PII check
if request.check_pii:
pii_result = pii_guardrail.process_input(request.text)
if pii_result.result == ValidationResult.BLOCK:
return {
"valid": False,
"reason": pii_result.reason
}
return {
"valid": True,
"sanitized": pii_result.sanitized_input
}
return {
"valid": True,
"sanitized": result.sanitized_input
}
@app.post("/v1/filter/output")
async def filter_output(request: FilterRequest):
"""Filter output text."""
# Content filtering
filtered_text, result = output_guardrail.filter(request.text)
# PII masking
if request.mask_pii:
filtered_text = pii_guardrail.process_output(filtered_text)
return {
"filtered": filtered_text,
"was_filtered": not result.allowed,
"category": result.category.value if not result.allowed else None
}
@app.post("/v1/completions/guarded")
async def guarded_completion(request: GuardedCompletionRequest):
"""Create completion with full guardrails."""
# Validate input
user_message = request.messages[-1].get("content", "")
input_result = input_guardrail.validate(user_message)
if input_result.result == ValidationResult.BLOCK:
raise HTTPException(400, f"Input blocked: {input_result.reason}")
# Mask PII in input
pii_result = pii_guardrail.process_input(user_message)
sanitized_messages = request.messages.copy()
sanitized_messages[-1]["content"] = pii_result.sanitized_input
# Call LLM (placeholder)
# response = await client.chat.completions.create(...)
response_text = "This is a placeholder response"
# Filter output
filtered_text, filter_result = output_guardrail.filter(response_text)
# Mask PII in output
final_text = pii_guardrail.process_output(filtered_text)
return {
"content": final_text,
"guardrails": {
"input_sanitized": pii_result.sanitized_input != user_message,
"output_filtered": not filter_result.allowed
}
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- NVIDIA NeMo Guardrails: https://github.com/NVIDIA/NeMo-Guardrails
- Guardrails AI: https://www.guardrailsai.com/
- OpenAI Moderation: https://platform.openai.com/docs/guides/moderation
- LlamaGuard: https://ai.meta.com/research/publications/llama-guard-llm-based-input-output-safeguard-for-human-ai-conversations/
Conclusion
Guardrails are essential for production LLM applications. Input validation catches problematic requests before they reach the model—detect prompt injection attempts, enforce length limits, and restrict to allowed topics. PII detection protects user privacy by masking sensitive information in both inputs and outputs. Output filtering ensures responses meet safety standards—filter toxic content, harmful instructions, and inappropriate material. Use multiple layers of defense: pattern-based filters for known issues, LLM-based moderation for nuanced content, and external moderation APIs for comprehensive coverage. Monitor guardrail triggers to identify attack patterns and improve defenses. The goal is building AI systems that are helpful while remaining safe and appropriate for all users.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.

Leave a Reply