Introduction: Getting LLMs to produce consistently formatted output is one of the most practical challenges in production AI systems. You need JSON for your API, but the model sometimes wraps it in markdown code blocks. You need a specific schema, but the model invents extra fields or omits required ones. You need clean text, but you get explanatory preambles and postambles. Output formatting techniques solve these problems through a combination of prompt engineering, parsing strategies, and validation pipelines. The key insight is that LLMs are probabilistic—they’ll usually follow your format instructions, but not always. Robust systems assume formatting failures will happen and build in recovery mechanisms. This guide covers practical techniques for getting structured output from LLMs: JSON mode, function calling, schema validation, parsing strategies, and error recovery patterns that make your LLM integrations reliable.

JSON Output Strategies
from dataclasses import dataclass, field
from typing import Any, Optional, List, Dict, Type, TypeVar
from enum import Enum
import json
import re
T = TypeVar('T')
class OutputFormat(Enum):
"""Supported output formats."""
JSON = "json"
MARKDOWN = "markdown"
TEXT = "text"
XML = "xml"
CSV = "csv"
@dataclass
class FormatConfig:
"""Configuration for output formatting."""
format: OutputFormat
schema: dict = None
strict: bool = True
retry_on_failure: bool = True
max_retries: int = 3
class JSONExtractor:
"""Extract JSON from LLM output."""
def extract(self, text: str) -> dict:
"""Extract JSON from text."""
# Try direct parsing first
try:
return json.loads(text.strip())
except json.JSONDecodeError:
pass
# Try extracting from code blocks
json_match = re.search(r'```(?:json)?\s*([\s\S]*?)```', text)
if json_match:
try:
return json.loads(json_match.group(1).strip())
except json.JSONDecodeError:
pass
# Try finding JSON object
json_match = re.search(r'\{[\s\S]*\}', text)
if json_match:
try:
return json.loads(json_match.group(0))
except json.JSONDecodeError:
pass
# Try finding JSON array
json_match = re.search(r'\[[\s\S]*\]', text)
if json_match:
try:
return json.loads(json_match.group(0))
except json.JSONDecodeError:
pass
raise ValueError(f"Could not extract JSON from: {text[:200]}...")
def extract_multiple(self, text: str) -> list[dict]:
"""Extract multiple JSON objects."""
results = []
# Find all JSON objects
pattern = r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}'
matches = re.findall(pattern, text)
for match in matches:
try:
results.append(json.loads(match))
except json.JSONDecodeError:
continue
return results
class SchemaValidator:
"""Validate output against schema."""
def __init__(self, schema: dict):
self.schema = schema
def validate(self, data: dict) -> tuple[bool, list[str]]:
"""Validate data against schema."""
errors = []
# Check required fields
required = self.schema.get("required", [])
for field in required:
if field not in data:
errors.append(f"Missing required field: {field}")
# Check field types
properties = self.schema.get("properties", {})
for field, value in data.items():
if field in properties:
expected_type = properties[field].get("type")
if not self._check_type(value, expected_type):
errors.append(f"Field '{field}' has wrong type: expected {expected_type}")
return len(errors) == 0, errors
def _check_type(self, value: Any, expected: str) -> bool:
"""Check if value matches expected type."""
type_map = {
"string": str,
"integer": int,
"number": (int, float),
"boolean": bool,
"array": list,
"object": dict,
"null": type(None)
}
if expected not in type_map:
return True
return isinstance(value, type_map[expected])
def coerce(self, data: dict) -> dict:
"""Coerce data to match schema types."""
properties = self.schema.get("properties", {})
result = {}
for field, spec in properties.items():
if field in data:
result[field] = self._coerce_value(data[field], spec.get("type"))
elif "default" in spec:
result[field] = spec["default"]
return result
def _coerce_value(self, value: Any, target_type: str) -> Any:
"""Coerce value to target type."""
if target_type == "string":
return str(value)
elif target_type == "integer":
return int(float(value)) if value else 0
elif target_type == "number":
return float(value) if value else 0.0
elif target_type == "boolean":
if isinstance(value, str):
return value.lower() in ("true", "yes", "1")
return bool(value)
elif target_type == "array":
if isinstance(value, str):
return [value]
return list(value) if value else []
elif target_type == "object":
return dict(value) if value else {}
return value
class PydanticFormatter:
"""Format output using Pydantic models."""
def __init__(self, model_class: Type[T]):
self.model_class = model_class
def parse(self, text: str) -> T:
"""Parse text into Pydantic model."""
extractor = JSONExtractor()
data = extractor.extract(text)
return self.model_class(**data)
def parse_list(self, text: str) -> list[T]:
"""Parse text into list of models."""
extractor = JSONExtractor()
data = extractor.extract(text)
if isinstance(data, list):
return [self.model_class(**item) for item in data]
else:
return [self.model_class(**data)]
def get_schema(self) -> dict:
"""Get JSON schema for model."""
return self.model_class.model_json_schema()
def get_prompt_schema(self) -> str:
"""Get schema formatted for prompt."""
schema = self.get_schema()
return json.dumps(schema, indent=2)
Structured Output with Function Calling
from dataclasses import dataclass
from typing import Any, Optional, List, Callable
import json
@dataclass
class FunctionDefinition:
"""Definition of a function for structured output."""
name: str
description: str
parameters: dict
def to_dict(self) -> dict:
return {
"name": self.name,
"description": self.description,
"parameters": self.parameters
}
class FunctionCallFormatter:
"""Use function calling for structured output."""
def __init__(self, llm_client: Any):
self.llm = llm_client
async def get_structured(
self,
prompt: str,
output_schema: dict,
function_name: str = "extract_data"
) -> dict:
"""Get structured output using function calling."""
function_def = FunctionDefinition(
name=function_name,
description="Extract structured data from the input",
parameters=output_schema
)
response = await self.llm.generate(
prompt,
functions=[function_def.to_dict()],
function_call={"name": function_name}
)
# Parse function call response
if hasattr(response, 'function_call'):
return json.loads(response.function_call.arguments)
return response
class ToolCallFormatter:
"""Use tool calling for structured output."""
def __init__(self, llm_client: Any):
self.llm = llm_client
async def get_structured(
self,
prompt: str,
output_schema: dict,
tool_name: str = "output"
) -> dict:
"""Get structured output using tool calling."""
tool = {
"type": "function",
"function": {
"name": tool_name,
"description": "Output the structured result",
"parameters": output_schema
}
}
response = await self.llm.generate(
prompt,
tools=[tool],
tool_choice={"type": "function", "function": {"name": tool_name}}
)
# Parse tool call response
if hasattr(response, 'tool_calls') and response.tool_calls:
return json.loads(response.tool_calls[0].function.arguments)
return response
class ResponseFormatFormatter:
"""Use response_format for JSON output."""
def __init__(self, llm_client: Any):
self.llm = llm_client
async def get_json(
self,
prompt: str,
schema: dict = None
) -> dict:
"""Get JSON output using response_format."""
if schema:
# Use structured outputs (OpenAI)
response = await self.llm.generate(
prompt,
response_format={
"type": "json_schema",
"json_schema": {
"name": "response",
"schema": schema
}
}
)
else:
# Use basic JSON mode
response = await self.llm.generate(
prompt,
response_format={"type": "json_object"}
)
return json.loads(response)
class InstructorFormatter:
"""Use Instructor library for structured output."""
def __init__(self, llm_client: Any):
self.llm = llm_client
async def get_structured(
self,
prompt: str,
response_model: type
) -> Any:
"""Get structured output using Instructor."""
import instructor
# Patch client with instructor
client = instructor.patch(self.llm)
response = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
response_model=response_model
)
return response
class OutputSchemaBuilder:
"""Build output schemas programmatically."""
def __init__(self):
self.properties = {}
self.required = []
def add_string(
self,
name: str,
description: str = "",
required: bool = True,
enum: list[str] = None
) -> 'OutputSchemaBuilder':
"""Add string field."""
prop = {"type": "string", "description": description}
if enum:
prop["enum"] = enum
self.properties[name] = prop
if required:
self.required.append(name)
return self
def add_integer(
self,
name: str,
description: str = "",
required: bool = True,
minimum: int = None,
maximum: int = None
) -> 'OutputSchemaBuilder':
"""Add integer field."""
prop = {"type": "integer", "description": description}
if minimum is not None:
prop["minimum"] = minimum
if maximum is not None:
prop["maximum"] = maximum
self.properties[name] = prop
if required:
self.required.append(name)
return self
def add_boolean(
self,
name: str,
description: str = "",
required: bool = True
) -> 'OutputSchemaBuilder':
"""Add boolean field."""
self.properties[name] = {
"type": "boolean",
"description": description
}
if required:
self.required.append(name)
return self
def add_array(
self,
name: str,
item_type: str = "string",
description: str = "",
required: bool = True
) -> 'OutputSchemaBuilder':
"""Add array field."""
self.properties[name] = {
"type": "array",
"items": {"type": item_type},
"description": description
}
if required:
self.required.append(name)
return self
def add_object(
self,
name: str,
properties: dict,
description: str = "",
required: bool = True
) -> 'OutputSchemaBuilder':
"""Add nested object field."""
self.properties[name] = {
"type": "object",
"properties": properties,
"description": description
}
if required:
self.required.append(name)
return self
def build(self) -> dict:
"""Build the schema."""
return {
"type": "object",
"properties": self.properties,
"required": self.required
}
Parsing and Recovery
from dataclasses import dataclass
from typing import Any, Optional, List, Callable
import re
import json
@dataclass
class ParseResult:
"""Result of parsing attempt."""
success: bool
data: Any = None
error: str = None
raw_output: str = ""
class RobustParser:
"""Parse LLM output with multiple fallback strategies."""
def __init__(self):
self.strategies: list[Callable] = []
def add_strategy(self, strategy: Callable):
"""Add parsing strategy."""
self.strategies.append(strategy)
def parse(self, text: str) -> ParseResult:
"""Try all strategies until one succeeds."""
for strategy in self.strategies:
try:
result = strategy(text)
return ParseResult(
success=True,
data=result,
raw_output=text
)
except Exception as e:
continue
return ParseResult(
success=False,
error="All parsing strategies failed",
raw_output=text
)
class JSONRepairParser:
"""Parse and repair malformed JSON."""
def parse(self, text: str) -> dict:
"""Parse with repair attempts."""
# Try direct parse
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# Try repairs
repaired = self._repair(text)
return json.loads(repaired)
def _repair(self, text: str) -> str:
"""Attempt to repair JSON."""
# Extract JSON portion
text = self._extract_json(text)
# Fix common issues
text = self._fix_trailing_commas(text)
text = self._fix_single_quotes(text)
text = self._fix_unquoted_keys(text)
text = self._fix_missing_quotes(text)
text = self._fix_newlines(text)
return text
def _extract_json(self, text: str) -> str:
"""Extract JSON from surrounding text."""
# Remove markdown code blocks
text = re.sub(r'```(?:json)?\s*', '', text)
text = re.sub(r'```\s*$', '', text)
# Find JSON boundaries
start = text.find('{')
if start == -1:
start = text.find('[')
if start == -1:
return text
# Find matching end
if text[start] == '{':
end = text.rfind('}')
else:
end = text.rfind(']')
if end == -1:
return text[start:]
return text[start:end + 1]
def _fix_trailing_commas(self, text: str) -> str:
"""Remove trailing commas."""
text = re.sub(r',\s*}', '}', text)
text = re.sub(r',\s*]', ']', text)
return text
def _fix_single_quotes(self, text: str) -> str:
"""Replace single quotes with double quotes."""
# Only replace quotes around keys and string values
result = []
in_string = False
string_char = None
i = 0
while i < len(text):
char = text[i]
if char in '"\'':
if not in_string:
in_string = True
string_char = char
result.append('"')
elif char == string_char:
in_string = False
string_char = None
result.append('"')
else:
result.append(char)
else:
result.append(char)
i += 1
return ''.join(result)
def _fix_unquoted_keys(self, text: str) -> str:
"""Add quotes to unquoted keys."""
# Match unquoted keys
pattern = r'([{,]\s*)([a-zA-Z_][a-zA-Z0-9_]*)(\s*:)'
return re.sub(pattern, r'\1"\2"\3', text)
def _fix_missing_quotes(self, text: str) -> str:
"""Add missing quotes to string values."""
# This is tricky - only fix obvious cases
# Fix values that look like unquoted strings after colons
pattern = r':\s*([a-zA-Z][a-zA-Z0-9_\s]*[a-zA-Z0-9])(\s*[,}])'
return re.sub(pattern, r': "\1"\2', text)
def _fix_newlines(self, text: str) -> str:
"""Fix newlines in strings."""
# Replace literal newlines in strings with \n
return text.replace('\n', '\\n').replace('\\n\\n', '\n')
class MarkdownParser:
"""Parse structured data from markdown."""
def parse_table(self, text: str) -> list[dict]:
"""Parse markdown table to list of dicts."""
lines = text.strip().split('\n')
# Find table lines
table_lines = [l for l in lines if '|' in l]
if len(table_lines) < 2:
return []
# Parse header
header = self._parse_row(table_lines[0])
# Skip separator line
data_lines = table_lines[2:] if len(table_lines) > 2 else []
# Parse data rows
results = []
for line in data_lines:
values = self._parse_row(line)
if len(values) == len(header):
results.append(dict(zip(header, values)))
return results
def _parse_row(self, line: str) -> list[str]:
"""Parse a table row."""
cells = line.split('|')
return [c.strip() for c in cells if c.strip()]
def parse_list(self, text: str) -> list[str]:
"""Parse markdown list."""
items = []
for line in text.split('\n'):
# Match list items
match = re.match(r'^[\-\*\d\.]+\s+(.+)$', line.strip())
if match:
items.append(match.group(1))
return items
def parse_code_blocks(self, text: str) -> list[dict]:
"""Extract code blocks with language."""
blocks = []
pattern = r'```(\w*)\s*([\s\S]*?)```'
matches = re.findall(pattern, text)
for lang, code in matches:
blocks.append({
"language": lang or "text",
"code": code.strip()
})
return blocks
class RetryParser:
"""Parse with retry and LLM correction."""
def __init__(self, llm_client: Any, parser: Any):
self.llm = llm_client
self.parser = parser
self.max_retries = 3
async def parse(
self,
text: str,
schema: dict = None
) -> ParseResult:
"""Parse with retry on failure."""
# First attempt
try:
data = self.parser.parse(text)
return ParseResult(success=True, data=data, raw_output=text)
except Exception as e:
pass
# Retry with LLM correction
for attempt in range(self.max_retries):
corrected = await self._correct_with_llm(text, schema)
try:
data = self.parser.parse(corrected)
return ParseResult(success=True, data=data, raw_output=corrected)
except Exception as e:
text = corrected
return ParseResult(
success=False,
error="Failed after retries",
raw_output=text
)
async def _correct_with_llm(
self,
text: str,
schema: dict = None
) -> str:
"""Use LLM to correct malformed output."""
schema_str = json.dumps(schema, indent=2) if schema else "valid JSON"
prompt = f"""The following text should be valid JSON but has formatting errors.
Please fix the JSON and return only the corrected JSON, nothing else.
Expected schema:
{schema_str}
Malformed text:
{text}
Corrected JSON:"""
return await self.llm.generate(prompt)
Output Transformation
from dataclasses import dataclass
from typing import Any, Optional, List, Callable, Dict
import re
class OutputTransformer:
"""Transform LLM output to desired format."""
def __init__(self):
self.transformers: list[Callable] = []
def add_transformer(self, transformer: Callable):
"""Add transformation step."""
self.transformers.append(transformer)
def transform(self, data: Any) -> Any:
"""Apply all transformations."""
result = data
for transformer in self.transformers:
result = transformer(result)
return result
class FieldMapper:
"""Map fields between schemas."""
def __init__(self, mapping: dict[str, str]):
self.mapping = mapping
def map(self, data: dict) -> dict:
"""Map fields according to mapping."""
result = {}
for source, target in self.mapping.items():
if source in data:
result[target] = data[source]
# Include unmapped fields
for key, value in data.items():
if key not in self.mapping:
result[key] = value
return result
class TypeCoercer:
"""Coerce field types."""
def __init__(self, type_map: dict[str, type]):
self.type_map = type_map
def coerce(self, data: dict) -> dict:
"""Coerce types according to map."""
result = {}
for key, value in data.items():
if key in self.type_map:
result[key] = self._coerce_value(value, self.type_map[key])
else:
result[key] = value
return result
def _coerce_value(self, value: Any, target_type: type) -> Any:
"""Coerce single value."""
if value is None:
return None
try:
if target_type == bool:
if isinstance(value, str):
return value.lower() in ('true', 'yes', '1')
return bool(value)
elif target_type == int:
return int(float(value))
elif target_type == float:
return float(value)
elif target_type == str:
return str(value)
elif target_type == list:
if isinstance(value, str):
return [value]
return list(value)
else:
return target_type(value)
except (ValueError, TypeError):
return value
class DefaultFiller:
"""Fill missing fields with defaults."""
def __init__(self, defaults: dict[str, Any]):
self.defaults = defaults
def fill(self, data: dict) -> dict:
"""Fill missing fields."""
result = dict(data)
for key, default in self.defaults.items():
if key not in result or result[key] is None:
result[key] = default
return result
class FieldFilter:
"""Filter fields from output."""
def __init__(
self,
include: list[str] = None,
exclude: list[str] = None
):
self.include = set(include) if include else None
self.exclude = set(exclude) if exclude else set()
def filter(self, data: dict) -> dict:
"""Filter fields."""
result = {}
for key, value in data.items():
if self.include and key not in self.include:
continue
if key in self.exclude:
continue
result[key] = value
return result
class NestedFlattener:
"""Flatten nested structures."""
def __init__(self, separator: str = "."):
self.separator = separator
def flatten(self, data: dict, prefix: str = "") -> dict:
"""Flatten nested dict."""
result = {}
for key, value in data.items():
new_key = f"{prefix}{self.separator}{key}" if prefix else key
if isinstance(value, dict):
result.update(self.flatten(value, new_key))
elif isinstance(value, list):
for i, item in enumerate(value):
if isinstance(item, dict):
result.update(self.flatten(item, f"{new_key}[{i}]"))
else:
result[f"{new_key}[{i}]"] = item
else:
result[new_key] = value
return result
def unflatten(self, data: dict) -> dict:
"""Unflatten to nested dict."""
result = {}
for key, value in data.items():
parts = key.split(self.separator)
current = result
for i, part in enumerate(parts[:-1]):
# Handle array indices
match = re.match(r'(.+)\[(\d+)\]', part)
if match:
name, idx = match.groups()
idx = int(idx)
if name not in current:
current[name] = []
while len(current[name]) <= idx:
current[name].append({})
current = current[name][idx]
else:
if part not in current:
current[part] = {}
current = current[part]
current[parts[-1]] = value
return result
class OutputPipeline:
"""Pipeline for output processing."""
def __init__(self):
self.steps: list[tuple[str, Callable]] = []
def add_step(self, name: str, processor: Callable):
"""Add processing step."""
self.steps.append((name, processor))
def process(self, data: Any) -> dict:
"""Process through pipeline."""
result = data
metadata = {"steps": []}
for name, processor in self.steps:
try:
result = processor(result)
metadata["steps"].append({"name": name, "success": True})
except Exception as e:
metadata["steps"].append({
"name": name,
"success": False,
"error": str(e)
})
raise
return {"data": result, "metadata": metadata}
Production Formatting Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List, Any
import json
app = FastAPI()
class FormatRequest(BaseModel):
text: str
format: str = "json"
schema: Optional[dict] = None
repair: bool = True
class FormatResponse(BaseModel):
success: bool
data: Any = None
error: str = None
class TransformRequest(BaseModel):
data: dict
mapping: Optional[dict] = None
defaults: Optional[dict] = None
include_fields: Optional[List[str]] = None
exclude_fields: Optional[List[str]] = None
# Initialize components
json_extractor = JSONExtractor()
json_repair_parser = JSONRepairParser()
markdown_parser = MarkdownParser()
@app.post("/v1/format/json", response_model=FormatResponse)
async def format_json(request: FormatRequest) -> FormatResponse:
"""Extract and format JSON from text."""
try:
if request.repair:
data = json_repair_parser.parse(request.text)
else:
data = json_extractor.extract(request.text)
# Validate against schema if provided
if request.schema:
validator = SchemaValidator(request.schema)
valid, errors = validator.validate(data)
if not valid:
return FormatResponse(
success=False,
error=f"Schema validation failed: {errors}"
)
return FormatResponse(success=True, data=data)
except Exception as e:
return FormatResponse(success=False, error=str(e))
@app.post("/v1/format/markdown")
async def format_markdown(request: FormatRequest) -> FormatResponse:
"""Extract structured data from markdown."""
try:
if "table" in request.format:
data = markdown_parser.parse_table(request.text)
elif "list" in request.format:
data = markdown_parser.parse_list(request.text)
elif "code" in request.format:
data = markdown_parser.parse_code_blocks(request.text)
else:
data = {
"tables": markdown_parser.parse_table(request.text),
"lists": markdown_parser.parse_list(request.text),
"code_blocks": markdown_parser.parse_code_blocks(request.text)
}
return FormatResponse(success=True, data=data)
except Exception as e:
return FormatResponse(success=False, error=str(e))
@app.post("/v1/transform")
async def transform_data(request: TransformRequest) -> FormatResponse:
"""Transform structured data."""
try:
data = request.data
# Apply mapping
if request.mapping:
mapper = FieldMapper(request.mapping)
data = mapper.map(data)
# Apply defaults
if request.defaults:
filler = DefaultFiller(request.defaults)
data = filler.fill(data)
# Apply field filter
if request.include_fields or request.exclude_fields:
filter = FieldFilter(
include=request.include_fields,
exclude=request.exclude_fields
)
data = filter.filter(data)
return FormatResponse(success=True, data=data)
except Exception as e:
return FormatResponse(success=False, error=str(e))
@app.post("/v1/validate")
async def validate_data(data: dict, schema: dict) -> dict:
"""Validate data against schema."""
validator = SchemaValidator(schema)
valid, errors = validator.validate(data)
return {
"valid": valid,
"errors": errors
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- OpenAI Structured Outputs: https://platform.openai.com/docs/guides/structured-outputs
- Instructor: https://python.useinstructor.com/
- Pydantic: https://docs.pydantic.dev/
- JSON Schema: https://json-schema.org/
Conclusion
Reliable output formatting requires defense in depth. Start with the strongest guarantees your model supports—structured outputs with JSON schema enforcement, function calling, or response_format JSON mode. When these aren’t available or fail, fall back to robust parsing: extract JSON from code blocks, repair common malformations like trailing commas and single quotes, and validate against your expected schema. Build transformation pipelines that map fields, coerce types, fill defaults, and filter to exactly the structure your application needs. For production systems, implement retry logic that uses the LLM itself to correct malformed output—often the model can fix its own mistakes when shown the error. The key insight is that formatting is a spectrum of reliability: native structured outputs are most reliable, followed by function calling, then JSON mode, then prompt engineering with parsing. Use the most reliable method available, but always have fallbacks. Monitor parsing failures in production—they often indicate prompt issues or model behavior changes that need attention.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.
