Introduction: LLM outputs are inherently unstructured—models generate text, not data structures. Yet most applications need structured data: JSON for APIs, typed objects for business logic, specific formats for downstream processing. Output parsing bridges this gap, transforming free-form text into reliable, validated data structures. This guide covers the techniques that make parsing robust: format specification in prompts, regex and pattern-based extraction, JSON and XML parsing with error recovery, Pydantic validation for type safety, and retry strategies for malformed outputs. Whether you’re extracting entities from text, generating structured reports, or building data pipelines powered by LLMs, these patterns will help you get consistent, reliable structured outputs from any model.

Format Specification
from dataclasses import dataclass, field
from typing import Any, Optional, Type
from abc import ABC, abstractmethod
import json
class OutputFormat(ABC):
"""Abstract output format specification."""
@abstractmethod
def get_format_instructions(self) -> str:
"""Get instructions for the LLM."""
pass
@abstractmethod
def parse(self, output: str) -> Any:
"""Parse the output."""
pass
class JSONOutputFormat(OutputFormat):
"""JSON output format."""
def __init__(
self,
schema: dict = None,
example: dict = None,
strict: bool = True
):
self.schema = schema
self.example = example
self.strict = strict
def get_format_instructions(self) -> str:
"""Get JSON format instructions."""
instructions = [
"Output your response as valid JSON.",
"Do not include any text before or after the JSON.",
"Ensure all strings are properly quoted.",
"Do not use trailing commas."
]
if self.schema:
instructions.append(f"\nJSON Schema:\n```json\n{json.dumps(self.schema, indent=2)}\n```")
if self.example:
instructions.append(f"\nExample output:\n```json\n{json.dumps(self.example, indent=2)}\n```")
return "\n".join(instructions)
def parse(self, output: str) -> dict:
"""Parse JSON output."""
# Try to extract JSON from markdown code blocks
import re
# Pattern for JSON in code blocks
json_match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', output)
if json_match:
json_str = json_match.group(1)
else:
# Try to find JSON object or array
json_match = re.search(r'(\{[\s\S]*\}|\[[\s\S]*\])', output)
json_str = json_match.group(1) if json_match else output
return json.loads(json_str)
class XMLOutputFormat(OutputFormat):
"""XML output format."""
def __init__(self, root_element: str, elements: list[str] = None):
self.root = root_element
self.elements = elements or []
def get_format_instructions(self) -> str:
"""Get XML format instructions."""
instructions = [
f"Output your response as valid XML with root element <{self.root}>.",
"Ensure all tags are properly closed.",
"Use proper XML escaping for special characters."
]
if self.elements:
instructions.append("\nRequired elements:")
for elem in self.elements:
instructions.append(f" <{elem}>...{elem}>")
return "\n".join(instructions)
def parse(self, output: str) -> dict:
"""Parse XML output."""
import xml.etree.ElementTree as ET
import re
# Extract XML from code blocks if present
xml_match = re.search(r'```(?:xml)?\s*([\s\S]*?)\s*```', output)
xml_str = xml_match.group(1) if xml_match else output
# Parse XML
root = ET.fromstring(xml_str)
# Convert to dict
result = {}
for child in root:
if len(child) == 0:
result[child.tag] = child.text
else:
result[child.tag] = self._element_to_dict(child)
return result
def _element_to_dict(self, element) -> dict:
"""Convert XML element to dict."""
result = {}
for child in element:
if len(child) == 0:
result[child.tag] = child.text
else:
result[child.tag] = self._element_to_dict(child)
return result
class MarkdownOutputFormat(OutputFormat):
"""Markdown output format with sections."""
def __init__(self, sections: list[str], heading_level: int = 2):
self.sections = sections
self.heading_level = heading_level
def get_format_instructions(self) -> str:
"""Get markdown format instructions."""
heading = "#" * self.heading_level
instructions = [
"Format your response in Markdown with the following sections:",
""
]
for section in self.sections:
instructions.append(f"{heading} {section}")
instructions.append("[Your content here]")
instructions.append("")
return "\n".join(instructions)
def parse(self, output: str) -> dict:
"""Parse markdown into sections."""
import re
heading_pattern = r'^#{1,6}\s+(.+)$'
sections = {}
current_section = "intro"
current_content = []
for line in output.split('\n'):
match = re.match(heading_pattern, line)
if match:
# Save previous section
if current_content:
sections[current_section] = '\n'.join(current_content).strip()
current_section = match.group(1).strip()
current_content = []
else:
current_content.append(line)
# Save last section
if current_content:
sections[current_section] = '\n'.join(current_content).strip()
return sections
class ListOutputFormat(OutputFormat):
"""List output format."""
def __init__(
self,
item_format: str = "numbered", # "numbered", "bulleted", "plain"
min_items: int = None,
max_items: int = None
):
self.item_format = item_format
self.min_items = min_items
self.max_items = max_items
def get_format_instructions(self) -> str:
"""Get list format instructions."""
if self.item_format == "numbered":
format_desc = "a numbered list (1. item, 2. item, etc.)"
elif self.item_format == "bulleted":
format_desc = "a bulleted list (- item or * item)"
else:
format_desc = "one item per line"
instructions = [f"Output your response as {format_desc}."]
if self.min_items:
instructions.append(f"Include at least {self.min_items} items.")
if self.max_items:
instructions.append(f"Include at most {self.max_items} items.")
return " ".join(instructions)
def parse(self, output: str) -> list[str]:
"""Parse list output."""
import re
items = []
for line in output.split('\n'):
line = line.strip()
if not line:
continue
# Remove list markers
cleaned = re.sub(r'^[\d]+[.)]\s*', '', line) # Numbered
cleaned = re.sub(r'^[-*•]\s*', '', cleaned) # Bulleted
if cleaned:
items.append(cleaned)
return items
Pattern-Based Extraction
from dataclasses import dataclass
from typing import Any, Optional, Pattern
import re
@dataclass
class ExtractionPattern:
"""Pattern for extracting data."""
name: str
pattern: str
group: int = 1
transform: callable = None
class PatternExtractor:
"""Extract data using regex patterns."""
def __init__(self):
self.patterns: list[ExtractionPattern] = []
def add_pattern(
self,
name: str,
pattern: str,
group: int = 1,
transform: callable = None
):
"""Add extraction pattern."""
self.patterns.append(ExtractionPattern(
name=name,
pattern=pattern,
group=group,
transform=transform
))
def extract(self, text: str) -> dict:
"""Extract all patterns from text."""
results = {}
for pattern in self.patterns:
match = re.search(pattern.pattern, text, re.IGNORECASE | re.DOTALL)
if match:
value = match.group(pattern.group)
if pattern.transform:
value = pattern.transform(value)
results[pattern.name] = value
return results
def extract_all(self, text: str) -> dict[str, list]:
"""Extract all occurrences of patterns."""
results = {}
for pattern in self.patterns:
matches = re.findall(pattern.pattern, text, re.IGNORECASE | re.DOTALL)
if pattern.transform:
matches = [pattern.transform(m) for m in matches]
results[pattern.name] = matches
return results
class StructuredExtractor:
"""Extract structured data from text."""
def __init__(self):
self.field_patterns: dict[str, str] = {}
self.section_pattern: str = None
def set_field_pattern(self, field_name: str, pattern: str):
"""Set pattern for a field."""
self.field_patterns[field_name] = pattern
def set_section_pattern(self, pattern: str):
"""Set pattern for sections."""
self.section_pattern = pattern
def extract(self, text: str) -> dict:
"""Extract structured data."""
result = {}
# Extract individual fields
for field, pattern in self.field_patterns.items():
match = re.search(pattern, text, re.IGNORECASE | re.DOTALL)
if match:
result[field] = match.group(1).strip()
# Extract sections if pattern set
if self.section_pattern:
sections = re.findall(self.section_pattern, text, re.IGNORECASE | re.DOTALL)
result['sections'] = sections
return result
class TagExtractor:
"""Extract content between tags."""
def __init__(self, tags: list[str] = None):
self.tags = tags or []
def add_tag(self, tag: str):
"""Add tag to extract."""
self.tags.append(tag)
def extract(self, text: str) -> dict:
"""Extract content between tags."""
results = {}
for tag in self.tags:
# Match both XML-style and custom tags
patterns = [
f'<{tag}>(.*?){tag}>',
f'\\[{tag}\\](.*?)\\[/{tag}\\]',
f'{{{tag}}}(.*?){{{tag}}}',
]
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE | re.DOTALL)
if match:
results[tag] = match.group(1).strip()
break
return results
class KeyValueExtractor:
"""Extract key-value pairs."""
def __init__(
self,
separator: str = ":",
line_based: bool = True
):
self.separator = separator
self.line_based = line_based
def extract(self, text: str) -> dict:
"""Extract key-value pairs."""
results = {}
if self.line_based:
for line in text.split('\n'):
if self.separator in line:
parts = line.split(self.separator, 1)
if len(parts) == 2:
key = parts[0].strip().lower().replace(' ', '_')
value = parts[1].strip()
results[key] = value
else:
# Find all key: value patterns
pattern = r'(\w+(?:\s+\w+)*)\s*' + re.escape(self.separator) + r'\s*([^\n,]+)'
matches = re.findall(pattern, text)
for key, value in matches:
key = key.strip().lower().replace(' ', '_')
results[key] = value.strip()
return results
class TableExtractor:
"""Extract tabular data."""
def __init__(self, has_header: bool = True):
self.has_header = has_header
def extract(self, text: str) -> list[dict]:
"""Extract table data."""
lines = text.strip().split('\n')
# Find table lines (contain |)
table_lines = [l for l in lines if '|' in l]
if not table_lines:
return []
# Parse rows
rows = []
for line in table_lines:
# Skip separator lines
if re.match(r'^[\s|:-]+$', line):
continue
cells = [c.strip() for c in line.split('|')]
cells = [c for c in cells if c] # Remove empty
if cells:
rows.append(cells)
if not rows:
return []
# Convert to dicts if has header
if self.has_header and len(rows) > 1:
headers = rows[0]
return [
dict(zip(headers, row))
for row in rows[1:]
]
return rows
Pydantic Validation
from pydantic import BaseModel, Field, validator, root_validator
from typing import Any, Optional, Type, TypeVar, Generic
from enum import Enum
import json
T = TypeVar('T', bound=BaseModel)
class PydanticOutputParser(Generic[T]):
"""Parse and validate output using Pydantic."""
def __init__(self, model_class: Type[T]):
self.model_class = model_class
def get_format_instructions(self) -> str:
"""Get format instructions from Pydantic model."""
schema = self.model_class.model_json_schema()
return f"""Output your response as JSON matching this schema:
```json
{json.dumps(schema, indent=2)}
```
Ensure all required fields are present and types are correct."""
def parse(self, output: str) -> T:
"""Parse and validate output."""
import re
# Extract JSON
json_match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', output)
if json_match:
json_str = json_match.group(1)
else:
json_match = re.search(r'(\{[\s\S]*\})', output)
json_str = json_match.group(1) if json_match else output
# Parse JSON
data = json.loads(json_str)
# Validate with Pydantic
return self.model_class.model_validate(data)
def parse_safe(self, output: str) -> tuple[Optional[T], Optional[str]]:
"""Parse with error handling."""
try:
result = self.parse(output)
return result, None
except json.JSONDecodeError as e:
return None, f"JSON parse error: {e}"
except Exception as e:
return None, f"Validation error: {e}"
# Example Pydantic models
class Sentiment(str, Enum):
POSITIVE = "positive"
NEGATIVE = "negative"
NEUTRAL = "neutral"
class SentimentAnalysis(BaseModel):
"""Sentiment analysis result."""
sentiment: Sentiment
confidence: float = Field(ge=0, le=1)
reasoning: str
@validator('confidence')
def round_confidence(cls, v):
return round(v, 3)
class Entity(BaseModel):
"""Extracted entity."""
name: str
type: str
start_index: Optional[int] = None
end_index: Optional[int] = None
class EntityExtraction(BaseModel):
"""Entity extraction result."""
entities: list[Entity]
text_length: int
@root_validator
def validate_indices(cls, values):
entities = values.get('entities', [])
text_length = values.get('text_length', 0)
for entity in entities:
if entity.start_index is not None:
if entity.start_index < 0 or entity.start_index >= text_length:
raise ValueError(f"Invalid start_index for entity {entity.name}")
return values
class Summary(BaseModel):
"""Text summary result."""
summary: str = Field(min_length=10, max_length=500)
key_points: list[str] = Field(min_items=1, max_items=10)
word_count: int = Field(ge=1)
class Classification(BaseModel):
"""Classification result."""
label: str
confidence: float = Field(ge=0, le=1)
alternatives: list[dict] = Field(default_factory=list)
class StructuredResponse(BaseModel):
"""Generic structured response."""
success: bool
data: Optional[dict] = None
error: Optional[str] = None
metadata: dict = Field(default_factory=dict)
class OutputParserFactory:
"""Factory for creating output parsers."""
@staticmethod
def for_model(model_class: Type[BaseModel]) -> PydanticOutputParser:
"""Create parser for Pydantic model."""
return PydanticOutputParser(model_class)
@staticmethod
def for_json(schema: dict = None) -> JSONOutputFormat:
"""Create JSON parser."""
return JSONOutputFormat(schema=schema)
@staticmethod
def for_list(min_items: int = None, max_items: int = None) -> ListOutputFormat:
"""Create list parser."""
return ListOutputFormat(min_items=min_items, max_items=max_items)
Error Recovery and Retries
from dataclasses import dataclass
from typing import Any, Optional, Type
import json
@dataclass
class ParseResult:
"""Result of parsing attempt."""
success: bool
data: Any = None
error: str = None
raw_output: str = None
attempts: int = 1
class OutputFixer:
"""Fix malformed LLM outputs."""
def __init__(self, llm_client: Any = None):
self.llm = llm_client
def fix_json(self, malformed: str) -> str:
"""Attempt to fix malformed JSON."""
import re
fixed = malformed
# Remove markdown code blocks
fixed = re.sub(r'```(?:json)?\s*', '', fixed)
fixed = re.sub(r'```\s*$', '', fixed)
# Fix common issues
# Trailing commas
fixed = re.sub(r',\s*}', '}', fixed)
fixed = re.sub(r',\s*]', ']', fixed)
# Single quotes to double quotes
fixed = re.sub(r"'([^']*)':", r'"\1":', fixed)
# Unquoted keys
fixed = re.sub(r'(\{|,)\s*(\w+)\s*:', r'\1"\2":', fixed)
# Missing quotes around string values
# This is tricky - only do for obvious cases
fixed = re.sub(r':\s*([a-zA-Z][a-zA-Z0-9_]*)\s*([,}])', r': "\1"\2', fixed)
# Try to extract just the JSON part
json_match = re.search(r'(\{[\s\S]*\}|\[[\s\S]*\])', fixed)
if json_match:
fixed = json_match.group(1)
return fixed.strip()
async def fix_with_llm(
self,
malformed: str,
expected_format: str
) -> str:
"""Use LLM to fix malformed output."""
if not self.llm:
raise ValueError("LLM client required for LLM-based fixing")
prompt = f"""The following output is malformed. Fix it to match the expected format.
Malformed output:
{malformed}
Expected format:
{expected_format}
Output only the fixed version, nothing else:"""
response = await self.llm.complete(prompt)
return response.content
class RetryingParser:
"""Parser with automatic retries."""
def __init__(
self,
parser: Any,
llm_client: Any,
max_retries: int = 3,
fixer: OutputFixer = None
):
self.parser = parser
self.llm = llm_client
self.max_retries = max_retries
self.fixer = fixer or OutputFixer(llm_client)
async def parse(
self,
prompt: str,
**llm_kwargs
) -> ParseResult:
"""Parse with retries."""
last_error = None
last_output = None
for attempt in range(self.max_retries):
# Get LLM response
if attempt == 0:
full_prompt = f"{prompt}\n\n{self.parser.get_format_instructions()}"
else:
# Include error feedback
full_prompt = f"""{prompt}
{self.parser.get_format_instructions()}
Previous attempt failed with error: {last_error}
Previous output: {last_output}
Please fix the output format."""
response = await self.llm.complete(full_prompt, **llm_kwargs)
last_output = response.content
# Try to parse
try:
# First try direct parsing
data = self.parser.parse(last_output)
return ParseResult(
success=True,
data=data,
raw_output=last_output,
attempts=attempt + 1
)
except Exception as e:
last_error = str(e)
# Try fixing
if hasattr(self.parser, 'parse') and 'json' in str(type(self.parser)).lower():
fixed = self.fixer.fix_json(last_output)
try:
data = self.parser.parse(fixed)
return ParseResult(
success=True,
data=data,
raw_output=last_output,
attempts=attempt + 1
)
except Exception:
pass
return ParseResult(
success=False,
error=last_error,
raw_output=last_output,
attempts=self.max_retries
)
class FallbackParser:
"""Parser with fallback strategies."""
def __init__(self, parsers: list[Any]):
self.parsers = parsers
def parse(self, output: str) -> tuple[Any, int]:
"""Try parsers in order until one succeeds."""
errors = []
for i, parser in enumerate(self.parsers):
try:
result = parser.parse(output)
return result, i
except Exception as e:
errors.append(f"Parser {i}: {e}")
raise ValueError(f"All parsers failed: {'; '.join(errors)}")
class PartialParser:
"""Extract partial results from malformed output."""
def __init__(self, required_fields: list[str], optional_fields: list[str] = None):
self.required = set(required_fields)
self.optional = set(optional_fields or [])
def parse(self, output: str) -> tuple[dict, list[str]]:
"""Parse and return partial results with missing fields."""
import re
result = {}
missing = []
# Try JSON parsing first
try:
json_match = re.search(r'(\{[\s\S]*\})', output)
if json_match:
data = json.loads(json_match.group(1))
result.update(data)
except json.JSONDecodeError:
pass
# Try key-value extraction as fallback
for field in self.required | self.optional:
if field not in result:
# Try to find field in text
patterns = [
f'"{field}"\\s*:\\s*"([^"]*)"',
f'"{field}"\\s*:\\s*([\\d.]+)',
f'{field}\\s*:\\s*([^\\n,}}]+)',
]
for pattern in patterns:
match = re.search(pattern, output, re.IGNORECASE)
if match:
result[field] = match.group(1).strip()
break
# Check for missing required fields
for field in self.required:
if field not in result:
missing.append(field)
return result, missing
class StreamingParser:
"""Parse streaming LLM output."""
def __init__(self, parser: Any):
self.parser = parser
self.buffer = ""
def add_chunk(self, chunk: str) -> Optional[Any]:
"""Add chunk and try to parse."""
self.buffer += chunk
# Try to parse current buffer
try:
result = self.parser.parse(self.buffer)
return result
except Exception:
return None
def finalize(self) -> Any:
"""Finalize and parse complete output."""
return self.parser.parse(self.buffer)
def reset(self):
"""Reset buffer."""
self.buffer = ""
Type Transformation
from dataclasses import dataclass
from typing import Any, Optional, Type, Union, get_type_hints, get_origin, get_args
from datetime import datetime, date
from enum import Enum
class TypeTransformer:
"""Transform parsed values to correct types."""
def __init__(self):
self.transformers: dict[type, callable] = {
int: self._to_int,
float: self._to_float,
bool: self._to_bool,
str: self._to_str,
datetime: self._to_datetime,
date: self._to_date,
list: self._to_list,
dict: self._to_dict,
}
def transform(self, value: Any, target_type: type) -> Any:
"""Transform value to target type."""
if value is None:
return None
# Handle Optional types
origin = get_origin(target_type)
if origin is Union:
args = get_args(target_type)
# Try each type in union
for arg in args:
if arg is type(None):
continue
try:
return self.transform(value, arg)
except Exception:
continue
raise ValueError(f"Cannot transform {value} to {target_type}")
# Handle list types
if origin is list:
item_type = get_args(target_type)[0] if get_args(target_type) else str
return [self.transform(item, item_type) for item in value]
# Handle dict types
if origin is dict:
args = get_args(target_type)
key_type = args[0] if args else str
value_type = args[1] if len(args) > 1 else Any
return {
self.transform(k, key_type): self.transform(v, value_type)
for k, v in value.items()
}
# Handle Enum types
if isinstance(target_type, type) and issubclass(target_type, Enum):
return self._to_enum(value, target_type)
# Use registered transformer
transformer = self.transformers.get(target_type)
if transformer:
return transformer(value)
# Default: return as-is
return value
def _to_int(self, value: Any) -> int:
"""Convert to int."""
if isinstance(value, int):
return value
if isinstance(value, float):
return int(value)
if isinstance(value, str):
# Handle common formats
value = value.strip().replace(',', '')
return int(float(value))
raise ValueError(f"Cannot convert {value} to int")
def _to_float(self, value: Any) -> float:
"""Convert to float."""
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
value = value.strip().replace(',', '')
# Handle percentages
if value.endswith('%'):
return float(value[:-1]) / 100
return float(value)
raise ValueError(f"Cannot convert {value} to float")
def _to_bool(self, value: Any) -> bool:
"""Convert to bool."""
if isinstance(value, bool):
return value
if isinstance(value, str):
lower = value.lower().strip()
if lower in ('true', 'yes', '1', 'on'):
return True
if lower in ('false', 'no', '0', 'off'):
return False
if isinstance(value, (int, float)):
return bool(value)
raise ValueError(f"Cannot convert {value} to bool")
def _to_str(self, value: Any) -> str:
"""Convert to string."""
return str(value)
def _to_datetime(self, value: Any) -> datetime:
"""Convert to datetime."""
if isinstance(value, datetime):
return value
if isinstance(value, str):
# Try common formats
formats = [
'%Y-%m-%dT%H:%M:%S',
'%Y-%m-%dT%H:%M:%SZ',
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%d',
'%m/%d/%Y',
'%d/%m/%Y',
]
for fmt in formats:
try:
return datetime.strptime(value, fmt)
except ValueError:
continue
# Try ISO format
return datetime.fromisoformat(value.replace('Z', '+00:00'))
raise ValueError(f"Cannot convert {value} to datetime")
def _to_date(self, value: Any) -> date:
"""Convert to date."""
if isinstance(value, date):
return value
if isinstance(value, datetime):
return value.date()
if isinstance(value, str):
dt = self._to_datetime(value)
return dt.date()
raise ValueError(f"Cannot convert {value} to date")
def _to_list(self, value: Any) -> list:
"""Convert to list."""
if isinstance(value, list):
return value
if isinstance(value, str):
# Try JSON
try:
import json
return json.loads(value)
except json.JSONDecodeError:
pass
# Split by common delimiters
if ',' in value:
return [v.strip() for v in value.split(',')]
if '\n' in value:
return [v.strip() for v in value.split('\n') if v.strip()]
return [value]
def _to_dict(self, value: Any) -> dict:
"""Convert to dict."""
if isinstance(value, dict):
return value
if isinstance(value, str):
import json
return json.loads(value)
raise ValueError(f"Cannot convert {value} to dict")
def _to_enum(self, value: Any, enum_class: Type[Enum]) -> Enum:
"""Convert to enum."""
if isinstance(value, enum_class):
return value
# Try by value
try:
return enum_class(value)
except ValueError:
pass
# Try by name
if isinstance(value, str):
try:
return enum_class[value.upper()]
except KeyError:
pass
raise ValueError(f"Cannot convert {value} to {enum_class}")
class SchemaTransformer:
"""Transform data according to schema."""
def __init__(self, schema: dict):
self.schema = schema
self.transformer = TypeTransformer()
def transform(self, data: dict) -> dict:
"""Transform data according to schema."""
result = {}
properties = self.schema.get('properties', {})
for key, prop_schema in properties.items():
if key in data:
value = data[key]
target_type = self._schema_to_type(prop_schema)
result[key] = self.transformer.transform(value, target_type)
return result
def _schema_to_type(self, schema: dict) -> type:
"""Convert JSON schema type to Python type."""
type_map = {
'string': str,
'integer': int,
'number': float,
'boolean': bool,
'array': list,
'object': dict,
}
schema_type = schema.get('type', 'string')
return type_map.get(schema_type, str)
Production Parsing Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, Any
import json
app = FastAPI()
class ParseRequest(BaseModel):
output: str
format: str = "json" # json, xml, markdown, list
schema: Optional[dict] = None
class ExtractRequest(BaseModel):
text: str
patterns: dict[str, str]
class ValidateRequest(BaseModel):
data: dict
schema: dict
# Initialize components
json_parser = JSONOutputFormat()
xml_parser = XMLOutputFormat("root")
list_parser = ListOutputFormat()
transformer = TypeTransformer()
fixer = OutputFixer()
@app.post("/v1/parse")
async def parse_output(request: ParseRequest) -> dict:
"""Parse LLM output."""
try:
if request.format == "json":
parser = JSONOutputFormat(schema=request.schema)
elif request.format == "xml":
parser = XMLOutputFormat("root")
elif request.format == "markdown":
parser = MarkdownOutputFormat([])
elif request.format == "list":
parser = ListOutputFormat()
else:
raise HTTPException(status_code=400, detail=f"Unknown format: {request.format}")
result = parser.parse(request.output)
return {
"success": True,
"data": result,
"format": request.format
}
except Exception as e:
return {
"success": False,
"error": str(e),
"format": request.format
}
@app.post("/v1/parse/fix")
async def fix_and_parse(request: ParseRequest) -> dict:
"""Fix malformed output and parse."""
try:
# Try direct parsing first
parser = JSONOutputFormat(schema=request.schema)
result = parser.parse(request.output)
return {
"success": True,
"data": result,
"fixed": False
}
except Exception:
# Try fixing
try:
fixed = fixer.fix_json(request.output)
result = json.loads(fixed)
return {
"success": True,
"data": result,
"fixed": True,
"fixed_output": fixed
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
@app.post("/v1/extract")
async def extract_patterns(request: ExtractRequest) -> dict:
"""Extract data using patterns."""
extractor = PatternExtractor()
for name, pattern in request.patterns.items():
extractor.add_pattern(name, pattern)
results = extractor.extract(request.text)
return {
"success": True,
"extracted": results,
"patterns_matched": len(results)
}
@app.post("/v1/validate")
async def validate_data(request: ValidateRequest) -> dict:
"""Validate data against schema."""
from jsonschema import validate, ValidationError
try:
validate(instance=request.data, schema=request.schema)
return {
"valid": True,
"errors": []
}
except ValidationError as e:
return {
"valid": False,
"errors": [e.message]
}
@app.post("/v1/transform")
async def transform_types(data: dict, schema: dict) -> dict:
"""Transform data types according to schema."""
schema_transformer = SchemaTransformer(schema)
try:
result = schema_transformer.transform(data)
return {
"success": True,
"data": result
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- Pydantic: https://docs.pydantic.dev/
- JSON Schema: https://json-schema.org/
- LangChain Output Parsers: https://python.langchain.com/docs/modules/model_io/output_parsers/
- Instructor: https://github.com/jxnl/instructor
Conclusion
Output parsing is the bridge between LLM text generation and structured application data. Start with clear format instructions in your prompts—models follow instructions better when the expected format is explicit and includes examples. Use Pydantic for type-safe parsing; it catches errors early and provides clear validation messages. Implement multiple extraction strategies: JSON parsing for structured data, regex patterns for specific fields, and key-value extraction as a fallback. Always plan for malformed outputs—even the best models occasionally produce invalid JSON or miss required fields. Build retry logic that includes the error message in subsequent attempts; models can often self-correct when told what went wrong. Consider partial parsing when complete parsing fails; extracting some fields is often better than failing entirely. For production systems, log parsing failures to identify patterns and improve your prompts. The most important insight is that parsing reliability comes from defense in depth: clear instructions, multiple extraction methods, automatic fixing, and graceful degradation. A robust parsing pipeline can achieve 99%+ success rates even with imperfect model outputs.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.