Introduction: Vector search finds semantically similar content, but it misses the structured relationships that make knowledge truly useful. Knowledge graphs capture entities and their relationships explicitly—who works where, what depends on what, how concepts connect. Combining knowledge graphs with LLMs creates systems that can reason over structured relationships while generating natural language responses. This guide covers practical knowledge graph integration: extracting entities and relationships from text, building and querying graph databases, using graph context to enhance LLM responses, and patterns for keeping graphs synchronized with source documents. Whether you’re building enterprise search, question answering, or research assistants, knowledge graphs add a dimension of understanding that pure vector search cannot provide.

Entity Extraction
from dataclasses import dataclass, field
from typing import Any, Optional
from enum import Enum
import json
class EntityType(Enum):
"""Types of entities."""
PERSON = "person"
ORGANIZATION = "organization"
LOCATION = "location"
PRODUCT = "product"
CONCEPT = "concept"
EVENT = "event"
TECHNOLOGY = "technology"
@dataclass
class Entity:
"""An extracted entity."""
id: str
name: str
entity_type: EntityType
aliases: list[str] = field(default_factory=list)
properties: dict = field(default_factory=dict)
source_text: str = None
confidence: float = 1.0
@dataclass
class Relation:
"""A relation between entities."""
source_id: str
target_id: str
relation_type: str
properties: dict = field(default_factory=dict)
source_text: str = None
confidence: float = 1.0
class LLMEntityExtractor:
"""Extract entities using LLM."""
def __init__(self, client: Any, model: str = "gpt-4o"):
self.client = client
self.model = model
async def extract(
self,
text: str,
entity_types: list[EntityType] = None
) -> list[Entity]:
"""Extract entities from text."""
types = entity_types or list(EntityType)
type_list = ", ".join(t.value for t in types)
prompt = f"""Extract entities from this text.
Entity types to extract: {type_list}
Text:
{text}
Return JSON array of entities:
[
{{
"name": "entity name",
"type": "entity_type",
"aliases": ["other names"],
"properties": {{"key": "value"}}
}}
]
Only extract entities that are clearly mentioned. Be precise."""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0
)
# Parse response
content = response.choices[0].message.content
# Extract JSON
import re
json_match = re.search(r'\[[\s\S]*\]', content)
if not json_match:
return []
entities_data = json.loads(json_match.group(0))
entities = []
for i, e in enumerate(entities_data):
entity = Entity(
id=f"entity_{i}_{hash(e['name']) % 10000}",
name=e["name"],
entity_type=EntityType(e["type"]),
aliases=e.get("aliases", []),
properties=e.get("properties", {}),
source_text=text[:200]
)
entities.append(entity)
return entities
class LLMRelationExtractor:
"""Extract relations using LLM."""
def __init__(self, client: Any, model: str = "gpt-4o"):
self.client = client
self.model = model
async def extract(
self,
text: str,
entities: list[Entity]
) -> list[Relation]:
"""Extract relations between entities."""
entity_list = "\n".join([
f"- {e.name} ({e.entity_type.value})"
for e in entities
])
prompt = f"""Extract relationships between these entities from the text.
Entities:
{entity_list}
Text:
{text}
Return JSON array of relationships:
[
{{
"source": "source entity name",
"target": "target entity name",
"relation": "relationship type",
"properties": {{"key": "value"}}
}}
]
Common relation types: works_at, located_in, part_of, created_by, depends_on, related_to, manages, owns, uses"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0
)
# Parse response
content = response.choices[0].message.content
import re
json_match = re.search(r'\[[\s\S]*\]', content)
if not json_match:
return []
relations_data = json.loads(json_match.group(0))
# Map entity names to IDs
name_to_id = {e.name.lower(): e.id for e in entities}
for e in entities:
for alias in e.aliases:
name_to_id[alias.lower()] = e.id
relations = []
for r in relations_data:
source_id = name_to_id.get(r["source"].lower())
target_id = name_to_id.get(r["target"].lower())
if source_id and target_id:
relation = Relation(
source_id=source_id,
target_id=target_id,
relation_type=r["relation"],
properties=r.get("properties", {}),
source_text=text[:200]
)
relations.append(relation)
return relations
class KnowledgeExtractor:
"""Combined entity and relation extraction."""
def __init__(self, client: Any, model: str = "gpt-4o"):
self.entity_extractor = LLMEntityExtractor(client, model)
self.relation_extractor = LLMRelationExtractor(client, model)
async def extract(
self,
text: str
) -> tuple[list[Entity], list[Relation]]:
"""Extract entities and relations."""
# Extract entities first
entities = await self.entity_extractor.extract(text)
# Then extract relations
relations = await self.relation_extractor.extract(text, entities)
return entities, relations
Graph Database Integration
from dataclasses import dataclass
from typing import Any, Optional
from abc import ABC, abstractmethod
class GraphStore(ABC):
"""Abstract graph store interface."""
@abstractmethod
async def add_entity(self, entity: Entity):
"""Add an entity to the graph."""
pass
@abstractmethod
async def add_relation(self, relation: Relation):
"""Add a relation to the graph."""
pass
@abstractmethod
async def get_entity(self, entity_id: str) -> Optional[Entity]:
"""Get an entity by ID."""
pass
@abstractmethod
async def query(self, cypher: str) -> list[dict]:
"""Execute a graph query."""
pass
class Neo4jGraphStore(GraphStore):
"""Neo4j graph database integration."""
def __init__(self, uri: str, user: str, password: str):
from neo4j import AsyncGraphDatabase
self.driver = AsyncGraphDatabase.driver(uri, auth=(user, password))
async def add_entity(self, entity: Entity):
"""Add entity to Neo4j."""
query = """
MERGE (e:Entity {id: $id})
SET e.name = $name,
e.type = $type,
e.aliases = $aliases,
e.properties = $properties
"""
async with self.driver.session() as session:
await session.run(
query,
id=entity.id,
name=entity.name,
type=entity.entity_type.value,
aliases=entity.aliases,
properties=json.dumps(entity.properties)
)
async def add_relation(self, relation: Relation):
"""Add relation to Neo4j."""
query = f"""
MATCH (source:Entity {{id: $source_id}})
MATCH (target:Entity {{id: $target_id}})
MERGE (source)-[r:{relation.relation_type.upper()}]->(target)
SET r.properties = $properties
"""
async with self.driver.session() as session:
await session.run(
query,
source_id=relation.source_id,
target_id=relation.target_id,
properties=json.dumps(relation.properties)
)
async def get_entity(self, entity_id: str) -> Optional[Entity]:
"""Get entity from Neo4j."""
query = """
MATCH (e:Entity {id: $id})
RETURN e
"""
async with self.driver.session() as session:
result = await session.run(query, id=entity_id)
record = await result.single()
if not record:
return None
node = record["e"]
return Entity(
id=node["id"],
name=node["name"],
entity_type=EntityType(node["type"]),
aliases=node.get("aliases", []),
properties=json.loads(node.get("properties", "{}"))
)
async def query(self, cypher: str) -> list[dict]:
"""Execute Cypher query."""
async with self.driver.session() as session:
result = await session.run(cypher)
records = await result.data()
return records
async def get_neighbors(
self,
entity_id: str,
depth: int = 1
) -> list[dict]:
"""Get neighboring entities."""
query = f"""
MATCH (e:Entity {{id: $id}})-[r*1..{depth}]-(neighbor:Entity)
RETURN DISTINCT neighbor, r
"""
return await self.query(query.replace("$id", f'"{entity_id}"'))
class InMemoryGraphStore(GraphStore):
"""In-memory graph store for testing."""
def __init__(self):
self._entities: dict[str, Entity] = {}
self._relations: list[Relation] = []
self._adjacency: dict[str, list[str]] = {}
async def add_entity(self, entity: Entity):
"""Add entity to memory."""
self._entities[entity.id] = entity
if entity.id not in self._adjacency:
self._adjacency[entity.id] = []
async def add_relation(self, relation: Relation):
"""Add relation to memory."""
self._relations.append(relation)
if relation.source_id not in self._adjacency:
self._adjacency[relation.source_id] = []
self._adjacency[relation.source_id].append(relation.target_id)
async def get_entity(self, entity_id: str) -> Optional[Entity]:
"""Get entity from memory."""
return self._entities.get(entity_id)
async def query(self, cypher: str) -> list[dict]:
"""Simple query support."""
# Simplified - would need proper Cypher parser
return []
async def get_neighbors(
self,
entity_id: str,
depth: int = 1
) -> list[Entity]:
"""Get neighboring entities."""
visited = set()
to_visit = [entity_id]
neighbors = []
for _ in range(depth):
next_visit = []
for eid in to_visit:
if eid in visited:
continue
visited.add(eid)
for neighbor_id in self._adjacency.get(eid, []):
if neighbor_id not in visited:
next_visit.append(neighbor_id)
entity = self._entities.get(neighbor_id)
if entity:
neighbors.append(entity)
to_visit = next_visit
return neighbors
Graph-Enhanced RAG
from dataclasses import dataclass
from typing import Any, Optional
@dataclass
class GraphContext:
"""Context from knowledge graph."""
entities: list[Entity]
relations: list[Relation]
paths: list[list[str]]
def to_text(self) -> str:
"""Convert to text for LLM context."""
lines = ["Knowledge Graph Context:"]
# Entities
lines.append("\nEntities:")
for e in self.entities:
props = ", ".join(f"{k}: {v}" for k, v in e.properties.items())
lines.append(f"- {e.name} ({e.entity_type.value}): {props}")
# Relations
lines.append("\nRelationships:")
for r in self.relations:
source = next((e.name for e in self.entities if e.id == r.source_id), r.source_id)
target = next((e.name for e in self.entities if e.id == r.target_id), r.target_id)
lines.append(f"- {source} --[{r.relation_type}]--> {target}")
return "\n".join(lines)
class GraphRAG:
"""RAG enhanced with knowledge graph."""
def __init__(
self,
client: Any,
graph_store: GraphStore,
vector_store: Any,
extractor: KnowledgeExtractor
):
self.client = client
self.graph_store = graph_store
self.vector_store = vector_store
self.extractor = extractor
async def index_document(self, doc_id: str, text: str):
"""Index document in both vector and graph stores."""
# Extract entities and relations
entities, relations = await self.extractor.extract(text)
# Add to graph
for entity in entities:
entity.properties["doc_id"] = doc_id
await self.graph_store.add_entity(entity)
for relation in relations:
relation.properties["doc_id"] = doc_id
await self.graph_store.add_relation(relation)
# Add to vector store (assuming it handles embedding)
await self.vector_store.add(doc_id, text)
async def query(
self,
question: str,
k_vector: int = 5,
k_graph: int = 10
) -> str:
"""Query using both vector and graph retrieval."""
# Vector retrieval
vector_results = await self.vector_store.search(question, k=k_vector)
# Extract entities from question
question_entities, _ = await self.extractor.extract(question)
# Graph retrieval
graph_context = await self._get_graph_context(
question_entities,
k_graph
)
# Combine contexts
vector_context = "\n\n".join([r["content"] for r in vector_results])
graph_text = graph_context.to_text()
# Generate response
prompt = f"""Answer the question using the provided context.
Vector Search Results:
{vector_context}
{graph_text}
Question: {question}
Answer:"""
response = await self.client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
temperature=0.3
)
return response.choices[0].message.content
async def _get_graph_context(
self,
entities: list[Entity],
k: int
) -> GraphContext:
"""Get context from knowledge graph."""
all_entities = []
all_relations = []
for entity in entities:
# Find matching entities in graph
# This would use entity resolution in practice
neighbors = await self.graph_store.get_neighbors(entity.id, depth=2)
all_entities.extend(neighbors if isinstance(neighbors, list) else [])
# Deduplicate
seen_ids = set()
unique_entities = []
for e in all_entities[:k]:
if isinstance(e, Entity) and e.id not in seen_ids:
seen_ids.add(e.id)
unique_entities.append(e)
return GraphContext(
entities=unique_entities,
relations=all_relations,
paths=[]
)
class GraphQueryGenerator:
"""Generate graph queries from natural language."""
def __init__(self, client: Any, model: str = "gpt-4o"):
self.client = client
self.model = model
async def generate_cypher(
self,
question: str,
schema: str
) -> str:
"""Generate Cypher query from question."""
prompt = f"""Convert this question to a Cypher query.
Graph Schema:
{schema}
Question: {question}
Return only the Cypher query, no explanation."""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0
)
return response.choices[0].message.content.strip()
async def answer_with_graph(
self,
question: str,
graph_store: GraphStore,
schema: str
) -> str:
"""Answer question using graph query."""
# Generate query
cypher = await self.generate_cypher(question, schema)
# Execute query
results = await graph_store.query(cypher)
# Generate natural language answer
prompt = f"""Answer the question based on these graph query results.
Question: {question}
Query Results:
{json.dumps(results, indent=2)}
Provide a natural language answer."""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.3
)
return response.choices[0].message.content
Graph Synchronization
from dataclasses import dataclass
from typing import Any, Optional
from datetime import datetime
from enum import Enum
class ChangeType(Enum):
"""Types of changes."""
ADD = "add"
UPDATE = "update"
DELETE = "delete"
@dataclass
class GraphChange:
"""A change to the graph."""
change_type: ChangeType
entity_id: str = None
relation_id: str = None
old_value: Any = None
new_value: Any = None
timestamp: datetime = None
source_doc: str = None
class GraphSynchronizer:
"""Keep graph synchronized with source documents."""
def __init__(
self,
graph_store: GraphStore,
extractor: KnowledgeExtractor
):
self.graph_store = graph_store
self.extractor = extractor
self._doc_entities: dict[str, set[str]] = {}
self._doc_relations: dict[str, set[str]] = {}
async def sync_document(
self,
doc_id: str,
text: str
) -> list[GraphChange]:
"""Synchronize document with graph."""
changes = []
# Extract current entities and relations
new_entities, new_relations = await self.extractor.extract(text)
# Get existing entities for this document
old_entity_ids = self._doc_entities.get(doc_id, set())
new_entity_ids = {e.id for e in new_entities}
# Find additions
for entity in new_entities:
if entity.id not in old_entity_ids:
await self.graph_store.add_entity(entity)
changes.append(GraphChange(
change_type=ChangeType.ADD,
entity_id=entity.id,
new_value=entity,
timestamp=datetime.utcnow(),
source_doc=doc_id
))
# Find deletions
for old_id in old_entity_ids - new_entity_ids:
# Mark as deleted or remove
changes.append(GraphChange(
change_type=ChangeType.DELETE,
entity_id=old_id,
timestamp=datetime.utcnow(),
source_doc=doc_id
))
# Update tracking
self._doc_entities[doc_id] = new_entity_ids
# Handle relations similarly
for relation in new_relations:
await self.graph_store.add_relation(relation)
return changes
async def delete_document(self, doc_id: str) -> list[GraphChange]:
"""Remove document's entities from graph."""
changes = []
entity_ids = self._doc_entities.get(doc_id, set())
for entity_id in entity_ids:
changes.append(GraphChange(
change_type=ChangeType.DELETE,
entity_id=entity_id,
timestamp=datetime.utcnow(),
source_doc=doc_id
))
# Clean up tracking
if doc_id in self._doc_entities:
del self._doc_entities[doc_id]
if doc_id in self._doc_relations:
del self._doc_relations[doc_id]
return changes
class EntityResolver:
"""Resolve entities across documents."""
def __init__(self, client: Any, model: str = "gpt-4o-mini"):
self.client = client
self.model = model
self._canonical: dict[str, str] = {} # alias -> canonical
async def resolve(
self,
entity: Entity,
candidates: list[Entity]
) -> Optional[Entity]:
"""Find matching entity from candidates."""
if not candidates:
return None
# Check exact match first
for candidate in candidates:
if entity.name.lower() == candidate.name.lower():
return candidate
if entity.name.lower() in [a.lower() for a in candidate.aliases]:
return candidate
# Use LLM for fuzzy matching
candidate_list = "\n".join([
f"{i+1}. {c.name} ({c.entity_type.value})"
for i, c in enumerate(candidates)
])
prompt = f"""Does this entity match any of the candidates?
Entity: {entity.name} ({entity.entity_type.value})
Candidates:
{candidate_list}
If there's a match, return the number. If no match, return 0."""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0
)
try:
match_idx = int(response.choices[0].message.content.strip()) - 1
if 0 <= match_idx < len(candidates):
return candidates[match_idx]
except ValueError:
pass
return None
def merge_entities(
self,
primary: Entity,
secondary: Entity
) -> Entity:
"""Merge two entities."""
# Combine aliases
all_aliases = set(primary.aliases + secondary.aliases)
all_aliases.add(secondary.name)
all_aliases.discard(primary.name)
# Merge properties
merged_props = {**secondary.properties, **primary.properties}
return Entity(
id=primary.id,
name=primary.name,
entity_type=primary.entity_type,
aliases=list(all_aliases),
properties=merged_props,
confidence=max(primary.confidence, secondary.confidence)
)
Production Knowledge Graph Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
# Initialize components
client = None # Initialize with OpenAI client
graph_store = InMemoryGraphStore()
extractor = KnowledgeExtractor(client)
synchronizer = GraphSynchronizer(graph_store, extractor)
class ExtractRequest(BaseModel):
text: str
entity_types: Optional[list[str]] = None
class IndexRequest(BaseModel):
doc_id: str
text: str
class QueryRequest(BaseModel):
question: str
use_graph: bool = True
k: int = 10
class EntityRequest(BaseModel):
name: str
entity_type: str
aliases: Optional[list[str]] = None
properties: Optional[dict] = None
class RelationRequest(BaseModel):
source_id: str
target_id: str
relation_type: str
properties: Optional[dict] = None
@app.post("/v1/extract")
async def extract_knowledge(request: ExtractRequest):
"""Extract entities and relations from text."""
entity_types = None
if request.entity_types:
entity_types = [EntityType(t) for t in request.entity_types]
entities, relations = await extractor.extract(request.text)
return {
"entities": [
{
"id": e.id,
"name": e.name,
"type": e.entity_type.value,
"aliases": e.aliases,
"properties": e.properties
}
for e in entities
],
"relations": [
{
"source_id": r.source_id,
"target_id": r.target_id,
"relation_type": r.relation_type,
"properties": r.properties
}
for r in relations
]
}
@app.post("/v1/index")
async def index_document(request: IndexRequest):
"""Index document in knowledge graph."""
changes = await synchronizer.sync_document(request.doc_id, request.text)
return {
"doc_id": request.doc_id,
"changes": len(changes),
"change_types": {
"add": sum(1 for c in changes if c.change_type == ChangeType.ADD),
"update": sum(1 for c in changes if c.change_type == ChangeType.UPDATE),
"delete": sum(1 for c in changes if c.change_type == ChangeType.DELETE)
}
}
@app.post("/v1/entities")
async def add_entity(request: EntityRequest):
"""Add entity to graph."""
entity = Entity(
id=f"entity_{hash(request.name) % 100000}",
name=request.name,
entity_type=EntityType(request.entity_type),
aliases=request.aliases or [],
properties=request.properties or {}
)
await graph_store.add_entity(entity)
return {
"id": entity.id,
"name": entity.name,
"type": entity.entity_type.value
}
@app.get("/v1/entities/{entity_id}")
async def get_entity(entity_id: str):
"""Get entity by ID."""
entity = await graph_store.get_entity(entity_id)
if not entity:
raise HTTPException(404, "Entity not found")
return {
"id": entity.id,
"name": entity.name,
"type": entity.entity_type.value,
"aliases": entity.aliases,
"properties": entity.properties
}
@app.get("/v1/entities/{entity_id}/neighbors")
async def get_neighbors(entity_id: str, depth: int = 1):
"""Get neighboring entities."""
neighbors = await graph_store.get_neighbors(entity_id, depth)
return {
"entity_id": entity_id,
"depth": depth,
"neighbors": [
{
"id": n.id if isinstance(n, Entity) else n.get("id"),
"name": n.name if isinstance(n, Entity) else n.get("name"),
"type": n.entity_type.value if isinstance(n, Entity) else n.get("type")
}
for n in neighbors
] if neighbors else []
}
@app.post("/v1/relations")
async def add_relation(request: RelationRequest):
"""Add relation to graph."""
relation = Relation(
source_id=request.source_id,
target_id=request.target_id,
relation_type=request.relation_type,
properties=request.properties or {}
)
await graph_store.add_relation(relation)
return {
"source_id": relation.source_id,
"target_id": relation.target_id,
"relation_type": relation.relation_type
}
@app.post("/v1/query/cypher")
async def query_cypher(cypher: str):
"""Execute Cypher query."""
results = await graph_store.query(cypher)
return {"results": results}
@app.delete("/v1/documents/{doc_id}")
async def delete_document(doc_id: str):
"""Delete document from graph."""
changes = await synchronizer.delete_document(doc_id)
return {
"doc_id": doc_id,
"deleted_entities": len(changes)
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- Neo4j: https://neo4j.com/
- LlamaIndex Knowledge Graphs: https://docs.llamaindex.ai/en/stable/examples/index_structs/knowledge_graph/
- GraphRAG: https://github.com/microsoft/graphrag
- Knowledge Graph Embeddings: https://arxiv.org/abs/2002.00388
Conclusion
Knowledge graphs add structured reasoning to LLM applications. Start with entity extraction—use LLMs to identify people, organizations, concepts, and other entities from your documents. Extract relationships between entities to build a connected graph of knowledge. Store in a graph database like Neo4j for efficient traversal and querying. Combine graph context with vector search for enhanced RAG—the graph provides structured relationships while vectors capture semantic similarity. Generate Cypher queries from natural language to answer questions that require multi-hop reasoning. Keep your graph synchronized with source documents using change tracking and entity resolution. The key insight is that knowledge graphs capture information that’s implicit in text but explicit in structure—who reports to whom, what depends on what, how concepts relate. This structured knowledge enables reasoning patterns that pure text retrieval cannot support. Build knowledge graph capabilities incrementally, starting with entity extraction and expanding to full graph-enhanced RAG as your needs grow.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.