Guardrails and Safety Filters: Protecting LLM Applications from Harmful Content

Introduction: LLMs can generate harmful, biased, or inappropriate content. They can be manipulated through prompt injection, jailbreaks, and adversarial inputs. Production applications need guardrails—safety mechanisms that validate inputs, moderate content, and filter outputs before they reach users. This guide covers practical guardrail implementations: input validation to catch malicious prompts, content moderation using classifiers and LLM-based detection, output filtering to remove sensitive information, and comprehensive safety pipelines that combine multiple layers of protection. Whether you’re building a customer-facing chatbot or an internal tool, guardrails are essential for responsible AI deployment.

Guardrails
Guardrails: Input Validation, Content Moderation, Output Filtering

Input Validation

from dataclasses import dataclass, field
from typing import Any, Optional
from enum import Enum
from abc import ABC, abstractmethod
import re

class ValidationResult(Enum):
    """Validation result status."""
    
    PASS = "pass"
    WARN = "warn"
    BLOCK = "block"

@dataclass
class ValidationOutput:
    """Output from validation."""
    
    result: ValidationResult
    reason: str = ""
    modified_input: str = None
    confidence: float = 1.0
    details: dict = field(default_factory=dict)

class InputValidator(ABC):
    """Abstract input validator."""
    
    @abstractmethod
    async def validate(self, text: str) -> ValidationOutput:
        """Validate input text."""
        pass

class LengthValidator(InputValidator):
    """Validate input length."""
    
    def __init__(
        self,
        min_length: int = 1,
        max_length: int = 10000,
        max_tokens: int = None
    ):
        self.min_length = min_length
        self.max_length = max_length
        self.max_tokens = max_tokens
    
    async def validate(self, text: str) -> ValidationOutput:
        """Check length constraints."""
        
        if len(text) < self.min_length:
            return ValidationOutput(
                result=ValidationResult.BLOCK,
                reason=f"Input too short (min: {self.min_length})"
            )
        
        if len(text) > self.max_length:
            return ValidationOutput(
                result=ValidationResult.BLOCK,
                reason=f"Input too long (max: {self.max_length})"
            )
        
        if self.max_tokens:
            # Rough token estimate
            token_count = len(text) // 4
            if token_count > self.max_tokens:
                return ValidationOutput(
                    result=ValidationResult.BLOCK,
                    reason=f"Input exceeds token limit ({self.max_tokens})"
                )
        
        return ValidationOutput(result=ValidationResult.PASS)

class PatternValidator(InputValidator):
    """Validate against regex patterns."""
    
    def __init__(self):
        self.blocked_patterns = [
            # Prompt injection attempts
            r"ignore\s+(previous|all|above)\s+(instructions?|prompts?)",
            r"disregard\s+(previous|all|above)",
            r"forget\s+(everything|all|previous)",
            r"you\s+are\s+now\s+",
            r"new\s+instructions?:",
            r"system\s*:\s*",
            r"\[INST\]",
            r"<\|im_start\|>",
            
            # Jailbreak patterns
            r"DAN\s+mode",
            r"developer\s+mode",
            r"pretend\s+you\s+(are|can)",
            r"act\s+as\s+if\s+you\s+have\s+no",
            r"bypass\s+(your|the)\s+(restrictions?|filters?)",
            
            # Code injection
            r"```\s*(python|bash|sh|cmd).*exec\s*\(",
            r"os\.system\s*\(",
            r"subprocess\.",
            r"eval\s*\(",
        ]
        
        self._compiled = [
            re.compile(p, re.IGNORECASE)
            for p in self.blocked_patterns
        ]
    
    async def validate(self, text: str) -> ValidationOutput:
        """Check for blocked patterns."""
        
        for pattern in self._compiled:
            match = pattern.search(text)
            if match:
                return ValidationOutput(
                    result=ValidationResult.BLOCK,
                    reason="Potentially malicious pattern detected",
                    details={"matched": match.group(0)}
                )
        
        return ValidationOutput(result=ValidationResult.PASS)

class PIIValidator(InputValidator):
    """Detect and optionally redact PII."""
    
    def __init__(self, redact: bool = False):
        self.redact = redact
        
        self.pii_patterns = {
            "email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
            "phone": r"\b(\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b",
            "ssn": r"\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b",
            "credit_card": r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b",
            "ip_address": r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b",
        }
        
        self._compiled = {
            name: re.compile(pattern)
            for name, pattern in self.pii_patterns.items()
        }
    
    async def validate(self, text: str) -> ValidationOutput:
        """Detect PII in text."""
        
        found_pii = {}
        modified = text
        
        for pii_type, pattern in self._compiled.items():
            matches = pattern.findall(text)
            if matches:
                found_pii[pii_type] = len(matches)
                
                if self.redact:
                    modified = pattern.sub(f"[REDACTED_{pii_type.upper()}]", modified)
        
        if found_pii:
            if self.redact:
                return ValidationOutput(
                    result=ValidationResult.WARN,
                    reason="PII detected and redacted",
                    modified_input=modified,
                    details={"pii_found": found_pii}
                )
            else:
                return ValidationOutput(
                    result=ValidationResult.WARN,
                    reason="PII detected in input",
                    details={"pii_found": found_pii}
                )
        
        return ValidationOutput(result=ValidationResult.PASS)

class LanguageValidator(InputValidator):
    """Validate input language."""
    
    def __init__(
        self,
        allowed_languages: list[str] = None,
        detector: Any = None
    ):
        self.allowed_languages = allowed_languages or ["en"]
        self.detector = detector
    
    async def validate(self, text: str) -> ValidationOutput:
        """Check input language."""
        
        if self.detector:
            detected = self.detector.detect(text)
            language = detected.lang
            confidence = detected.prob
        else:
            # Simple heuristic
            language = "en"
            confidence = 0.8
        
        if language not in self.allowed_languages:
            return ValidationOutput(
                result=ValidationResult.BLOCK,
                reason=f"Language '{language}' not allowed",
                confidence=confidence,
                details={"detected_language": language}
            )
        
        return ValidationOutput(
            result=ValidationResult.PASS,
            confidence=confidence,
            details={"detected_language": language}
        )

class CompositeValidator(InputValidator):
    """Combine multiple validators."""
    
    def __init__(self, validators: list[InputValidator]):
        self.validators = validators
    
    async def validate(self, text: str) -> ValidationOutput:
        """Run all validators."""
        
        current_text = text
        all_details = {}
        
        for validator in self.validators:
            result = await validator.validate(current_text)
            
            # Collect details
            all_details[type(validator).__name__] = {
                "result": result.result.value,
                "reason": result.reason,
                "details": result.details
            }
            
            # Block immediately if any validator blocks
            if result.result == ValidationResult.BLOCK:
                return ValidationOutput(
                    result=ValidationResult.BLOCK,
                    reason=result.reason,
                    details=all_details
                )
            
            # Use modified input if provided
            if result.modified_input:
                current_text = result.modified_input
        
        # Check for any warnings
        has_warnings = any(
            d["result"] == "warn"
            for d in all_details.values()
        )
        
        return ValidationOutput(
            result=ValidationResult.WARN if has_warnings else ValidationResult.PASS,
            modified_input=current_text if current_text != text else None,
            details=all_details
        )

Content Moderation

from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum

class ContentCategory(Enum):
    """Content moderation categories."""
    
    SAFE = "safe"
    HATE = "hate"
    VIOLENCE = "violence"
    SEXUAL = "sexual"
    SELF_HARM = "self_harm"
    HARASSMENT = "harassment"
    ILLEGAL = "illegal"
    MISINFORMATION = "misinformation"

@dataclass
class ModerationResult:
    """Result of content moderation."""
    
    is_safe: bool
    categories: dict[ContentCategory, float]  # category -> score
    flagged_categories: list[ContentCategory]
    explanation: str = ""

class ContentModerator(ABC):
    """Abstract content moderator."""
    
    @abstractmethod
    async def moderate(self, text: str) -> ModerationResult:
        """Moderate content."""
        pass

class OpenAIModerator(ContentModerator):
    """Use OpenAI moderation API."""
    
    def __init__(self, client: Any):
        self.client = client
        
        # Map OpenAI categories to our categories
        self.category_map = {
            "hate": ContentCategory.HATE,
            "hate/threatening": ContentCategory.HATE,
            "harassment": ContentCategory.HARASSMENT,
            "harassment/threatening": ContentCategory.HARASSMENT,
            "self-harm": ContentCategory.SELF_HARM,
            "self-harm/intent": ContentCategory.SELF_HARM,
            "self-harm/instructions": ContentCategory.SELF_HARM,
            "sexual": ContentCategory.SEXUAL,
            "sexual/minors": ContentCategory.SEXUAL,
            "violence": ContentCategory.VIOLENCE,
            "violence/graphic": ContentCategory.VIOLENCE,
        }
    
    async def moderate(self, text: str) -> ModerationResult:
        """Call OpenAI moderation API."""
        
        response = await self.client.moderations.create(input=text)
        result = response.results[0]
        
        categories = {}
        flagged = []
        
        for oai_cat, score in result.category_scores.items():
            our_cat = self.category_map.get(oai_cat)
            if our_cat:
                # Take max score for each category
                if our_cat not in categories or score > categories[our_cat]:
                    categories[our_cat] = score
                
                if result.categories.get(oai_cat, False):
                    if our_cat not in flagged:
                        flagged.append(our_cat)
        
        return ModerationResult(
            is_safe=not result.flagged,
            categories=categories,
            flagged_categories=flagged
        )

class LLMModerator(ContentModerator):
    """Use LLM for content moderation."""
    
    def __init__(
        self,
        client: Any,
        model: str = "gpt-4o-mini",
        threshold: float = 0.7
    ):
        self.client = client
        self.model = model
        self.threshold = threshold
    
    async def moderate(self, text: str) -> ModerationResult:
        """Use LLM to moderate content."""
        
        prompt = f"""Analyze this text for harmful content.

Text: "{text[:2000]}"

Rate each category from 0.0 (none) to 1.0 (severe):
- hate: Hate speech, discrimination
- violence: Violent content, threats
- sexual: Sexual content
- self_harm: Self-harm, suicide content
- harassment: Bullying, personal attacks
- illegal: Illegal activities
- misinformation: False information

Return JSON:
{{"hate": 0.0, "violence": 0.0, "sexual": 0.0, "self_harm": 0.0, "harassment": 0.0, "illegal": 0.0, "misinformation": 0.0, "explanation": "..."}}"""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        
        import json
        import re
        
        content = response.choices[0].message.content
        json_match = re.search(r'\{[\s\S]*\}', content)
        
        if json_match:
            data = json.loads(json_match.group(0))
            
            categories = {}
            flagged = []
            
            for cat_name, score in data.items():
                if cat_name == "explanation":
                    continue
                
                try:
                    cat = ContentCategory(cat_name)
                    categories[cat] = float(score)
                    
                    if float(score) >= self.threshold:
                        flagged.append(cat)
                except (ValueError, TypeError):
                    pass
            
            return ModerationResult(
                is_safe=len(flagged) == 0,
                categories=categories,
                flagged_categories=flagged,
                explanation=data.get("explanation", "")
            )
        
        # Default to safe if parsing fails
        return ModerationResult(
            is_safe=True,
            categories={},
            flagged_categories=[]
        )

class ClassifierModerator(ContentModerator):
    """Use classifier model for moderation."""
    
    def __init__(
        self,
        model_path: str = None,
        threshold: float = 0.5
    ):
        self.threshold = threshold
        self.model = None
        self.tokenizer = None
        
        if model_path:
            self._load_model(model_path)
    
    def _load_model(self, model_path: str) -> None:
        """Load classifier model."""
        
        # Would load actual model here
        # from transformers import AutoModelForSequenceClassification, AutoTokenizer
        # self.model = AutoModelForSequenceClassification.from_pretrained(model_path)
        # self.tokenizer = AutoTokenizer.from_pretrained(model_path)
        pass
    
    async def moderate(self, text: str) -> ModerationResult:
        """Classify content."""
        
        if not self.model:
            return ModerationResult(
                is_safe=True,
                categories={},
                flagged_categories=[]
            )
        
        # Would run actual inference
        # inputs = self.tokenizer(text, return_tensors="pt", truncation=True)
        # outputs = self.model(**inputs)
        # scores = outputs.logits.softmax(dim=-1)
        
        # Placeholder
        return ModerationResult(
            is_safe=True,
            categories={ContentCategory.SAFE: 0.95},
            flagged_categories=[]
        )

class EnsembleModerator(ContentModerator):
    """Combine multiple moderators."""
    
    def __init__(
        self,
        moderators: list[ContentModerator],
        strategy: str = "any"  # any, all, majority
    ):
        self.moderators = moderators
        self.strategy = strategy
    
    async def moderate(self, text: str) -> ModerationResult:
        """Run all moderators and combine results."""
        
        import asyncio
        
        results = await asyncio.gather(*[
            m.moderate(text) for m in self.moderators
        ])
        
        # Combine categories (max score)
        combined_categories = {}
        all_flagged = set()
        
        for result in results:
            for cat, score in result.categories.items():
                if cat not in combined_categories or score > combined_categories[cat]:
                    combined_categories[cat] = score
            
            all_flagged.update(result.flagged_categories)
        
        # Determine safety based on strategy
        if self.strategy == "any":
            is_safe = all(r.is_safe for r in results)
        elif self.strategy == "all":
            is_safe = any(r.is_safe for r in results)
        else:  # majority
            safe_count = sum(1 for r in results if r.is_safe)
            is_safe = safe_count > len(results) / 2
        
        return ModerationResult(
            is_safe=is_safe,
            categories=combined_categories,
            flagged_categories=list(all_flagged)
        )

Output Filtering

from dataclasses import dataclass
from typing import Any, Optional
from abc import ABC, abstractmethod
import re

@dataclass
class FilterResult:
    """Result of output filtering."""
    
    original: str
    filtered: str
    was_modified: bool
    modifications: list[str] = None

class OutputFilter(ABC):
    """Abstract output filter."""
    
    @abstractmethod
    async def filter(self, text: str) -> FilterResult:
        """Filter output text."""
        pass

class PIIFilter(OutputFilter):
    """Filter PII from output."""
    
    def __init__(self):
        self.patterns = {
            "email": (
                r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
                "[EMAIL]"
            ),
            "phone": (
                r"\b(\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b",
                "[PHONE]"
            ),
            "ssn": (
                r"\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b",
                "[SSN]"
            ),
            "credit_card": (
                r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b",
                "[CREDIT_CARD]"
            ),
        }
    
    async def filter(self, text: str) -> FilterResult:
        """Redact PII from output."""
        
        filtered = text
        modifications = []
        
        for pii_type, (pattern, replacement) in self.patterns.items():
            matches = re.findall(pattern, filtered)
            if matches:
                filtered = re.sub(pattern, replacement, filtered)
                modifications.append(f"Redacted {len(matches)} {pii_type}(s)")
        
        return FilterResult(
            original=text,
            filtered=filtered,
            was_modified=len(modifications) > 0,
            modifications=modifications
        )

class TopicFilter(OutputFilter):
    """Filter content about specific topics."""
    
    def __init__(
        self,
        blocked_topics: list[str] = None,
        replacement: str = "[Content filtered]"
    ):
        self.blocked_topics = blocked_topics or []
        self.replacement = replacement
    
    async def filter(self, text: str) -> FilterResult:
        """Filter blocked topics."""
        
        text_lower = text.lower()
        
        for topic in self.blocked_topics:
            if topic.lower() in text_lower:
                return FilterResult(
                    original=text,
                    filtered=self.replacement,
                    was_modified=True,
                    modifications=[f"Blocked topic: {topic}"]
                )
        
        return FilterResult(
            original=text,
            filtered=text,
            was_modified=False
        )

class CodeFilter(OutputFilter):
    """Filter or sanitize code in output."""
    
    def __init__(
        self,
        allow_code: bool = True,
        dangerous_patterns: list[str] = None
    ):
        self.allow_code = allow_code
        self.dangerous_patterns = dangerous_patterns or [
            r"os\.system\s*\(",
            r"subprocess\.",
            r"eval\s*\(",
            r"exec\s*\(",
            r"__import__\s*\(",
            r"open\s*\([^)]*,\s*['\"]w",
            r"rm\s+-rf",
            r"DROP\s+TABLE",
            r"DELETE\s+FROM",
        ]
        
        self._compiled = [
            re.compile(p, re.IGNORECASE)
            for p in self.dangerous_patterns
        ]
    
    async def filter(self, text: str) -> FilterResult:
        """Filter dangerous code patterns."""
        
        if not self.allow_code:
            # Remove all code blocks
            filtered = re.sub(r'```[\s\S]*?```', '[Code removed]', text)
            filtered = re.sub(r'`[^`]+`', '[Code removed]', filtered)
            
            return FilterResult(
                original=text,
                filtered=filtered,
                was_modified=filtered != text,
                modifications=["Removed code blocks"]
            )
        
        # Check for dangerous patterns
        modifications = []
        filtered = text
        
        for pattern in self._compiled:
            if pattern.search(filtered):
                filtered = pattern.sub('[DANGEROUS_CODE_REMOVED]', filtered)
                modifications.append(f"Removed dangerous pattern")
        
        return FilterResult(
            original=text,
            filtered=filtered,
            was_modified=len(modifications) > 0,
            modifications=modifications
        )

class LengthFilter(OutputFilter):
    """Truncate output to max length."""
    
    def __init__(
        self,
        max_length: int = 4000,
        truncation_message: str = "\n\n[Response truncated]"
    ):
        self.max_length = max_length
        self.truncation_message = truncation_message
    
    async def filter(self, text: str) -> FilterResult:
        """Truncate if too long."""
        
        if len(text) <= self.max_length:
            return FilterResult(
                original=text,
                filtered=text,
                was_modified=False
            )
        
        # Truncate at word boundary
        truncated = text[:self.max_length]
        last_space = truncated.rfind(' ')
        if last_space > self.max_length * 0.8:
            truncated = truncated[:last_space]
        
        truncated += self.truncation_message
        
        return FilterResult(
            original=text,
            filtered=truncated,
            was_modified=True,
            modifications=[f"Truncated from {len(text)} to {len(truncated)} chars"]
        )

class FactualityFilter(OutputFilter):
    """Filter potentially false claims."""
    
    def __init__(
        self,
        llm_client: Any,
        model: str = "gpt-4o-mini"
    ):
        self.llm_client = llm_client
        self.model = model
    
    async def filter(self, text: str) -> FilterResult:
        """Check for potentially false claims."""
        
        prompt = f"""Analyze this text for potentially false or unverifiable claims:

"{text[:2000]}"

Identify any:
1. Specific statistics without sources
2. Medical or legal advice
3. Predictions stated as facts
4. Historical claims that may be inaccurate

Return JSON:
{{"has_issues": true/false, "issues": ["issue1", "issue2"], "suggestion": "..."}}"""
        
        response = await self.llm_client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        
        import json
        
        content = response.choices[0].message.content
        json_match = re.search(r'\{[\s\S]*\}', content)
        
        if json_match:
            data = json.loads(json_match.group(0))
            
            if data.get("has_issues"):
                # Add disclaimer
                disclaimer = "\n\n*Note: This response may contain claims that should be independently verified.*"
                
                return FilterResult(
                    original=text,
                    filtered=text + disclaimer,
                    was_modified=True,
                    modifications=data.get("issues", [])
                )
        
        return FilterResult(
            original=text,
            filtered=text,
            was_modified=False
        )

class CompositeFilter(OutputFilter):
    """Combine multiple filters."""
    
    def __init__(self, filters: list[OutputFilter]):
        self.filters = filters
    
    async def filter(self, text: str) -> FilterResult:
        """Apply all filters in sequence."""
        
        current = text
        all_modifications = []
        
        for f in self.filters:
            result = await f.filter(current)
            current = result.filtered
            
            if result.modifications:
                all_modifications.extend(result.modifications)
        
        return FilterResult(
            original=text,
            filtered=current,
            was_modified=current != text,
            modifications=all_modifications
        )

Safety Pipeline

from dataclasses import dataclass, field
from typing import Any, Optional, Callable
from datetime import datetime
from enum import Enum

class SafetyAction(Enum):
    """Actions to take on safety violations."""
    
    ALLOW = "allow"
    WARN = "warn"
    BLOCK = "block"
    MODIFY = "modify"

@dataclass
class SafetyConfig:
    """Safety pipeline configuration."""
    
    # Input validation
    max_input_length: int = 10000
    block_prompt_injection: bool = True
    redact_input_pii: bool = True
    
    # Content moderation
    moderation_threshold: float = 0.7
    blocked_categories: list[ContentCategory] = None
    
    # Output filtering
    max_output_length: int = 8000
    redact_output_pii: bool = True
    filter_dangerous_code: bool = True
    
    # Actions
    on_input_violation: SafetyAction = SafetyAction.BLOCK
    on_moderation_violation: SafetyAction = SafetyAction.BLOCK
    on_output_violation: SafetyAction = SafetyAction.MODIFY

@dataclass
class SafetyResult:
    """Result of safety pipeline."""
    
    allowed: bool
    input_result: ValidationOutput = None
    moderation_result: ModerationResult = None
    output_result: FilterResult = None
    action_taken: SafetyAction = None
    final_output: str = None
    audit_log: dict = field(default_factory=dict)

class SafetyPipeline:
    """Complete safety pipeline."""
    
    def __init__(
        self,
        config: SafetyConfig,
        llm_client: Any = None,
        moderator: ContentModerator = None
    ):
        self.config = config
        self.llm_client = llm_client
        
        # Setup validators
        self.input_validator = CompositeValidator([
            LengthValidator(max_length=config.max_input_length),
            PatternValidator() if config.block_prompt_injection else None,
            PIIValidator(redact=config.redact_input_pii),
        ])
        self.input_validator.validators = [
            v for v in self.input_validator.validators if v
        ]
        
        # Setup moderator
        self.moderator = moderator
        if not self.moderator and llm_client:
            self.moderator = LLMModerator(
                llm_client,
                threshold=config.moderation_threshold
            )
        
        # Setup output filters
        self.output_filter = CompositeFilter([
            LengthFilter(max_length=config.max_output_length),
            PIIFilter() if config.redact_output_pii else None,
            CodeFilter(dangerous_patterns=None) if config.filter_dangerous_code else None,
        ])
        self.output_filter.filters = [
            f for f in self.output_filter.filters if f
        ]
        
        self._audit_log: list[dict] = []
    
    async def check_input(self, text: str) -> tuple[bool, str, ValidationOutput]:
        """Check input safety."""
        
        result = await self.input_validator.validate(text)
        
        if result.result == ValidationResult.BLOCK:
            return False, text, result
        
        # Use modified input if available
        safe_input = result.modified_input or text
        
        return True, safe_input, result
    
    async def check_content(self, text: str) -> tuple[bool, ModerationResult]:
        """Check content moderation."""
        
        if not self.moderator:
            return True, None
        
        result = await self.moderator.moderate(text)
        
        # Check against blocked categories
        if self.config.blocked_categories:
            for cat in result.flagged_categories:
                if cat in self.config.blocked_categories:
                    return False, result
        
        return result.is_safe, result
    
    async def filter_output(self, text: str) -> tuple[str, FilterResult]:
        """Filter output."""
        
        result = await self.output_filter.filter(text)
        return result.filtered, result
    
    async def process_request(
        self,
        input_text: str,
        generate_fn: Callable[[str], str]
    ) -> SafetyResult:
        """Process complete request through safety pipeline."""
        
        audit = {
            "timestamp": datetime.utcnow().isoformat(),
            "input_length": len(input_text)
        }
        
        # 1. Validate input
        input_allowed, safe_input, input_result = await self.check_input(input_text)
        audit["input_validation"] = input_result.result.value
        
        if not input_allowed:
            audit["action"] = "blocked_input"
            self._audit_log.append(audit)
            
            return SafetyResult(
                allowed=False,
                input_result=input_result,
                action_taken=self.config.on_input_violation,
                audit_log=audit
            )
        
        # 2. Check input content moderation
        content_safe, moderation_result = await self.check_content(safe_input)
        if moderation_result:
            audit["input_moderation"] = "safe" if content_safe else "flagged"
        
        if not content_safe:
            audit["action"] = "blocked_moderation"
            self._audit_log.append(audit)
            
            return SafetyResult(
                allowed=False,
                input_result=input_result,
                moderation_result=moderation_result,
                action_taken=self.config.on_moderation_violation,
                audit_log=audit
            )
        
        # 3. Generate response
        try:
            raw_output = await generate_fn(safe_input)
            audit["generation"] = "success"
        except Exception as e:
            audit["generation"] = f"error: {str(e)}"
            self._audit_log.append(audit)
            raise
        
        # 4. Check output content moderation
        output_safe, output_moderation = await self.check_content(raw_output)
        if output_moderation:
            audit["output_moderation"] = "safe" if output_safe else "flagged"
        
        if not output_safe:
            audit["action"] = "blocked_output_moderation"
            self._audit_log.append(audit)
            
            return SafetyResult(
                allowed=False,
                input_result=input_result,
                moderation_result=output_moderation,
                action_taken=SafetyAction.BLOCK,
                audit_log=audit
            )
        
        # 5. Filter output
        filtered_output, filter_result = await self.filter_output(raw_output)
        audit["output_filtered"] = filter_result.was_modified
        audit["action"] = "allowed"
        
        self._audit_log.append(audit)
        
        return SafetyResult(
            allowed=True,
            input_result=input_result,
            moderation_result=moderation_result,
            output_result=filter_result,
            action_taken=SafetyAction.MODIFY if filter_result.was_modified else SafetyAction.ALLOW,
            final_output=filtered_output,
            audit_log=audit
        )
    
    def get_audit_log(self) -> list[dict]:
        """Get audit log."""
        return list(self._audit_log)

Production Safety Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

# Initialize safety pipeline
config = SafetyConfig(
    max_input_length=10000,
    block_prompt_injection=True,
    redact_input_pii=True,
    moderation_threshold=0.7,
    max_output_length=8000
)
# pipeline = SafetyPipeline(config, llm_client)

class ValidateRequest(BaseModel):
    text: str
    check_pii: bool = True
    check_injection: bool = True

class ModerateRequest(BaseModel):
    text: str
    threshold: float = 0.7

class FilterRequest(BaseModel):
    text: str
    redact_pii: bool = True
    filter_code: bool = True
    max_length: int = 8000

class SafetyCheckRequest(BaseModel):
    input_text: str
    output_text: Optional[str] = None

@app.post("/v1/validate")
async def validate_input(request: ValidateRequest):
    """Validate input text."""
    
    validators = [LengthValidator(max_length=10000)]
    
    if request.check_injection:
        validators.append(PatternValidator())
    
    if request.check_pii:
        validators.append(PIIValidator(redact=True))
    
    validator = CompositeValidator(validators)
    result = await validator.validate(request.text)
    
    return {
        "result": result.result.value,
        "reason": result.reason,
        "modified_text": result.modified_input,
        "details": result.details
    }

@app.post("/v1/moderate")
async def moderate_content(request: ModerateRequest):
    """Moderate content."""
    
    # Would use actual moderator
    # result = await moderator.moderate(request.text)
    
    return {
        "is_safe": True,
        "categories": {},
        "flagged_categories": [],
        "explanation": "Content appears safe"
    }

@app.post("/v1/filter")
async def filter_output(request: FilterRequest):
    """Filter output text."""
    
    filters = [LengthFilter(max_length=request.max_length)]
    
    if request.redact_pii:
        filters.append(PIIFilter())
    
    if request.filter_code:
        filters.append(CodeFilter())
    
    filter_chain = CompositeFilter(filters)
    result = await filter_chain.filter(request.text)
    
    return {
        "original_length": len(result.original),
        "filtered_length": len(result.filtered),
        "was_modified": result.was_modified,
        "modifications": result.modifications,
        "filtered_text": result.filtered
    }

@app.post("/v1/safety-check")
async def safety_check(request: SafetyCheckRequest):
    """Complete safety check."""
    
    # Validate input
    validator = CompositeValidator([
        LengthValidator(max_length=10000),
        PatternValidator(),
        PIIValidator(redact=True)
    ])
    
    input_result = await validator.validate(request.input_text)
    
    response = {
        "input": {
            "result": input_result.result.value,
            "reason": input_result.reason,
            "safe_text": input_result.modified_input or request.input_text
        }
    }
    
    # Filter output if provided
    if request.output_text:
        filter_chain = CompositeFilter([
            LengthFilter(max_length=8000),
            PIIFilter(),
            CodeFilter()
        ])
        
        output_result = await filter_chain.filter(request.output_text)
        
        response["output"] = {
            "was_modified": output_result.was_modified,
            "modifications": output_result.modifications,
            "filtered_text": output_result.filtered
        }
    
    return response

@app.get("/v1/audit")
async def get_audit_log(limit: int = 100):
    """Get audit log."""
    
    # Would return actual audit log
    return {
        "entries": [],
        "total": 0
    }

@app.get("/v1/stats")
async def get_stats():
    """Get safety statistics."""
    
    return {
        "total_requests": 1000,
        "blocked_input": 15,
        "blocked_moderation": 8,
        "modified_output": 120,
        "block_rate": 0.023,
        "modification_rate": 0.12
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Guardrails are essential for responsible LLM deployment. Start with input validation—check length, detect prompt injection attempts, and optionally redact PII before processing. Add content moderation using the OpenAI moderation API, LLM-based classification, or specialized models like LlamaGuard to catch harmful content. Filter outputs to remove sensitive information, dangerous code patterns, and potentially false claims. Combine these into a comprehensive safety pipeline that validates inputs, moderates content, and filters outputs in sequence. Log all safety decisions for auditing and compliance. The key insight is that safety is defense in depth—no single check catches everything, so layer multiple mechanisms. Monitor your block rates and false positive rates, tuning thresholds based on your use case. A well-designed safety system protects users from harmful content while minimizing friction for legitimate requests, making the difference between a trustworthy AI application and one that poses risks to users and your organization.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.