Introduction: LLMs can generate harmful, biased, or inappropriate content. They can be manipulated through prompt injection, jailbreaks, and adversarial inputs. Production applications need guardrails—safety mechanisms that validate inputs, moderate content, and filter outputs before they reach users. This guide covers practical guardrail implementations: input validation to catch malicious prompts, content moderation using classifiers and LLM-based detection, output filtering to remove sensitive information, and comprehensive safety pipelines that combine multiple layers of protection. Whether you’re building a customer-facing chatbot or an internal tool, guardrails are essential for responsible AI deployment.

Input Validation
from dataclasses import dataclass, field
from typing import Any, Optional
from enum import Enum
from abc import ABC, abstractmethod
import re
class ValidationResult(Enum):
"""Validation result status."""
PASS = "pass"
WARN = "warn"
BLOCK = "block"
@dataclass
class ValidationOutput:
"""Output from validation."""
result: ValidationResult
reason: str = ""
modified_input: str = None
confidence: float = 1.0
details: dict = field(default_factory=dict)
class InputValidator(ABC):
"""Abstract input validator."""
@abstractmethod
async def validate(self, text: str) -> ValidationOutput:
"""Validate input text."""
pass
class LengthValidator(InputValidator):
"""Validate input length."""
def __init__(
self,
min_length: int = 1,
max_length: int = 10000,
max_tokens: int = None
):
self.min_length = min_length
self.max_length = max_length
self.max_tokens = max_tokens
async def validate(self, text: str) -> ValidationOutput:
"""Check length constraints."""
if len(text) < self.min_length:
return ValidationOutput(
result=ValidationResult.BLOCK,
reason=f"Input too short (min: {self.min_length})"
)
if len(text) > self.max_length:
return ValidationOutput(
result=ValidationResult.BLOCK,
reason=f"Input too long (max: {self.max_length})"
)
if self.max_tokens:
# Rough token estimate
token_count = len(text) // 4
if token_count > self.max_tokens:
return ValidationOutput(
result=ValidationResult.BLOCK,
reason=f"Input exceeds token limit ({self.max_tokens})"
)
return ValidationOutput(result=ValidationResult.PASS)
class PatternValidator(InputValidator):
"""Validate against regex patterns."""
def __init__(self):
self.blocked_patterns = [
# Prompt injection attempts
r"ignore\s+(previous|all|above)\s+(instructions?|prompts?)",
r"disregard\s+(previous|all|above)",
r"forget\s+(everything|all|previous)",
r"you\s+are\s+now\s+",
r"new\s+instructions?:",
r"system\s*:\s*",
r"\[INST\]",
r"<\|im_start\|>",
# Jailbreak patterns
r"DAN\s+mode",
r"developer\s+mode",
r"pretend\s+you\s+(are|can)",
r"act\s+as\s+if\s+you\s+have\s+no",
r"bypass\s+(your|the)\s+(restrictions?|filters?)",
# Code injection
r"```\s*(python|bash|sh|cmd).*exec\s*\(",
r"os\.system\s*\(",
r"subprocess\.",
r"eval\s*\(",
]
self._compiled = [
re.compile(p, re.IGNORECASE)
for p in self.blocked_patterns
]
async def validate(self, text: str) -> ValidationOutput:
"""Check for blocked patterns."""
for pattern in self._compiled:
match = pattern.search(text)
if match:
return ValidationOutput(
result=ValidationResult.BLOCK,
reason="Potentially malicious pattern detected",
details={"matched": match.group(0)}
)
return ValidationOutput(result=ValidationResult.PASS)
class PIIValidator(InputValidator):
"""Detect and optionally redact PII."""
def __init__(self, redact: bool = False):
self.redact = redact
self.pii_patterns = {
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"phone": r"\b(\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b",
"ssn": r"\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b",
"credit_card": r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b",
"ip_address": r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b",
}
self._compiled = {
name: re.compile(pattern)
for name, pattern in self.pii_patterns.items()
}
async def validate(self, text: str) -> ValidationOutput:
"""Detect PII in text."""
found_pii = {}
modified = text
for pii_type, pattern in self._compiled.items():
matches = pattern.findall(text)
if matches:
found_pii[pii_type] = len(matches)
if self.redact:
modified = pattern.sub(f"[REDACTED_{pii_type.upper()}]", modified)
if found_pii:
if self.redact:
return ValidationOutput(
result=ValidationResult.WARN,
reason="PII detected and redacted",
modified_input=modified,
details={"pii_found": found_pii}
)
else:
return ValidationOutput(
result=ValidationResult.WARN,
reason="PII detected in input",
details={"pii_found": found_pii}
)
return ValidationOutput(result=ValidationResult.PASS)
class LanguageValidator(InputValidator):
"""Validate input language."""
def __init__(
self,
allowed_languages: list[str] = None,
detector: Any = None
):
self.allowed_languages = allowed_languages or ["en"]
self.detector = detector
async def validate(self, text: str) -> ValidationOutput:
"""Check input language."""
if self.detector:
detected = self.detector.detect(text)
language = detected.lang
confidence = detected.prob
else:
# Simple heuristic
language = "en"
confidence = 0.8
if language not in self.allowed_languages:
return ValidationOutput(
result=ValidationResult.BLOCK,
reason=f"Language '{language}' not allowed",
confidence=confidence,
details={"detected_language": language}
)
return ValidationOutput(
result=ValidationResult.PASS,
confidence=confidence,
details={"detected_language": language}
)
class CompositeValidator(InputValidator):
"""Combine multiple validators."""
def __init__(self, validators: list[InputValidator]):
self.validators = validators
async def validate(self, text: str) -> ValidationOutput:
"""Run all validators."""
current_text = text
all_details = {}
for validator in self.validators:
result = await validator.validate(current_text)
# Collect details
all_details[type(validator).__name__] = {
"result": result.result.value,
"reason": result.reason,
"details": result.details
}
# Block immediately if any validator blocks
if result.result == ValidationResult.BLOCK:
return ValidationOutput(
result=ValidationResult.BLOCK,
reason=result.reason,
details=all_details
)
# Use modified input if provided
if result.modified_input:
current_text = result.modified_input
# Check for any warnings
has_warnings = any(
d["result"] == "warn"
for d in all_details.values()
)
return ValidationOutput(
result=ValidationResult.WARN if has_warnings else ValidationResult.PASS,
modified_input=current_text if current_text != text else None,
details=all_details
)
Content Moderation
from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum
class ContentCategory(Enum):
"""Content moderation categories."""
SAFE = "safe"
HATE = "hate"
VIOLENCE = "violence"
SEXUAL = "sexual"
SELF_HARM = "self_harm"
HARASSMENT = "harassment"
ILLEGAL = "illegal"
MISINFORMATION = "misinformation"
@dataclass
class ModerationResult:
"""Result of content moderation."""
is_safe: bool
categories: dict[ContentCategory, float] # category -> score
flagged_categories: list[ContentCategory]
explanation: str = ""
class ContentModerator(ABC):
"""Abstract content moderator."""
@abstractmethod
async def moderate(self, text: str) -> ModerationResult:
"""Moderate content."""
pass
class OpenAIModerator(ContentModerator):
"""Use OpenAI moderation API."""
def __init__(self, client: Any):
self.client = client
# Map OpenAI categories to our categories
self.category_map = {
"hate": ContentCategory.HATE,
"hate/threatening": ContentCategory.HATE,
"harassment": ContentCategory.HARASSMENT,
"harassment/threatening": ContentCategory.HARASSMENT,
"self-harm": ContentCategory.SELF_HARM,
"self-harm/intent": ContentCategory.SELF_HARM,
"self-harm/instructions": ContentCategory.SELF_HARM,
"sexual": ContentCategory.SEXUAL,
"sexual/minors": ContentCategory.SEXUAL,
"violence": ContentCategory.VIOLENCE,
"violence/graphic": ContentCategory.VIOLENCE,
}
async def moderate(self, text: str) -> ModerationResult:
"""Call OpenAI moderation API."""
response = await self.client.moderations.create(input=text)
result = response.results[0]
categories = {}
flagged = []
for oai_cat, score in result.category_scores.items():
our_cat = self.category_map.get(oai_cat)
if our_cat:
# Take max score for each category
if our_cat not in categories or score > categories[our_cat]:
categories[our_cat] = score
if result.categories.get(oai_cat, False):
if our_cat not in flagged:
flagged.append(our_cat)
return ModerationResult(
is_safe=not result.flagged,
categories=categories,
flagged_categories=flagged
)
class LLMModerator(ContentModerator):
"""Use LLM for content moderation."""
def __init__(
self,
client: Any,
model: str = "gpt-4o-mini",
threshold: float = 0.7
):
self.client = client
self.model = model
self.threshold = threshold
async def moderate(self, text: str) -> ModerationResult:
"""Use LLM to moderate content."""
prompt = f"""Analyze this text for harmful content.
Text: "{text[:2000]}"
Rate each category from 0.0 (none) to 1.0 (severe):
- hate: Hate speech, discrimination
- violence: Violent content, threats
- sexual: Sexual content
- self_harm: Self-harm, suicide content
- harassment: Bullying, personal attacks
- illegal: Illegal activities
- misinformation: False information
Return JSON:
{{"hate": 0.0, "violence": 0.0, "sexual": 0.0, "self_harm": 0.0, "harassment": 0.0, "illegal": 0.0, "misinformation": 0.0, "explanation": "..."}}"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0
)
import json
import re
content = response.choices[0].message.content
json_match = re.search(r'\{[\s\S]*\}', content)
if json_match:
data = json.loads(json_match.group(0))
categories = {}
flagged = []
for cat_name, score in data.items():
if cat_name == "explanation":
continue
try:
cat = ContentCategory(cat_name)
categories[cat] = float(score)
if float(score) >= self.threshold:
flagged.append(cat)
except (ValueError, TypeError):
pass
return ModerationResult(
is_safe=len(flagged) == 0,
categories=categories,
flagged_categories=flagged,
explanation=data.get("explanation", "")
)
# Default to safe if parsing fails
return ModerationResult(
is_safe=True,
categories={},
flagged_categories=[]
)
class ClassifierModerator(ContentModerator):
"""Use classifier model for moderation."""
def __init__(
self,
model_path: str = None,
threshold: float = 0.5
):
self.threshold = threshold
self.model = None
self.tokenizer = None
if model_path:
self._load_model(model_path)
def _load_model(self, model_path: str) -> None:
"""Load classifier model."""
# Would load actual model here
# from transformers import AutoModelForSequenceClassification, AutoTokenizer
# self.model = AutoModelForSequenceClassification.from_pretrained(model_path)
# self.tokenizer = AutoTokenizer.from_pretrained(model_path)
pass
async def moderate(self, text: str) -> ModerationResult:
"""Classify content."""
if not self.model:
return ModerationResult(
is_safe=True,
categories={},
flagged_categories=[]
)
# Would run actual inference
# inputs = self.tokenizer(text, return_tensors="pt", truncation=True)
# outputs = self.model(**inputs)
# scores = outputs.logits.softmax(dim=-1)
# Placeholder
return ModerationResult(
is_safe=True,
categories={ContentCategory.SAFE: 0.95},
flagged_categories=[]
)
class EnsembleModerator(ContentModerator):
"""Combine multiple moderators."""
def __init__(
self,
moderators: list[ContentModerator],
strategy: str = "any" # any, all, majority
):
self.moderators = moderators
self.strategy = strategy
async def moderate(self, text: str) -> ModerationResult:
"""Run all moderators and combine results."""
import asyncio
results = await asyncio.gather(*[
m.moderate(text) for m in self.moderators
])
# Combine categories (max score)
combined_categories = {}
all_flagged = set()
for result in results:
for cat, score in result.categories.items():
if cat not in combined_categories or score > combined_categories[cat]:
combined_categories[cat] = score
all_flagged.update(result.flagged_categories)
# Determine safety based on strategy
if self.strategy == "any":
is_safe = all(r.is_safe for r in results)
elif self.strategy == "all":
is_safe = any(r.is_safe for r in results)
else: # majority
safe_count = sum(1 for r in results if r.is_safe)
is_safe = safe_count > len(results) / 2
return ModerationResult(
is_safe=is_safe,
categories=combined_categories,
flagged_categories=list(all_flagged)
)
Output Filtering
from dataclasses import dataclass
from typing import Any, Optional
from abc import ABC, abstractmethod
import re
@dataclass
class FilterResult:
"""Result of output filtering."""
original: str
filtered: str
was_modified: bool
modifications: list[str] = None
class OutputFilter(ABC):
"""Abstract output filter."""
@abstractmethod
async def filter(self, text: str) -> FilterResult:
"""Filter output text."""
pass
class PIIFilter(OutputFilter):
"""Filter PII from output."""
def __init__(self):
self.patterns = {
"email": (
r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"[EMAIL]"
),
"phone": (
r"\b(\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b",
"[PHONE]"
),
"ssn": (
r"\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b",
"[SSN]"
),
"credit_card": (
r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b",
"[CREDIT_CARD]"
),
}
async def filter(self, text: str) -> FilterResult:
"""Redact PII from output."""
filtered = text
modifications = []
for pii_type, (pattern, replacement) in self.patterns.items():
matches = re.findall(pattern, filtered)
if matches:
filtered = re.sub(pattern, replacement, filtered)
modifications.append(f"Redacted {len(matches)} {pii_type}(s)")
return FilterResult(
original=text,
filtered=filtered,
was_modified=len(modifications) > 0,
modifications=modifications
)
class TopicFilter(OutputFilter):
"""Filter content about specific topics."""
def __init__(
self,
blocked_topics: list[str] = None,
replacement: str = "[Content filtered]"
):
self.blocked_topics = blocked_topics or []
self.replacement = replacement
async def filter(self, text: str) -> FilterResult:
"""Filter blocked topics."""
text_lower = text.lower()
for topic in self.blocked_topics:
if topic.lower() in text_lower:
return FilterResult(
original=text,
filtered=self.replacement,
was_modified=True,
modifications=[f"Blocked topic: {topic}"]
)
return FilterResult(
original=text,
filtered=text,
was_modified=False
)
class CodeFilter(OutputFilter):
"""Filter or sanitize code in output."""
def __init__(
self,
allow_code: bool = True,
dangerous_patterns: list[str] = None
):
self.allow_code = allow_code
self.dangerous_patterns = dangerous_patterns or [
r"os\.system\s*\(",
r"subprocess\.",
r"eval\s*\(",
r"exec\s*\(",
r"__import__\s*\(",
r"open\s*\([^)]*,\s*['\"]w",
r"rm\s+-rf",
r"DROP\s+TABLE",
r"DELETE\s+FROM",
]
self._compiled = [
re.compile(p, re.IGNORECASE)
for p in self.dangerous_patterns
]
async def filter(self, text: str) -> FilterResult:
"""Filter dangerous code patterns."""
if not self.allow_code:
# Remove all code blocks
filtered = re.sub(r'```[\s\S]*?```', '[Code removed]', text)
filtered = re.sub(r'`[^`]+`', '[Code removed]', filtered)
return FilterResult(
original=text,
filtered=filtered,
was_modified=filtered != text,
modifications=["Removed code blocks"]
)
# Check for dangerous patterns
modifications = []
filtered = text
for pattern in self._compiled:
if pattern.search(filtered):
filtered = pattern.sub('[DANGEROUS_CODE_REMOVED]', filtered)
modifications.append(f"Removed dangerous pattern")
return FilterResult(
original=text,
filtered=filtered,
was_modified=len(modifications) > 0,
modifications=modifications
)
class LengthFilter(OutputFilter):
"""Truncate output to max length."""
def __init__(
self,
max_length: int = 4000,
truncation_message: str = "\n\n[Response truncated]"
):
self.max_length = max_length
self.truncation_message = truncation_message
async def filter(self, text: str) -> FilterResult:
"""Truncate if too long."""
if len(text) <= self.max_length:
return FilterResult(
original=text,
filtered=text,
was_modified=False
)
# Truncate at word boundary
truncated = text[:self.max_length]
last_space = truncated.rfind(' ')
if last_space > self.max_length * 0.8:
truncated = truncated[:last_space]
truncated += self.truncation_message
return FilterResult(
original=text,
filtered=truncated,
was_modified=True,
modifications=[f"Truncated from {len(text)} to {len(truncated)} chars"]
)
class FactualityFilter(OutputFilter):
"""Filter potentially false claims."""
def __init__(
self,
llm_client: Any,
model: str = "gpt-4o-mini"
):
self.llm_client = llm_client
self.model = model
async def filter(self, text: str) -> FilterResult:
"""Check for potentially false claims."""
prompt = f"""Analyze this text for potentially false or unverifiable claims:
"{text[:2000]}"
Identify any:
1. Specific statistics without sources
2. Medical or legal advice
3. Predictions stated as facts
4. Historical claims that may be inaccurate
Return JSON:
{{"has_issues": true/false, "issues": ["issue1", "issue2"], "suggestion": "..."}}"""
response = await self.llm_client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0
)
import json
content = response.choices[0].message.content
json_match = re.search(r'\{[\s\S]*\}', content)
if json_match:
data = json.loads(json_match.group(0))
if data.get("has_issues"):
# Add disclaimer
disclaimer = "\n\n*Note: This response may contain claims that should be independently verified.*"
return FilterResult(
original=text,
filtered=text + disclaimer,
was_modified=True,
modifications=data.get("issues", [])
)
return FilterResult(
original=text,
filtered=text,
was_modified=False
)
class CompositeFilter(OutputFilter):
"""Combine multiple filters."""
def __init__(self, filters: list[OutputFilter]):
self.filters = filters
async def filter(self, text: str) -> FilterResult:
"""Apply all filters in sequence."""
current = text
all_modifications = []
for f in self.filters:
result = await f.filter(current)
current = result.filtered
if result.modifications:
all_modifications.extend(result.modifications)
return FilterResult(
original=text,
filtered=current,
was_modified=current != text,
modifications=all_modifications
)
Safety Pipeline
from dataclasses import dataclass, field
from typing import Any, Optional, Callable
from datetime import datetime
from enum import Enum
class SafetyAction(Enum):
"""Actions to take on safety violations."""
ALLOW = "allow"
WARN = "warn"
BLOCK = "block"
MODIFY = "modify"
@dataclass
class SafetyConfig:
"""Safety pipeline configuration."""
# Input validation
max_input_length: int = 10000
block_prompt_injection: bool = True
redact_input_pii: bool = True
# Content moderation
moderation_threshold: float = 0.7
blocked_categories: list[ContentCategory] = None
# Output filtering
max_output_length: int = 8000
redact_output_pii: bool = True
filter_dangerous_code: bool = True
# Actions
on_input_violation: SafetyAction = SafetyAction.BLOCK
on_moderation_violation: SafetyAction = SafetyAction.BLOCK
on_output_violation: SafetyAction = SafetyAction.MODIFY
@dataclass
class SafetyResult:
"""Result of safety pipeline."""
allowed: bool
input_result: ValidationOutput = None
moderation_result: ModerationResult = None
output_result: FilterResult = None
action_taken: SafetyAction = None
final_output: str = None
audit_log: dict = field(default_factory=dict)
class SafetyPipeline:
"""Complete safety pipeline."""
def __init__(
self,
config: SafetyConfig,
llm_client: Any = None,
moderator: ContentModerator = None
):
self.config = config
self.llm_client = llm_client
# Setup validators
self.input_validator = CompositeValidator([
LengthValidator(max_length=config.max_input_length),
PatternValidator() if config.block_prompt_injection else None,
PIIValidator(redact=config.redact_input_pii),
])
self.input_validator.validators = [
v for v in self.input_validator.validators if v
]
# Setup moderator
self.moderator = moderator
if not self.moderator and llm_client:
self.moderator = LLMModerator(
llm_client,
threshold=config.moderation_threshold
)
# Setup output filters
self.output_filter = CompositeFilter([
LengthFilter(max_length=config.max_output_length),
PIIFilter() if config.redact_output_pii else None,
CodeFilter(dangerous_patterns=None) if config.filter_dangerous_code else None,
])
self.output_filter.filters = [
f for f in self.output_filter.filters if f
]
self._audit_log: list[dict] = []
async def check_input(self, text: str) -> tuple[bool, str, ValidationOutput]:
"""Check input safety."""
result = await self.input_validator.validate(text)
if result.result == ValidationResult.BLOCK:
return False, text, result
# Use modified input if available
safe_input = result.modified_input or text
return True, safe_input, result
async def check_content(self, text: str) -> tuple[bool, ModerationResult]:
"""Check content moderation."""
if not self.moderator:
return True, None
result = await self.moderator.moderate(text)
# Check against blocked categories
if self.config.blocked_categories:
for cat in result.flagged_categories:
if cat in self.config.blocked_categories:
return False, result
return result.is_safe, result
async def filter_output(self, text: str) -> tuple[str, FilterResult]:
"""Filter output."""
result = await self.output_filter.filter(text)
return result.filtered, result
async def process_request(
self,
input_text: str,
generate_fn: Callable[[str], str]
) -> SafetyResult:
"""Process complete request through safety pipeline."""
audit = {
"timestamp": datetime.utcnow().isoformat(),
"input_length": len(input_text)
}
# 1. Validate input
input_allowed, safe_input, input_result = await self.check_input(input_text)
audit["input_validation"] = input_result.result.value
if not input_allowed:
audit["action"] = "blocked_input"
self._audit_log.append(audit)
return SafetyResult(
allowed=False,
input_result=input_result,
action_taken=self.config.on_input_violation,
audit_log=audit
)
# 2. Check input content moderation
content_safe, moderation_result = await self.check_content(safe_input)
if moderation_result:
audit["input_moderation"] = "safe" if content_safe else "flagged"
if not content_safe:
audit["action"] = "blocked_moderation"
self._audit_log.append(audit)
return SafetyResult(
allowed=False,
input_result=input_result,
moderation_result=moderation_result,
action_taken=self.config.on_moderation_violation,
audit_log=audit
)
# 3. Generate response
try:
raw_output = await generate_fn(safe_input)
audit["generation"] = "success"
except Exception as e:
audit["generation"] = f"error: {str(e)}"
self._audit_log.append(audit)
raise
# 4. Check output content moderation
output_safe, output_moderation = await self.check_content(raw_output)
if output_moderation:
audit["output_moderation"] = "safe" if output_safe else "flagged"
if not output_safe:
audit["action"] = "blocked_output_moderation"
self._audit_log.append(audit)
return SafetyResult(
allowed=False,
input_result=input_result,
moderation_result=output_moderation,
action_taken=SafetyAction.BLOCK,
audit_log=audit
)
# 5. Filter output
filtered_output, filter_result = await self.filter_output(raw_output)
audit["output_filtered"] = filter_result.was_modified
audit["action"] = "allowed"
self._audit_log.append(audit)
return SafetyResult(
allowed=True,
input_result=input_result,
moderation_result=moderation_result,
output_result=filter_result,
action_taken=SafetyAction.MODIFY if filter_result.was_modified else SafetyAction.ALLOW,
final_output=filtered_output,
audit_log=audit
)
def get_audit_log(self) -> list[dict]:
"""Get audit log."""
return list(self._audit_log)
Production Safety Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
# Initialize safety pipeline
config = SafetyConfig(
max_input_length=10000,
block_prompt_injection=True,
redact_input_pii=True,
moderation_threshold=0.7,
max_output_length=8000
)
# pipeline = SafetyPipeline(config, llm_client)
class ValidateRequest(BaseModel):
text: str
check_pii: bool = True
check_injection: bool = True
class ModerateRequest(BaseModel):
text: str
threshold: float = 0.7
class FilterRequest(BaseModel):
text: str
redact_pii: bool = True
filter_code: bool = True
max_length: int = 8000
class SafetyCheckRequest(BaseModel):
input_text: str
output_text: Optional[str] = None
@app.post("/v1/validate")
async def validate_input(request: ValidateRequest):
"""Validate input text."""
validators = [LengthValidator(max_length=10000)]
if request.check_injection:
validators.append(PatternValidator())
if request.check_pii:
validators.append(PIIValidator(redact=True))
validator = CompositeValidator(validators)
result = await validator.validate(request.text)
return {
"result": result.result.value,
"reason": result.reason,
"modified_text": result.modified_input,
"details": result.details
}
@app.post("/v1/moderate")
async def moderate_content(request: ModerateRequest):
"""Moderate content."""
# Would use actual moderator
# result = await moderator.moderate(request.text)
return {
"is_safe": True,
"categories": {},
"flagged_categories": [],
"explanation": "Content appears safe"
}
@app.post("/v1/filter")
async def filter_output(request: FilterRequest):
"""Filter output text."""
filters = [LengthFilter(max_length=request.max_length)]
if request.redact_pii:
filters.append(PIIFilter())
if request.filter_code:
filters.append(CodeFilter())
filter_chain = CompositeFilter(filters)
result = await filter_chain.filter(request.text)
return {
"original_length": len(result.original),
"filtered_length": len(result.filtered),
"was_modified": result.was_modified,
"modifications": result.modifications,
"filtered_text": result.filtered
}
@app.post("/v1/safety-check")
async def safety_check(request: SafetyCheckRequest):
"""Complete safety check."""
# Validate input
validator = CompositeValidator([
LengthValidator(max_length=10000),
PatternValidator(),
PIIValidator(redact=True)
])
input_result = await validator.validate(request.input_text)
response = {
"input": {
"result": input_result.result.value,
"reason": input_result.reason,
"safe_text": input_result.modified_input or request.input_text
}
}
# Filter output if provided
if request.output_text:
filter_chain = CompositeFilter([
LengthFilter(max_length=8000),
PIIFilter(),
CodeFilter()
])
output_result = await filter_chain.filter(request.output_text)
response["output"] = {
"was_modified": output_result.was_modified,
"modifications": output_result.modifications,
"filtered_text": output_result.filtered
}
return response
@app.get("/v1/audit")
async def get_audit_log(limit: int = 100):
"""Get audit log."""
# Would return actual audit log
return {
"entries": [],
"total": 0
}
@app.get("/v1/stats")
async def get_stats():
"""Get safety statistics."""
return {
"total_requests": 1000,
"blocked_input": 15,
"blocked_moderation": 8,
"modified_output": 120,
"block_rate": 0.023,
"modification_rate": 0.12
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- Guardrails AI: https://github.com/guardrails-ai/guardrails
- NeMo Guardrails: https://github.com/NVIDIA/NeMo-Guardrails
- OpenAI Moderation: https://platform.openai.com/docs/guides/moderation
- LlamaGuard: https://ai.meta.com/research/publications/llama-guard-llm-based-input-output-safeguard-for-human-ai-conversations/
Conclusion
Guardrails are essential for responsible LLM deployment. Start with input validation—check length, detect prompt injection attempts, and optionally redact PII before processing. Add content moderation using the OpenAI moderation API, LLM-based classification, or specialized models like LlamaGuard to catch harmful content. Filter outputs to remove sensitive information, dangerous code patterns, and potentially false claims. Combine these into a comprehensive safety pipeline that validates inputs, moderates content, and filters outputs in sequence. Log all safety decisions for auditing and compliance. The key insight is that safety is defense in depth—no single check catches everything, so layer multiple mechanisms. Monitor your block rates and false positive rates, tuning thresholds based on your use case. A well-designed safety system protects users from harmful content while minimizing friction for legitimate requests, making the difference between a trustworthy AI application and one that poses risks to users and your organization.
