Skip to main content
โšก Calmops

Community Detection Algorithms: Graph Clustering Methods

Introduction

Networks and graphs permeate modern data analysisโ€”from social media connections to biological protein interactions, from transportation systems to web page links. A fundamental challenge in network science is detecting communities: groups of nodes that are more densely connected internally than to the rest of the network.

Community detection reveals underlying structure in complex systems, enabling insights into organizational patterns, functional modules, and latent relationships. In 2026, community detection algorithms power recommendation systems, fraud detection, biological network analysis, and social network understanding.

This article explores the foundations of community detection, major algorithms, implementation strategies, and practical applications.

Fundamentals of Community Detection

What is a Community?

A community (or cluster) is a group of nodes with:

  • High internal connectivity: Many edges between members
  • Low external connectivity: Few edges to nodes outside the group

Mathematically, communities can be defined through various quality functions:

  • Modularity: Measures deviation from random edge distribution
  • Conductance: Ratio of external to internal edges
  • Normalized Cut: Graph partitioning objective

Types of Community Structure

  1. Disjoint communities: Each node belongs to exactly one community
  2. Overlapping communities: Nodes can belong to multiple communities
  3. Hierarchical communities: Nested community structure
  4. Fuzzy communities: Nodes have membership probabilities

Quality Metrics

import numpy as np

def modularity(adjacency_matrix, community_assignments):
    """
    Calculate modularity Q.
    
    Q = (1/2m) ร— ฮฃ_ij [A_ij - (k_i ร— k_j)/2m] ร— ฮด(c_i, c_j)
    
    Where:
    - A_ij: Edge between i and j
    - k_i: Degree of node i
    - m: Total edges
    - ฮด(c_i, c_j): 1 if same community, 0 otherwise
    """
    n = len(adjacency_matrix)
    m = np.sum(adjacency_matrix) / 2
    
    degrees = np.sum(adjacency_matrix, axis=1)
    Q = 0
    
    for i in range(n):
        for j in range(n):
            if community_assignments[i] == community_assignments[j]:
                expected = (degrees[i] * degrees[j]) / (2 * m)
                Q += adjacency_matrix[i, j] - expected
    
    return Q / (2 * m)


def conductance(adjacency_matrix, community):
    """
    Conductance: external edges / total edges incident to community
    """
    internal_edges = 0
    external_edges = 0
    
    for i in community:
        for j in range(len(adjacency_matrix)):
            if adjacency_matrix[i, j] > 0:
                if j in community:
                    internal_edges += adjacency_matrix[i, j]
                else:
                    external_edges += adjacency_matrix[i, j]
    
    total = internal_edges + external_edges
    return external_edges / total if total > 0 else 0

Algorithm Categories

1. Modularity-Based Methods

Louvain Algorithm

The most popular community detection algorithm, known for speed and quality:

import numpy as np
import random
from collections import defaultdict

class Louvain:
    def __init__(self, resolution=1.0):
        self.resolution = resolution
        self.partition = {}
        self.adjacency = {}
        self.degrees = {}
        self.m = 0
    
    def fit(self, edge_list):
        """Detect communities using Louvain algorithm."""
        # Build adjacency structure
        self._build_graph(edge_list)
        
        # Initialize each node as its own community
        self.partition = {node: node for node in self.adjacency}
        
        # Phase 1: Local modularity optimization
        improved = True
        while improved:
            improved = self._phase_one()
        
        # Phase 2: Aggregate graph and repeat
        while self._phase_two():
            improved = True
            while improved:
                improved = self._phase_one()
        
        return self.get_partition()
    
    def _build_graph(self, edge_list):
        """Build adjacency list and compute degrees."""
        self.adjacency = defaultdict(lambda: defaultdict(int))
        self.degrees = defaultdict(int)
        self.m = 0
        
        for u, v, weight in edge_list:
            if weight is None:
                weight = 1
            
            self.adjacency[u][v] = weight
            self.adjacency[v][u] = weight
            
            self.degrees[u] += weight
            self.degrees[v] += weight
            self.m += weight
    
    def _phase_one(self):
        """Optimize modularity locally."""
        improved = False
        nodes = list(self.adjacency.keys())
        random.shuffle(nodes)
        
        for node in nodes:
            current_community = self.partition[node]
            
            # Calculate contribution from current community
            current_gain = self._modularity_gain(node, current_community, remove=True)
            
            # Find best community to move to
            best_community = current_community
            best_gain = 0
            
            neighbors = self.adjacency[node].keys()
            communities = set(self.partition[n] for n in neighbors)
            communities.add(current_community)
            
            for community in communities:
                if community == current_community:
                    continue
                
                gain = self._modularity_gain(node, community, remove=False)
                
                if gain > best_gain:
                    best_gain = gain
                    best_community = community
            
            # Move if improvement
            if best_gain > current_gain + 1e-10:
                self._move_node(node, current_community, best_community)
                improved = True
        
        return improved
    
    def _modularity_gain(self, node, community, remove=False):
        """Calculate modularity gain from moving node."""
        if remove:
            # Removing node contribution
            k_i_in = self.adjacency[node].get(node, 0)  # Self-loops
        
        community_nodes = [n for n in self.adjacency if self.partition[n] == community]
        
        k_i_in_community = sum(self.adjacency[node].get(n, 0) for n in community_nodes)
        k_i = self.degrees[node]
        
        k_community = sum(self.degrees[n] for n in community_nodes)
        
        # Modularity gain formula
        gain = (k_i_in_community - k_i * k_community / (2 * self.m)) / self.m
        
        if remove:
            # Subtract self-loop contribution
            gain -= (self.adjacency[node].get(node, 0) / self.m)
        
        return gain * self.resolution
    
    def _move_node(self, node, old_community, new_community):
        """Move node from old to new community."""
        self.partition[node] = new_community
    
    def _phase_two(self):
        """Aggregate graph into supernodes."""
        # Check if any community has more than one node
        communities = defaultdict(list)
        for node, comm in self.partition.items():
            communities[comm].append(node)
        
        # If all communities are single nodes, we're done
        if all(len(nodes) == 1 for nodes in communities.values()):
            return False
        
        # Build new edge list
        new_edges = []
        
        for u in self.adjacency:
            for v, weight in self.adjacency[u].items():
                u_comm = self.partition[u]
                v_comm = self.partition[v]
                
                if u_comm != v_comm:
                    new_edges.append((u_comm, v_comm, weight))
        
        # Rebuild graph with communities as nodes
        self._build_graph(new_edges)
        
        # Reinitialize partition
        self.partition = {node: node for node in self.adjacency}
        
        return True
    
    def get_partition(self):
        """Return final community assignments."""
        return dict(self.partition)

Greedy Modularity Optimization

def greedy_modularity(adjacency, n_communities=None):
    """
    Greedy algorithm for modularity optimization.
    
    Starts with each node as separate community,
    merges communities to maximize modularity.
    """
    n = len(adjacency)
    degrees = np.sum(adjacency, axis=1)
    m = np.sum(degrees) / 2
    
    # Initialize
    communities = {i: {i} for i in range(n)}
    community_list = list(range(n))
    
    # Current modularity
    current_Q = 0
    
    best_partition = {i: i for i in range(n)}
    best_Q = 0
    
    # Merge communities until no improvement
    while len(community_list) > 1:
        best_merge = None
        best_merge_gain = -float('inf')
        
        # Find best pair to merge
        for i, ci in enumerate(community_list):
            for j, cj in enumerate(community_list[i+1:], i+1):
                # Estimate modularity gain
                gain = estimate_merge_gain(ci, cj, communities, adjacency, degrees, m)
                
                if gain > best_merge_gain:
                    best_merge_gain = gain
                    best_merge = (ci, cj)
        
        if best_merge_gain <= 0:
            break
        
        # Merge communities
        ci, cj = best_merge
        communities[ci] = communities[ci] | communities[cj]
        del communities[cj]
        community_list.remove(cj)
        
        # Update partition
        for node in communities[ci]:
            best_partition[node] = ci
        
        current_Q += best_merge_gain
        
        if current_Q > best_Q:
            best_Q = current_Q
    
    return best_partition

2. Label Propagation

Simple and fast algorithm using network structure:

import random
from collections import Counter

def label_propagation(adjacency, max_iterations=100):
    """
    Label Propagation Algorithm (LPA).
    
    Each node adopts the most common label among neighbors.
    """
    n = len(adjacency)
    
    # Initialize with unique labels
    labels = {i: i for i in range(n)}
    
    for iteration in range(max_iterations):
        nodes = list(range(n))
        random.shuffle(nodes)
        
        changed = 0
        
        for node in nodes:
            neighbors = np.where(adjacency[node] > 0)[0]
            
            if len(neighbors) == 0:
                continue
            
            # Get neighbor labels
            neighbor_labels = [labels[n] for n in neighbors]
            
            # Most common label
            most_common = Counter(neighbor_labels).most_common(1)[0][0]
            
            if labels[node] != most_common:
                labels[node] = most_common
                changed += 1
        
        # Convergence check
        if changed == 0:
            break
    
    return labels


def label_propagation_async(adjacency, max_iterations=100):
    """
    Asynchronous Label Propagation with tie-breaking.
    """
    n = len(adjacency)
    labels = list(range(n))
    
    for _ in range(max_iterations):
        nodes = list(range(n))
        random.shuffle(nodes)
        
        for node in nodes:
            neighbors = np.where(adjacency[node] > 0)[0]
            
            if len(neighbors) == 0:
                continue
            
            neighbor_labels = [labels[n] for n in neighbors]
            max_count = max(neighbor_labels.count(l) for l in set(neighbor_labels))
            candidates = [l for l in set(neighbor_labels) 
                         if neighbor_labels.count(l) == max_count]
            
            labels[node] = random.choice(candidates)
    
    # Group nodes by label
    communities = {}
    for node, label in enumerate(labels):
        if label not in communities:
            communities[label] = []
        communities[label].append(node)
    
    return communities

3. Spectral Methods

import numpy as np
from scipy import linalg

def spectral_clustering(adjacency, n_clusters, n_components=None):
    """
    Spectral Clustering for community detection.
    
    Steps:
    1. Compute Laplacian matrix
    2. Find smallest eigenvectors
    3. Cluster in eigenspace
    """
    n = len(adjacency)
    
    if n_clusters is None:
        n_clusters = n // 2
    
    # Degree matrix
    D = np.diag(np.sum(adjacency, axis=1))
    
    # Normalized Laplacian: L = I - D^(-1/2) A D^(-1/2)
    D_inv_sqrt = np.diag(1.0 / np.sqrt(np.diag(D) + 1e-10))
    L_norm = np.eye(n) - D_inv_sqrt @ adjacency @ D_inv_sqrt
    
    # Eigen decomposition
    eigenvalues, eigenvectors = linalg.eigh(L_norm)
    
    # Use k smallest eigenvectors
    if n_components:
        k = n_components
    else:
        k = n_clusters
    
    embedding = eigenvectors[:, :k]
    
    # Normalize rows
    embedding = embedding / (np.linalg.norm(embedding, axis=1, keepdims=True) + 1e-10)
    
    # Simple k-means in embedding space
    labels = kmeans_init(embedding, n_clusters)
    
    return labels


def kmeans_init(X, k):
    """Simple k-means initialization."""
    n = len(X)
    
    # Random initial centers
    centers = X[np.random.choice(n, k, replace=False)]
    
    for _ in range(50):
        # Assign to nearest center
        distances = np.array([[np.linalg.norm(x - c) for c in centers] for x in X])
        labels = np.argmin(distances, axis=1)
        
        # Update centers
        new_centers = np.array([X[labels == i].mean(axis=0) if np.sum(labels == i) > 0 else centers[i] 
                               for i in range(k)])
        
        if np.allclose(centers, new_centers):
            break
        centers = new_centers
    
    return labels

4. Girvan-Newman Algorithm

Edge betweenness-based algorithm:

from collections import deque

def girvan_newman(adjacency, n_communities=None):
    """
    Girvan-Newman algorithm.
    
    Repeatedly removes edges with highest betweenness
    to split network into communities.
    """
    n = len(adjacency)
    
    if n_communities is None:
        n_communities = 2
    
    # Work on a copy
    adj = adjacency.copy()
    
    communities = list(range(n))
    community_map = {i: [i] for i in range(n)}
    
    while len(communities) < n_communities:
        # Find edge with highest betweenness
        edge_betweenness = compute_all_betweenness(adj)
        
        if not edge_betweenness:
            break
        
        max_edge = max(edge_betweenness.items(), key=lambda x: x[1])[0]
        
        # Remove edge
        adj[max_edge[0], max_edge[1]] = 0
        adj[max_edge[1], max_edge[0]] = 0
    
    # Extract communities using BFS
    visited = set()
    communities = []
    
    for start in range(n):
        if start not in visited:
            component = bfs_component(adj, start)
            visited.update(component)
            communities.append(component)
    
    return communities


def compute_all_betweenness(adjacency):
    """Compute betweenness for all edges."""
    n = len(adjacency)
    betweenness = {}
    
    for source in range(n):
        # BFS from source
        distances, predecessors, paths = bfs_predecessors(adjacency, source)
        
        # Count shortest paths through each edge
        edge_counts = defaultdict(float)
        
        for target in range(n):
            if target == source:
                continue
            
            # Backtrack to find edges
            current = target
            while current != source:
                pred = predecessors[current]
                for p in pred:
                    edge = tuple(sorted([current, p]))
                    edge_counts[edge] += paths[current] / paths[target]
                current = p if predecessors[current] else source
        
        # Accumulate betweenness
        for edge, count in edge_counts.items():
            betweenness[edge] = betweenness.get(edge, 0) + count
    
    return betweenness


def bfs_predecessors(adjacency, start):
    """BFS to find shortest paths."""
    n = len(adjacency)
    distances = [-1] * n
    predecessors = [[] for _ in range(n)]
    paths = [0] * n
    
    queue = deque([start])
    distances[start] = 0
    paths[start] = 1
    
    while queue:
        node = queue.popleft()
        
        for neighbor in np.where(adjacency[node] > 0)[0]:
            if distances[neighbor] == -1:
                distances[neighbor] = distances[node] + 1
                queue.append(neighbor)
            
            if distances[neighbor] == distances[node] + 1:
                predecessors[neighbor].append(node)
                paths[neighbor] += paths[node]
    
    return distances, predecessors, paths

Overlapping Community Detection

Clique Percolation Method

def clique_percolation(adjacency, k=3):
    """
    CPM: Find overlapping communities based on k-cliques.
    
    Two k-cliques belong to same community if they share k-1 nodes.
    """
    # Find all k-cliques
    cliques = find_k_cliques(adjacency, k)
    
    # Build clique graph
    clique_graph = defaultdict(set)
    
    for i, c1 in enumerate(cliques):
        for j, c2 in enumerate(cliques[i+1:], i+1):
            if len(set(c1) & set(c2)) >= k - 1:
                clique_graph[i].add(j)
                clique_graph[j].add(i)
    
    # Find connected components
    communities = []
    visited = set()
    
    for clique in clique_graph:
        if clique not in visited:
            component = bfs_component_from_graph(clique_graph, clique)
            visited.update(component)
            
            # Get all nodes in this community
            community = set()
            for c in component:
                community.update(cliques[c])
            communities.append(list(community))
    
    return communities


def find_k_cliques(adjacency, k):
    """Find all k-cliques in the graph."""
    # Simplified: Enumerate cliques using Bronโ€“Kerbosch
    # In practice, use networkx or igraph
    n = len(adjacency)
    
    if k > n:
        return []
    
    # For small graphs
    cliques = []
    
    def bron_kerbosch(R, P, X):
        if not P and not X:
            if len(R) == k:
                cliques.append(list(R))
            return
        
        for v in list(P):
            bron_kerbosch(R | {v}, P & set(np.where(adjacency[v] > 0)[0]), 
                         X & set(np.where(adjacency[v] > 0)[0]))
            P.remove(v)
            X.add(v)
    
    bron_kerbosch(set(), set(range(n)), set())
    return [c for c in cliques if len(c) == k]

Practical Applications

Social Network Analysis

def detect_social_communities(social_graph):
    """
    Analyze social network community structure.
    
    Returns:
    - Community assignments
    - Inter-community connections
    - Key influencers per community
    """
    # Build adjacency from edge list
    adj = build_adjacency(social_graph['edges'])
    
    # Detect communities using Louvain
    louvain = Louvain()
    communities = louvain.fit(social_graph['edges'])
    
    # Analyze each community
    results = {
        'communities': defaultdict(list),
        'inter_community_edges': 0,
        'size_distribution': []
    }
    
    for node, comm in communities.items():
        results['communities'][comm].append(node)
    
    # Compute metrics
    for comm, members in results['communities'].items():
        results['size_distribution'].append(len(members))
    
    return results

Recommendation Systems

def community_based_recommendation(user_item_matrix, community_labels):
    """
    Use community detection for recommendations.
    
    Recommend items popular in user's community.
    """
    n_users, n_items = user_item_matrix.shape
    
    # Find item communities based on co-purchase patterns
    item_adj = user_item_matrix.T @ user_item_matrix
    item_communities = Louvain().fit_edges(get_edges(item_adj))
    
    recommendations = {}
    
    for user in range(n_users):
        user_items = set(np.where(user_item_matrix[user] > 0)[0])
        
        # Items from same community as purchased items
        community_items = set()
        for item in user_items:
            community_items.update(item_communities.get(item, []))
        
        # Exclude already purchased
        candidates = community_items - user_items
        
        # Score by popularity in community
        scores = [(item, np.sum(user_item_matrix[:, item])) for item in candidates]
        scores.sort(key=lambda x: x[1], reverse=True)
        
        recommendations[user] = scores[:10]
    
    return recommendations

Algorithm Comparison

Algorithm Time Complexity Quality Overlapping Best For
Louvain O(n log n) High No Large networks
Label Propagation O(m) Medium Yes Very large networks
Spectral O(nยณ) High No Small-medium networks
Girvan-Newman O(mยฒn) High No Dense networks
Clique Percolation O(n^k) High Yes Dense networks

Best Practices

Choosing an Algorithm

  1. Large networks (>100K nodes): Louvain or Label Propagation
  2. Need overlapping communities: Label Propagation or CPM
  3. Small, dense networks: Spectral clustering
  4. Quality paramount: Girvan-Newman (slow) or Louvain

Preprocessing

  • Remove low-degree nodes to reduce noise
  • Consider edge weights if available
  • Handle disconnected components separately

Validation

def evaluate_community_detection(predicted, ground_truth):
    """Compare detected communities to ground truth."""
    
    # Normalized Mutual Information
    def NMI(A, B):
        # Compute NMI between two partitions
        # ...
        pass
    
    # Adjusted Rand Index
    def ARI(A, B):
        # Compute ARI
        # ...
        pass
    
    return {
        'nmi': NMI(predicted, ground_truth),
        'ari': ARI(predicted, ground_truth)
    }

Conclusion

Community detection remains fundamental to network analysis, revealing hidden structure in complex systems. The field offers algorithms for every need:

  • Louvain: The workhorseโ€”fast and high-quality
  • Label Propagation: When speed is paramount
  • Spectral Methods: When mathematical elegance matters
  • Girvan-Newman: For understanding edge importance
  • Clique Percolation: For overlapping communities

In 2026, community detection integrates with machine learning pipelines, enables sophisticated recommendations, and powers social network analysis. The algorithms continue to evolve, with research addressing dynamic networks, multi-resolution analysis, and distributed computation.

Understanding community detection provides essential tools for analyzing any networked dataโ€”social, biological, or technological.

Resources

Comments