Introduction
Mesh networks enable devices to communicate directly without central infrastructure. This article covers mesh network architecture, routing protocols, and implementation patterns.
Key Statistics:
- Mesh networks: 50% coverage improvement over traditional
- FireChat: 30M+ users during disasters
- Helium: 400K+ hotspots
- Rural connectivity: 70% cost reduction
Mesh Network Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Mesh Network Topologies โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Full Mesh โ
โ โโโ Every node connects to every other node โ
โ โโโ N nodes: N*(N-1)/2 connections โ
โ โโโ Use: Small networks, critical systems โ
โ โ
โ Partial Mesh โ
โ โโโ Nodes connect to nearby neighbors โ
โ โโโ Scalable to large networks โ
โ โโโ Use: Most practical implementations โ
โ โ
โ Hybrid Mesh โ
โ โโโ Mesh combined with infrastructure โ
โ โโโ Gateway nodes for internet access โ
โ โโโ Use: Community networks, IoT โ
โ โ
โ Layered/ Hierarchical โ
โ โโโ Supernodes with full mesh โ
โ โโโ Child nodes connect to supernodes โ
โ โ
โ Key Protocols โ
โ โโโ BATMAN (Better Approach To Mobile Ad-hoc Networking) โ
โ โโโ OLSR (Optimized Link State Routing) โ
โ โโโ Babel โ
โ โโโ Yggdrasil (encrypted mesh routing) โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Mesh Routing Implementation
#!/usr/bin/env python3
"""Mesh network routing protocol implementation."""
import socket
import struct
import threading
import time
from typing import Dict, List, Optional
from dataclasses import dataclass
from collections import defaultdict
@dataclass
class MeshNode:
"""Mesh network node."""
node_id: str
ip_address: str
port: int
neighbors: List[str]
last_seen: float
def is_alive(self, timeout: int = 30) -> bool:
"""Check if node is still alive."""
return time.time() - self.last_seen < timeout
class MeshPacket:
"""Mesh network packet."""
TYPE_DATA = 1
TYPE_ROUTE_REQUEST = 2
TYPE_ROUTE_REPLY = 3
TYPE_ROUTE_ERROR = 4
TYPE_BEACON = 5
def __init__(self, packet_type: int, source: str,
destination: str, payload: bytes):
self.type = packet_type
self.source = source
self.destination = destination
self.payload = payload
self.ttl = 64
self.hop_limit = 10
def serialize(self) -> bytes:
"""Serialize packet."""
header = struct.pack('!BBHH',
self.type,
self.ttl,
self.hop_limit,
len(self.payload)
)
# Source and destination (16 bytes each)
src = self.source.encode().ljust(16, b'\x00')
dst = self.destination.encode().ljust(16, b'\x00')
return header + src + dst + self.payload
@classmethod
def deserialize(cls, data: bytes) -> 'MeshPacket':
"""Deserialize packet."""
header = struct.unpack('!BBHH', data[:6])
packet = cls(
packet_type=header[0],
source=data[6:22].decode().strip('\x00'),
destination=data[22:38].decode().strip('\x00'),
payload=data[38:]
)
packet.ttl = header[1]
packet.hop_limit = header[2]
return packet
class BABEL_Routing:
"""BABEL routing protocol implementation."""
def __init__(self, node_id: str):
self.node_id = node_id
self.routing_table: Dict[str, Dict] = {}
self.adjacency: Dict[str, List[str]] = defaultdict(list)
self.seqno = 0
def update_neighbor(self, neighbor_id: str, rxcost: int):
"""Update neighbor information."""
self.adjacency[self.node_id].append(neighbor_id)
# Update routing table
self._update_route(neighbor_id, neighbor_id, rxcost)
def _update_route(self, prefix: str, gateway: str, cost: int):
"""Update route in routing table."""
self.routing_table[prefix] = {
'gateway': gateway,
'cost': cost,
'seqno': self.seqno,
'time': time.time()
}
def calculate_route_cost(self, destination: str) -> int:
"""Calculate route cost using hop count."""
if destination not in self.routing_table:
return float('inf')
# Simple cost: number of hops
return self.routing_table[destination]['cost']
def find_next_hop(self, destination: str) -> Optional[str]:
"""Find next hop for destination."""
if destination not in self.routing_table:
return None
return self.routing_table[destination]['gateway']
class YggdrasilMesh:
"""Yggdrasil mesh routing implementation."""
def __init__(self, node_id: str):
self.node_id = node_id
self.tree = {} # Tree coordinates
self.coords = self._generate_coords()
self.peers = {}
def _generate_coords(self) -> bytes:
"""Generate Yggdrasil coordinates."""
import hashlib
# Coordinate based on node ID and time
coord = hashlib.sha256(
f"{self.node_id}:{time.time()}".encode()
).digest()[:32]
return coord
def add_peer(self, peer_id: str, peer_coord: bytes):
"""Add peer connection."""
self.peers[peer_id] = {
'coord': peer_coord,
'connected': True,
'last_seen': time.time()
}
def find_route(self, destination_coord: bytes) -> str:
"""Find route using tree coordinates."""
best_peer = None
best_distance = float('inf')
for peer_id, peer in self.peers.items():
dist = self._coord_distance(peer['coord'], destination_coord)
if dist < best_distance:
best_distance = dist
best_peer = peer_id
return best_peer
def _coord_distance(self, coord1: bytes, coord2: bytes) -> int:
"""Calculate distance between coordinates."""
# XOR distance
xored = bytes(a ^ b for a, b in zip(coord1, coord2))
# Find first non-zero bit
for i, b in enumerate(xored):
if b != 0:
return i * 8 + (b.bit_length() - 1)
return 0
class MeshNetworkManager:
"""Manage mesh network operations."""
def __init__(self, node_id: str, port: int = 45678):
self.node_id = node_id
self.port = port
self.nodes: Dict[str, MeshNode] = {}
self.routing = BABEL_Routing(node_id)
self.socket = None
self.running = False
def start(self):
"""Start mesh network node."""
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.socket.bind(('0.0.0.0', self.port))
self.running = True
# Start threads
threading.Thread(target=self._receive_loop, daemon=True).start()
threading.Thread(target=self._beacon_loop, daemon=True).start()
threading.Thread(target=self._cleanup_loop, daemon=True).start()
def _receive_loop(self):
"""Receive and process packets."""
while self.running:
try:
data, addr = self.socket.recvfrom(4096)
packet = MeshPacket.deserialize(data)
self._process_packet(packet, addr)
except Exception as e:
print(f"Receive error: {e}")
def _process_packet(self, packet: MeshPacket, addr):
"""Process received packet."""
if packet.type == MeshPacket.TYPE_BEACON:
self._handle_beacon(packet)
elif packet.type == MeshPacket.TYPE_DATA:
self._handle_data(packet)
def _handle_beacon(self, packet: MeshPacket):
"""Handle beacon packet."""
source = packet.source
if source not in self.nodes:
self.nodes[source] = MeshNode(
node_id=source,
ip_address=packet.payload.decode(),
port=45678,
neighbors=[],
last_seen=time.time()
)
else:
self.nodes[source].last_seen = time.time()
# Update routing
self.routing.update_neighbor(source, 10) # Default cost
def _handle_data(self, packet: MeshPacket):
"""Handle data packet."""
if packet.destination == self.node_id:
# For us
print(f"Received: {packet.payload}")
else:
# Forward
next_hop = self.routing.find_next_hop(packet.destination)
if next_hop:
self._forward(packet, next_hop)
def send_to(self, destination: str, data: bytes):
"""Send data to destination."""
packet = MeshPacket(
MeshPacket.TYPE_DATA,
self.node_id,
destination,
data
)
next_hop = self.routing.find_next_hop(destination)
if next_hop and next_hop in self.nodes:
self._send_packet(packet, next_hop)
def _send_packet(self, packet: MeshPacket, next_hop: str):
"""Send packet to next hop."""
node = self.nodes.get(next_hop)
if node:
self.socket.sendto(
packet.serialize(),
(node.ip_address, node.port)
)
def _forward(self, packet: MeshPacket, next_hop: str):
"""Forward packet."""
packet.ttl -= 1
if packet.ttl > 0:
self._send_packet(packet, next_hop)
def _beacon_loop(self):
"""Send periodic beacons."""
while self.running:
packet = MeshPacket(
MeshPacket.TYPE_BEACON,
self.node_id,
"broadcast",
socket.gethostbyname(socket.gethostname()).encode()
)
self.socket.sendto(
packet.serialize(),
('<broadcast>', self.port)
)
time.sleep(5)
def _cleanup_loop(self):
"""Remove stale nodes."""
while self.running:
stale = [
nid for nid, node in self.nodes.items()
if not node.is_alive(60)
]
for nid in stale:
del self.nodes[nid]
time.sleep(10)
Peer-to-Peer Discovery
#!/usr/bin/env python3
"""P2P peer discovery and connection management."""
import asyncio
import random
from typing import Set, Dict
from dataclasses import dataclass
@dataclass
class Peer:
"""Peer information."""
peer_id: str
address: str
port: int
last_seen: float
@property
def endpoint(self) -> str:
return f"{self.address}:{self.port}"
class P2PDiscovery:
"""P2P peer discovery protocol."""
def __init__(self, peer_id: str, port: int = 45679):
self.peer_id = peer_id
self.port = port
self.peers: Dict[str, Peer] = {}
self.bootstrap_nodes: Set[str] = set()
async def start(self):
"""Start peer discovery."""
# Connect to bootstrap nodes
for node in self.bootstrap_nodes:
await self.connect_to_peer(node)
# Start accepting connections
server = await asyncio.start_server(
self._handle_connection,
'0.0.0.0',
self.port
)
async with server:
await server.serve_forever()
async def connect_to_peer(self, endpoint: str):
"""Connect to a peer."""
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(*endpoint.split(':')),
timeout=5
)
# Send handshake
writer.write(f"HELLO:{self.peer_id}\n".encode())
await writer.drain()
# Wait for response
response = await reader.readline()
peer_id = response.decode().strip().split(':')[1]
# Add to peers
self.peers[peer_id] = Peer(
peer_id=peer_id,
address=endpoint.split(':')[0],
port=int(endpoint.split(':')[1]),
last_seen=asyncio.get_event_loop().time()
)
# Start message loop
asyncio.create_task(self._peer_loop(peer_id, reader, writer))
except Exception as e:
print(f"Failed to connect to {endpoint}: {e}")
async def _handle_connection(self, reader, writer):
"""Handle incoming connection."""
try:
# Read handshake
data = await reader.readline()
peer_id = data.decode().strip().split(':')[1]
# Send response
writer.write(f"HELLO:{self.peer_id}\n".encode())
await writer.drain()
# Add peer
addr = writer.get_extra_info('peername')
self.peers[peer_id] = Peer(
peer_id=peer_id,
address=addr[0],
port=addr[1],
last_seen=asyncio.get_event_loop().time()
)
# Start message loop
await self._peer_loop(peer_id, reader, writer)
except Exception as e:
print(f"Connection error: {e}")
async def _peer_loop(self, peer_id: str, reader, writer):
"""Main peer message loop."""
try:
while True:
data = await reader.readline()
if not data:
break
# Update last seen
if peer_id in self.peers:
self.peers[peer_id].last_seen = asyncio.get_event_loop().time()
# Process message
await self._process_message(peer_id, data)
except Exception as e:
print(f"Peer loop error: {e}")
finally:
# Remove peer on disconnect
if peer_id in self.peers:
del self.peers[peer_id]
async def _process_message(self, peer_id: str, data: bytes):
"""Process peer message."""
# Handle different message types
pass
def get_random_peers(self, count: int = 3) -> list:
"""Get random peers for discovery."""
peer_list = list(self.peers.values())
if len(peer_list) <= count:
return peer_list
return random.sample(peer_list, count)
Network Topology Management
# Mesh network topology configuration
mesh_network:
topology:
type: "hybrid" # full, partial, hybrid
protocol: "babel" # batman, olsr, babel, yggdrasil
node_configuration:
node_id: "mesh-node-001"
ip_range: "10.0.0.0/24"
port: 45678
# Radio configuration
radio:
interface: "wlan0"
mode: "ad-hoc"
channel: 6
ssid: "community-mesh"
routing:
protocol: "babel"
options:
# Babel specific
hello_interval: 4 # seconds
update_interval: 10 # seconds
honeybee_neighbors: 4
max_queue_length: 32
# Performance
split_horizon: true
path_metric: "etx" # Expected Transmissions
security:
encryption: "wireguard" # or "ipsec"
key_exchange: "curve25519"
gateway:
enabled: true
upstream: "eth0"
nat: true
dns_servers:
- "1.1.1.1"
- "8.8.8.8"
monitoring:
metrics:
- "packet_loss"
- "latency"
- "throughput"
- "neighbors"
interval: 30 # seconds
failover:
enabled: true
threshold: 3 # failed pings before failover
cooldown: 60 # seconds
# Performance tuning
optimization:
mtu: 1500
buffer_size: 4096
window_size: 64
External Resources
Related Articles
- Edge Computing: Cloudflare Workers, AWS Lambda@Edge
- Decentralized Systems: IPFS, Distributed Storage
- 5G Infrastructure: Network Slicing, Edge MEC
Comments