Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

LLM Output Formatting: Getting Structured Data from Language Models

Introduction: Getting LLMs to produce consistently formatted output is one of the most practical challenges in production AI systems. You need JSON for your API, but the model sometimes wraps it in markdown code blocks. You need a specific schema, but the model invents extra fields or omits required ones. You need clean text, but you get explanatory preambles and postambles. Output formatting techniques solve these problems through a combination of prompt engineering, parsing strategies, and validation pipelines. The key insight is that LLMs are probabilistic—they’ll usually follow your format instructions, but not always. Robust systems assume formatting failures will happen and build in recovery mechanisms. This guide covers practical techniques for getting structured output from LLMs: JSON mode, function calling, schema validation, parsing strategies, and error recovery patterns that make your LLM integrations reliable.

LLM Output Formatting
Output Formatting: Parse, Transform, Serialize

JSON Output Strategies

from dataclasses import dataclass, field
from typing import Any, Optional, List, Dict, Type, TypeVar
from enum import Enum
import json
import re

T = TypeVar('T')

class OutputFormat(Enum):
    """Supported output formats."""
    
    JSON = "json"
    MARKDOWN = "markdown"
    TEXT = "text"
    XML = "xml"
    CSV = "csv"

@dataclass
class FormatConfig:
    """Configuration for output formatting."""
    
    format: OutputFormat
    schema: dict = None
    strict: bool = True
    retry_on_failure: bool = True
    max_retries: int = 3

class JSONExtractor:
    """Extract JSON from LLM output."""
    
    def extract(self, text: str) -> dict:
        """Extract JSON from text."""
        
        # Try direct parsing first
        try:
            return json.loads(text.strip())
        except json.JSONDecodeError:
            pass
        
        # Try extracting from code blocks
        json_match = re.search(r'```(?:json)?\s*([\s\S]*?)```', text)
        if json_match:
            try:
                return json.loads(json_match.group(1).strip())
            except json.JSONDecodeError:
                pass
        
        # Try finding JSON object
        json_match = re.search(r'\{[\s\S]*\}', text)
        if json_match:
            try:
                return json.loads(json_match.group(0))
            except json.JSONDecodeError:
                pass
        
        # Try finding JSON array
        json_match = re.search(r'\[[\s\S]*\]', text)
        if json_match:
            try:
                return json.loads(json_match.group(0))
            except json.JSONDecodeError:
                pass
        
        raise ValueError(f"Could not extract JSON from: {text[:200]}...")
    
    def extract_multiple(self, text: str) -> list[dict]:
        """Extract multiple JSON objects."""
        
        results = []
        
        # Find all JSON objects
        pattern = r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}'
        matches = re.findall(pattern, text)
        
        for match in matches:
            try:
                results.append(json.loads(match))
            except json.JSONDecodeError:
                continue
        
        return results

class SchemaValidator:
    """Validate output against schema."""
    
    def __init__(self, schema: dict):
        self.schema = schema
    
    def validate(self, data: dict) -> tuple[bool, list[str]]:
        """Validate data against schema."""
        
        errors = []
        
        # Check required fields
        required = self.schema.get("required", [])
        for field in required:
            if field not in data:
                errors.append(f"Missing required field: {field}")
        
        # Check field types
        properties = self.schema.get("properties", {})
        for field, value in data.items():
            if field in properties:
                expected_type = properties[field].get("type")
                if not self._check_type(value, expected_type):
                    errors.append(f"Field '{field}' has wrong type: expected {expected_type}")
        
        return len(errors) == 0, errors
    
    def _check_type(self, value: Any, expected: str) -> bool:
        """Check if value matches expected type."""
        
        type_map = {
            "string": str,
            "integer": int,
            "number": (int, float),
            "boolean": bool,
            "array": list,
            "object": dict,
            "null": type(None)
        }
        
        if expected not in type_map:
            return True
        
        return isinstance(value, type_map[expected])
    
    def coerce(self, data: dict) -> dict:
        """Coerce data to match schema types."""
        
        properties = self.schema.get("properties", {})
        result = {}
        
        for field, spec in properties.items():
            if field in data:
                result[field] = self._coerce_value(data[field], spec.get("type"))
            elif "default" in spec:
                result[field] = spec["default"]
        
        return result
    
    def _coerce_value(self, value: Any, target_type: str) -> Any:
        """Coerce value to target type."""
        
        if target_type == "string":
            return str(value)
        elif target_type == "integer":
            return int(float(value)) if value else 0
        elif target_type == "number":
            return float(value) if value else 0.0
        elif target_type == "boolean":
            if isinstance(value, str):
                return value.lower() in ("true", "yes", "1")
            return bool(value)
        elif target_type == "array":
            if isinstance(value, str):
                return [value]
            return list(value) if value else []
        elif target_type == "object":
            return dict(value) if value else {}
        
        return value

class PydanticFormatter:
    """Format output using Pydantic models."""
    
    def __init__(self, model_class: Type[T]):
        self.model_class = model_class
    
    def parse(self, text: str) -> T:
        """Parse text into Pydantic model."""
        
        extractor = JSONExtractor()
        data = extractor.extract(text)
        
        return self.model_class(**data)
    
    def parse_list(self, text: str) -> list[T]:
        """Parse text into list of models."""
        
        extractor = JSONExtractor()
        data = extractor.extract(text)
        
        if isinstance(data, list):
            return [self.model_class(**item) for item in data]
        else:
            return [self.model_class(**data)]
    
    def get_schema(self) -> dict:
        """Get JSON schema for model."""
        
        return self.model_class.model_json_schema()
    
    def get_prompt_schema(self) -> str:
        """Get schema formatted for prompt."""
        
        schema = self.get_schema()
        return json.dumps(schema, indent=2)

Structured Output with Function Calling

from dataclasses import dataclass
from typing import Any, Optional, List, Callable
import json

@dataclass
class FunctionDefinition:
    """Definition of a function for structured output."""
    
    name: str
    description: str
    parameters: dict
    
    def to_dict(self) -> dict:
        return {
            "name": self.name,
            "description": self.description,
            "parameters": self.parameters
        }

class FunctionCallFormatter:
    """Use function calling for structured output."""
    
    def __init__(self, llm_client: Any):
        self.llm = llm_client
    
    async def get_structured(
        self,
        prompt: str,
        output_schema: dict,
        function_name: str = "extract_data"
    ) -> dict:
        """Get structured output using function calling."""
        
        function_def = FunctionDefinition(
            name=function_name,
            description="Extract structured data from the input",
            parameters=output_schema
        )
        
        response = await self.llm.generate(
            prompt,
            functions=[function_def.to_dict()],
            function_call={"name": function_name}
        )
        
        # Parse function call response
        if hasattr(response, 'function_call'):
            return json.loads(response.function_call.arguments)
        
        return response

class ToolCallFormatter:
    """Use tool calling for structured output."""
    
    def __init__(self, llm_client: Any):
        self.llm = llm_client
    
    async def get_structured(
        self,
        prompt: str,
        output_schema: dict,
        tool_name: str = "output"
    ) -> dict:
        """Get structured output using tool calling."""
        
        tool = {
            "type": "function",
            "function": {
                "name": tool_name,
                "description": "Output the structured result",
                "parameters": output_schema
            }
        }
        
        response = await self.llm.generate(
            prompt,
            tools=[tool],
            tool_choice={"type": "function", "function": {"name": tool_name}}
        )
        
        # Parse tool call response
        if hasattr(response, 'tool_calls') and response.tool_calls:
            return json.loads(response.tool_calls[0].function.arguments)
        
        return response

class ResponseFormatFormatter:
    """Use response_format for JSON output."""
    
    def __init__(self, llm_client: Any):
        self.llm = llm_client
    
    async def get_json(
        self,
        prompt: str,
        schema: dict = None
    ) -> dict:
        """Get JSON output using response_format."""
        
        if schema:
            # Use structured outputs (OpenAI)
            response = await self.llm.generate(
                prompt,
                response_format={
                    "type": "json_schema",
                    "json_schema": {
                        "name": "response",
                        "schema": schema
                    }
                }
            )
        else:
            # Use basic JSON mode
            response = await self.llm.generate(
                prompt,
                response_format={"type": "json_object"}
            )
        
        return json.loads(response)

class InstructorFormatter:
    """Use Instructor library for structured output."""
    
    def __init__(self, llm_client: Any):
        self.llm = llm_client
    
    async def get_structured(
        self,
        prompt: str,
        response_model: type
    ) -> Any:
        """Get structured output using Instructor."""
        
        import instructor
        
        # Patch client with instructor
        client = instructor.patch(self.llm)
        
        response = await client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            response_model=response_model
        )
        
        return response

class OutputSchemaBuilder:
    """Build output schemas programmatically."""
    
    def __init__(self):
        self.properties = {}
        self.required = []
    
    def add_string(
        self,
        name: str,
        description: str = "",
        required: bool = True,
        enum: list[str] = None
    ) -> 'OutputSchemaBuilder':
        """Add string field."""
        
        prop = {"type": "string", "description": description}
        if enum:
            prop["enum"] = enum
        
        self.properties[name] = prop
        
        if required:
            self.required.append(name)
        
        return self
    
    def add_integer(
        self,
        name: str,
        description: str = "",
        required: bool = True,
        minimum: int = None,
        maximum: int = None
    ) -> 'OutputSchemaBuilder':
        """Add integer field."""
        
        prop = {"type": "integer", "description": description}
        if minimum is not None:
            prop["minimum"] = minimum
        if maximum is not None:
            prop["maximum"] = maximum
        
        self.properties[name] = prop
        
        if required:
            self.required.append(name)
        
        return self
    
    def add_boolean(
        self,
        name: str,
        description: str = "",
        required: bool = True
    ) -> 'OutputSchemaBuilder':
        """Add boolean field."""
        
        self.properties[name] = {
            "type": "boolean",
            "description": description
        }
        
        if required:
            self.required.append(name)
        
        return self
    
    def add_array(
        self,
        name: str,
        item_type: str = "string",
        description: str = "",
        required: bool = True
    ) -> 'OutputSchemaBuilder':
        """Add array field."""
        
        self.properties[name] = {
            "type": "array",
            "items": {"type": item_type},
            "description": description
        }
        
        if required:
            self.required.append(name)
        
        return self
    
    def add_object(
        self,
        name: str,
        properties: dict,
        description: str = "",
        required: bool = True
    ) -> 'OutputSchemaBuilder':
        """Add nested object field."""
        
        self.properties[name] = {
            "type": "object",
            "properties": properties,
            "description": description
        }
        
        if required:
            self.required.append(name)
        
        return self
    
    def build(self) -> dict:
        """Build the schema."""
        
        return {
            "type": "object",
            "properties": self.properties,
            "required": self.required
        }

Parsing and Recovery

from dataclasses import dataclass
from typing import Any, Optional, List, Callable
import re
import json

@dataclass
class ParseResult:
    """Result of parsing attempt."""
    
    success: bool
    data: Any = None
    error: str = None
    raw_output: str = ""

class RobustParser:
    """Parse LLM output with multiple fallback strategies."""
    
    def __init__(self):
        self.strategies: list[Callable] = []
    
    def add_strategy(self, strategy: Callable):
        """Add parsing strategy."""
        
        self.strategies.append(strategy)
    
    def parse(self, text: str) -> ParseResult:
        """Try all strategies until one succeeds."""
        
        for strategy in self.strategies:
            try:
                result = strategy(text)
                return ParseResult(
                    success=True,
                    data=result,
                    raw_output=text
                )
            except Exception as e:
                continue
        
        return ParseResult(
            success=False,
            error="All parsing strategies failed",
            raw_output=text
        )

class JSONRepairParser:
    """Parse and repair malformed JSON."""
    
    def parse(self, text: str) -> dict:
        """Parse with repair attempts."""
        
        # Try direct parse
        try:
            return json.loads(text)
        except json.JSONDecodeError:
            pass
        
        # Try repairs
        repaired = self._repair(text)
        return json.loads(repaired)
    
    def _repair(self, text: str) -> str:
        """Attempt to repair JSON."""
        
        # Extract JSON portion
        text = self._extract_json(text)
        
        # Fix common issues
        text = self._fix_trailing_commas(text)
        text = self._fix_single_quotes(text)
        text = self._fix_unquoted_keys(text)
        text = self._fix_missing_quotes(text)
        text = self._fix_newlines(text)
        
        return text
    
    def _extract_json(self, text: str) -> str:
        """Extract JSON from surrounding text."""
        
        # Remove markdown code blocks
        text = re.sub(r'```(?:json)?\s*', '', text)
        text = re.sub(r'```\s*$', '', text)
        
        # Find JSON boundaries
        start = text.find('{')
        if start == -1:
            start = text.find('[')
        
        if start == -1:
            return text
        
        # Find matching end
        if text[start] == '{':
            end = text.rfind('}')
        else:
            end = text.rfind(']')
        
        if end == -1:
            return text[start:]
        
        return text[start:end + 1]
    
    def _fix_trailing_commas(self, text: str) -> str:
        """Remove trailing commas."""
        
        text = re.sub(r',\s*}', '}', text)
        text = re.sub(r',\s*]', ']', text)
        return text
    
    def _fix_single_quotes(self, text: str) -> str:
        """Replace single quotes with double quotes."""
        
        # Only replace quotes around keys and string values
        result = []
        in_string = False
        string_char = None
        
        i = 0
        while i < len(text):
            char = text[i]
            
            if char in '"\'':
                if not in_string:
                    in_string = True
                    string_char = char
                    result.append('"')
                elif char == string_char:
                    in_string = False
                    string_char = None
                    result.append('"')
                else:
                    result.append(char)
            else:
                result.append(char)
            
            i += 1
        
        return ''.join(result)
    
    def _fix_unquoted_keys(self, text: str) -> str:
        """Add quotes to unquoted keys."""
        
        # Match unquoted keys
        pattern = r'([{,]\s*)([a-zA-Z_][a-zA-Z0-9_]*)(\s*:)'
        return re.sub(pattern, r'\1"\2"\3', text)
    
    def _fix_missing_quotes(self, text: str) -> str:
        """Add missing quotes to string values."""
        
        # This is tricky - only fix obvious cases
        # Fix values that look like unquoted strings after colons
        pattern = r':\s*([a-zA-Z][a-zA-Z0-9_\s]*[a-zA-Z0-9])(\s*[,}])'
        return re.sub(pattern, r': "\1"\2', text)
    
    def _fix_newlines(self, text: str) -> str:
        """Fix newlines in strings."""
        
        # Replace literal newlines in strings with \n
        return text.replace('\n', '\\n').replace('\\n\\n', '\n')

class MarkdownParser:
    """Parse structured data from markdown."""
    
    def parse_table(self, text: str) -> list[dict]:
        """Parse markdown table to list of dicts."""
        
        lines = text.strip().split('\n')
        
        # Find table lines
        table_lines = [l for l in lines if '|' in l]
        
        if len(table_lines) < 2:
            return []
        
        # Parse header
        header = self._parse_row(table_lines[0])
        
        # Skip separator line
        data_lines = table_lines[2:] if len(table_lines) > 2 else []
        
        # Parse data rows
        results = []
        for line in data_lines:
            values = self._parse_row(line)
            if len(values) == len(header):
                results.append(dict(zip(header, values)))
        
        return results
    
    def _parse_row(self, line: str) -> list[str]:
        """Parse a table row."""
        
        cells = line.split('|')
        return [c.strip() for c in cells if c.strip()]
    
    def parse_list(self, text: str) -> list[str]:
        """Parse markdown list."""
        
        items = []
        
        for line in text.split('\n'):
            # Match list items
            match = re.match(r'^[\-\*\d\.]+\s+(.+)$', line.strip())
            if match:
                items.append(match.group(1))
        
        return items
    
    def parse_code_blocks(self, text: str) -> list[dict]:
        """Extract code blocks with language."""
        
        blocks = []
        
        pattern = r'```(\w*)\s*([\s\S]*?)```'
        matches = re.findall(pattern, text)
        
        for lang, code in matches:
            blocks.append({
                "language": lang or "text",
                "code": code.strip()
            })
        
        return blocks

class RetryParser:
    """Parse with retry and LLM correction."""
    
    def __init__(self, llm_client: Any, parser: Any):
        self.llm = llm_client
        self.parser = parser
        self.max_retries = 3
    
    async def parse(
        self,
        text: str,
        schema: dict = None
    ) -> ParseResult:
        """Parse with retry on failure."""
        
        # First attempt
        try:
            data = self.parser.parse(text)
            return ParseResult(success=True, data=data, raw_output=text)
        except Exception as e:
            pass
        
        # Retry with LLM correction
        for attempt in range(self.max_retries):
            corrected = await self._correct_with_llm(text, schema)
            
            try:
                data = self.parser.parse(corrected)
                return ParseResult(success=True, data=data, raw_output=corrected)
            except Exception as e:
                text = corrected
        
        return ParseResult(
            success=False,
            error="Failed after retries",
            raw_output=text
        )
    
    async def _correct_with_llm(
        self,
        text: str,
        schema: dict = None
    ) -> str:
        """Use LLM to correct malformed output."""
        
        schema_str = json.dumps(schema, indent=2) if schema else "valid JSON"
        
        prompt = f"""The following text should be valid JSON but has formatting errors.
Please fix the JSON and return only the corrected JSON, nothing else.

Expected schema:
{schema_str}

Malformed text:
{text}

Corrected JSON:"""
        
        return await self.llm.generate(prompt)

Output Transformation

from dataclasses import dataclass
from typing import Any, Optional, List, Callable, Dict
import re

class OutputTransformer:
    """Transform LLM output to desired format."""
    
    def __init__(self):
        self.transformers: list[Callable] = []
    
    def add_transformer(self, transformer: Callable):
        """Add transformation step."""
        
        self.transformers.append(transformer)
    
    def transform(self, data: Any) -> Any:
        """Apply all transformations."""
        
        result = data
        
        for transformer in self.transformers:
            result = transformer(result)
        
        return result

class FieldMapper:
    """Map fields between schemas."""
    
    def __init__(self, mapping: dict[str, str]):
        self.mapping = mapping
    
    def map(self, data: dict) -> dict:
        """Map fields according to mapping."""
        
        result = {}
        
        for source, target in self.mapping.items():
            if source in data:
                result[target] = data[source]
        
        # Include unmapped fields
        for key, value in data.items():
            if key not in self.mapping:
                result[key] = value
        
        return result

class TypeCoercer:
    """Coerce field types."""
    
    def __init__(self, type_map: dict[str, type]):
        self.type_map = type_map
    
    def coerce(self, data: dict) -> dict:
        """Coerce types according to map."""
        
        result = {}
        
        for key, value in data.items():
            if key in self.type_map:
                result[key] = self._coerce_value(value, self.type_map[key])
            else:
                result[key] = value
        
        return result
    
    def _coerce_value(self, value: Any, target_type: type) -> Any:
        """Coerce single value."""
        
        if value is None:
            return None
        
        try:
            if target_type == bool:
                if isinstance(value, str):
                    return value.lower() in ('true', 'yes', '1')
                return bool(value)
            elif target_type == int:
                return int(float(value))
            elif target_type == float:
                return float(value)
            elif target_type == str:
                return str(value)
            elif target_type == list:
                if isinstance(value, str):
                    return [value]
                return list(value)
            else:
                return target_type(value)
        except (ValueError, TypeError):
            return value

class DefaultFiller:
    """Fill missing fields with defaults."""
    
    def __init__(self, defaults: dict[str, Any]):
        self.defaults = defaults
    
    def fill(self, data: dict) -> dict:
        """Fill missing fields."""
        
        result = dict(data)
        
        for key, default in self.defaults.items():
            if key not in result or result[key] is None:
                result[key] = default
        
        return result

class FieldFilter:
    """Filter fields from output."""
    
    def __init__(
        self,
        include: list[str] = None,
        exclude: list[str] = None
    ):
        self.include = set(include) if include else None
        self.exclude = set(exclude) if exclude else set()
    
    def filter(self, data: dict) -> dict:
        """Filter fields."""
        
        result = {}
        
        for key, value in data.items():
            if self.include and key not in self.include:
                continue
            if key in self.exclude:
                continue
            result[key] = value
        
        return result

class NestedFlattener:
    """Flatten nested structures."""
    
    def __init__(self, separator: str = "."):
        self.separator = separator
    
    def flatten(self, data: dict, prefix: str = "") -> dict:
        """Flatten nested dict."""
        
        result = {}
        
        for key, value in data.items():
            new_key = f"{prefix}{self.separator}{key}" if prefix else key
            
            if isinstance(value, dict):
                result.update(self.flatten(value, new_key))
            elif isinstance(value, list):
                for i, item in enumerate(value):
                    if isinstance(item, dict):
                        result.update(self.flatten(item, f"{new_key}[{i}]"))
                    else:
                        result[f"{new_key}[{i}]"] = item
            else:
                result[new_key] = value
        
        return result
    
    def unflatten(self, data: dict) -> dict:
        """Unflatten to nested dict."""
        
        result = {}
        
        for key, value in data.items():
            parts = key.split(self.separator)
            current = result
            
            for i, part in enumerate(parts[:-1]):
                # Handle array indices
                match = re.match(r'(.+)\[(\d+)\]', part)
                if match:
                    name, idx = match.groups()
                    idx = int(idx)
                    
                    if name not in current:
                        current[name] = []
                    
                    while len(current[name]) <= idx:
                        current[name].append({})
                    
                    current = current[name][idx]
                else:
                    if part not in current:
                        current[part] = {}
                    current = current[part]
            
            current[parts[-1]] = value
        
        return result

class OutputPipeline:
    """Pipeline for output processing."""
    
    def __init__(self):
        self.steps: list[tuple[str, Callable]] = []
    
    def add_step(self, name: str, processor: Callable):
        """Add processing step."""
        
        self.steps.append((name, processor))
    
    def process(self, data: Any) -> dict:
        """Process through pipeline."""
        
        result = data
        metadata = {"steps": []}
        
        for name, processor in self.steps:
            try:
                result = processor(result)
                metadata["steps"].append({"name": name, "success": True})
            except Exception as e:
                metadata["steps"].append({
                    "name": name,
                    "success": False,
                    "error": str(e)
                })
                raise
        
        return {"data": result, "metadata": metadata}

Production Formatting Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List, Any
import json

app = FastAPI()

class FormatRequest(BaseModel):
    text: str
    format: str = "json"
    schema: Optional[dict] = None
    repair: bool = True

class FormatResponse(BaseModel):
    success: bool
    data: Any = None
    error: str = None

class TransformRequest(BaseModel):
    data: dict
    mapping: Optional[dict] = None
    defaults: Optional[dict] = None
    include_fields: Optional[List[str]] = None
    exclude_fields: Optional[List[str]] = None

# Initialize components
json_extractor = JSONExtractor()
json_repair_parser = JSONRepairParser()
markdown_parser = MarkdownParser()

@app.post("/v1/format/json", response_model=FormatResponse)
async def format_json(request: FormatRequest) -> FormatResponse:
    """Extract and format JSON from text."""
    
    try:
        if request.repair:
            data = json_repair_parser.parse(request.text)
        else:
            data = json_extractor.extract(request.text)
        
        # Validate against schema if provided
        if request.schema:
            validator = SchemaValidator(request.schema)
            valid, errors = validator.validate(data)
            
            if not valid:
                return FormatResponse(
                    success=False,
                    error=f"Schema validation failed: {errors}"
                )
        
        return FormatResponse(success=True, data=data)
    
    except Exception as e:
        return FormatResponse(success=False, error=str(e))

@app.post("/v1/format/markdown")
async def format_markdown(request: FormatRequest) -> FormatResponse:
    """Extract structured data from markdown."""
    
    try:
        if "table" in request.format:
            data = markdown_parser.parse_table(request.text)
        elif "list" in request.format:
            data = markdown_parser.parse_list(request.text)
        elif "code" in request.format:
            data = markdown_parser.parse_code_blocks(request.text)
        else:
            data = {
                "tables": markdown_parser.parse_table(request.text),
                "lists": markdown_parser.parse_list(request.text),
                "code_blocks": markdown_parser.parse_code_blocks(request.text)
            }
        
        return FormatResponse(success=True, data=data)
    
    except Exception as e:
        return FormatResponse(success=False, error=str(e))

@app.post("/v1/transform")
async def transform_data(request: TransformRequest) -> FormatResponse:
    """Transform structured data."""
    
    try:
        data = request.data
        
        # Apply mapping
        if request.mapping:
            mapper = FieldMapper(request.mapping)
            data = mapper.map(data)
        
        # Apply defaults
        if request.defaults:
            filler = DefaultFiller(request.defaults)
            data = filler.fill(data)
        
        # Apply field filter
        if request.include_fields or request.exclude_fields:
            filter = FieldFilter(
                include=request.include_fields,
                exclude=request.exclude_fields
            )
            data = filter.filter(data)
        
        return FormatResponse(success=True, data=data)
    
    except Exception as e:
        return FormatResponse(success=False, error=str(e))

@app.post("/v1/validate")
async def validate_data(data: dict, schema: dict) -> dict:
    """Validate data against schema."""
    
    validator = SchemaValidator(schema)
    valid, errors = validator.validate(data)
    
    return {
        "valid": valid,
        "errors": errors
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Reliable output formatting requires defense in depth. Start with the strongest guarantees your model supports—structured outputs with JSON schema enforcement, function calling, or response_format JSON mode. When these aren’t available or fail, fall back to robust parsing: extract JSON from code blocks, repair common malformations like trailing commas and single quotes, and validate against your expected schema. Build transformation pipelines that map fields, coerce types, fill defaults, and filter to exactly the structure your application needs. For production systems, implement retry logic that uses the LLM itself to correct malformed output—often the model can fix its own mistakes when shown the error. The key insight is that formatting is a spectrum of reliability: native structured outputs are most reliable, followed by function calling, then JSON mode, then prompt engineering with parsing. Use the most reliable method available, but always have fallbacks. Monitor parsing failures in production—they often indicate prompt issues or model behavior changes that need attention.