Skip to main content
โšก Calmops

Neo4j for AI: Knowledge Graphs, Machine Learning, and RAG Pipelines

Introduction

Artificial intelligence applications increasingly require structured knowledge and complex relationship reasoning that traditional databases cannot provide. Neo4j’s graph database is uniquely positioned to power AI applications, from building knowledge graphs that augment LLMs to enabling graph neural networks and feature engineering for machine learning. This article explores how to leverage Neo4j for AI applications.

Knowledge Graphs for AI

Knowledge graphs provide structured, interpretable representations of domain knowledge that AI systems can reason over.

Building Domain Knowledge Graphs

// Create a domain knowledge graph
CREATE 
  // Entities
  (ai:Concept {name: 'Artificial Intelligence', category: 'field'}),
  (ml:Concept {name: 'Machine Learning', category: 'subfield'}),
  (dl:Concept {name: 'Deep Learning', category: 'subfield'}),
  (nn:Concept {name: 'Neural Networks', category: 'technique'}),
  (transformer:Concept {name: 'Transformer', category: 'architecture'}),
  
  // Relationships
  (ml)-[:IS_A]->(ai),
  (dl)-[:IS_A]->(ml),
  (nn)-[:IS_A]->(ml),
  (transformer)-[:USES]->(nn),
  (dl)-[:ENABLES]->(transformer),

  // Additional context
  (ai)-[:HAS_APPLICATION]->(nlp:NLP {name: 'Natural Language Processing'})

Extracting Knowledge from Text

# Extract entities and relationships from text
from neo4j import GraphDatabase
import spacy

nlp = spacy.load("en_core_web_sm")
driver = GraphDatabase.driver("bolt://localhost:7687")

def extract_knowledge(text):
    doc = nlp(text)
    
    with driver.session() as session:
        # Extract entities
        for ent in doc.ents:
            session.run("""
                MERGE (e:Entity {name: $name})
                SET e.label = $label
            """, name=ent.text, label=ent.label_)
        
        # Extract relationships
        for sent in doc.sents:
            for token in sent:
                if token.dep_ == "nsubj" and token.head.pos_ == "VERB":
                    subject = token.text
                    verb = token.head.text
                    for child in token.head.children:
                        if child.dep_ == "dobj":
                            obj = child.text
                            session.run("""
                                MATCH (s:Entity {name: $subject})
                                MERGE (o:Entity {name: $obj})
                                MERGE (s)-[:RELATES {verb: $verb}]->(o)
                            """, subject=subject, verb=verb, obj=obj)

extract_knowledge("Machine learning uses neural networks to learn patterns.")

Knowledge Graph Completion

// Predict missing relationships
CALL gds.linkPrediction.prediction(
  'knowledgeGraph',
  'Concept',
  'RELATES_TO',
  {
    topN: 10,
    threshold: 0.7
  }
)
YIELD relationships, probability

Vector Embeddings in Neo4j

Neo4j can store and query vector embeddings for similarity search.

Storing Embeddings

// Add embedding property to nodes
MATCH (p:Person)
SET p.embedding = [0.123, -0.456, 0.789, ...]  // 128-dim vector

// Create index for similarity search
// Note: Neo4j 5.x+ supports vector indexes
CREATE INDEX embedding_idx IF NOT EXISTS 
FOR (n:Entity) ON (n.embedding)
# Generate embeddings and store in Neo4j
from neo4j import GraphDatabase
from openai import OpenAI

client = OpenAI()
driver = GraphDatabase.driver("bolt://localhost:7687")

def store_with_embeddings(text, metadata):
    # Generate embedding
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=text
    )
    embedding = response.data[0].embedding
    
    # Store in Neo4j
    with driver.session() as session:
        session.run("""
            CREATE (d:Document {
                text: $text,
                embedding: $embedding,
                metadata: $metadata
            })
        """, text=text, embedding=embedding, metadata=metadata)

# Store documents
store_with_embeddings(
    "Machine learning is a subset of artificial intelligence.",
    {"topic": "AI", "source": "textbook"}
)

Similarity Queries

# Find similar documents
def find_similar(query, top_k=5):
    # Generate query embedding
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=query
    )
    query_embedding = response.data[0].embedding
    
    with driver.session() as session:
        result = session.run("""
            MATCH (d:Document)
            WITH d, 
                 apoc.algo.similarity(d.embedding, $embedding) AS sim
            RETURN d.text, sim
            ORDER BY sim DESC
            LIMIT $topK
        """, embedding=query_embedding, topK=top_k)
        
        return [(record['d.text'], record['sim']) for record in result]

# Find similar documents
results = find_similar("What is deep learning?")
for text, score in results:
    print(f"Score: {score:.3f} - {text[:50]}...")

GraphRAG: Graph + Retrieval Augmented Generation

GraphRAG combines knowledge graphs with LLMs for improved question answering.

Building the RAG Pipeline

# Complete GraphRAG implementation
from neo4j import GraphDatabase
from openai import OpenAI

class GraphRAG:
    def __init__(self, neo4j_uri, neo4j_user, neo4j_password, openai_key):
        self.driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))
        self.client = OpenAI(api_key=openai_key)
    
    def retrieve_context(self, question, max_nodes=10):
        """Extract relevant subgraph from knowledge graph"""
        with self.driver.session() as session:
            # Extract key entities from question
            entity_result = session.run("""
                MATCH (e:Entity)
                WHERE toLower(e.name) CONTAINS toLower($question)
                RETURN e
                LIMIT 5
            """, question=question)
            
            entities = [record['e'] for record in entity_result]
            
            # Get related context
            context = []
            for entity in entities:
                result = session.run("""
                    MATCH path = (e)-[r]-(related)
                    WHERE e.name = $entity_name
                    RETURN path
                    LIMIT 3
                """, entity_name=entity['name'])
                
                for record in result:
                    path = record['path']
                    for rel in path.relationships:
                        context.append({
                            'from': rel.start_node['name'],
                            'relationship': rel.type,
                            'to': rel.end_node['name']
                        })
            
            return context
    
    def answer(self, question):
        # Retrieve context
        context = self.retrieve_context(question)
        
        # Build prompt
        context_str = "\n".join([
            f"{c['from']} -{c['relationship']}-> {c['to']}"
            for c in context
        ])
        
        prompt = f"""Based on this knowledge graph information:
{context_str}

Question: {question}

Answer:"""
        
        # Generate answer
        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7
        )
        
        return response.choices[0].message.content
    
    def close(self):
        self.driver.close()

# Usage
rag = GraphRAG(
    "bolt://localhost:7687",
    "neo4j", "password",
    "your-openai-key"
)

answer = rag.answer("What is the relationship between machine learning and neural networks?")
print(answer)
rag.close()

Hybrid Search with Vector + Graph

# Combine vector similarity with graph traversal
def hybrid_search(query, topic_filter=None):
    # Get query embedding
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=query
    )
    query_embedding = response.data[0].embedding
    
    with driver.session() as session:
        # Vector similarity search
        result = session.run("""
            MATCH (d:Document)
            WITH d, 
                 apoc.algo.similarity(d.embedding, $embedding) AS sim
            WHERE $topic IS NULL OR d.topic = $topic
            RETURN d.text, d.topic, sim
            ORDER BY sim DESC
            LIMIT 10
        """, embedding=query_embedding, topic=topic_filter)
        
        return [(dict(record['d']), record['sim']) for record in result]

Machine Learning Features

Neo4j Graph Data Science (GDS) library enables ML on graph data.

Feature Engineering

// Generate node features using GDS

// 1. PageRank
CALL gds.pageRank.write('myGraph', {
  writeProperty: 'pageRank'
})

// 2. Betweenness centrality
CALL gds.betweenness.write('myGraph', {
  writeProperty: 'betweenness'
})

// 3. Node degree
CALL gds.degree.write('myGraph', {
  writeProperty: 'degree',
  relationshipWeightProperty: 'strength'
})

// 4. Community detection
CALL gds.labelPropagation.write('myGraph', {
  writeProperty: 'community'
})

// 5. Node2Vec embeddings
CALL gds.node2vec.write('myGraph', {
  embeddingDimension: 128,
  walkLength: 80,
  walksPerNode: 10,
  windowSize: 10,
  writeProperty: 'embedding'
})

Export Features for ML

# Export graph features for ML training
import pandas as pd

def export_node_features():
    with driver.session() as session:
        result = session.run("""
            MATCH (p:Person)
            RETURN 
                p.id AS id,
                p.pageRank AS pagerank,
                p.betweenness AS betweenness,
                p.degree AS degree,
                p.community AS community,
                p.embedding AS embedding,
                p.label AS label  // Target variable
        """)
        
        data = []
        for record in result:
            row = {
                'id': record['id'],
                'pagerank': record['pagerank'],
                'betweenness': record['betweenness'],
                'degree': record['degree'],
                'community': record['community'],
                'label': record['label']
            }
            # Flatten embedding
            if record['embedding']:
                for i, val in enumerate(record['embedding']):
                    row[f'emb_{i}'] = val
            data.append(row)
        
        return pd.DataFrame(data)

# Train ML model
df = export_node_features()
X = df.drop(['id', 'label'], axis=1)
y = df['label']

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier

X_train, X_test, y_train, y_test = train_test_split(X, y)
model = RandomForestClassifier().fit(X_train, y_train)
accuracy = model.score(X_test, y_test)
// Predict future connections

// Create graph projection
CALL gds.graph.project(
  'linkPredGraph',
  'Person',
  'KNOWS',
  {
    relationshipProperties: 'weight'
  }
)

// Train link prediction model
CALL gds.linkPrediction.train(
  'linkPredGraph',
  'KNOWS',
  {
    featureProperties: ['weight'],
    algorithm: 'adamicAdar'
  }
)
YIELD modelInfo

// Make predictions
CALL gds.linkPrediction.predict.mutate(
  'linkPredGraph',
  'KNOWS',
  {
    topN: 100,
    mutateRelationshipType: 'PREDICTED_KNOWS'
  }
)
YIELD relationshipsWritten

Entity Resolution

Graphs excel at identity resolution and entity matching.

# Entity resolution using graph matching
def resolve_entities(records):
    """
    Match duplicate entities across data sources
    """
    with driver.session() as session:
        # Create entity nodes
        for record in records:
            session.run("""
                MERGE (e:Entity {source: $source, external_id: $id})
                SET e.name = $name,
                    e.properties = $props
            """, 
            source=record['source'],
            id=record['id'],
            name=record['name'],
            props=record.get('properties', {})
            )
        
        # Link similar entities
        session.run("""
            MATCH (e1:Entity), (e2:Entity)
            WHERE e1 <> e2
              AND e1.name = e2.name
              AND e1.source <> e2.source
            MERGE (e1)-[:SAME_AS]->(e2)
        """)
        
        # Find canonical entities
        result = session.run("""
            MATCH (e1:Entity)-[:SAME_AS]->(e2:Entity)
            WITH e1, collect(e2) AS duplicates
            WHERE size(duplicates) > 0
            RETURN e1.name AS canonical, 
                   size(duplicates) AS dup_count
        """)
        
        return [(r['canonical'], r['dup_count']) for r in result]

Recommendation Systems

Graph-based recommendations leverage relationships.

# Collaborative filtering with Neo4j
def recommend_items(user_id, limit=10):
    """
    Recommend items based on similar users
    """
    with driver.session() as session:
        result = session.run("""
            // Find similar users
            MATCH (user1:User {id: $userId})-[:RATED]->(item)<-[:RATED]-(user2:User)
            WITH user2, count(item) AS commonRatings
            
            // Find items user2 liked that user1 hasn't seen
            MATCH (user2:User)-[r:RATED]->(item)
            WHERE r.rating >= 4
              AND NOT (user1:User {id: $userId})-[:RATED]->(item)
            
            // Score and rank
            WITH item, commonRatings, r.rating AS score
            RETURN item.name AS item, SUM(score * commonRatings) AS recommendationScore
            ORDER BY recommendationScore DESC
            LIMIT $limit
        """, userId=user_id, limit=limit)
        
        return [dict(record) for record in result]

Graph Neural Networks

For advanced ML, export graphs for GNN training.

# Export to PyTorch Geometric
import torch
from torch_geometric.data import Data

def export_to_pytorch_geometric():
    with driver.session() as session:
        # Get edge list
        edges = session.run("""
            MATCH (a:Entity)-[r]-(b:Entity)
            RETURN id(a) AS source, id(b) AS target
        """)
        
        edge_list = [[r['source'], r['target']] for r in edges]
        
        # Get node features
        features = session.run("""
            MATCH (n:Entity)
            RETURN id(n) AS node_id, n.features AS features
        """)
        
        node_features = {}
        for f in features:
            node_features[f['node_id']] = f['features']
        
        # Create PyTorch Geometric data
        edge_index = torch.tensor(edge_list).t().contiguous()
        x = torch.tensor([node_features[i] for i in range(len(node_features))])
        
        data = Data(x=x, edge_index=edge_index)
        
        return data

# Train GNN
data = export_to_pytorch_geometric()

Complete AI Pipeline Example

# End-to-end Neo4j AI pipeline

class Neo4jAIPipeline:
    def __init__(self, config):
        self.driver = GraphDatabase.driver(config['neo4j_uri'], auth=config['auth'])
        self.embedding_model = config.get('embedding_model', 'text-embedding-3-small')
    
    def ingest_documents(self, documents):
        """Ingest documents into knowledge graph"""
        for doc in documents:
            self._store_document(doc)
    
    def _store_document(self, doc):
        with self.driver.session() as session:
            # Extract entities
            entities = self._extract_entities(doc['content'])
            
            # Create document node
            session.run("""
                CREATE (d:Document {
                    id: $id,
                    content: $content,
                    metadata: $metadata,
                    embedding: $embedding
                })
            """, 
            id=doc['id'],
            content=doc['content'],
            metadata=doc.get('metadata', {}),
            embedding=self._get_embedding(doc['content'])
            )
            
            # Link entities to document
            for entity in entities:
                session.run("""
                    MATCH (d:Document {id: $docId})
                    MERGE (e:Entity {name: $entity})
                    MERGE (d)-[:MENTIONS]->(e)
                """, docId=doc['id'], entity=entity)
    
    def query(self, question):
        """Answer questions using RAG"""
        # Get relevant context
        context = self._get_context(question)
        
        # Generate answer
        prompt = f"""Context:\n{context}\n\nQuestion: {question}\n\nAnswer:"""
        
        # Return context for display
        return {
            'answer': prompt,  # In production, call LLM
            'context': context
        }
    
    def get_insights(self):
        """Generate graph insights"""
        with self.driver.session() as session:
            # Key entities
            entities = session.run("""
                MATCH (e:Entity)
                RETURN e.name AS entity, 
                       size((e)<-[:MENTIONS]-()) AS mentions
                ORDER BY mentions DESC
                LIMIT 10
            """)
            
            # Community structure
            communities = session.run("""
                CALL gds.labelPropagation.stream('knowledgeGraph')
                YIELD nodeId, communityId
                RETURN communityId, count(*) AS size
                ORDER BY size DESC
            """)
            
            return {
                'top_entities': [dict(r) for r in entities],
                'communities': [dict(r) for r in communities]
            }

Conclusion

Neo4j provides a powerful foundation for AI applications. From building knowledge graphs that augment LLMs to enabling sophisticated machine learning on graph data, Neo4j’s capabilities align perfectly with modern AI requirements. The GraphRAG pattern, vector embeddings, graph neural networks, and recommendation systems all benefit from Neo4j’s native graph representation.

Key capabilities include:

  • Knowledge graphs for structured AI knowledge
  • Vector embeddings for semantic search
  • GraphRAG for enhanced LLM responses
  • GDS library for graph machine learning
  • Entity resolution and recommendation systems

In the final article, we’ll explore real-world Neo4j use cases across industries.

Resources

Comments