Skip to main content
โšก Calmops

Heaps and Priority Queues: Complete Guide

Introduction

Heaps are specialized tree-based data structures that satisfy the heap property. They are the foundation for priority queues and efficient sorting algorithms. This guide covers heap fundamentals, implementations, and practical applications.

Heap Fundamentals

What is a Heap?

A heap is a complete binary tree where each parent node is either greater than or equal to (max heap) or less than or equal to (min heap) its children.

Max Heap (Parent >= Children):       Min Heap (Parent <= Children):
           50                                 10
         /    \                              /    \
       30      40                           20      30
      /  \    /  \                         /  \    /  \
    10   20  15   35                     40   50  35   45

Properties

  1. Complete Binary Tree: Filled level by level, left to right
  2. Heap Property:
    • Max Heap: Parent >= Children
    • Min Heap: Parent <= Children
  3. Array Representation: Can be stored efficiently in an array

Array Representation

For a 0-indexed array:

  • Parent of node i: (i-1) // 2
  • Left child of node i: 2*i + 1
  • Right child of node i: 2*i + 2
Index:    0   1   2   3   4   5   6
Array:  [50, 30, 40, 10, 20, 15, 35]

Tree:
           50(0)
         /      \
      30(1)    40(2)
      /  \     /  \
   10(3) 20(4) 15(5) 35(6)

Heap Implementation

class MinHeap:
    """Min Heap implementation using array."""
    
    def __init__(self):
        self.heap = []
    
    def parent(self, i):
        return (i - 1) // 2
    
    def left_child(self, i):
        return 2 * i + 1
    
    def right_child(self, i):
        return 2 * i + 2
    
    def swap(self, i, j):
        self.heap[i], self.heap[j] = self.heap[j], self.heap[i]
    
    def insert(self, value):
        """Insert value into heap. O(log n)"""
        self.heap.append(value)
        self._heapify_up(len(self.heap) - 1)
    
    def _heapify_up(self, i):
        """Move element up to maintain heap property."""
        while i > 0 and self.heap[self.parent(i)] > self.heap[i]:
            self.swap(i, self.parent(i))
            i = self.parent(i)
    
    def extract_min(self):
        """Remove and return minimum element. O(log n)"""
        if not self.heap:
            return None
        
        min_val = self.heap[0]
        
        if len(self.heap) == 1:
            self.heap.pop()
        else:
            self.heap[0] = self.heap.pop()
            self._heapify_down(0)
        
        return min_val
    
    def _heapify_down(self, i):
        """Move element down to maintain heap property."""
        smallest = i
        left = self.left_child(i)
        right = self.right_child(i)
        
        if left < len(self.heap) and self.heap[left] < self.heap[smallest]:
            smallest = left
        
        if right < len(self.heap) and self.heap[right] < self.heap[smallest]:
            smallest = right
        
        if smallest != i:
            self.swap(i, smallest)
            self._heapify_down(smallest)
    
    def get_min(self):
        """Return minimum without removing."""
        return self.heap[0] if self.heap else None
    
    def size(self):
        return len(self.heap)
    
    def is_empty(self):
        return len(self.heap) == 0


class MaxHeap:
    """Max Heap - same as MinHeap but with comparison reversed."""
    
    def __init__(self):
        self.heap = []
    
    def parent(self, i):
        return (i - 1) // 2
    
    def left_child(self, i):
        return 2 * i + 1
    
    def right_child(self, i):
        return 2 * i + 2
    
    def swap(self, i, j):
        self.heap[i], self.heap[j] = self.heap[j], self.heap[i]
    
    def insert(self, value):
        self.heap.append(value)
        self._heapify_up(len(self.heap) - 1)
    
    def _heapify_up(self, i):
        while i > 0 and self.heap[self.parent(i)] < self.heap[i]:
            self.swap(i, self.parent(i))
            i = self.parent(i)
    
    def extract_max(self):
        if not self.heap:
            return None
        
        max_val = self.heap[0]
        
        if len(self.heap) == 1:
            self.heap.pop()
        else:
            self.heap[0] = self.heap.pop()
            self._heapify_down(0)
        
        return max_val
    
    def _heapify_down(self, i):
        largest = i
        left = self.left_child(i)
        right = self.right_child(i)
        
        if left < len(self.heap) and self.heap[left] > self.heap[largest]:
            largest = left
        
        if right < len(self.heap) and self.heap[right] > self.heap[largest]:
            largest = right
        
        if largest != i:
            self.swap(i, largest)
            self._heapify_down(largest)
    
    def get_max(self):
        return self.heap[0] if self.heap else None

Priority Queue

A priority queue is an abstract data type where elements have priorities, and higher-priority elements are dequeued first.

import heapq

# Python's heapq is a min-heap
# For max priority queue, negate values

class PriorityQueue:
    """Priority queue using heapq (min-heap)."""
    
    def __init__(self):
        self.heap = []
        self.entry_finder = {}
        self.counter = 0
    
    def add_task(self, task, priority=0):
        """Add task with priority (lower = higher priority for min-heap)."""
        if task in self.entry_finder:
            self.remove_task(task)
        
        self.entry_finder[task] = [priority, self.counter, task]
        heapq.heappush(self.heap, [priority, self.counter, task])
        self.counter += 1
    
    def remove_task(self, task):
        """Mark task as removed."""
        entry = self.entry_finder.pop(task, None)
        if entry:
            entry[-1] = '$REMOVED$'
    
    def pop_task(self):
        """Remove and return highest priority task."""
        while self.heap:
            priority, count, task = heapq.heappop(self.heap)
            
            if task != '$REMOVED$':
                del self.entry_finder[task]
                return task, priority
        
        return None, None
    
    def __len__(self):
        return len(self.entry_finder)
    
    def __bool__(self):
        return bool(self.entry_finder)


# Example: Max priority queue using negation
class MaxPriorityQueue:
    """Max priority queue (higher values have higher priority)."""
    
    def __init__(self):
        self.heap = []
    
    def enqueue(self, val, priority=0):
        # Negate priority for max-heap behavior
        heapq.heappush(self.heap, (-priority, val))
    
    def dequeue(self):
        if not self.heap:
            return None
        priority, val = heapq.heappop(self.heap)
        return val, -priority
    
    def peek(self):
        if not self.heap:
            return None
        priority, val = self.heap[0]
        return val, -priority


# Usage
pq = MaxPriorityQueue()
pq.enqueue("Task 1", priority=3)
pq.enqueue("Task 2", priority=1)
pq.enqueue("Task 3", priority=5)

print(pq.dequeue())  # ('Task 3', 5)
print(pq.dequeue())  # ('Task 1', 3)
print(pq.dequeue())  # ('Task 2', 1)

Heap Sort

Heap sort uses a heap to sort elements in-place with O(n log n) time complexity.

def heap_sort(arr):
    """Sort array using heap sort."""
    n = len(arr)
    
    # Build max heap
    for i in range(n // 2 - 1, -1, -1):
        heapify(arr, n, i)
    
    # Extract elements from heap
    for i in range(n - 1, 0, -1):
        arr[0], arr[i] = arr[i], arr[0]
        heapify(arr, i, 0)
    
    return arr


def heapify(arr, n, i):
    """Heapify subtree rooted at index i."""
    largest = i
    left = 2 * i + 1
    right = 2 * i + 2
    
    if left < n and arr[left] > arr[largest]:
        largest = left
    
    if right < n and arr[right] > arr[largest]:
        largest = right
    
    if largest != i:
        arr[i], arr[largest] = arr[largest], arr[i]
        heapify(arr, n, largest)


# Example
arr = [12, 11, 13, 5, 6, 7]
print(heap_sort(arr))  # [5, 6, 7, 11, 12, 13]

Practical Applications

1. Kth Largest/Smallest Element

import heapq

def find_kth_largest(nums, k):
    """Find kth largest element using min-heap of size k."""
    min_heap = nums[:k]
    heapq.heapify(min_heap)
    
    for num in nums[k:]:
        if num > min_heap[0]:
            heapq.heapreplace(min_heap, num)
    
    return min_heap[0]


def find_kth_smallest(nums, k):
    """Find kth smallest element using max-heap."""
    max_heap = [-x for x in nums[:k]]
    heapq.heapify(max_heap)
    
    for num in nums[k:]:
        if num < -max_heap[0]:
            heapq.heapreplace(max_heap, -num)
    
    return -max_heap[0]


# Example
nums = [3, 2, 1, 5, 6, 4]
print(find_kth_largest(nums, 2))  # 5 (2nd largest)
print(find_kth_smallest(nums, 3))  # 3 (3rd smallest)

2. Top K Frequent Elements

from collections import Counter

def top_k_frequent(nums, k):
    """Find top k most frequent elements."""
    freq = Counter(nums)
    
    # Use bucket sort: frequency -> elements
    bucket = [[] for _ in range(len(nums) + 1)]
    
    for num, count in freq.items():
        bucket[count].append(num)
    
    result = []
    for i in range(len(nums), 0, -1):
        result.extend(bucket[i])
        if len(result) >= k:
            break
    
    return result[:k]


# Using heap
def top_k_frequent_heap(nums, k):
    """Using min-heap approach."""
    freq = Counter(nums)
    
    # (-frequency, element)
    heap = [(-count, num) for num, count in freq.items()]
    heapq.heapify(heap)
    
    result = []
    for _ in range(k):
        count, num = heapq.heappop(heap)
        result.append(num)
    
    return result


# Example
nums = [1, 1, 1, 2, 2, 3]
print(top_k_frequent(nums, 2))  # [1, 2]

3. Merge K Sorted Arrays

import heapq

def merge_k_sorted(arrays):
    """Merge k sorted arrays into one sorted array."""
    result = []
    heap = []
    
    # Initialize heap with first element from each array
    for i, arr in enumerate(arrays):
        if arr:
            heapq.heappush(heap, (arr[0], i, 0))
    
    while heap:
        val, arr_idx, elem_idx = heapq.heappop(heap)
        result.append(val)
        
        # Add next element from same array
        if elem_idx + 1 < len(arrays[arr_idx]):
            next_val = arrays[arr_idx][elem_idx + 1]
            heapq.heappush(heap, (next_val, arr_idx, elem_idx + 1))
    
    return result


# Example
arrays = [
    [1, 4, 7, 10],
    [2, 5, 8, 11],
    [3, 6, 9, 12]
]

print(merge_k_sorted(arrays))
# [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]

4. Median of Data Stream

import heapq

class MedianFinder:
    """Find median of a data stream."""
    
    def __init__(self):
        # max-heap for lower half
        self.low = []
        # min-heap for upper half
        self.high = []
    
    def add_num(self, num):
        # Add to max-heap (negate for max behavior)
        heapq.heappush(self.low, -num)
        
        # Balance: largest of low <= smallest of high
        heapq.heappush(self.high, -heapq.heappop(self.low))
        
        # Ensure low has >= elements than high
        if len(self.low) < len(self.high):
            heapq.heappush(self.low, -heapq.heappop(self.high))
    
    def find_median(self):
        if len(self.low) > len(self.high):
            return -self.low[0]
        return (-self.low[0] + self.high[0]) / 2.0


# Example
mf = MedianFinder()
for num in [5, 15, 1, 3]:
    mf.add_num(num)
    print(f"Added {num}, Median: {mf.find_median()}")
# Added 5, Median: 5
# Added 15, Median: 10.0
# Added 1, Median: 5
# Added 3, Median: 4.0

5. Dijkstra’s Algorithm (already shown in graphs)

def dijkstra(graph, start):
    """Shortest path using min-heap."""
    distances = {node: float('inf') for node in graph}
    distances[start] = 0
    pq = [(0, start)]
    
    while pq:
        dist, node = heapq.heappop(pq)
        
        if dist > distances[node]:
            continue
        
        for neighbor, weight in graph[node]:
            new_dist = dist + weight
            if new_dist < distances[neighbor]:
                distances[neighbor] = new_dist
                heapq.heappush(pq, (new_dist, neighbor))
    
    return distances

Time Complexity Summary

Operation Binary Heap Priority Queue (heapq)
Insert O(log n) O(log n)
Extract Min/Max O(log n) O(log n)
Peek O(1) O(1)
Build Heap O(n) O(n)
Search O(n) O(n)

When to Use Heaps

  1. Finding Kth largest/smallest: Maintain heap of size K
  2. Merging sorted data: Efficiently merge multiple sorted streams
  3. Priority scheduling: Task scheduling with priorities
  4. Median maintenance: Dynamic median calculation
  5. Dijkstra’s algorithm: Shortest path with non-negative weights
  6. Heap sort: In-place sorting with O(n log n)

Conclusion

Heaps and priority queues are essential data structures for many algorithmic problems. Their O(log n) operations make them ideal for scenarios requiring frequent insertions and deletions while maintaining order. Master these patterns to efficiently solve problems involving top-K elements, merging sorted data, and priority-based processing.

Comments