Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

Guardrails and Safety for LLMs: Building Secure AI Applications with Input Validation and Output Filtering

Introduction: Production LLM applications need guardrails to ensure safe, appropriate outputs. Without proper safeguards, models can generate harmful content, leak sensitive information, or produce responses that violate business policies. Guardrails provide defense-in-depth: input validation catches problematic requests before they reach the model, output filtering ensures responses meet safety standards, and content moderation prevents harmful generations. This guide covers practical guardrail patterns: input sanitization, topic restriction, PII detection, toxicity filtering, and production-ready safety pipelines that protect both users and your organization.

Guardrails
Guardrails System: Input Validation, LLM Processing, Output Filtering

Input Validation

from dataclasses import dataclass
from typing import Optional
from enum import Enum
import re

class ValidationResult(Enum):
    PASS = "pass"
    BLOCK = "block"
    WARN = "warn"

@dataclass
class ValidationResponse:
    """Result of input validation."""
    
    result: ValidationResult
    reason: Optional[str] = None
    sanitized_input: Optional[str] = None

class InputValidator:
    """Validate and sanitize user inputs."""
    
    def __init__(
        self,
        max_length: int = 10000,
        blocked_patterns: list[str] = None,
        allowed_topics: list[str] = None
    ):
        self.max_length = max_length
        self.blocked_patterns = blocked_patterns or []
        self.allowed_topics = allowed_topics
    
    def validate(self, text: str) -> ValidationResponse:
        """Validate input text."""
        
        # Length check
        if len(text) > self.max_length:
            return ValidationResponse(
                result=ValidationResult.BLOCK,
                reason=f"Input exceeds maximum length of {self.max_length}"
            )
        
        # Empty check
        if not text.strip():
            return ValidationResponse(
                result=ValidationResult.BLOCK,
                reason="Input is empty"
            )
        
        # Blocked patterns
        for pattern in self.blocked_patterns:
            if re.search(pattern, text, re.IGNORECASE):
                return ValidationResponse(
                    result=ValidationResult.BLOCK,
                    reason=f"Input contains blocked pattern"
                )
        
        return ValidationResponse(
            result=ValidationResult.PASS,
            sanitized_input=text.strip()
        )

class PromptInjectionDetector:
    """Detect prompt injection attempts."""
    
    INJECTION_PATTERNS = [
        r"ignore\s+(previous|all|above)\s+instructions",
        r"disregard\s+(previous|all|above)",
        r"forget\s+(everything|all|previous)",
        r"you\s+are\s+now\s+",
        r"new\s+instructions:",
        r"system\s*:\s*",
        r"<\|.*\|>",
        r"\[INST\]",
        r"```system",
    ]
    
    def __init__(self, custom_patterns: list[str] = None):
        self.patterns = self.INJECTION_PATTERNS + (custom_patterns or [])
    
    def detect(self, text: str) -> ValidationResponse:
        """Detect prompt injection attempts."""
        
        text_lower = text.lower()
        
        for pattern in self.patterns:
            if re.search(pattern, text_lower):
                return ValidationResponse(
                    result=ValidationResult.BLOCK,
                    reason="Potential prompt injection detected"
                )
        
        return ValidationResponse(result=ValidationResult.PASS)

class TopicRestrictor:
    """Restrict inputs to allowed topics."""
    
    def __init__(
        self,
        allowed_topics: list[str],
        blocked_topics: list[str] = None
    ):
        self.allowed_topics = allowed_topics
        self.blocked_topics = blocked_topics or []
    
    async def check_topic(
        self,
        text: str,
        classifier: callable
    ) -> ValidationResponse:
        """Check if input is on-topic."""
        
        # Use classifier to determine topic
        topic = await classifier(text)
        
        # Check blocked topics first
        if topic in self.blocked_topics:
            return ValidationResponse(
                result=ValidationResult.BLOCK,
                reason=f"Topic '{topic}' is not allowed"
            )
        
        # Check allowed topics
        if self.allowed_topics and topic not in self.allowed_topics:
            return ValidationResponse(
                result=ValidationResult.BLOCK,
                reason=f"Topic '{topic}' is outside allowed scope"
            )
        
        return ValidationResponse(result=ValidationResult.PASS)

# Combined input guardrail
class InputGuardrail:
    """Combined input validation guardrail."""
    
    def __init__(
        self,
        max_length: int = 10000,
        blocked_patterns: list[str] = None,
        detect_injection: bool = True
    ):
        self.validator = InputValidator(
            max_length=max_length,
            blocked_patterns=blocked_patterns
        )
        self.injection_detector = PromptInjectionDetector() if detect_injection else None
    
    def validate(self, text: str) -> ValidationResponse:
        """Run all input validations."""
        
        # Basic validation
        result = self.validator.validate(text)
        if result.result != ValidationResult.PASS:
            return result
        
        # Injection detection
        if self.injection_detector:
            result = self.injection_detector.detect(text)
            if result.result != ValidationResult.PASS:
                return result
        
        return ValidationResponse(
            result=ValidationResult.PASS,
            sanitized_input=text.strip()
        )

PII Detection and Masking

from dataclasses import dataclass
from typing import Optional
import re

@dataclass
class PIIMatch:
    """A detected PII match."""
    
    type: str
    value: str
    start: int
    end: int
    masked: str

class PIIDetector:
    """Detect and mask personally identifiable information."""
    
    PATTERNS = {
        "email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
        "phone_us": r'\b(?:\+1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b',
        "ssn": r'\b\d{3}-\d{2}-\d{4}\b',
        "credit_card": r'\b(?:\d{4}[-\s]?){3}\d{4}\b',
        "ip_address": r'\b(?:\d{1,3}\.){3}\d{1,3}\b',
        "date_of_birth": r'\b(?:0?[1-9]|1[0-2])[/-](?:0?[1-9]|[12]\d|3[01])[/-](?:19|20)\d{2}\b',
    }
    
    MASKS = {
        "email": "[EMAIL]",
        "phone_us": "[PHONE]",
        "ssn": "[SSN]",
        "credit_card": "[CREDIT_CARD]",
        "ip_address": "[IP_ADDRESS]",
        "date_of_birth": "[DOB]",
    }
    
    def __init__(self, custom_patterns: dict = None):
        self.patterns = {**self.PATTERNS, **(custom_patterns or {})}
    
    def detect(self, text: str) -> list[PIIMatch]:
        """Detect all PII in text."""
        
        matches = []
        
        for pii_type, pattern in self.patterns.items():
            for match in re.finditer(pattern, text):
                matches.append(PIIMatch(
                    type=pii_type,
                    value=match.group(),
                    start=match.start(),
                    end=match.end(),
                    masked=self.MASKS.get(pii_type, "[REDACTED]")
                ))
        
        return matches
    
    def mask(self, text: str) -> tuple[str, list[PIIMatch]]:
        """Mask all PII in text."""
        
        matches = self.detect(text)
        
        # Sort by position (reverse) to replace from end
        matches.sort(key=lambda m: m.start, reverse=True)
        
        masked_text = text
        for match in matches:
            masked_text = (
                masked_text[:match.start] +
                match.masked +
                masked_text[match.end:]
            )
        
        return masked_text, matches
    
    def has_pii(self, text: str) -> bool:
        """Check if text contains PII."""
        return len(self.detect(text)) > 0

class PIIGuardrail:
    """Guardrail for PII handling."""
    
    def __init__(
        self,
        action: str = "mask",  # "mask", "block", "warn"
        detector: PIIDetector = None
    ):
        self.action = action
        self.detector = detector or PIIDetector()
    
    def process_input(self, text: str) -> ValidationResponse:
        """Process input for PII."""
        
        matches = self.detector.detect(text)
        
        if not matches:
            return ValidationResponse(
                result=ValidationResult.PASS,
                sanitized_input=text
            )
        
        if self.action == "block":
            return ValidationResponse(
                result=ValidationResult.BLOCK,
                reason=f"Input contains PII: {[m.type for m in matches]}"
            )
        
        elif self.action == "warn":
            return ValidationResponse(
                result=ValidationResult.WARN,
                reason=f"Input contains PII: {[m.type for m in matches]}",
                sanitized_input=text
            )
        
        else:  # mask
            masked_text, _ = self.detector.mask(text)
            return ValidationResponse(
                result=ValidationResult.PASS,
                sanitized_input=masked_text
            )
    
    def process_output(self, text: str) -> str:
        """Mask PII in output."""
        
        masked_text, _ = self.detector.mask(text)
        return masked_text

Output Filtering

from dataclasses import dataclass
from typing import Optional, Callable
from enum import Enum

class ContentCategory(Enum):
    SAFE = "safe"
    TOXIC = "toxic"
    HARMFUL = "harmful"
    INAPPROPRIATE = "inappropriate"
    OFF_TOPIC = "off_topic"

@dataclass
class FilterResult:
    """Result of content filtering."""
    
    allowed: bool
    category: ContentCategory
    confidence: float
    reason: Optional[str] = None
    filtered_content: Optional[str] = None

class ToxicityFilter:
    """Filter toxic content from outputs."""
    
    TOXIC_PATTERNS = [
        r'\b(hate|kill|murder|attack)\b.*\b(people|group|race)\b',
        r'\b(stupid|idiot|moron)\b',
        r'profanity_pattern_here',
    ]
    
    def __init__(
        self,
        threshold: float = 0.7,
        custom_patterns: list[str] = None
    ):
        self.threshold = threshold
        self.patterns = self.TOXIC_PATTERNS + (custom_patterns or [])
    
    def filter(self, text: str) -> FilterResult:
        """Filter text for toxicity."""
        
        text_lower = text.lower()
        
        for pattern in self.patterns:
            if re.search(pattern, text_lower):
                return FilterResult(
                    allowed=False,
                    category=ContentCategory.TOXIC,
                    confidence=0.9,
                    reason="Toxic content detected"
                )
        
        return FilterResult(
            allowed=True,
            category=ContentCategory.SAFE,
            confidence=0.95
        )

class HarmfulContentFilter:
    """Filter harmful content like dangerous instructions."""
    
    HARMFUL_CATEGORIES = [
        "weapons_instructions",
        "drug_synthesis",
        "hacking_instructions",
        "self_harm",
        "illegal_activities"
    ]
    
    HARMFUL_PATTERNS = {
        "weapons_instructions": [
            r'how\s+to\s+(make|build|create)\s+(bomb|explosive|weapon)',
        ],
        "drug_synthesis": [
            r'how\s+to\s+(make|synthesize|cook)\s+(meth|drugs|cocaine)',
        ],
        "hacking_instructions": [
            r'how\s+to\s+hack\s+(into|password|account)',
        ],
    }
    
    def filter(self, text: str) -> FilterResult:
        """Filter harmful content."""
        
        text_lower = text.lower()
        
        for category, patterns in self.HARMFUL_PATTERNS.items():
            for pattern in patterns:
                if re.search(pattern, text_lower):
                    return FilterResult(
                        allowed=False,
                        category=ContentCategory.HARMFUL,
                        confidence=0.95,
                        reason=f"Harmful content detected: {category}"
                    )
        
        return FilterResult(
            allowed=True,
            category=ContentCategory.SAFE,
            confidence=0.9
        )

class OutputGuardrail:
    """Combined output filtering guardrail."""
    
    def __init__(
        self,
        filters: list = None,
        fallback_response: str = "I cannot provide that information."
    ):
        self.filters = filters or [
            ToxicityFilter(),
            HarmfulContentFilter()
        ]
        self.fallback_response = fallback_response
    
    def filter(self, text: str) -> tuple[str, FilterResult]:
        """Filter output through all filters."""
        
        for filter_obj in self.filters:
            result = filter_obj.filter(text)
            
            if not result.allowed:
                return self.fallback_response, result
        
        return text, FilterResult(
            allowed=True,
            category=ContentCategory.SAFE,
            confidence=0.95
        )

Content Moderation

from dataclasses import dataclass
from typing import Any, Optional
import asyncio

@dataclass
class ModerationResult:
    """Result from content moderation."""
    
    flagged: bool
    categories: dict[str, bool]
    scores: dict[str, float]
    
    def get_flagged_categories(self) -> list[str]:
        return [cat for cat, flagged in self.categories.items() if flagged]

class OpenAIModerator:
    """Use OpenAI's moderation API."""
    
    def __init__(self, client: Any):
        self.client = client
    
    async def moderate(self, text: str) -> ModerationResult:
        """Moderate text using OpenAI API."""
        
        response = await self.client.moderations.create(input=text)
        
        result = response.results[0]
        
        return ModerationResult(
            flagged=result.flagged,
            categories={
                "hate": result.categories.hate,
                "hate_threatening": result.categories.hate_threatening,
                "harassment": result.categories.harassment,
                "self_harm": result.categories.self_harm,
                "sexual": result.categories.sexual,
                "violence": result.categories.violence,
            },
            scores={
                "hate": result.category_scores.hate,
                "hate_threatening": result.category_scores.hate_threatening,
                "harassment": result.category_scores.harassment,
                "self_harm": result.category_scores.self_harm,
                "sexual": result.category_scores.sexual,
                "violence": result.category_scores.violence,
            }
        )

class LLMBasedModerator:
    """Use an LLM for content moderation."""
    
    MODERATION_PROMPT = """Analyze the following content for safety issues.
    
Categories to check:
- harmful: Instructions for dangerous activities
- toxic: Hate speech, harassment, discrimination
- inappropriate: Adult content, profanity
- off_topic: Content unrelated to allowed topics

Content to analyze:
{content}

Respond with JSON:
{{"flagged": true/false, "category": "category_name", "reason": "explanation"}}"""
    
    def __init__(self, client: Any, model: str = "gpt-4o-mini"):
        self.client = client
        self.model = model
    
    async def moderate(self, text: str) -> ModerationResult:
        """Moderate using LLM."""
        
        import json
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "user", "content": self.MODERATION_PROMPT.format(content=text)}
            ],
            response_format={"type": "json_object"}
        )
        
        result = json.loads(response.choices[0].message.content)
        
        category = result.get("category", "safe")
        
        return ModerationResult(
            flagged=result.get("flagged", False),
            categories={category: result.get("flagged", False)},
            scores={category: 1.0 if result.get("flagged") else 0.0}
        )

class ContentModerationPipeline:
    """Pipeline combining multiple moderation approaches."""
    
    def __init__(
        self,
        moderators: list,
        threshold: float = 0.5
    ):
        self.moderators = moderators
        self.threshold = threshold
    
    async def moderate(self, text: str) -> ModerationResult:
        """Run all moderators and combine results."""
        
        results = await asyncio.gather(*[
            mod.moderate(text) for mod in self.moderators
        ])
        
        # Combine results - flag if any moderator flags
        combined_flagged = any(r.flagged for r in results)
        
        # Merge categories and scores
        combined_categories = {}
        combined_scores = {}
        
        for result in results:
            for cat, flagged in result.categories.items():
                if cat not in combined_categories or flagged:
                    combined_categories[cat] = flagged
            
            for cat, score in result.scores.items():
                if cat not in combined_scores or score > combined_scores[cat]:
                    combined_scores[cat] = score
        
        return ModerationResult(
            flagged=combined_flagged,
            categories=combined_categories,
            scores=combined_scores
        )

Production Guardrails Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

# Initialize components
input_guardrail = InputGuardrail()
pii_guardrail = PIIGuardrail(action="mask")
output_guardrail = OutputGuardrail()

class ValidateRequest(BaseModel):
    text: str
    check_pii: bool = True
    check_injection: bool = True

class FilterRequest(BaseModel):
    text: str
    mask_pii: bool = True

class GuardedCompletionRequest(BaseModel):
    messages: list[dict]
    model: str = "gpt-4o-mini"
    max_tokens: Optional[int] = 1000

@app.post("/v1/validate/input")
async def validate_input(request: ValidateRequest):
    """Validate input text."""
    
    # Input validation
    result = input_guardrail.validate(request.text)
    
    if result.result == ValidationResult.BLOCK:
        return {
            "valid": False,
            "reason": result.reason
        }
    
    # PII check
    if request.check_pii:
        pii_result = pii_guardrail.process_input(request.text)
        
        if pii_result.result == ValidationResult.BLOCK:
            return {
                "valid": False,
                "reason": pii_result.reason
            }
        
        return {
            "valid": True,
            "sanitized": pii_result.sanitized_input
        }
    
    return {
        "valid": True,
        "sanitized": result.sanitized_input
    }

@app.post("/v1/filter/output")
async def filter_output(request: FilterRequest):
    """Filter output text."""
    
    # Content filtering
    filtered_text, result = output_guardrail.filter(request.text)
    
    # PII masking
    if request.mask_pii:
        filtered_text = pii_guardrail.process_output(filtered_text)
    
    return {
        "filtered": filtered_text,
        "was_filtered": not result.allowed,
        "category": result.category.value if not result.allowed else None
    }

@app.post("/v1/completions/guarded")
async def guarded_completion(request: GuardedCompletionRequest):
    """Create completion with full guardrails."""
    
    # Validate input
    user_message = request.messages[-1].get("content", "")
    
    input_result = input_guardrail.validate(user_message)
    if input_result.result == ValidationResult.BLOCK:
        raise HTTPException(400, f"Input blocked: {input_result.reason}")
    
    # Mask PII in input
    pii_result = pii_guardrail.process_input(user_message)
    sanitized_messages = request.messages.copy()
    sanitized_messages[-1]["content"] = pii_result.sanitized_input
    
    # Call LLM (placeholder)
    # response = await client.chat.completions.create(...)
    response_text = "This is a placeholder response"
    
    # Filter output
    filtered_text, filter_result = output_guardrail.filter(response_text)
    
    # Mask PII in output
    final_text = pii_guardrail.process_output(filtered_text)
    
    return {
        "content": final_text,
        "guardrails": {
            "input_sanitized": pii_result.sanitized_input != user_message,
            "output_filtered": not filter_result.allowed
        }
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Guardrails are essential for production LLM applications. Input validation catches problematic requests before they reach the model—detect prompt injection attempts, enforce length limits, and restrict to allowed topics. PII detection protects user privacy by masking sensitive information in both inputs and outputs. Output filtering ensures responses meet safety standards—filter toxic content, harmful instructions, and inappropriate material. Use multiple layers of defense: pattern-based filters for known issues, LLM-based moderation for nuanced content, and external moderation APIs for comprehensive coverage. Monitor guardrail triggers to identify attack patterns and improve defenses. The goal is building AI systems that are helpful while remaining safe and appropriate for all users.