Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

Text-to-SQL with LLMs: Building Natural Language Database Interfaces

Introduction: Natural language to SQL is one of the most practical LLM applications. Business users can query databases without knowing SQL, analysts can explore data faster, and developers can prototype queries quickly. But naive implementations fail spectacularly—generating invalid SQL, hallucinating table names, or producing queries that return wrong results. This guide covers building robust text-to-SQL systems: providing schema context effectively, validating generated SQL, handling errors gracefully, and implementing safety guardrails to prevent destructive queries.

Text to SQL
Text-to-SQL: From Natural Language to Database Results

Basic Text-to-SQL

from openai import OpenAI
import sqlite3

client = OpenAI()

def get_schema_string(conn: sqlite3.Connection) -> str:
    """Extract schema from SQLite database."""
    
    cursor = conn.execute(
        "SELECT sql FROM sqlite_master WHERE type='table' AND sql IS NOT NULL"
    )
    
    schemas = []
    for row in cursor:
        schemas.append(row[0])
    
    return "\n\n".join(schemas)

def text_to_sql(question: str, schema: str) -> str:
    """Convert natural language to SQL."""
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": f"""You are a SQL expert. Convert natural language questions to SQL queries.

Database schema:
{schema}

Rules:
- Return ONLY the SQL query, no explanation
- Use only tables and columns that exist in the schema
- Use appropriate JOINs when needed
- Handle NULL values appropriately
- Use aliases for readability"""
            },
            {"role": "user", "content": question}
        ],
        temperature=0
    )
    
    sql = response.choices[0].message.content.strip()
    
    # Remove markdown code blocks if present
    if sql.startswith("```"):
        sql = sql.split("\n", 1)[1]
        sql = sql.rsplit("```", 1)[0]
    
    return sql.strip()

def execute_query(conn: sqlite3.Connection, sql: str) -> list[dict]:
    """Execute SQL and return results as list of dicts."""
    
    cursor = conn.execute(sql)
    columns = [desc[0] for desc in cursor.description]
    
    results = []
    for row in cursor:
        results.append(dict(zip(columns, row)))
    
    return results

def ask_database(conn: sqlite3.Connection, question: str) -> dict:
    """Full pipeline: question -> SQL -> results."""
    
    schema = get_schema_string(conn)
    sql = text_to_sql(question, schema)
    
    try:
        results = execute_query(conn, sql)
        return {
            "success": True,
            "sql": sql,
            "results": results,
            "row_count": len(results)
        }
    except Exception as e:
        return {
            "success": False,
            "sql": sql,
            "error": str(e)
        }

# Usage
conn = sqlite3.connect("sales.db")

result = ask_database(conn, "What were total sales by product category last month?")
print(f"SQL: {result['sql']}")
print(f"Results: {result['results']}")

Schema Context Optimization

from dataclasses import dataclass

@dataclass
class TableInfo:
    name: str
    columns: list[dict]  # {"name": str, "type": str, "description": str}
    sample_values: dict[str, list]  # column -> sample values
    row_count: int

def get_enhanced_schema(conn: sqlite3.Connection) -> list[TableInfo]:
    """Get schema with sample values and descriptions."""
    
    tables = []
    
    # Get table names
    cursor = conn.execute(
        "SELECT name FROM sqlite_master WHERE type='table'"
    )
    table_names = [row[0] for row in cursor]
    
    for table_name in table_names:
        # Get columns
        cursor = conn.execute(f"PRAGMA table_info({table_name})")
        columns = []
        for row in cursor:
            columns.append({
                "name": row[1],
                "type": row[2],
                "nullable": not row[3],
                "primary_key": bool(row[5])
            })
        
        # Get sample values
        sample_values = {}
        for col in columns:
            cursor = conn.execute(
                f"SELECT DISTINCT {col['name']} FROM {table_name} LIMIT 5"
            )
            sample_values[col['name']] = [row[0] for row in cursor]
        
        # Get row count
        cursor = conn.execute(f"SELECT COUNT(*) FROM {table_name}")
        row_count = cursor.fetchone()[0]
        
        tables.append(TableInfo(
            name=table_name,
            columns=columns,
            sample_values=sample_values,
            row_count=row_count
        ))
    
    return tables

def format_schema_for_llm(tables: list[TableInfo]) -> str:
    """Format schema optimally for LLM context."""
    
    lines = []
    
    for table in tables:
        lines.append(f"Table: {table.name} ({table.row_count:,} rows)")
        lines.append("Columns:")
        
        for col in table.columns:
            pk = " [PK]" if col.get("primary_key") else ""
            samples = table.sample_values.get(col["name"], [])
            sample_str = f" (e.g., {', '.join(repr(s) for s in samples[:3])})" if samples else ""
            
            lines.append(f"  - {col['name']}: {col['type']}{pk}{sample_str}")
        
        lines.append("")
    
    return "\n".join(lines)

# Selective schema for large databases
def get_relevant_tables(question: str, all_tables: list[TableInfo]) -> list[TableInfo]:
    """Select only relevant tables for the question."""
    
    # Use LLM to identify relevant tables
    table_list = "\n".join(f"- {t.name}: columns {[c['name'] for c in t.columns]}" for t in all_tables)
    
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": f"""Given a question and list of tables, identify which tables are needed.

Tables:
{table_list}

Return only table names, comma-separated."""
            },
            {"role": "user", "content": question}
        ],
        temperature=0
    )
    
    relevant_names = [n.strip() for n in response.choices[0].message.content.split(",")]
    
    return [t for t in all_tables if t.name in relevant_names]

SQL Validation and Error Recovery

import sqlparse
from typing import Optional

class SQLValidator:
    """Validate and sanitize generated SQL."""
    
    DANGEROUS_KEYWORDS = [
        "DROP", "DELETE", "TRUNCATE", "UPDATE", "INSERT",
        "ALTER", "CREATE", "GRANT", "REVOKE", "EXEC"
    ]
    
    def __init__(self, allow_writes: bool = False):
        self.allow_writes = allow_writes
    
    def validate(self, sql: str) -> tuple[bool, Optional[str]]:
        """Validate SQL. Returns (is_valid, error_message)."""
        
        # Parse SQL
        try:
            parsed = sqlparse.parse(sql)
            if not parsed:
                return False, "Could not parse SQL"
        except Exception as e:
            return False, f"Parse error: {e}"
        
        # Check for dangerous operations
        if not self.allow_writes:
            sql_upper = sql.upper()
            for keyword in self.DANGEROUS_KEYWORDS:
                if keyword in sql_upper:
                    return False, f"Dangerous operation not allowed: {keyword}"
        
        # Check for multiple statements
        if len(parsed) > 1:
            return False, "Multiple statements not allowed"
        
        # Check statement type
        stmt = parsed[0]
        if stmt.get_type() not in ("SELECT", "UNKNOWN"):
            if not self.allow_writes:
                return False, f"Only SELECT queries allowed, got: {stmt.get_type()}"
        
        return True, None
    
    def sanitize(self, sql: str) -> str:
        """Sanitize SQL query."""
        
        # Remove comments
        sql = sqlparse.format(sql, strip_comments=True)
        
        # Normalize whitespace
        sql = sqlparse.format(sql, reindent=True)
        
        # Remove trailing semicolons
        sql = sql.rstrip(";")
        
        return sql

def text_to_sql_with_validation(
    question: str,
    schema: str,
    conn: sqlite3.Connection,
    max_retries: int = 3
) -> dict:
    """Generate SQL with validation and error recovery."""
    
    validator = SQLValidator(allow_writes=False)
    
    messages = [
        {
            "role": "system",
            "content": f"""You are a SQL expert. Convert questions to SQL.

Schema:
{schema}

Return ONLY valid SQL. No explanations."""
        },
        {"role": "user", "content": question}
    ]
    
    for attempt in range(max_retries):
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            temperature=0
        )
        
        sql = response.choices[0].message.content.strip()
        
        # Remove code blocks
        if "```" in sql:
            sql = sql.split("```")[1]
            if sql.startswith("sql"):
                sql = sql[3:]
            sql = sql.strip()
        
        # Validate
        is_valid, error = validator.validate(sql)
        
        if not is_valid:
            messages.append({"role": "assistant", "content": sql})
            messages.append({
                "role": "user",
                "content": f"Error: {error}. Please fix the SQL."
            })
            continue
        
        # Try to execute
        try:
            sql = validator.sanitize(sql)
            cursor = conn.execute(sql)
            columns = [desc[0] for desc in cursor.description]
            results = [dict(zip(columns, row)) for row in cursor]
            
            return {
                "success": True,
                "sql": sql,
                "results": results,
                "attempts": attempt + 1
            }
        
        except Exception as e:
            messages.append({"role": "assistant", "content": sql})
            messages.append({
                "role": "user",
                "content": f"SQL execution error: {e}. Please fix."
            })
    
    return {
        "success": False,
        "sql": sql,
        "error": "Max retries exceeded"
    }

Natural Language Response

def generate_natural_response(
    question: str,
    sql: str,
    results: list[dict]
) -> str:
    """Convert SQL results to natural language."""
    
    # Truncate results for context
    results_str = str(results[:20])
    if len(results) > 20:
        results_str += f"\n... and {len(results) - 20} more rows"
    
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": """Convert SQL query results into a natural language answer.
Be concise but complete. Include specific numbers and data points.
If results are empty, say so clearly."""
            },
            {
                "role": "user",
                "content": f"""Question: {question}

SQL executed: {sql}

Results: {results_str}

Provide a natural language answer:"""
            }
        ]
    )
    
    return response.choices[0].message.content

def full_text_to_sql_pipeline(
    conn: sqlite3.Connection,
    question: str
) -> dict:
    """Complete pipeline with natural language response."""
    
    schema = get_schema_string(conn)
    
    # Generate and execute SQL
    result = text_to_sql_with_validation(question, schema, conn)
    
    if not result["success"]:
        return {
            "answer": f"I couldn't answer that question. Error: {result.get('error', 'Unknown')}",
            "sql": result.get("sql"),
            "success": False
        }
    
    # Generate natural response
    answer = generate_natural_response(
        question,
        result["sql"],
        result["results"]
    )
    
    return {
        "answer": answer,
        "sql": result["sql"],
        "results": result["results"],
        "success": True
    }

# Usage
result = full_text_to_sql_pipeline(
    conn,
    "Which products had the highest sales growth compared to last year?"
)

print(f"Answer: {result['answer']}")
print(f"SQL: {result['sql']}")

Production Text-to-SQL Service

from dataclasses import dataclass
from typing import Optional
import hashlib
import json

@dataclass
class QueryResult:
    question: str
    sql: str
    results: list[dict]
    answer: str
    execution_time_ms: float
    cached: bool

class TextToSQLService:
    """Production-ready text-to-SQL service."""
    
    def __init__(
        self,
        conn: sqlite3.Connection,
        cache_enabled: bool = True,
        max_results: int = 1000,
        timeout_seconds: int = 30
    ):
        self.conn = conn
        self.cache_enabled = cache_enabled
        self.max_results = max_results
        self.timeout = timeout_seconds
        self.cache: dict[str, QueryResult] = {}
        
        # Pre-load schema
        self.schema = get_schema_string(conn)
        self.tables = get_enhanced_schema(conn)
    
    def _cache_key(self, question: str) -> str:
        """Generate cache key for question."""
        return hashlib.md5(question.lower().strip().encode()).hexdigest()
    
    def query(self, question: str, use_cache: bool = True) -> QueryResult:
        """Execute text-to-SQL query."""
        import time
        
        start = time.time()
        
        # Check cache
        cache_key = self._cache_key(question)
        if use_cache and self.cache_enabled and cache_key in self.cache:
            cached = self.cache[cache_key]
            return QueryResult(
                question=question,
                sql=cached.sql,
                results=cached.results,
                answer=cached.answer,
                execution_time_ms=(time.time() - start) * 1000,
                cached=True
            )
        
        # Get relevant schema
        relevant_tables = get_relevant_tables(question, self.tables)
        schema = format_schema_for_llm(relevant_tables)
        
        # Generate SQL
        result = text_to_sql_with_validation(question, schema, self.conn)
        
        if not result["success"]:
            raise ValueError(f"Query failed: {result.get('error')}")
        
        # Limit results
        results = result["results"][:self.max_results]
        
        # Generate answer
        answer = generate_natural_response(question, result["sql"], results)
        
        query_result = QueryResult(
            question=question,
            sql=result["sql"],
            results=results,
            answer=answer,
            execution_time_ms=(time.time() - start) * 1000,
            cached=False
        )
        
        # Cache result
        if self.cache_enabled:
            self.cache[cache_key] = query_result
        
        return query_result
    
    def explain_sql(self, sql: str) -> str:
        """Explain what a SQL query does."""
        
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "system",
                    "content": "Explain SQL queries in simple terms for non-technical users."
                },
                {"role": "user", "content": f"Explain this SQL:\n{sql}"}
            ]
        )
        
        return response.choices[0].message.content

# FastAPI endpoint
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

class QueryRequest(BaseModel):
    question: str
    use_cache: bool = True

class QueryResponse(BaseModel):
    answer: str
    sql: str
    row_count: int
    execution_time_ms: float
    cached: bool

service = TextToSQLService(sqlite3.connect("database.db"))

@app.post("/query", response_model=QueryResponse)
async def query_database(request: QueryRequest):
    try:
        result = service.query(request.question, request.use_cache)
        
        return QueryResponse(
            answer=result.answer,
            sql=result.sql,
            row_count=len(result.results),
            execution_time_ms=result.execution_time_ms,
            cached=result.cached
        )
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

References

Conclusion

Text-to-SQL democratizes data access, letting anyone query databases using natural language. The key to success is providing rich schema context—table names, column types, sample values, and relationships. Always validate generated SQL before execution, both for correctness and safety. Implement retry logic with error feedback to recover from generation mistakes. Add natural language response generation to make results accessible to non-technical users. For production systems, cache common queries, limit result sizes, and implement proper timeout handling. Start with a focused domain (one database, specific question types) and expand gradually. The technology is mature enough for production use, but requires careful engineering to handle edge cases and ensure reliability.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

You can use these HTML tags

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

  

  

  

This site uses Akismet to reduce spam. Learn how your comment data is processed.