Introduction
Heaps are specialized tree-based data structures that satisfy the heap property. They are the foundation for priority queues and efficient sorting algorithms. This guide covers heap fundamentals, implementations, and practical applications.
Heap Fundamentals
What is a Heap?
A heap is a complete binary tree where each parent node is either greater than or equal to (max heap) or less than or equal to (min heap) its children.
Max Heap (Parent >= Children): Min Heap (Parent <= Children):
50 10
/ \ / \
30 40 20 30
/ \ / \ / \ / \
10 20 15 35 40 50 35 45
Properties
- Complete Binary Tree: Filled level by level, left to right
- Heap Property:
- Max Heap: Parent >= Children
- Min Heap: Parent <= Children
- Array Representation: Can be stored efficiently in an array
Array Representation
For a 0-indexed array:
- Parent of node i: (i-1) // 2
- Left child of node i: 2*i + 1
- Right child of node i: 2*i + 2
Index: 0 1 2 3 4 5 6
Array: [50, 30, 40, 10, 20, 15, 35]
Tree:
50(0)
/ \
30(1) 40(2)
/ \ / \
10(3) 20(4) 15(5) 35(6)
Heap Implementation
class MinHeap:
"""Min Heap implementation using array."""
def __init__(self):
self.heap = []
def parent(self, i):
return (i - 1) // 2
def left_child(self, i):
return 2 * i + 1
def right_child(self, i):
return 2 * i + 2
def swap(self, i, j):
self.heap[i], self.heap[j] = self.heap[j], self.heap[i]
def insert(self, value):
"""Insert value into heap. O(log n)"""
self.heap.append(value)
self._heapify_up(len(self.heap) - 1)
def _heapify_up(self, i):
"""Move element up to maintain heap property."""
while i > 0 and self.heap[self.parent(i)] > self.heap[i]:
self.swap(i, self.parent(i))
i = self.parent(i)
def extract_min(self):
"""Remove and return minimum element. O(log n)"""
if not self.heap:
return None
min_val = self.heap[0]
if len(self.heap) == 1:
self.heap.pop()
else:
self.heap[0] = self.heap.pop()
self._heapify_down(0)
return min_val
def _heapify_down(self, i):
"""Move element down to maintain heap property."""
smallest = i
left = self.left_child(i)
right = self.right_child(i)
if left < len(self.heap) and self.heap[left] < self.heap[smallest]:
smallest = left
if right < len(self.heap) and self.heap[right] < self.heap[smallest]:
smallest = right
if smallest != i:
self.swap(i, smallest)
self._heapify_down(smallest)
def get_min(self):
"""Return minimum without removing."""
return self.heap[0] if self.heap else None
def size(self):
return len(self.heap)
def is_empty(self):
return len(self.heap) == 0
class MaxHeap:
"""Max Heap - same as MinHeap but with comparison reversed."""
def __init__(self):
self.heap = []
def parent(self, i):
return (i - 1) // 2
def left_child(self, i):
return 2 * i + 1
def right_child(self, i):
return 2 * i + 2
def swap(self, i, j):
self.heap[i], self.heap[j] = self.heap[j], self.heap[i]
def insert(self, value):
self.heap.append(value)
self._heapify_up(len(self.heap) - 1)
def _heapify_up(self, i):
while i > 0 and self.heap[self.parent(i)] < self.heap[i]:
self.swap(i, self.parent(i))
i = self.parent(i)
def extract_max(self):
if not self.heap:
return None
max_val = self.heap[0]
if len(self.heap) == 1:
self.heap.pop()
else:
self.heap[0] = self.heap.pop()
self._heapify_down(0)
return max_val
def _heapify_down(self, i):
largest = i
left = self.left_child(i)
right = self.right_child(i)
if left < len(self.heap) and self.heap[left] > self.heap[largest]:
largest = left
if right < len(self.heap) and self.heap[right] > self.heap[largest]:
largest = right
if largest != i:
self.swap(i, largest)
self._heapify_down(largest)
def get_max(self):
return self.heap[0] if self.heap else None
Priority Queue
A priority queue is an abstract data type where elements have priorities, and higher-priority elements are dequeued first.
import heapq
# Python's heapq is a min-heap
# For max priority queue, negate values
class PriorityQueue:
"""Priority queue using heapq (min-heap)."""
def __init__(self):
self.heap = []
self.entry_finder = {}
self.counter = 0
def add_task(self, task, priority=0):
"""Add task with priority (lower = higher priority for min-heap)."""
if task in self.entry_finder:
self.remove_task(task)
self.entry_finder[task] = [priority, self.counter, task]
heapq.heappush(self.heap, [priority, self.counter, task])
self.counter += 1
def remove_task(self, task):
"""Mark task as removed."""
entry = self.entry_finder.pop(task, None)
if entry:
entry[-1] = '$REMOVED$'
def pop_task(self):
"""Remove and return highest priority task."""
while self.heap:
priority, count, task = heapq.heappop(self.heap)
if task != '$REMOVED$':
del self.entry_finder[task]
return task, priority
return None, None
def __len__(self):
return len(self.entry_finder)
def __bool__(self):
return bool(self.entry_finder)
# Example: Max priority queue using negation
class MaxPriorityQueue:
"""Max priority queue (higher values have higher priority)."""
def __init__(self):
self.heap = []
def enqueue(self, val, priority=0):
# Negate priority for max-heap behavior
heapq.heappush(self.heap, (-priority, val))
def dequeue(self):
if not self.heap:
return None
priority, val = heapq.heappop(self.heap)
return val, -priority
def peek(self):
if not self.heap:
return None
priority, val = self.heap[0]
return val, -priority
# Usage
pq = MaxPriorityQueue()
pq.enqueue("Task 1", priority=3)
pq.enqueue("Task 2", priority=1)
pq.enqueue("Task 3", priority=5)
print(pq.dequeue()) # ('Task 3', 5)
print(pq.dequeue()) # ('Task 1', 3)
print(pq.dequeue()) # ('Task 2', 1)
Heap Sort
Heap sort uses a heap to sort elements in-place with O(n log n) time complexity.
def heap_sort(arr):
"""Sort array using heap sort."""
n = len(arr)
# Build max heap
for i in range(n // 2 - 1, -1, -1):
heapify(arr, n, i)
# Extract elements from heap
for i in range(n - 1, 0, -1):
arr[0], arr[i] = arr[i], arr[0]
heapify(arr, i, 0)
return arr
def heapify(arr, n, i):
"""Heapify subtree rooted at index i."""
largest = i
left = 2 * i + 1
right = 2 * i + 2
if left < n and arr[left] > arr[largest]:
largest = left
if right < n and arr[right] > arr[largest]:
largest = right
if largest != i:
arr[i], arr[largest] = arr[largest], arr[i]
heapify(arr, n, largest)
# Example
arr = [12, 11, 13, 5, 6, 7]
print(heap_sort(arr)) # [5, 6, 7, 11, 12, 13]
Practical Applications
1. Kth Largest/Smallest Element
import heapq
def find_kth_largest(nums, k):
"""Find kth largest element using min-heap of size k."""
min_heap = nums[:k]
heapq.heapify(min_heap)
for num in nums[k:]:
if num > min_heap[0]:
heapq.heapreplace(min_heap, num)
return min_heap[0]
def find_kth_smallest(nums, k):
"""Find kth smallest element using max-heap."""
max_heap = [-x for x in nums[:k]]
heapq.heapify(max_heap)
for num in nums[k:]:
if num < -max_heap[0]:
heapq.heapreplace(max_heap, -num)
return -max_heap[0]
# Example
nums = [3, 2, 1, 5, 6, 4]
print(find_kth_largest(nums, 2)) # 5 (2nd largest)
print(find_kth_smallest(nums, 3)) # 3 (3rd smallest)
2. Top K Frequent Elements
from collections import Counter
def top_k_frequent(nums, k):
"""Find top k most frequent elements."""
freq = Counter(nums)
# Use bucket sort: frequency -> elements
bucket = [[] for _ in range(len(nums) + 1)]
for num, count in freq.items():
bucket[count].append(num)
result = []
for i in range(len(nums), 0, -1):
result.extend(bucket[i])
if len(result) >= k:
break
return result[:k]
# Using heap
def top_k_frequent_heap(nums, k):
"""Using min-heap approach."""
freq = Counter(nums)
# (-frequency, element)
heap = [(-count, num) for num, count in freq.items()]
heapq.heapify(heap)
result = []
for _ in range(k):
count, num = heapq.heappop(heap)
result.append(num)
return result
# Example
nums = [1, 1, 1, 2, 2, 3]
print(top_k_frequent(nums, 2)) # [1, 2]
3. Merge K Sorted Arrays
import heapq
def merge_k_sorted(arrays):
"""Merge k sorted arrays into one sorted array."""
result = []
heap = []
# Initialize heap with first element from each array
for i, arr in enumerate(arrays):
if arr:
heapq.heappush(heap, (arr[0], i, 0))
while heap:
val, arr_idx, elem_idx = heapq.heappop(heap)
result.append(val)
# Add next element from same array
if elem_idx + 1 < len(arrays[arr_idx]):
next_val = arrays[arr_idx][elem_idx + 1]
heapq.heappush(heap, (next_val, arr_idx, elem_idx + 1))
return result
# Example
arrays = [
[1, 4, 7, 10],
[2, 5, 8, 11],
[3, 6, 9, 12]
]
print(merge_k_sorted(arrays))
# [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
4. Median of Data Stream
import heapq
class MedianFinder:
"""Find median of a data stream."""
def __init__(self):
# max-heap for lower half
self.low = []
# min-heap for upper half
self.high = []
def add_num(self, num):
# Add to max-heap (negate for max behavior)
heapq.heappush(self.low, -num)
# Balance: largest of low <= smallest of high
heapq.heappush(self.high, -heapq.heappop(self.low))
# Ensure low has >= elements than high
if len(self.low) < len(self.high):
heapq.heappush(self.low, -heapq.heappop(self.high))
def find_median(self):
if len(self.low) > len(self.high):
return -self.low[0]
return (-self.low[0] + self.high[0]) / 2.0
# Example
mf = MedianFinder()
for num in [5, 15, 1, 3]:
mf.add_num(num)
print(f"Added {num}, Median: {mf.find_median()}")
# Added 5, Median: 5
# Added 15, Median: 10.0
# Added 1, Median: 5
# Added 3, Median: 4.0
5. Dijkstra’s Algorithm (already shown in graphs)
def dijkstra(graph, start):
"""Shortest path using min-heap."""
distances = {node: float('inf') for node in graph}
distances[start] = 0
pq = [(0, start)]
while pq:
dist, node = heapq.heappop(pq)
if dist > distances[node]:
continue
for neighbor, weight in graph[node]:
new_dist = dist + weight
if new_dist < distances[neighbor]:
distances[neighbor] = new_dist
heapq.heappush(pq, (new_dist, neighbor))
return distances
Time Complexity Summary
| Operation | Binary Heap | Priority Queue (heapq) |
|---|---|---|
| Insert | O(log n) | O(log n) |
| Extract Min/Max | O(log n) | O(log n) |
| Peek | O(1) | O(1) |
| Build Heap | O(n) | O(n) |
| Search | O(n) | O(n) |
When to Use Heaps
- Finding Kth largest/smallest: Maintain heap of size K
- Merging sorted data: Efficiently merge multiple sorted streams
- Priority scheduling: Task scheduling with priorities
- Median maintenance: Dynamic median calculation
- Dijkstra’s algorithm: Shortest path with non-negative weights
- Heap sort: In-place sorting with O(n log n)
Conclusion
Heaps and priority queues are essential data structures for many algorithmic problems. Their O(log n) operations make them ideal for scenarios requiring frequent insertions and deletions while maintaining order. Master these patterns to efficiently solve problems involving top-K elements, merging sorted data, and priority-based processing.
Comments