Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

Knowledge Graphs with LLMs: Building Structured Knowledge from Text

Introduction: Knowledge graphs represent information as entities and relationships, enabling powerful reasoning and querying capabilities. LLMs excel at extracting structured knowledge from unstructured text—identifying entities, relationships, and attributes that can be stored in graph databases. This guide covers building knowledge graphs with LLMs: entity and relation extraction, graph schema design, populating Neo4j and other graph databases, combining graph queries with LLM reasoning, and building GraphRAG systems that leverage structured knowledge for better retrieval. These patterns create AI systems that truly understand the relationships in your data.

Knowledge Graphs
Knowledge Graphs: From Text to Structured Knowledge

Entity Extraction

from openai import OpenAI
from pydantic import BaseModel
from typing import Optional
import json

client = OpenAI()

class Entity(BaseModel):
    name: str
    type: str  # Person, Organization, Location, Product, etc.
    description: Optional[str] = None
    attributes: dict = {}

class Relation(BaseModel):
    source: str  # Entity name
    target: str  # Entity name
    relation_type: str  # works_at, located_in, owns, etc.
    attributes: dict = {}

class KnowledgeExtraction(BaseModel):
    entities: list[Entity]
    relations: list[Relation]

def extract_knowledge(text: str, entity_types: list[str] = None) -> KnowledgeExtraction:
    """Extract entities and relations from text."""
    
    types_hint = f"Focus on these entity types: {', '.join(entity_types)}" if entity_types else ""
    
    prompt = f"""Extract entities and relationships from this text.

{types_hint}

Text: {text}

Return JSON with:
{{
    "entities": [
        {{"name": "entity name", "type": "Person|Organization|Location|Product|Event|Concept", "description": "brief description", "attributes": {{}}}}
    ],
    "relations": [
        {{"source": "entity1 name", "target": "entity2 name", "relation_type": "relationship type", "attributes": {{}}}}
    ]
}}

Common relation types: works_at, located_in, owns, founded, part_of, related_to, created_by, reports_to"""
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        response_format={"type": "json_object"}
    )
    
    data = json.loads(response.choices[0].message.content)
    return KnowledgeExtraction(**data)

# Usage
text = """
Apple Inc., founded by Steve Jobs, Steve Wozniak, and Ronald Wayne in 1976, 
is headquartered in Cupertino, California. Tim Cook has been the CEO since 2011.
The company created the iPhone, which revolutionized the smartphone industry.
"""

knowledge = extract_knowledge(text, entity_types=["Person", "Organization", "Product", "Location"])

print("Entities:")
for entity in knowledge.entities:
    print(f"  {entity.name} ({entity.type})")

print("\nRelations:")
for rel in knowledge.relations:
    print(f"  {rel.source} --[{rel.relation_type}]--> {rel.target}")

Graph Schema Design

from dataclasses import dataclass, field
from typing import Set

@dataclass
class NodeType:
    name: str
    properties: list[str]
    required_properties: list[str] = field(default_factory=list)

@dataclass
class RelationType:
    name: str
    source_types: list[str]
    target_types: list[str]
    properties: list[str] = field(default_factory=list)

class GraphSchema:
    """Define and validate knowledge graph schema."""
    
    def __init__(self):
        self.node_types: dict[str, NodeType] = {}
        self.relation_types: dict[str, RelationType] = {}
    
    def add_node_type(self, node_type: NodeType):
        """Add a node type to schema."""
        self.node_types[node_type.name] = node_type
    
    def add_relation_type(self, relation_type: RelationType):
        """Add a relation type to schema."""
        self.relation_types[relation_type.name] = relation_type
    
    def validate_entity(self, entity: Entity) -> list[str]:
        """Validate an entity against schema."""
        errors = []
        
        if entity.type not in self.node_types:
            errors.append(f"Unknown entity type: {entity.type}")
            return errors
        
        node_type = self.node_types[entity.type]
        
        for required in node_type.required_properties:
            if required not in entity.attributes:
                errors.append(f"Missing required property: {required}")
        
        return errors
    
    def validate_relation(self, relation: Relation, entities: dict[str, Entity]) -> list[str]:
        """Validate a relation against schema."""
        errors = []
        
        if relation.relation_type not in self.relation_types:
            errors.append(f"Unknown relation type: {relation.relation_type}")
            return errors
        
        rel_type = self.relation_types[relation.relation_type]
        
        source_entity = entities.get(relation.source)
        target_entity = entities.get(relation.target)
        
        if not source_entity:
            errors.append(f"Source entity not found: {relation.source}")
        elif source_entity.type not in rel_type.source_types:
            errors.append(f"Invalid source type for {relation.relation_type}: {source_entity.type}")
        
        if not target_entity:
            errors.append(f"Target entity not found: {relation.target}")
        elif target_entity.type not in rel_type.target_types:
            errors.append(f"Invalid target type for {relation.relation_type}: {target_entity.type}")
        
        return errors
    
    def to_cypher_schema(self) -> str:
        """Generate Cypher schema constraints."""
        statements = []
        
        for name, node_type in self.node_types.items():
            # Create constraint for required properties
            for prop in node_type.required_properties:
                statements.append(
                    f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:{name}) REQUIRE n.{prop} IS NOT NULL"
                )
        
        return ";\n".join(statements)

# Define schema
schema = GraphSchema()

schema.add_node_type(NodeType(
    name="Person",
    properties=["name", "title", "birth_date"],
    required_properties=["name"]
))

schema.add_node_type(NodeType(
    name="Organization",
    properties=["name", "founded", "industry", "headquarters"],
    required_properties=["name"]
))

schema.add_node_type(NodeType(
    name="Product",
    properties=["name", "category", "release_date"],
    required_properties=["name"]
))

schema.add_relation_type(RelationType(
    name="works_at",
    source_types=["Person"],
    target_types=["Organization"],
    properties=["role", "start_date", "end_date"]
))

schema.add_relation_type(RelationType(
    name="created",
    source_types=["Person", "Organization"],
    target_types=["Product"],
    properties=["year"]
))

Neo4j Integration

# pip install neo4j

from neo4j import GraphDatabase

class Neo4jKnowledgeGraph:
    """Knowledge graph backed by Neo4j."""
    
    def __init__(self, uri: str, user: str, password: str):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))
    
    def close(self):
        self.driver.close()
    
    def add_entity(self, entity: Entity):
        """Add an entity node to the graph."""
        
        with self.driver.session() as session:
            # Build properties
            props = {"name": entity.name}
            if entity.description:
                props["description"] = entity.description
            props.update(entity.attributes)
            
            # Create node with dynamic label
            query = f"""
            MERGE (n:{entity.type} {{name: $name}})
            SET n += $props
            RETURN n
            """
            
            session.run(query, name=entity.name, props=props)
    
    def add_relation(self, relation: Relation):
        """Add a relationship between entities."""
        
        with self.driver.session() as session:
            # Create relationship with dynamic type
            query = f"""
            MATCH (source {{name: $source_name}})
            MATCH (target {{name: $target_name}})
            MERGE (source)-[r:{relation.relation_type}]->(target)
            SET r += $props
            RETURN r
            """
            
            session.run(
                query,
                source_name=relation.source,
                target_name=relation.target,
                props=relation.attributes
            )
    
    def add_knowledge(self, extraction: KnowledgeExtraction):
        """Add extracted knowledge to graph."""
        
        # Add entities first
        for entity in extraction.entities:
            self.add_entity(entity)
        
        # Then add relations
        for relation in extraction.relations:
            self.add_relation(relation)
    
    def query(self, cypher: str, params: dict = None) -> list[dict]:
        """Execute a Cypher query."""
        
        with self.driver.session() as session:
            result = session.run(cypher, params or {})
            return [record.data() for record in result]
    
    def find_related(self, entity_name: str, max_depth: int = 2) -> list[dict]:
        """Find entities related to a given entity."""
        
        query = """
        MATCH path = (start {name: $name})-[*1..$depth]-(related)
        RETURN DISTINCT related.name as name, labels(related)[0] as type,
               length(path) as distance
        ORDER BY distance
        LIMIT 20
        """
        
        return self.query(query, {"name": entity_name, "depth": max_depth})
    
    def get_path(self, source: str, target: str) -> list[dict]:
        """Find shortest path between two entities."""
        
        query = """
        MATCH path = shortestPath((a {name: $source})-[*]-(b {name: $target}))
        RETURN [node in nodes(path) | node.name] as nodes,
               [rel in relationships(path) | type(rel)] as relations
        """
        
        return self.query(query, {"source": source, "target": target})

# Usage
kg = Neo4jKnowledgeGraph(
    uri="bolt://localhost:7687",
    user="neo4j",
    password="password"
)

# Add extracted knowledge
kg.add_knowledge(knowledge)

# Query the graph
related = kg.find_related("Apple Inc.", max_depth=2)
for item in related:
    print(f"{item['name']} ({item['type']}) - distance: {item['distance']}")

GraphRAG: Combining Graphs with LLMs

class GraphRAG:
    """RAG system using knowledge graph for retrieval."""
    
    def __init__(self, kg: Neo4jKnowledgeGraph):
        self.kg = kg
    
    def extract_query_entities(self, query: str) -> list[str]:
        """Extract entity names from a query."""
        
        prompt = f"""Extract entity names mentioned in this query.
Return JSON: {{"entities": ["entity1", "entity2"]}}

Query: {query}"""
        
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            response_format={"type": "json_object"}
        )
        
        data = json.loads(response.choices[0].message.content)
        return data.get("entities", [])
    
    def get_graph_context(self, entities: list[str], max_depth: int = 2) -> str:
        """Get relevant graph context for entities."""
        
        context_parts = []
        
        for entity_name in entities:
            # Get entity info
            entity_query = """
            MATCH (n {name: $name})
            RETURN n, labels(n)[0] as type
            """
            results = self.kg.query(entity_query, {"name": entity_name})
            
            if results:
                entity_data = results[0]
                context_parts.append(f"Entity: {entity_name} (Type: {entity_data['type']})")
            
            # Get related entities
            related = self.kg.find_related(entity_name, max_depth=max_depth)
            
            if related:
                relations_text = ", ".join([
                    f"{r['name']} ({r['type']})"
                    for r in related[:10]
                ])
                context_parts.append(f"Related to {entity_name}: {relations_text}")
            
            # Get direct relationships
            rel_query = """
            MATCH (n {name: $name})-[r]->(m)
            RETURN type(r) as relation, m.name as target, labels(m)[0] as target_type
            UNION
            MATCH (n {name: $name})<-[r]-(m)
            RETURN type(r) as relation, m.name as target, labels(m)[0] as target_type
            """
            relationships = self.kg.query(rel_query, {"name": entity_name})
            
            for rel in relationships[:10]:
                context_parts.append(
                    f"{entity_name} --[{rel['relation']}]--> {rel['target']} ({rel['target_type']})"
                )
        
        return "\n".join(context_parts)
    
    def query(self, question: str) -> str:
        """Answer a question using graph knowledge."""
        
        # Extract entities from question
        entities = self.extract_query_entities(question)
        
        # Get graph context
        graph_context = self.get_graph_context(entities)
        
        # Generate answer
        prompt = f"""Answer the question using the knowledge graph context.

Knowledge Graph Context:
{graph_context}

Question: {question}

If the context doesn't contain enough information, say so."""
        
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}]
        )
        
        return response.choices[0].message.content

# Usage
graph_rag = GraphRAG(kg)

answer = graph_rag.query("Who founded Apple and what products did they create?")
print(answer)

Incremental Graph Building

class IncrementalKnowledgeBuilder:
    """Build knowledge graph incrementally from documents."""
    
    def __init__(self, kg: Neo4jKnowledgeGraph, schema: GraphSchema):
        self.kg = kg
        self.schema = schema
        self.processed_docs: set[str] = set()
    
    def process_document(self, doc_id: str, text: str) -> dict:
        """Process a document and add to graph."""
        
        if doc_id in self.processed_docs:
            return {"status": "skipped", "reason": "already processed"}
        
        # Extract knowledge
        extraction = extract_knowledge(
            text,
            entity_types=list(self.schema.node_types.keys())
        )
        
        # Validate against schema
        entities_dict = {e.name: e for e in extraction.entities}
        
        valid_entities = []
        valid_relations = []
        errors = []
        
        for entity in extraction.entities:
            entity_errors = self.schema.validate_entity(entity)
            if entity_errors:
                errors.extend(entity_errors)
            else:
                valid_entities.append(entity)
        
        for relation in extraction.relations:
            rel_errors = self.schema.validate_relation(relation, entities_dict)
            if rel_errors:
                errors.extend(rel_errors)
            else:
                valid_relations.append(relation)
        
        # Add valid knowledge to graph
        for entity in valid_entities:
            self.kg.add_entity(entity)
        
        for relation in valid_relations:
            self.kg.add_relation(relation)
        
        self.processed_docs.add(doc_id)
        
        return {
            "status": "processed",
            "entities_added": len(valid_entities),
            "relations_added": len(valid_relations),
            "validation_errors": errors
        }
    
    def process_batch(self, documents: list[dict]) -> dict:
        """Process multiple documents."""
        
        results = {
            "total": len(documents),
            "processed": 0,
            "entities": 0,
            "relations": 0,
            "errors": []
        }
        
        for doc in documents:
            result = self.process_document(doc["id"], doc["text"])
            
            if result["status"] == "processed":
                results["processed"] += 1
                results["entities"] += result["entities_added"]
                results["relations"] += result["relations_added"]
                results["errors"].extend(result.get("validation_errors", []))
        
        return results

# Usage
builder = IncrementalKnowledgeBuilder(kg, schema)

documents = [
    {"id": "doc1", "text": "Microsoft was founded by Bill Gates and Paul Allen in 1975..."},
    {"id": "doc2", "text": "Google, founded by Larry Page and Sergey Brin, created Android..."},
]

results = builder.process_batch(documents)
print(f"Processed {results['processed']} documents")
print(f"Added {results['entities']} entities and {results['relations']} relations")

Production Knowledge Graph Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

class ExtractRequest(BaseModel):
    text: str
    entity_types: Optional[list[str]] = None

class QueryRequest(BaseModel):
    question: str
    max_depth: int = 2

class GraphQueryRequest(BaseModel):
    cypher: str
    params: Optional[dict] = None

@app.post("/extract")
async def extract_and_add(request: ExtractRequest):
    """Extract knowledge from text and add to graph."""
    
    extraction = extract_knowledge(request.text, request.entity_types)
    kg.add_knowledge(extraction)
    
    return {
        "entities": [e.model_dump() for e in extraction.entities],
        "relations": [r.model_dump() for r in extraction.relations]
    }

@app.post("/query")
async def query_graph(request: QueryRequest):
    """Query the knowledge graph with natural language."""
    
    answer = graph_rag.query(request.question)
    
    # Also return the entities found
    entities = graph_rag.extract_query_entities(request.question)
    
    return {
        "answer": answer,
        "entities_found": entities
    }

@app.post("/cypher")
async def execute_cypher(request: GraphQueryRequest):
    """Execute raw Cypher query."""
    
    try:
        results = kg.query(request.cypher, request.params)
        return {"results": results}
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/entity/{name}")
async def get_entity(name: str, depth: int = 2):
    """Get entity and its relationships."""
    
    related = kg.find_related(name, max_depth=depth)
    
    return {
        "entity": name,
        "related": related
    }

@app.get("/stats")
async def get_stats():
    """Get graph statistics."""
    
    stats_query = """
    MATCH (n)
    WITH labels(n)[0] as type, count(*) as count
    RETURN type, count
    ORDER BY count DESC
    """
    
    node_stats = kg.query(stats_query)
    
    rel_query = """
    MATCH ()-[r]->()
    WITH type(r) as type, count(*) as count
    RETURN type, count
    ORDER BY count DESC
    """
    
    rel_stats = kg.query(rel_query)
    
    return {
        "node_types": node_stats,
        "relation_types": rel_stats
    }

References

Conclusion

Knowledge graphs combined with LLMs create powerful systems that understand relationships in your data. Use LLMs to extract entities and relations from unstructured text, then store them in graph databases like Neo4j for efficient querying. Define schemas to ensure data quality and consistency. Build GraphRAG systems that leverage graph structure for better retrieval than pure vector search. The combination of structured graph knowledge and LLM reasoning enables applications that can answer complex questions about relationships, trace connections, and reason over interconnected information in ways that traditional RAG cannot match.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

You can use these HTML tags

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

  

  

  

This site uses Akismet to reduce spam. Learn how your comment data is processed.