Skip to main content
โšก Calmops

IoT at Scale: Device Management, Data Ingestion

Introduction

IoT is transforming industries from manufacturing to healthcare. Building systems that handle millions of devices requires careful architecture decisions around protocol selection, edge computing, and data processing.

Key Statistics:

  • 30 billion IoT devices by 2025
  • IoT data: 80 zettabytes annually
  • Edge computing reduces IoT latency by 90%
  • Average IoT device generates 2GB data daily

IoT Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    IoT System Architecture                          โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚  โ”‚                      Devices                               โ”‚   โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚   โ”‚
โ”‚  โ”‚  โ”‚Sensors  โ”‚  โ”‚ Gateways โ”‚  โ”‚ Actuatorsโ”‚  โ”‚ Edge    โ”‚  โ”‚   โ”‚
โ”‚  โ”‚  โ”‚(Data)   โ”‚  โ”‚(Protocol)โ”‚  โ”‚(Control)โ”‚  โ”‚ Devices โ”‚  โ”‚   โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜  โ”‚   โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ”‚          โ”‚            โ”‚            โ”‚            โ”‚            โ”‚
โ”‚          โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜            โ”‚
โ”‚                                   โ–ผ                          โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚  โ”‚              Message Broker (MQTT, AMQP)               โ”‚   โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ”‚                               โ”‚                                 โ”‚
โ”‚          โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”          โ”‚
โ”‚          โ–ผ                    โ–ผ                    โ–ผ          โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”       โ”‚
โ”‚  โ”‚ Real-time   โ”‚    โ”‚  Storage    โ”‚    โ”‚   Analyticsโ”‚       โ”‚
โ”‚  โ”‚ Processing  โ”‚    โ”‚ (Time-series)โ”‚    โ”‚  (ML/AI)   โ”‚       โ”‚
โ”‚  โ”‚ (Flink)    โ”‚    โ”‚ (InfluxDB)   โ”‚    โ”‚             โ”‚       โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜       โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

MQTT Implementation

#!/usr/bin/env python3
"""MQTT IoT client."""

import paho.mqtt.client as mqtt
import json
from datetime import datetime

class IoTDevice:
    """IoT device client."""
    
    def __init__(self, device_id, broker_url, username=None, password=None):
        self.device_id = device_id
        self.client = mqtt.Client(client_id=device_id)
        
        if username and password:
            self.client.username_pw_set(username, password)
        
        self.client.on_connect = self.on_connect
        self.client.on_disconnect = self.on_disconnect
        self.client.on_message = self.on_message
        
        # Callbacks to override
        self.on_data_received = None
        self.on_command_received = None
    
    def connect(self):
        """Connect to broker."""
        self.client.connect(self.broker_url, 1883, 60)
        self.client.loop_start()
    
    def on_connect(self, client, userdata, flags, rc):
        """Handle connection."""
        if rc == 0:
            print(f"Connected: {self.device_id}")
            # Subscribe to commands
            client.subscribe(f"devices/{self.device_id}/command")
        else:
            print(f"Connection failed: {rc}")
    
    def on_disconnect(self, client, userdata, rc):
        """Handle disconnection."""
        print(f"Disconnected: {rc}")
    
    def on_message(self, client, userdata, msg):
        """Handle incoming message."""
        topic = msg.topic
        payload = json.loads(msg.payload)
        
        if '/command' in topic:
            if self.on_command_received:
                self.on_command_received(payload)
    
    def publish(self, topic, data):
        """Publish data."""
        payload = json.dumps({
            'device_id': self.device_id,
            'timestamp': datetime.utcnow().isoformat(),
            'data': data
        })
        
        self.client.publish(topic, payload)
    
    def publish_telemetry(self, sensor_data):
        """Publish sensor telemetry."""
        self.publish(f"devices/{self.device_id}/telemetry", sensor_data)
    
    def report_state(self, state):
        """Report device state."""
        self.publish(f"devices/{self.device_id}/state", state)

AWS IoT Core

# Terraform AWS IoT Core
resource "aws_iot_thing" "device" {
  name = "device-${var.device_id}"
  
  attributes = {
    device_type = var.device_type
    location    = var.location
  }
}

resource "aws_iot_thing_type" "device_type" {
  name = var.device_type
}

resource "aws_iot_certificate" "device_cert" {
  active = true
}

resource "aws_iot_thing_attachment" "attach" {
  thing_name = aws_iot_thing.device.name
  cert_arn   = aws_iot_certificate.device_cert.arn
}

resource "aws_iot_topic_rule" "telemetry" {
  name = "telemetry-rule"
  
  sql = "SELECT * FROM 'devices/+/telemetry'"
  
  action {
    lambda {
      function_arn = aws_lambda_function.telemetry_processor.arn
    }
  }
  
  action {
    firehose {
      delivery_stream_name = aws_kinesis_firehose_delivery_stream.s3.name
      role_arn = aws_iam_role.iot_firehose.arn
    }
  }
}

Time-Series Storage

#!/usr/bin/env python3
"""InfluxDB for IoT data."""

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

class IoTTimeSeries:
    """Store IoT data in InfluxDB."""
    
    def __init__(self, url, token, org, bucket):
        self.client = InfluxDBClient(url=url, token=token, org=org)
        self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
        self.query_api = self.client.query_api()
        self.bucket = bucket
    
    def write_telemetry(self, device_id, measurements):
        """Write telemetry data."""
        
        points = []
        
        for key, value in measurements.items():
            point = (
                Point("telemetry")
                .tag("device_id", device_id)
                .field(key, value)
                .time(datetime.utcnow())
            )
            points.append(point)
        
        self.write_api.write(bucket=self.bucket, org="my-org", record=points)
    
    def query_device_data(self, device_id, start, stop):
        """Query device data."""
        
        query = f'''
        from(bucket: "{self.bucket}")
          |> range(start: {start}, stop: {stop})
          |> filter(fn: (r) => r._measurement == "telemetry")
          |> filter(fn: (r) => r.device_id == "{device_id}")
        '''
        
        return self.query_api.query_data_frame(query)
    
    def aggregate_device_data(self, device_id, field, interval='1h'):
        """Aggregate device data."""
        
        query = f'''
        from(bucket: "{self.bucket}")
          |> range(start: -30d)
          |> filter(fn: (r) => r._measurement == "telemetry")
          |> filter(fn: (r) => r.device_id == "{device_id}")
          |> filter(fn: (r) => r._field == "{field}")
          |> aggregateWindow(every: {interval}, fn: mean, createEmpty: false)
        '''
        
        return self.query_api.query_data_frame(query)

Edge Processing

#!/usr/bin/env python3
"""Edge processing for IoT."""

class EdgeProcessor:
    """Process data at the edge."""
    
    def __init__(self):
        self.rules = []
    
    def add_rule(self, rule):
        """Add processing rule."""
        self.rules.append(rule)
    
    def process(self, data):
        """Process incoming data."""
        
        results = {'raw': data, 'processed': [], 'alerts': []}
        
        for rule in self.rules:
            try:
                result = rule.execute(data)
                
                if result.get('action') == 'alert':
                    results['alerts'].append(result)
                elif result.get('action') == 'transform':
                    results['processed'].append(result)
                    
            except Exception as e:
                results['errors'] = str(e)
        
        return results

# Example edge rules
class ThresholdRule:
    """Alert when threshold exceeded."""
    
    def __init__(self, field, threshold, severity='warning'):
        self.field = field
        self.threshold = threshold
        self.severity = severity
    
    def execute(self, data):
        value = data.get(self.field)
        
        if value and value > self.threshold:
            return {
                'action': 'alert',
                'rule': 'threshold',
                'field': self.field,
                'value': value,
                'threshold': self.threshold,
                'severity': self.severity,
                'timestamp': datetime.utcnow().isoformat()
            }
        
        return None

class AggregationRule:
    """Aggregate values at edge."""
    
    def __init__(self, window='1m', function='mean'):
        self.window = window
        self.function = function
        self.buffer = []
    
    def execute(self, data):
        self.buffer.append(data)
        
        # Simple aggregation
        values = [d.get('value', 0) for d in self.buffer]
        
        if self.function == 'mean':
            result = sum(values) / len(values)
        elif self.function == 'sum':
            result = sum(values)
        
        return {
            'action': 'transform',
            'aggregated': result,
            'count': len(values)
        }

External Resources


Comments