LLM Output Parsing: Transforming Unstructured Text into Reliable Data Structures

Introduction: LLM outputs are inherently unstructured—models generate text, not data structures. Yet most applications need structured data: JSON for APIs, typed objects for business logic, specific formats for downstream processing. Output parsing bridges this gap, transforming free-form text into reliable, validated data structures. This guide covers the techniques that make parsing robust: format specification in prompts, regex and pattern-based extraction, JSON and XML parsing with error recovery, Pydantic validation for type safety, and retry strategies for malformed outputs. Whether you’re extracting entities from text, generating structured reports, or building data pipelines powered by LLMs, these patterns will help you get consistent, reliable structured outputs from any model.

LLM Output Parsing
Output Parsing: Format Extraction, Schema Validation, Type Transformation

Format Specification

from dataclasses import dataclass, field
from typing import Any, Optional, Type
from abc import ABC, abstractmethod
import json

class OutputFormat(ABC):
    """Abstract output format specification."""
    
    @abstractmethod
    def get_format_instructions(self) -> str:
        """Get instructions for the LLM."""
        pass
    
    @abstractmethod
    def parse(self, output: str) -> Any:
        """Parse the output."""
        pass

class JSONOutputFormat(OutputFormat):
    """JSON output format."""
    
    def __init__(
        self,
        schema: dict = None,
        example: dict = None,
        strict: bool = True
    ):
        self.schema = schema
        self.example = example
        self.strict = strict
    
    def get_format_instructions(self) -> str:
        """Get JSON format instructions."""
        
        instructions = [
            "Output your response as valid JSON.",
            "Do not include any text before or after the JSON.",
            "Ensure all strings are properly quoted.",
            "Do not use trailing commas."
        ]
        
        if self.schema:
            instructions.append(f"\nJSON Schema:\n```json\n{json.dumps(self.schema, indent=2)}\n```")
        
        if self.example:
            instructions.append(f"\nExample output:\n```json\n{json.dumps(self.example, indent=2)}\n```")
        
        return "\n".join(instructions)
    
    def parse(self, output: str) -> dict:
        """Parse JSON output."""
        
        # Try to extract JSON from markdown code blocks
        import re
        
        # Pattern for JSON in code blocks
        json_match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', output)
        
        if json_match:
            json_str = json_match.group(1)
        else:
            # Try to find JSON object or array
            json_match = re.search(r'(\{[\s\S]*\}|\[[\s\S]*\])', output)
            json_str = json_match.group(1) if json_match else output
        
        return json.loads(json_str)

class XMLOutputFormat(OutputFormat):
    """XML output format."""
    
    def __init__(self, root_element: str, elements: list[str] = None):
        self.root = root_element
        self.elements = elements or []
    
    def get_format_instructions(self) -> str:
        """Get XML format instructions."""
        
        instructions = [
            f"Output your response as valid XML with root element <{self.root}>.",
            "Ensure all tags are properly closed.",
            "Use proper XML escaping for special characters."
        ]
        
        if self.elements:
            instructions.append("\nRequired elements:")
            for elem in self.elements:
                instructions.append(f"  <{elem}>...")
        
        return "\n".join(instructions)
    
    def parse(self, output: str) -> dict:
        """Parse XML output."""
        
        import xml.etree.ElementTree as ET
        import re
        
        # Extract XML from code blocks if present
        xml_match = re.search(r'```(?:xml)?\s*([\s\S]*?)\s*```', output)
        xml_str = xml_match.group(1) if xml_match else output
        
        # Parse XML
        root = ET.fromstring(xml_str)
        
        # Convert to dict
        result = {}
        for child in root:
            if len(child) == 0:
                result[child.tag] = child.text
            else:
                result[child.tag] = self._element_to_dict(child)
        
        return result
    
    def _element_to_dict(self, element) -> dict:
        """Convert XML element to dict."""
        
        result = {}
        for child in element:
            if len(child) == 0:
                result[child.tag] = child.text
            else:
                result[child.tag] = self._element_to_dict(child)
        return result

class MarkdownOutputFormat(OutputFormat):
    """Markdown output format with sections."""
    
    def __init__(self, sections: list[str], heading_level: int = 2):
        self.sections = sections
        self.heading_level = heading_level
    
    def get_format_instructions(self) -> str:
        """Get markdown format instructions."""
        
        heading = "#" * self.heading_level
        
        instructions = [
            "Format your response in Markdown with the following sections:",
            ""
        ]
        
        for section in self.sections:
            instructions.append(f"{heading} {section}")
            instructions.append("[Your content here]")
            instructions.append("")
        
        return "\n".join(instructions)
    
    def parse(self, output: str) -> dict:
        """Parse markdown into sections."""
        
        import re
        
        heading_pattern = r'^#{1,6}\s+(.+)$'
        
        sections = {}
        current_section = "intro"
        current_content = []
        
        for line in output.split('\n'):
            match = re.match(heading_pattern, line)
            
            if match:
                # Save previous section
                if current_content:
                    sections[current_section] = '\n'.join(current_content).strip()
                
                current_section = match.group(1).strip()
                current_content = []
            else:
                current_content.append(line)
        
        # Save last section
        if current_content:
            sections[current_section] = '\n'.join(current_content).strip()
        
        return sections

class ListOutputFormat(OutputFormat):
    """List output format."""
    
    def __init__(
        self,
        item_format: str = "numbered",  # "numbered", "bulleted", "plain"
        min_items: int = None,
        max_items: int = None
    ):
        self.item_format = item_format
        self.min_items = min_items
        self.max_items = max_items
    
    def get_format_instructions(self) -> str:
        """Get list format instructions."""
        
        if self.item_format == "numbered":
            format_desc = "a numbered list (1. item, 2. item, etc.)"
        elif self.item_format == "bulleted":
            format_desc = "a bulleted list (- item or * item)"
        else:
            format_desc = "one item per line"
        
        instructions = [f"Output your response as {format_desc}."]
        
        if self.min_items:
            instructions.append(f"Include at least {self.min_items} items.")
        if self.max_items:
            instructions.append(f"Include at most {self.max_items} items.")
        
        return " ".join(instructions)
    
    def parse(self, output: str) -> list[str]:
        """Parse list output."""
        
        import re
        
        items = []
        
        for line in output.split('\n'):
            line = line.strip()
            
            if not line:
                continue
            
            # Remove list markers
            cleaned = re.sub(r'^[\d]+[.)]\s*', '', line)  # Numbered
            cleaned = re.sub(r'^[-*•]\s*', '', cleaned)   # Bulleted
            
            if cleaned:
                items.append(cleaned)
        
        return items

Pattern-Based Extraction

from dataclasses import dataclass
from typing import Any, Optional, Pattern
import re

@dataclass
class ExtractionPattern:
    """Pattern for extracting data."""
    
    name: str
    pattern: str
    group: int = 1
    transform: callable = None

class PatternExtractor:
    """Extract data using regex patterns."""
    
    def __init__(self):
        self.patterns: list[ExtractionPattern] = []
    
    def add_pattern(
        self,
        name: str,
        pattern: str,
        group: int = 1,
        transform: callable = None
    ):
        """Add extraction pattern."""
        
        self.patterns.append(ExtractionPattern(
            name=name,
            pattern=pattern,
            group=group,
            transform=transform
        ))
    
    def extract(self, text: str) -> dict:
        """Extract all patterns from text."""
        
        results = {}
        
        for pattern in self.patterns:
            match = re.search(pattern.pattern, text, re.IGNORECASE | re.DOTALL)
            
            if match:
                value = match.group(pattern.group)
                
                if pattern.transform:
                    value = pattern.transform(value)
                
                results[pattern.name] = value
        
        return results
    
    def extract_all(self, text: str) -> dict[str, list]:
        """Extract all occurrences of patterns."""
        
        results = {}
        
        for pattern in self.patterns:
            matches = re.findall(pattern.pattern, text, re.IGNORECASE | re.DOTALL)
            
            if pattern.transform:
                matches = [pattern.transform(m) for m in matches]
            
            results[pattern.name] = matches
        
        return results

class StructuredExtractor:
    """Extract structured data from text."""
    
    def __init__(self):
        self.field_patterns: dict[str, str] = {}
        self.section_pattern: str = None
    
    def set_field_pattern(self, field_name: str, pattern: str):
        """Set pattern for a field."""
        self.field_patterns[field_name] = pattern
    
    def set_section_pattern(self, pattern: str):
        """Set pattern for sections."""
        self.section_pattern = pattern
    
    def extract(self, text: str) -> dict:
        """Extract structured data."""
        
        result = {}
        
        # Extract individual fields
        for field, pattern in self.field_patterns.items():
            match = re.search(pattern, text, re.IGNORECASE | re.DOTALL)
            if match:
                result[field] = match.group(1).strip()
        
        # Extract sections if pattern set
        if self.section_pattern:
            sections = re.findall(self.section_pattern, text, re.IGNORECASE | re.DOTALL)
            result['sections'] = sections
        
        return result

class TagExtractor:
    """Extract content between tags."""
    
    def __init__(self, tags: list[str] = None):
        self.tags = tags or []
    
    def add_tag(self, tag: str):
        """Add tag to extract."""
        self.tags.append(tag)
    
    def extract(self, text: str) -> dict:
        """Extract content between tags."""
        
        results = {}
        
        for tag in self.tags:
            # Match both XML-style and custom tags
            patterns = [
                f'<{tag}>(.*?)',
                f'\\[{tag}\\](.*?)\\[/{tag}\\]',
                f'{{{tag}}}(.*?){{{tag}}}',
            ]
            
            for pattern in patterns:
                match = re.search(pattern, text, re.IGNORECASE | re.DOTALL)
                if match:
                    results[tag] = match.group(1).strip()
                    break
        
        return results

class KeyValueExtractor:
    """Extract key-value pairs."""
    
    def __init__(
        self,
        separator: str = ":",
        line_based: bool = True
    ):
        self.separator = separator
        self.line_based = line_based
    
    def extract(self, text: str) -> dict:
        """Extract key-value pairs."""
        
        results = {}
        
        if self.line_based:
            for line in text.split('\n'):
                if self.separator in line:
                    parts = line.split(self.separator, 1)
                    if len(parts) == 2:
                        key = parts[0].strip().lower().replace(' ', '_')
                        value = parts[1].strip()
                        results[key] = value
        else:
            # Find all key: value patterns
            pattern = r'(\w+(?:\s+\w+)*)\s*' + re.escape(self.separator) + r'\s*([^\n,]+)'
            matches = re.findall(pattern, text)
            
            for key, value in matches:
                key = key.strip().lower().replace(' ', '_')
                results[key] = value.strip()
        
        return results

class TableExtractor:
    """Extract tabular data."""
    
    def __init__(self, has_header: bool = True):
        self.has_header = has_header
    
    def extract(self, text: str) -> list[dict]:
        """Extract table data."""
        
        lines = text.strip().split('\n')
        
        # Find table lines (contain |)
        table_lines = [l for l in lines if '|' in l]
        
        if not table_lines:
            return []
        
        # Parse rows
        rows = []
        for line in table_lines:
            # Skip separator lines
            if re.match(r'^[\s|:-]+$', line):
                continue
            
            cells = [c.strip() for c in line.split('|')]
            cells = [c for c in cells if c]  # Remove empty
            
            if cells:
                rows.append(cells)
        
        if not rows:
            return []
        
        # Convert to dicts if has header
        if self.has_header and len(rows) > 1:
            headers = rows[0]
            return [
                dict(zip(headers, row))
                for row in rows[1:]
            ]
        
        return rows

Pydantic Validation

from pydantic import BaseModel, Field, validator, root_validator
from typing import Any, Optional, Type, TypeVar, Generic
from enum import Enum
import json

T = TypeVar('T', bound=BaseModel)

class PydanticOutputParser(Generic[T]):
    """Parse and validate output using Pydantic."""
    
    def __init__(self, model_class: Type[T]):
        self.model_class = model_class
    
    def get_format_instructions(self) -> str:
        """Get format instructions from Pydantic model."""
        
        schema = self.model_class.model_json_schema()
        
        return f"""Output your response as JSON matching this schema:

```json
{json.dumps(schema, indent=2)}
```

Ensure all required fields are present and types are correct."""
    
    def parse(self, output: str) -> T:
        """Parse and validate output."""
        
        import re
        
        # Extract JSON
        json_match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', output)
        
        if json_match:
            json_str = json_match.group(1)
        else:
            json_match = re.search(r'(\{[\s\S]*\})', output)
            json_str = json_match.group(1) if json_match else output
        
        # Parse JSON
        data = json.loads(json_str)
        
        # Validate with Pydantic
        return self.model_class.model_validate(data)
    
    def parse_safe(self, output: str) -> tuple[Optional[T], Optional[str]]:
        """Parse with error handling."""
        
        try:
            result = self.parse(output)
            return result, None
        except json.JSONDecodeError as e:
            return None, f"JSON parse error: {e}"
        except Exception as e:
            return None, f"Validation error: {e}"

# Example Pydantic models
class Sentiment(str, Enum):
    POSITIVE = "positive"
    NEGATIVE = "negative"
    NEUTRAL = "neutral"

class SentimentAnalysis(BaseModel):
    """Sentiment analysis result."""
    
    sentiment: Sentiment
    confidence: float = Field(ge=0, le=1)
    reasoning: str
    
    @validator('confidence')
    def round_confidence(cls, v):
        return round(v, 3)

class Entity(BaseModel):
    """Extracted entity."""
    
    name: str
    type: str
    start_index: Optional[int] = None
    end_index: Optional[int] = None

class EntityExtraction(BaseModel):
    """Entity extraction result."""
    
    entities: list[Entity]
    text_length: int
    
    @root_validator
    def validate_indices(cls, values):
        entities = values.get('entities', [])
        text_length = values.get('text_length', 0)
        
        for entity in entities:
            if entity.start_index is not None:
                if entity.start_index < 0 or entity.start_index >= text_length:
                    raise ValueError(f"Invalid start_index for entity {entity.name}")
        
        return values

class Summary(BaseModel):
    """Text summary result."""
    
    summary: str = Field(min_length=10, max_length=500)
    key_points: list[str] = Field(min_items=1, max_items=10)
    word_count: int = Field(ge=1)

class Classification(BaseModel):
    """Classification result."""
    
    label: str
    confidence: float = Field(ge=0, le=1)
    alternatives: list[dict] = Field(default_factory=list)

class StructuredResponse(BaseModel):
    """Generic structured response."""
    
    success: bool
    data: Optional[dict] = None
    error: Optional[str] = None
    metadata: dict = Field(default_factory=dict)

class OutputParserFactory:
    """Factory for creating output parsers."""
    
    @staticmethod
    def for_model(model_class: Type[BaseModel]) -> PydanticOutputParser:
        """Create parser for Pydantic model."""
        return PydanticOutputParser(model_class)
    
    @staticmethod
    def for_json(schema: dict = None) -> JSONOutputFormat:
        """Create JSON parser."""
        return JSONOutputFormat(schema=schema)
    
    @staticmethod
    def for_list(min_items: int = None, max_items: int = None) -> ListOutputFormat:
        """Create list parser."""
        return ListOutputFormat(min_items=min_items, max_items=max_items)

Error Recovery and Retries

from dataclasses import dataclass
from typing import Any, Optional, Type
import json

@dataclass
class ParseResult:
    """Result of parsing attempt."""
    
    success: bool
    data: Any = None
    error: str = None
    raw_output: str = None
    attempts: int = 1

class OutputFixer:
    """Fix malformed LLM outputs."""
    
    def __init__(self, llm_client: Any = None):
        self.llm = llm_client
    
    def fix_json(self, malformed: str) -> str:
        """Attempt to fix malformed JSON."""
        
        import re
        
        fixed = malformed
        
        # Remove markdown code blocks
        fixed = re.sub(r'```(?:json)?\s*', '', fixed)
        fixed = re.sub(r'```\s*$', '', fixed)
        
        # Fix common issues
        # Trailing commas
        fixed = re.sub(r',\s*}', '}', fixed)
        fixed = re.sub(r',\s*]', ']', fixed)
        
        # Single quotes to double quotes
        fixed = re.sub(r"'([^']*)':", r'"\1":', fixed)
        
        # Unquoted keys
        fixed = re.sub(r'(\{|,)\s*(\w+)\s*:', r'\1"\2":', fixed)
        
        # Missing quotes around string values
        # This is tricky - only do for obvious cases
        fixed = re.sub(r':\s*([a-zA-Z][a-zA-Z0-9_]*)\s*([,}])', r': "\1"\2', fixed)
        
        # Try to extract just the JSON part
        json_match = re.search(r'(\{[\s\S]*\}|\[[\s\S]*\])', fixed)
        if json_match:
            fixed = json_match.group(1)
        
        return fixed.strip()
    
    async def fix_with_llm(
        self,
        malformed: str,
        expected_format: str
    ) -> str:
        """Use LLM to fix malformed output."""
        
        if not self.llm:
            raise ValueError("LLM client required for LLM-based fixing")
        
        prompt = f"""The following output is malformed. Fix it to match the expected format.

Malformed output:
{malformed}

Expected format:
{expected_format}

Output only the fixed version, nothing else:"""
        
        response = await self.llm.complete(prompt)
        return response.content

class RetryingParser:
    """Parser with automatic retries."""
    
    def __init__(
        self,
        parser: Any,
        llm_client: Any,
        max_retries: int = 3,
        fixer: OutputFixer = None
    ):
        self.parser = parser
        self.llm = llm_client
        self.max_retries = max_retries
        self.fixer = fixer or OutputFixer(llm_client)
    
    async def parse(
        self,
        prompt: str,
        **llm_kwargs
    ) -> ParseResult:
        """Parse with retries."""
        
        last_error = None
        last_output = None
        
        for attempt in range(self.max_retries):
            # Get LLM response
            if attempt == 0:
                full_prompt = f"{prompt}\n\n{self.parser.get_format_instructions()}"
            else:
                # Include error feedback
                full_prompt = f"""{prompt}

{self.parser.get_format_instructions()}

Previous attempt failed with error: {last_error}
Previous output: {last_output}

Please fix the output format."""
            
            response = await self.llm.complete(full_prompt, **llm_kwargs)
            last_output = response.content
            
            # Try to parse
            try:
                # First try direct parsing
                data = self.parser.parse(last_output)
                return ParseResult(
                    success=True,
                    data=data,
                    raw_output=last_output,
                    attempts=attempt + 1
                )
            except Exception as e:
                last_error = str(e)
                
                # Try fixing
                if hasattr(self.parser, 'parse') and 'json' in str(type(self.parser)).lower():
                    fixed = self.fixer.fix_json(last_output)
                    try:
                        data = self.parser.parse(fixed)
                        return ParseResult(
                            success=True,
                            data=data,
                            raw_output=last_output,
                            attempts=attempt + 1
                        )
                    except Exception:
                        pass
        
        return ParseResult(
            success=False,
            error=last_error,
            raw_output=last_output,
            attempts=self.max_retries
        )

class FallbackParser:
    """Parser with fallback strategies."""
    
    def __init__(self, parsers: list[Any]):
        self.parsers = parsers
    
    def parse(self, output: str) -> tuple[Any, int]:
        """Try parsers in order until one succeeds."""
        
        errors = []
        
        for i, parser in enumerate(self.parsers):
            try:
                result = parser.parse(output)
                return result, i
            except Exception as e:
                errors.append(f"Parser {i}: {e}")
        
        raise ValueError(f"All parsers failed: {'; '.join(errors)}")

class PartialParser:
    """Extract partial results from malformed output."""
    
    def __init__(self, required_fields: list[str], optional_fields: list[str] = None):
        self.required = set(required_fields)
        self.optional = set(optional_fields or [])
    
    def parse(self, output: str) -> tuple[dict, list[str]]:
        """Parse and return partial results with missing fields."""
        
        import re
        
        result = {}
        missing = []
        
        # Try JSON parsing first
        try:
            json_match = re.search(r'(\{[\s\S]*\})', output)
            if json_match:
                data = json.loads(json_match.group(1))
                result.update(data)
        except json.JSONDecodeError:
            pass
        
        # Try key-value extraction as fallback
        for field in self.required | self.optional:
            if field not in result:
                # Try to find field in text
                patterns = [
                    f'"{field}"\\s*:\\s*"([^"]*)"',
                    f'"{field}"\\s*:\\s*([\\d.]+)',
                    f'{field}\\s*:\\s*([^\\n,}}]+)',
                ]
                
                for pattern in patterns:
                    match = re.search(pattern, output, re.IGNORECASE)
                    if match:
                        result[field] = match.group(1).strip()
                        break
        
        # Check for missing required fields
        for field in self.required:
            if field not in result:
                missing.append(field)
        
        return result, missing

class StreamingParser:
    """Parse streaming LLM output."""
    
    def __init__(self, parser: Any):
        self.parser = parser
        self.buffer = ""
    
    def add_chunk(self, chunk: str) -> Optional[Any]:
        """Add chunk and try to parse."""
        
        self.buffer += chunk
        
        # Try to parse current buffer
        try:
            result = self.parser.parse(self.buffer)
            return result
        except Exception:
            return None
    
    def finalize(self) -> Any:
        """Finalize and parse complete output."""
        
        return self.parser.parse(self.buffer)
    
    def reset(self):
        """Reset buffer."""
        self.buffer = ""

Type Transformation

from dataclasses import dataclass
from typing import Any, Optional, Type, Union, get_type_hints, get_origin, get_args
from datetime import datetime, date
from enum import Enum

class TypeTransformer:
    """Transform parsed values to correct types."""
    
    def __init__(self):
        self.transformers: dict[type, callable] = {
            int: self._to_int,
            float: self._to_float,
            bool: self._to_bool,
            str: self._to_str,
            datetime: self._to_datetime,
            date: self._to_date,
            list: self._to_list,
            dict: self._to_dict,
        }
    
    def transform(self, value: Any, target_type: type) -> Any:
        """Transform value to target type."""
        
        if value is None:
            return None
        
        # Handle Optional types
        origin = get_origin(target_type)
        
        if origin is Union:
            args = get_args(target_type)
            # Try each type in union
            for arg in args:
                if arg is type(None):
                    continue
                try:
                    return self.transform(value, arg)
                except Exception:
                    continue
            raise ValueError(f"Cannot transform {value} to {target_type}")
        
        # Handle list types
        if origin is list:
            item_type = get_args(target_type)[0] if get_args(target_type) else str
            return [self.transform(item, item_type) for item in value]
        
        # Handle dict types
        if origin is dict:
            args = get_args(target_type)
            key_type = args[0] if args else str
            value_type = args[1] if len(args) > 1 else Any
            return {
                self.transform(k, key_type): self.transform(v, value_type)
                for k, v in value.items()
            }
        
        # Handle Enum types
        if isinstance(target_type, type) and issubclass(target_type, Enum):
            return self._to_enum(value, target_type)
        
        # Use registered transformer
        transformer = self.transformers.get(target_type)
        
        if transformer:
            return transformer(value)
        
        # Default: return as-is
        return value
    
    def _to_int(self, value: Any) -> int:
        """Convert to int."""
        
        if isinstance(value, int):
            return value
        if isinstance(value, float):
            return int(value)
        if isinstance(value, str):
            # Handle common formats
            value = value.strip().replace(',', '')
            return int(float(value))
        raise ValueError(f"Cannot convert {value} to int")
    
    def _to_float(self, value: Any) -> float:
        """Convert to float."""
        
        if isinstance(value, (int, float)):
            return float(value)
        if isinstance(value, str):
            value = value.strip().replace(',', '')
            # Handle percentages
            if value.endswith('%'):
                return float(value[:-1]) / 100
            return float(value)
        raise ValueError(f"Cannot convert {value} to float")
    
    def _to_bool(self, value: Any) -> bool:
        """Convert to bool."""
        
        if isinstance(value, bool):
            return value
        if isinstance(value, str):
            lower = value.lower().strip()
            if lower in ('true', 'yes', '1', 'on'):
                return True
            if lower in ('false', 'no', '0', 'off'):
                return False
        if isinstance(value, (int, float)):
            return bool(value)
        raise ValueError(f"Cannot convert {value} to bool")
    
    def _to_str(self, value: Any) -> str:
        """Convert to string."""
        return str(value)
    
    def _to_datetime(self, value: Any) -> datetime:
        """Convert to datetime."""
        
        if isinstance(value, datetime):
            return value
        if isinstance(value, str):
            # Try common formats
            formats = [
                '%Y-%m-%dT%H:%M:%S',
                '%Y-%m-%dT%H:%M:%SZ',
                '%Y-%m-%d %H:%M:%S',
                '%Y-%m-%d',
                '%m/%d/%Y',
                '%d/%m/%Y',
            ]
            
            for fmt in formats:
                try:
                    return datetime.strptime(value, fmt)
                except ValueError:
                    continue
            
            # Try ISO format
            return datetime.fromisoformat(value.replace('Z', '+00:00'))
        
        raise ValueError(f"Cannot convert {value} to datetime")
    
    def _to_date(self, value: Any) -> date:
        """Convert to date."""
        
        if isinstance(value, date):
            return value
        if isinstance(value, datetime):
            return value.date()
        if isinstance(value, str):
            dt = self._to_datetime(value)
            return dt.date()
        raise ValueError(f"Cannot convert {value} to date")
    
    def _to_list(self, value: Any) -> list:
        """Convert to list."""
        
        if isinstance(value, list):
            return value
        if isinstance(value, str):
            # Try JSON
            try:
                import json
                return json.loads(value)
            except json.JSONDecodeError:
                pass
            # Split by common delimiters
            if ',' in value:
                return [v.strip() for v in value.split(',')]
            if '\n' in value:
                return [v.strip() for v in value.split('\n') if v.strip()]
        return [value]
    
    def _to_dict(self, value: Any) -> dict:
        """Convert to dict."""
        
        if isinstance(value, dict):
            return value
        if isinstance(value, str):
            import json
            return json.loads(value)
        raise ValueError(f"Cannot convert {value} to dict")
    
    def _to_enum(self, value: Any, enum_class: Type[Enum]) -> Enum:
        """Convert to enum."""
        
        if isinstance(value, enum_class):
            return value
        
        # Try by value
        try:
            return enum_class(value)
        except ValueError:
            pass
        
        # Try by name
        if isinstance(value, str):
            try:
                return enum_class[value.upper()]
            except KeyError:
                pass
        
        raise ValueError(f"Cannot convert {value} to {enum_class}")

class SchemaTransformer:
    """Transform data according to schema."""
    
    def __init__(self, schema: dict):
        self.schema = schema
        self.transformer = TypeTransformer()
    
    def transform(self, data: dict) -> dict:
        """Transform data according to schema."""
        
        result = {}
        properties = self.schema.get('properties', {})
        
        for key, prop_schema in properties.items():
            if key in data:
                value = data[key]
                target_type = self._schema_to_type(prop_schema)
                result[key] = self.transformer.transform(value, target_type)
        
        return result
    
    def _schema_to_type(self, schema: dict) -> type:
        """Convert JSON schema type to Python type."""
        
        type_map = {
            'string': str,
            'integer': int,
            'number': float,
            'boolean': bool,
            'array': list,
            'object': dict,
        }
        
        schema_type = schema.get('type', 'string')
        return type_map.get(schema_type, str)

Production Parsing Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, Any
import json

app = FastAPI()

class ParseRequest(BaseModel):
    output: str
    format: str = "json"  # json, xml, markdown, list
    schema: Optional[dict] = None

class ExtractRequest(BaseModel):
    text: str
    patterns: dict[str, str]

class ValidateRequest(BaseModel):
    data: dict
    schema: dict

# Initialize components
json_parser = JSONOutputFormat()
xml_parser = XMLOutputFormat("root")
list_parser = ListOutputFormat()
transformer = TypeTransformer()
fixer = OutputFixer()

@app.post("/v1/parse")
async def parse_output(request: ParseRequest) -> dict:
    """Parse LLM output."""
    
    try:
        if request.format == "json":
            parser = JSONOutputFormat(schema=request.schema)
        elif request.format == "xml":
            parser = XMLOutputFormat("root")
        elif request.format == "markdown":
            parser = MarkdownOutputFormat([])
        elif request.format == "list":
            parser = ListOutputFormat()
        else:
            raise HTTPException(status_code=400, detail=f"Unknown format: {request.format}")
        
        result = parser.parse(request.output)
        
        return {
            "success": True,
            "data": result,
            "format": request.format
        }
        
    except Exception as e:
        return {
            "success": False,
            "error": str(e),
            "format": request.format
        }

@app.post("/v1/parse/fix")
async def fix_and_parse(request: ParseRequest) -> dict:
    """Fix malformed output and parse."""
    
    try:
        # Try direct parsing first
        parser = JSONOutputFormat(schema=request.schema)
        result = parser.parse(request.output)
        
        return {
            "success": True,
            "data": result,
            "fixed": False
        }
        
    except Exception:
        # Try fixing
        try:
            fixed = fixer.fix_json(request.output)
            result = json.loads(fixed)
            
            return {
                "success": True,
                "data": result,
                "fixed": True,
                "fixed_output": fixed
            }
            
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }

@app.post("/v1/extract")
async def extract_patterns(request: ExtractRequest) -> dict:
    """Extract data using patterns."""
    
    extractor = PatternExtractor()
    
    for name, pattern in request.patterns.items():
        extractor.add_pattern(name, pattern)
    
    results = extractor.extract(request.text)
    
    return {
        "success": True,
        "extracted": results,
        "patterns_matched": len(results)
    }

@app.post("/v1/validate")
async def validate_data(request: ValidateRequest) -> dict:
    """Validate data against schema."""
    
    from jsonschema import validate, ValidationError
    
    try:
        validate(instance=request.data, schema=request.schema)
        
        return {
            "valid": True,
            "errors": []
        }
        
    except ValidationError as e:
        return {
            "valid": False,
            "errors": [e.message]
        }

@app.post("/v1/transform")
async def transform_types(data: dict, schema: dict) -> dict:
    """Transform data types according to schema."""
    
    schema_transformer = SchemaTransformer(schema)
    
    try:
        result = schema_transformer.transform(data)
        
        return {
            "success": True,
            "data": result
        }
        
    except Exception as e:
        return {
            "success": False,
            "error": str(e)
        }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Output parsing is the bridge between LLM text generation and structured application data. Start with clear format instructions in your prompts—models follow instructions better when the expected format is explicit and includes examples. Use Pydantic for type-safe parsing; it catches errors early and provides clear validation messages. Implement multiple extraction strategies: JSON parsing for structured data, regex patterns for specific fields, and key-value extraction as a fallback. Always plan for malformed outputs—even the best models occasionally produce invalid JSON or miss required fields. Build retry logic that includes the error message in subsequent attempts; models can often self-correct when told what went wrong. Consider partial parsing when complete parsing fails; extracting some fields is often better than failing entirely. For production systems, log parsing failures to identify patterns and improve your prompts. The most important insight is that parsing reliability comes from defense in depth: clear instructions, multiple extraction methods, automatic fixing, and graceful degradation. A robust parsing pipeline can achieve 99%+ success rates even with imperfect model outputs.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.