Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

LLM Output Parsing: Extracting Structured Data from Free-Form Text

Introduction: LLMs generate text, but applications need structured data—JSON objects, lists, specific formats. The gap between free-form text and usable data structures is where output parsing comes in. Naive approaches using regex or string splitting break constantly as models vary their output format. Robust parsing requires multiple strategies: format instructions that guide the model, extraction patterns that handle variations, validation that catches malformed output, and repair mechanisms that fix common issues. This guide covers practical output parsing techniques that work reliably in production.

Output Parsing
Parsing Pipeline: Pattern Extraction, Schema Validation, Type Transform

Format Instructions

from dataclasses import dataclass
from typing import Any, Optional, Type, TypeVar
from pydantic import BaseModel
import json

T = TypeVar('T', bound=BaseModel)

class FormatInstructor:
    """Generate format instructions for LLM output."""
    
    def json_instructions(self, schema: dict = None) -> str:
        """Generate JSON format instructions."""
        
        base = "Respond with valid JSON only. No markdown, no explanation, just the JSON object."
        
        if schema:
            return f"""{base}

Your response must match this schema:
```json
{json.dumps(schema, indent=2)}
```"""
        
        return base
    
    def pydantic_instructions(self, model: Type[T]) -> str:
        """Generate instructions from Pydantic model."""
        
        schema = model.model_json_schema()
        
        # Build field descriptions
        fields_desc = []
        for name, prop in schema.get("properties", {}).items():
            field_type = prop.get("type", "string")
            description = prop.get("description", "")
            required = name in schema.get("required", [])
            
            req_str = "(required)" if required else "(optional)"
            fields_desc.append(f"- {name} ({field_type}) {req_str}: {description}")
        
        return f"""Respond with a JSON object containing these fields:
{chr(10).join(fields_desc)}

Example format:
```json
{json.dumps(self._generate_example(schema), indent=2)}
```"""
    
    def _generate_example(self, schema: dict) -> dict:
        """Generate example from schema."""
        
        example = {}
        for name, prop in schema.get("properties", {}).items():
            field_type = prop.get("type", "string")
            
            if field_type == "string":
                if "enum" in prop:
                    example[name] = prop["enum"][0]
                else:
                    example[name] = f"<{name}>"
            elif field_type == "integer":
                example[name] = 0
            elif field_type == "number":
                example[name] = 0.0
            elif field_type == "boolean":
                example[name] = True
            elif field_type == "array":
                example[name] = []
            elif field_type == "object":
                example[name] = {}
        
        return example
    
    def list_instructions(self, item_description: str = None) -> str:
        """Generate list format instructions."""
        
        base = "Respond with a JSON array."
        
        if item_description:
            return f"""{base}
Each item should be: {item_description}

Example: ["item1", "item2", "item3"]"""
        
        return base
    
    def enum_instructions(self, choices: list[str]) -> str:
        """Generate enum/choice instructions."""
        
        choices_str = ", ".join(f'"{c}"' for c in choices)
        
        return f"""Respond with exactly one of these values: {choices_str}
No explanation, just the value."""

class PromptWithFormat:
    """Build prompts with format instructions."""
    
    def __init__(self, instructor: FormatInstructor = None):
        self.instructor = instructor or FormatInstructor()
    
    def build(
        self,
        task: str,
        response_format: str = "json",
        schema: dict = None,
        pydantic_model: Type[T] = None,
        choices: list[str] = None
    ) -> str:
        """Build prompt with format instructions."""
        
        # Get format instructions
        if pydantic_model:
            format_inst = self.instructor.pydantic_instructions(pydantic_model)
        elif choices:
            format_inst = self.instructor.enum_instructions(choices)
        elif response_format == "json":
            format_inst = self.instructor.json_instructions(schema)
        elif response_format == "list":
            format_inst = self.instructor.list_instructions()
        else:
            format_inst = ""
        
        return f"""{task}

{format_inst}"""

Pattern Extraction

from dataclasses import dataclass
from typing import Any, Optional
import re
import json

@dataclass
class ExtractionResult:
    """Result of pattern extraction."""
    
    success: bool
    data: Any = None
    raw_match: str = None
    error: str = None

class PatternExtractor:
    """Extract structured data from LLM output."""
    
    def extract_json(self, text: str) -> ExtractionResult:
        """Extract JSON from text."""
        
        # Try direct parse first
        try:
            data = json.loads(text.strip())
            return ExtractionResult(success=True, data=data, raw_match=text)
        except json.JSONDecodeError:
            pass
        
        # Try to find JSON in code blocks
        code_block = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', text)
        if code_block:
            try:
                data = json.loads(code_block.group(1))
                return ExtractionResult(success=True, data=data, raw_match=code_block.group(1))
            except json.JSONDecodeError:
                pass
        
        # Try to find JSON object
        obj_match = re.search(r'\{[\s\S]*\}', text)
        if obj_match:
            try:
                data = json.loads(obj_match.group(0))
                return ExtractionResult(success=True, data=data, raw_match=obj_match.group(0))
            except json.JSONDecodeError:
                pass
        
        # Try to find JSON array
        arr_match = re.search(r'\[[\s\S]*\]', text)
        if arr_match:
            try:
                data = json.loads(arr_match.group(0))
                return ExtractionResult(success=True, data=data, raw_match=arr_match.group(0))
            except json.JSONDecodeError:
                pass
        
        return ExtractionResult(success=False, error="No valid JSON found")
    
    def extract_list(self, text: str) -> ExtractionResult:
        """Extract list from text."""
        
        # Try JSON array first
        json_result = self.extract_json(text)
        if json_result.success and isinstance(json_result.data, list):
            return json_result
        
        # Try numbered list
        numbered = re.findall(r'^\d+\.\s*(.+)$', text, re.MULTILINE)
        if numbered:
            return ExtractionResult(success=True, data=numbered)
        
        # Try bullet list
        bullets = re.findall(r'^[-*]\s*(.+)$', text, re.MULTILINE)
        if bullets:
            return ExtractionResult(success=True, data=bullets)
        
        # Try newline-separated
        lines = [l.strip() for l in text.split('\n') if l.strip()]
        if len(lines) > 1:
            return ExtractionResult(success=True, data=lines)
        
        return ExtractionResult(success=False, error="No list found")
    
    def extract_choice(self, text: str, choices: list[str]) -> ExtractionResult:
        """Extract choice from text."""
        
        text_lower = text.lower().strip()
        
        # Exact match
        for choice in choices:
            if choice.lower() == text_lower:
                return ExtractionResult(success=True, data=choice)
        
        # Partial match
        for choice in choices:
            if choice.lower() in text_lower:
                return ExtractionResult(success=True, data=choice)
        
        # Fuzzy match
        for choice in choices:
            if text_lower in choice.lower():
                return ExtractionResult(success=True, data=choice)
        
        return ExtractionResult(
            success=False,
            error=f"No matching choice found. Expected one of: {choices}"
        )
    
    def extract_number(self, text: str) -> ExtractionResult:
        """Extract number from text."""
        
        # Try float
        float_match = re.search(r'-?\d+\.?\d*', text)
        if float_match:
            try:
                num = float(float_match.group(0))
                if num == int(num):
                    num = int(num)
                return ExtractionResult(success=True, data=num, raw_match=float_match.group(0))
            except ValueError:
                pass
        
        return ExtractionResult(success=False, error="No number found")
    
    def extract_boolean(self, text: str) -> ExtractionResult:
        """Extract boolean from text."""
        
        text_lower = text.lower().strip()
        
        true_values = ["true", "yes", "1", "correct", "affirmative"]
        false_values = ["false", "no", "0", "incorrect", "negative"]
        
        for val in true_values:
            if val in text_lower:
                return ExtractionResult(success=True, data=True)
        
        for val in false_values:
            if val in text_lower:
                return ExtractionResult(success=True, data=False)
        
        return ExtractionResult(success=False, error="No boolean found")

class StructuredExtractor:
    """Extract structured data with field mapping."""
    
    def __init__(self):
        self.pattern_extractor = PatternExtractor()
    
    def extract_fields(
        self,
        text: str,
        field_patterns: dict[str, str]
    ) -> ExtractionResult:
        """Extract fields using regex patterns."""
        
        result = {}
        
        for field, pattern in field_patterns.items():
            match = re.search(pattern, text, re.IGNORECASE | re.MULTILINE)
            if match:
                result[field] = match.group(1).strip() if match.groups() else match.group(0)
        
        if result:
            return ExtractionResult(success=True, data=result)
        
        return ExtractionResult(success=False, error="No fields matched")
    
    def extract_key_value(self, text: str) -> ExtractionResult:
        """Extract key-value pairs from text."""
        
        # Try JSON first
        json_result = self.pattern_extractor.extract_json(text)
        if json_result.success and isinstance(json_result.data, dict):
            return json_result
        
        # Try key: value format
        pairs = re.findall(r'^([^:\n]+):\s*(.+)$', text, re.MULTILINE)
        if pairs:
            result = {k.strip(): v.strip() for k, v in pairs}
            return ExtractionResult(success=True, data=result)
        
        return ExtractionResult(success=False, error="No key-value pairs found")

Schema Validation

from dataclasses import dataclass
from typing import Any, Optional, Type, TypeVar
from pydantic import BaseModel, ValidationError
import json

T = TypeVar('T', bound=BaseModel)

@dataclass
class ValidationResult:
    """Result of schema validation."""
    
    valid: bool
    data: Any = None
    errors: list[str] = None
    coerced: bool = False

class SchemaValidator:
    """Validate extracted data against schemas."""
    
    def validate_json_schema(self, data: Any, schema: dict) -> ValidationResult:
        """Validate against JSON schema."""
        
        try:
            import jsonschema
            jsonschema.validate(data, schema)
            return ValidationResult(valid=True, data=data)
        except jsonschema.ValidationError as e:
            return ValidationResult(valid=False, errors=[str(e.message)])
        except Exception as e:
            return ValidationResult(valid=False, errors=[str(e)])
    
    def validate_pydantic(
        self,
        data: dict,
        model: Type[T]
    ) -> ValidationResult:
        """Validate against Pydantic model."""
        
        try:
            validated = model.model_validate(data)
            return ValidationResult(valid=True, data=validated)
        except ValidationError as e:
            errors = [f"{err['loc']}: {err['msg']}" for err in e.errors()]
            return ValidationResult(valid=False, errors=errors)
    
    def validate_with_coercion(
        self,
        data: dict,
        model: Type[T]
    ) -> ValidationResult:
        """Validate with type coercion."""
        
        # First try direct validation
        result = self.validate_pydantic(data, model)
        if result.valid:
            return result
        
        # Try coercing types
        coerced_data = self._coerce_types(data, model)
        
        try:
            validated = model.model_validate(coerced_data)
            return ValidationResult(valid=True, data=validated, coerced=True)
        except ValidationError as e:
            errors = [f"{err['loc']}: {err['msg']}" for err in e.errors()]
            return ValidationResult(valid=False, errors=errors)
    
    def _coerce_types(self, data: dict, model: Type[T]) -> dict:
        """Attempt to coerce types to match schema."""
        
        schema = model.model_json_schema()
        coerced = {}
        
        for field, value in data.items():
            if field not in schema.get("properties", {}):
                coerced[field] = value
                continue
            
            expected_type = schema["properties"][field].get("type")
            
            if expected_type == "integer" and isinstance(value, str):
                try:
                    coerced[field] = int(value)
                    continue
                except ValueError:
                    pass
            
            if expected_type == "number" and isinstance(value, str):
                try:
                    coerced[field] = float(value)
                    continue
                except ValueError:
                    pass
            
            if expected_type == "boolean" and isinstance(value, str):
                if value.lower() in ["true", "yes", "1"]:
                    coerced[field] = True
                    continue
                if value.lower() in ["false", "no", "0"]:
                    coerced[field] = False
                    continue
            
            if expected_type == "string" and not isinstance(value, str):
                coerced[field] = str(value)
                continue
            
            if expected_type == "array" and isinstance(value, str):
                try:
                    coerced[field] = json.loads(value)
                    continue
                except json.JSONDecodeError:
                    pass
            
            coerced[field] = value
        
        return coerced

class OutputValidator:
    """Complete output validation pipeline."""
    
    def __init__(self):
        self.extractor = PatternExtractor()
        self.validator = SchemaValidator()
    
    def validate_json_output(
        self,
        text: str,
        schema: dict = None
    ) -> ValidationResult:
        """Extract and validate JSON output."""
        
        # Extract JSON
        extraction = self.extractor.extract_json(text)
        if not extraction.success:
            return ValidationResult(valid=False, errors=[extraction.error])
        
        # Validate if schema provided
        if schema:
            return self.validator.validate_json_schema(extraction.data, schema)
        
        return ValidationResult(valid=True, data=extraction.data)
    
    def validate_typed_output(
        self,
        text: str,
        model: Type[T]
    ) -> ValidationResult:
        """Extract and validate typed output."""
        
        # Extract JSON
        extraction = self.extractor.extract_json(text)
        if not extraction.success:
            return ValidationResult(valid=False, errors=[extraction.error])
        
        # Validate with coercion
        return self.validator.validate_with_coercion(extraction.data, model)

Output Repair

from dataclasses import dataclass
from typing import Any, Optional
import re
import json

@dataclass
class RepairResult:
    """Result of output repair."""
    
    success: bool
    data: Any = None
    repairs_made: list[str] = None
    error: str = None

class OutputRepairer:
    """Repair malformed LLM output."""
    
    def repair_json(self, text: str) -> RepairResult:
        """Attempt to repair malformed JSON."""
        
        repairs = []
        fixed = text
        
        # Remove markdown code blocks
        if "```" in fixed:
            fixed = re.sub(r'```(?:json)?\s*', '', fixed)
            fixed = re.sub(r'\s*```', '', fixed)
            repairs.append("Removed markdown code blocks")
        
        # Fix trailing commas
        if re.search(r',\s*[}\]]', fixed):
            fixed = re.sub(r',(\s*[}\]])', r'\1', fixed)
            repairs.append("Removed trailing commas")
        
        # Fix single quotes to double quotes
        if "'" in fixed and '"' not in fixed:
            fixed = fixed.replace("'", '"')
            repairs.append("Converted single quotes to double quotes")
        
        # Fix unquoted keys
        unquoted_key = re.search(r'{\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*:', fixed)
        if unquoted_key:
            fixed = re.sub(r'([{,])\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*:', r'\1"\2":', fixed)
            repairs.append("Added quotes to unquoted keys")
        
        # Fix missing commas between elements
        fixed = re.sub(r'"\s*\n\s*"', '",\n"', fixed)
        fixed = re.sub(r'}\s*\n\s*{', '},\n{', fixed)
        
        # Try to parse
        try:
            data = json.loads(fixed)
            return RepairResult(success=True, data=data, repairs_made=repairs)
        except json.JSONDecodeError as e:
            return RepairResult(success=False, error=str(e), repairs_made=repairs)
    
    def repair_truncated_json(self, text: str) -> RepairResult:
        """Repair truncated JSON by closing brackets."""
        
        repairs = []
        fixed = text.strip()
        
        # Count brackets
        open_braces = fixed.count('{') - fixed.count('}')
        open_brackets = fixed.count('[') - fixed.count(']')
        
        # Close unclosed brackets
        if open_braces > 0 or open_brackets > 0:
            # Remove trailing comma if present
            fixed = fixed.rstrip(',')
            
            # Add closing brackets
            fixed += ']' * open_brackets
            fixed += '}' * open_braces
            repairs.append(f"Added {open_braces} closing braces and {open_brackets} closing brackets")
        
        try:
            data = json.loads(fixed)
            return RepairResult(success=True, data=data, repairs_made=repairs)
        except json.JSONDecodeError as e:
            return RepairResult(success=False, error=str(e), repairs_made=repairs)

class LLMRepairer:
    """Use LLM to repair malformed output."""
    
    def __init__(self, client: Any, model: str = "gpt-4o-mini"):
        self.client = client
        self.model = model
    
    async def repair(
        self,
        malformed: str,
        expected_schema: dict = None,
        original_prompt: str = None
    ) -> RepairResult:
        """Use LLM to repair malformed output."""
        
        repair_prompt = f"""The following output is malformed or invalid. Please fix it and return valid JSON.

Malformed output:
{malformed[:2000]}

{f"Expected schema: {json.dumps(expected_schema, indent=2)}" if expected_schema else ""}

Return only the corrected JSON, no explanation."""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": repair_prompt}],
            response_format={"type": "json_object"}
        )
        
        try:
            data = json.loads(response.choices[0].message.content)
            return RepairResult(
                success=True,
                data=data,
                repairs_made=["LLM-assisted repair"]
            )
        except json.JSONDecodeError as e:
            return RepairResult(success=False, error=str(e))

class RobustParser:
    """Robust parser with multiple repair strategies."""
    
    def __init__(self, client: Any = None):
        self.extractor = PatternExtractor()
        self.repairer = OutputRepairer()
        self.llm_repairer = LLMRepairer(client) if client else None
    
    async def parse_json(
        self,
        text: str,
        schema: dict = None
    ) -> dict:
        """Parse JSON with multiple fallback strategies."""
        
        # Try direct extraction
        result = self.extractor.extract_json(text)
        if result.success:
            return result.data
        
        # Try repair
        repair_result = self.repairer.repair_json(text)
        if repair_result.success:
            return repair_result.data
        
        # Try truncation repair
        truncation_result = self.repairer.repair_truncated_json(text)
        if truncation_result.success:
            return truncation_result.data
        
        # Try LLM repair as last resort
        if self.llm_repairer:
            llm_result = await self.llm_repairer.repair(text, schema)
            if llm_result.success:
                return llm_result.data
        
        raise ValueError(f"Could not parse output: {result.error}")

Production Parsing Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, create_model
from typing import Optional, Any

app = FastAPI()

# Initialize components
extractor = PatternExtractor()
validator = OutputValidator()
repairer = OutputRepairer()
robust_parser = None  # Initialize with client

class ParseRequest(BaseModel):
    text: str
    format: str = "json"
    schema: Optional[dict] = None

class ValidateRequest(BaseModel):
    text: str
    schema: dict

class RepairRequest(BaseModel):
    text: str
    schema: Optional[dict] = None

class ExtractChoiceRequest(BaseModel):
    text: str
    choices: list[str]

@app.post("/v1/parse")
async def parse_output(request: ParseRequest):
    """Parse LLM output."""
    
    if request.format == "json":
        result = extractor.extract_json(request.text)
    elif request.format == "list":
        result = extractor.extract_list(request.text)
    elif request.format == "number":
        result = extractor.extract_number(request.text)
    elif request.format == "boolean":
        result = extractor.extract_boolean(request.text)
    else:
        raise HTTPException(status_code=400, detail=f"Unknown format: {request.format}")
    
    if result.success:
        return {"success": True, "data": result.data}
    else:
        return {"success": False, "error": result.error}

@app.post("/v1/parse/robust")
async def robust_parse(request: ParseRequest):
    """Parse with multiple fallback strategies."""
    
    try:
        data = await robust_parser.parse_json(request.text, request.schema)
        return {"success": True, "data": data}
    except ValueError as e:
        return {"success": False, "error": str(e)}

@app.post("/v1/validate")
async def validate_output(request: ValidateRequest):
    """Validate output against schema."""
    
    result = validator.validate_json_output(request.text, request.schema)
    
    return {
        "valid": result.valid,
        "data": result.data if result.valid else None,
        "errors": result.errors
    }

@app.post("/v1/repair")
async def repair_output(request: RepairRequest):
    """Repair malformed output."""
    
    result = repairer.repair_json(request.text)
    
    if result.success:
        return {
            "success": True,
            "data": result.data,
            "repairs": result.repairs_made
        }
    
    # Try truncation repair
    truncation_result = repairer.repair_truncated_json(request.text)
    
    if truncation_result.success:
        return {
            "success": True,
            "data": truncation_result.data,
            "repairs": truncation_result.repairs_made
        }
    
    return {
        "success": False,
        "error": result.error,
        "attempted_repairs": result.repairs_made
    }

@app.post("/v1/extract/choice")
async def extract_choice(request: ExtractChoiceRequest):
    """Extract choice from text."""
    
    result = extractor.extract_choice(request.text, request.choices)
    
    return {
        "success": result.success,
        "choice": result.data if result.success else None,
        "error": result.error if not result.success else None
    }

@app.post("/v1/extract/fields")
async def extract_fields(text: str, patterns: dict[str, str]):
    """Extract fields using patterns."""
    
    structured = StructuredExtractor()
    result = structured.extract_fields(text, patterns)
    
    return {
        "success": result.success,
        "fields": result.data if result.success else None,
        "error": result.error if not result.success else None
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Reliable output parsing is essential for building production LLM applications. Start with clear format instructions that guide the model toward the structure you need—Pydantic models provide excellent schema documentation. Use pattern extraction that handles common variations: JSON in code blocks, numbered lists, key-value pairs. Validate extracted data against schemas to catch structural issues early, with type coercion to handle minor mismatches. Implement repair mechanisms for common issues: trailing commas, unquoted keys, truncated output. For critical applications, use LLM-assisted repair as a last resort. The key insight is that you should never trust raw LLM output—always extract, validate, and be prepared to repair. Build parsing pipelines with multiple fallback strategies, and log parsing failures to identify patterns you can address with better prompts or additional repair logic.