Skip to main content
โšก Calmops

Mesh Networks: Decentralized Communication and Networking

Introduction

Mesh networks enable devices to communicate directly without central infrastructure. This article covers mesh network architecture, routing protocols, and implementation patterns.

Key Statistics:

  • Mesh networks: 50% coverage improvement over traditional
  • FireChat: 30M+ users during disasters
  • Helium: 400K+ hotspots
  • Rural connectivity: 70% cost reduction

Mesh Network Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              Mesh Network Topologies                                        โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚  Full Mesh                                                        โ”‚
โ”‚  โ”œโ”€โ”€ Every node connects to every other node                    โ”‚
โ”‚  โ”œโ”€โ”€ N nodes: N*(N-1)/2 connections                             โ”‚
โ”‚  โ””โ”€โ”€ Use: Small networks, critical systems                      โ”‚
โ”‚                                                                  โ”‚
โ”‚  Partial Mesh                                                     โ”‚
โ”‚  โ”œโ”€โ”€ Nodes connect to nearby neighbors                          โ”‚
โ”‚  โ”œโ”€โ”€ Scalable to large networks                                โ”‚
โ”‚  โ””โ”€โ”€ Use: Most practical implementations                        โ”‚
โ”‚                                                                  โ”‚
โ”‚  Hybrid Mesh                                                      โ”‚
โ”‚  โ”œโ”€โ”€ Mesh combined with infrastructure                          โ”‚
โ”‚  โ”œโ”€โ”€ Gateway nodes for internet access                          โ”‚
โ”‚  โ””โ”€โ”€ Use: Community networks, IoT                              โ”‚
โ”‚                                                                  โ”‚
โ”‚  Layered/ Hierarchical                                           โ”‚
โ”‚  โ”œโ”€โ”€ Supernodes with full mesh                                  โ”‚
โ”‚  โ””โ”€โ”€ Child nodes connect to supernodes                         โ”‚
โ”‚                                                                  โ”‚
โ”‚  Key Protocols                                                   โ”‚
โ”‚  โ”œโ”€โ”€ BATMAN (Better Approach To Mobile Ad-hoc Networking)      โ”‚
โ”‚  โ”œโ”€โ”€ OLSR (Optimized Link State Routing)                       โ”‚
โ”‚  โ”œโ”€โ”€ Babel                                                      โ”‚
โ”‚  โ””โ”€โ”€ Yggdrasil (encrypted mesh routing)                        โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Mesh Routing Implementation

#!/usr/bin/env python3
"""Mesh network routing protocol implementation."""

import socket
import struct
import threading
import time
from typing import Dict, List, Optional
from dataclasses import dataclass
from collections import defaultdict

@dataclass
class MeshNode:
    """Mesh network node."""
    
    node_id: str
    ip_address: str
    port: int
    neighbors: List[str]
    last_seen: float
    
    def is_alive(self, timeout: int = 30) -> bool:
        """Check if node is still alive."""
        return time.time() - self.last_seen < timeout

class MeshPacket:
    """Mesh network packet."""
    
    TYPE_DATA = 1
    TYPE_ROUTE_REQUEST = 2
    TYPE_ROUTE_REPLY = 3
    TYPE_ROUTE_ERROR = 4
    TYPE_BEACON = 5
    
    def __init__(self, packet_type: int, source: str, 
                 destination: str, payload: bytes):
        self.type = packet_type
        self.source = source
        self.destination = destination
        self.payload = payload
        self.ttl = 64
        self.hop_limit = 10
    
    def serialize(self) -> bytes:
        """Serialize packet."""
        
        header = struct.pack('!BBHH', 
            self.type,
            self.ttl,
            self.hop_limit,
            len(self.payload)
        )
        
        # Source and destination (16 bytes each)
        src = self.source.encode().ljust(16, b'\x00')
        dst = self.destination.encode().ljust(16, b'\x00')
        
        return header + src + dst + self.payload
    
    @classmethod
    def deserialize(cls, data: bytes) -> 'MeshPacket':
        """Deserialize packet."""
        
        header = struct.unpack('!BBHH', data[:6])
        
        packet = cls(
            packet_type=header[0],
            source=data[6:22].decode().strip('\x00'),
            destination=data[22:38].decode().strip('\x00'),
            payload=data[38:]
        )
        
        packet.ttl = header[1]
        packet.hop_limit = header[2]
        
        return packet

class BABEL_Routing:
    """BABEL routing protocol implementation."""
    
    def __init__(self, node_id: str):
        self.node_id = node_id
        self.routing_table: Dict[str, Dict] = {}
        self.adjacency: Dict[str, List[str]] = defaultdict(list)
        self.seqno = 0
    
    def update_neighbor(self, neighbor_id: str, rxcost: int):
        """Update neighbor information."""
        
        self.adjacency[self.node_id].append(neighbor_id)
        
        # Update routing table
        self._update_route(neighbor_id, neighbor_id, rxcost)
    
    def _update_route(self, prefix: str, gateway: str, cost: int):
        """Update route in routing table."""
        
        self.routing_table[prefix] = {
            'gateway': gateway,
            'cost': cost,
            'seqno': self.seqno,
            'time': time.time()
        }
    
    def calculate_route_cost(self, destination: str) -> int:
        """Calculate route cost using hop count."""
        
        if destination not in self.routing_table:
            return float('inf')
        
        # Simple cost: number of hops
        return self.routing_table[destination]['cost']
    
    def find_next_hop(self, destination: str) -> Optional[str]:
        """Find next hop for destination."""
        
        if destination not in self.routing_table:
            return None
        
        return self.routing_table[destination]['gateway']

class YggdrasilMesh:
    """Yggdrasil mesh routing implementation."""
    
    def __init__(self, node_id: str):
        self.node_id = node_id
        self.tree = {}  # Tree coordinates
        self.coords = self._generate_coords()
        self.peers = {}
    
    def _generate_coords(self) -> bytes:
        """Generate Yggdrasil coordinates."""
        
        import hashlib
        
        # Coordinate based on node ID and time
        coord = hashlib.sha256(
            f"{self.node_id}:{time.time()}".encode()
        ).digest()[:32]
        
        return coord
    
    def add_peer(self, peer_id: str, peer_coord: bytes):
        """Add peer connection."""
        
        self.peers[peer_id] = {
            'coord': peer_coord,
            'connected': True,
            'last_seen': time.time()
        }
    
    def find_route(self, destination_coord: bytes) -> str:
        """Find route using tree coordinates."""
        
        best_peer = None
        best_distance = float('inf')
        
        for peer_id, peer in self.peers.items():
            dist = self._coord_distance(peer['coord'], destination_coord)
            
            if dist < best_distance:
                best_distance = dist
                best_peer = peer_id
        
        return best_peer
    
    def _coord_distance(self, coord1: bytes, coord2: bytes) -> int:
        """Calculate distance between coordinates."""
        
        # XOR distance
        xored = bytes(a ^ b for a, b in zip(coord1, coord2))
        
        # Find first non-zero bit
        for i, b in enumerate(xored):
            if b != 0:
                return i * 8 + (b.bit_length() - 1)
        
        return 0

class MeshNetworkManager:
    """Manage mesh network operations."""
    
    def __init__(self, node_id: str, port: int = 45678):
        self.node_id = node_id
        self.port = port
        self.nodes: Dict[str, MeshNode] = {}
        self.routing = BABEL_Routing(node_id)
        self.socket = None
        self.running = False
    
    def start(self):
        """Start mesh network node."""
        
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        self.socket.bind(('0.0.0.0', self.port))
        
        self.running = True
        
        # Start threads
        threading.Thread(target=self._receive_loop, daemon=True).start()
        threading.Thread(target=self._beacon_loop, daemon=True).start()
        threading.Thread(target=self._cleanup_loop, daemon=True).start()
    
    def _receive_loop(self):
        """Receive and process packets."""
        
        while self.running:
            try:
                data, addr = self.socket.recvfrom(4096)
                packet = MeshPacket.deserialize(data)
                self._process_packet(packet, addr)
            except Exception as e:
                print(f"Receive error: {e}")
    
    def _process_packet(self, packet: MeshPacket, addr):
        """Process received packet."""
        
        if packet.type == MeshPacket.TYPE_BEACON:
            self._handle_beacon(packet)
        
        elif packet.type == MeshPacket.TYPE_DATA:
            self._handle_data(packet)
    
    def _handle_beacon(self, packet: MeshPacket):
        """Handle beacon packet."""
        
        source = packet.source
        
        if source not in self.nodes:
            self.nodes[source] = MeshNode(
                node_id=source,
                ip_address=packet.payload.decode(),
                port=45678,
                neighbors=[],
                last_seen=time.time()
            )
        else:
            self.nodes[source].last_seen = time.time()
        
        # Update routing
        self.routing.update_neighbor(source, 10)  # Default cost
    
    def _handle_data(self, packet: MeshPacket):
        """Handle data packet."""
        
        if packet.destination == self.node_id:
            # For us
            print(f"Received: {packet.payload}")
        else:
            # Forward
            next_hop = self.routing.find_next_hop(packet.destination)
            if next_hop:
                self._forward(packet, next_hop)
    
    def send_to(self, destination: str, data: bytes):
        """Send data to destination."""
        
        packet = MeshPacket(
            MeshPacket.TYPE_DATA,
            self.node_id,
            destination,
            data
        )
        
        next_hop = self.routing.find_next_hop(destination)
        
        if next_hop and next_hop in self.nodes:
            self._send_packet(packet, next_hop)
    
    def _send_packet(self, packet: MeshPacket, next_hop: str):
        """Send packet to next hop."""
        
        node = self.nodes.get(next_hop)
        if node:
            self.socket.sendto(
                packet.serialize(),
                (node.ip_address, node.port)
            )
    
    def _forward(self, packet: MeshPacket, next_hop: str):
        """Forward packet."""
        
        packet.ttl -= 1
        
        if packet.ttl > 0:
            self._send_packet(packet, next_hop)
    
    def _beacon_loop(self):
        """Send periodic beacons."""
        
        while self.running:
            packet = MeshPacket(
                MeshPacket.TYPE_BEACON,
                self.node_id,
                "broadcast",
                socket.gethostbyname(socket.gethostname()).encode()
            )
            
            self.socket.sendto(
                packet.serialize(),
                ('<broadcast>', self.port)
            )
            
            time.sleep(5)
    
    def _cleanup_loop(self):
        """Remove stale nodes."""
        
        while self.running:
            stale = [
                nid for nid, node in self.nodes.items()
                if not node.is_alive(60)
            ]
            
            for nid in stale:
                del self.nodes[nid]
            
            time.sleep(10)

Peer-to-Peer Discovery

#!/usr/bin/env python3
"""P2P peer discovery and connection management."""

import asyncio
import random
from typing import Set, Dict
from dataclasses import dataclass

@dataclass
class Peer:
    """Peer information."""
    
    peer_id: str
    address: str
    port: int
    last_seen: float
    
    @property
    def endpoint(self) -> str:
        return f"{self.address}:{self.port}"

class P2PDiscovery:
    """P2P peer discovery protocol."""
    
    def __init__(self, peer_id: str, port: int = 45679):
        self.peer_id = peer_id
        self.port = port
        self.peers: Dict[str, Peer] = {}
        self.bootstrap_nodes: Set[str] = set()
    
    async def start(self):
        """Start peer discovery."""
        
        # Connect to bootstrap nodes
        for node in self.bootstrap_nodes:
            await self.connect_to_peer(node)
        
        # Start accepting connections
        server = await asyncio.start_server(
            self._handle_connection,
            '0.0.0.0',
            self.port
        )
        
        async with server:
            await server.serve_forever()
    
    async def connect_to_peer(self, endpoint: str):
        """Connect to a peer."""
        
        try:
            reader, writer = await asyncio.wait_for(
                asyncio.open_connection(*endpoint.split(':')),
                timeout=5
            )
            
            # Send handshake
            writer.write(f"HELLO:{self.peer_id}\n".encode())
            await writer.drain()
            
            # Wait for response
            response = await reader.readline()
            peer_id = response.decode().strip().split(':')[1]
            
            # Add to peers
            self.peers[peer_id] = Peer(
                peer_id=peer_id,
                address=endpoint.split(':')[0],
                port=int(endpoint.split(':')[1]),
                last_seen=asyncio.get_event_loop().time()
            )
            
            # Start message loop
            asyncio.create_task(self._peer_loop(peer_id, reader, writer))
            
        except Exception as e:
            print(f"Failed to connect to {endpoint}: {e}")
    
    async def _handle_connection(self, reader, writer):
        """Handle incoming connection."""
        
        try:
            # Read handshake
            data = await reader.readline()
            peer_id = data.decode().strip().split(':')[1]
            
            # Send response
            writer.write(f"HELLO:{self.peer_id}\n".encode())
            await writer.drain()
            
            # Add peer
            addr = writer.get_extra_info('peername')
            self.peers[peer_id] = Peer(
                peer_id=peer_id,
                address=addr[0],
                port=addr[1],
                last_seen=asyncio.get_event_loop().time()
            )
            
            # Start message loop
            await self._peer_loop(peer_id, reader, writer)
            
        except Exception as e:
            print(f"Connection error: {e}")
    
    async def _peer_loop(self, peer_id: str, reader, writer):
        """Main peer message loop."""
        
        try:
            while True:
                data = await reader.readline()
                
                if not data:
                    break
                
                # Update last seen
                if peer_id in self.peers:
                    self.peers[peer_id].last_seen = asyncio.get_event_loop().time()
                
                # Process message
                await self._process_message(peer_id, data)
        
        except Exception as e:
            print(f"Peer loop error: {e}")
        
        finally:
            # Remove peer on disconnect
            if peer_id in self.peers:
                del self.peers[peer_id]
    
    async def _process_message(self, peer_id: str, data: bytes):
        """Process peer message."""
        
        # Handle different message types
        pass
    
    def get_random_peers(self, count: int = 3) -> list:
        """Get random peers for discovery."""
        
        peer_list = list(self.peers.values())
        
        if len(peer_list) <= count:
            return peer_list
        
        return random.sample(peer_list, count)

Network Topology Management

# Mesh network topology configuration
mesh_network:
  topology:
    type: "hybrid"  # full, partial, hybrid
    protocol: "babel"  # batman, olsr, babel, yggdrasil
    
  node_configuration:
    node_id: "mesh-node-001"
    ip_range: "10.0.0.0/24"
    port: 45678
    
    # Radio configuration
    radio:
      interface: "wlan0"
      mode: "ad-hoc"
      channel: 6
      ssid: "community-mesh"
      
  routing:
    protocol: "babel"
    options:
      # Babel specific
      hello_interval: 4  # seconds
      update_interval: 10  # seconds
      honeybee_neighbors: 4
      max_queue_length: 32
      
      # Performance
      split_horizon: true
      path_metric: "etx"  # Expected Transmissions
      
  security:
    encryption: "wireguard"  # or "ipsec"
    key_exchange: "curve25519"
    
  gateway:
    enabled: true
    upstream: "eth0"
    nat: true
    dns_servers:
      - "1.1.1.1"
      - "8.8.8.8"
    
  monitoring:
    metrics:
      - "packet_loss"
      - "latency"
      - "throughput"
      - "neighbors"
    interval: 30  # seconds
    
  failover:
    enabled: true
    threshold: 3  # failed pings before failover
    cooldown: 60  # seconds

  # Performance tuning
  optimization:
    mtu: 1500
    buffer_size: 4096
    window_size: 64

External Resources


Comments