Introduction
IoT is transforming industries from manufacturing to healthcare. Building systems that handle millions of devices requires careful architecture decisions around protocol selection, edge computing, and data processing.
Key Statistics:
- 30 billion IoT devices by 2025
- IoT data: 80 zettabytes annually
- Edge computing reduces IoT latency by 90%
- Average IoT device generates 2GB data daily
IoT Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ IoT System Architecture โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Devices โ โ
โ โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ โ
โ โ โSensors โ โ Gateways โ โ Actuatorsโ โ Edge โ โ โ
โ โ โ(Data) โ โ(Protocol)โ โ(Control)โ โ Devices โ โ โ
โ โ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โ โ
โ โโโโโโโโโผโโโโโโโโโโโโโผโโโโโโโโโโโโโผโโโโโโโโโโโโโผโโโโโโโโ โ
โ โ โ โ โ โ
โ โโโโโโโโโโโโโโดโโโโโโโโโโโโโผโโโโโโโโโโโโโ โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Message Broker (MQTT, AMQP) โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโ โ
โ โผ โผ โผ โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โ โ Real-time โ โ Storage โ โ Analyticsโ โ
โ โ Processing โ โ (Time-series)โ โ (ML/AI) โ โ
โ โ (Flink) โ โ (InfluxDB) โ โ โ โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
MQTT Implementation
#!/usr/bin/env python3
"""MQTT IoT client."""
import paho.mqtt.client as mqtt
import json
from datetime import datetime
class IoTDevice:
"""IoT device client."""
def __init__(self, device_id, broker_url, username=None, password=None):
self.device_id = device_id
self.client = mqtt.Client(client_id=device_id)
if username and password:
self.client.username_pw_set(username, password)
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
# Callbacks to override
self.on_data_received = None
self.on_command_received = None
def connect(self):
"""Connect to broker."""
self.client.connect(self.broker_url, 1883, 60)
self.client.loop_start()
def on_connect(self, client, userdata, flags, rc):
"""Handle connection."""
if rc == 0:
print(f"Connected: {self.device_id}")
# Subscribe to commands
client.subscribe(f"devices/{self.device_id}/command")
else:
print(f"Connection failed: {rc}")
def on_disconnect(self, client, userdata, rc):
"""Handle disconnection."""
print(f"Disconnected: {rc}")
def on_message(self, client, userdata, msg):
"""Handle incoming message."""
topic = msg.topic
payload = json.loads(msg.payload)
if '/command' in topic:
if self.on_command_received:
self.on_command_received(payload)
def publish(self, topic, data):
"""Publish data."""
payload = json.dumps({
'device_id': self.device_id,
'timestamp': datetime.utcnow().isoformat(),
'data': data
})
self.client.publish(topic, payload)
def publish_telemetry(self, sensor_data):
"""Publish sensor telemetry."""
self.publish(f"devices/{self.device_id}/telemetry", sensor_data)
def report_state(self, state):
"""Report device state."""
self.publish(f"devices/{self.device_id}/state", state)
AWS IoT Core
# Terraform AWS IoT Core
resource "aws_iot_thing" "device" {
name = "device-${var.device_id}"
attributes = {
device_type = var.device_type
location = var.location
}
}
resource "aws_iot_thing_type" "device_type" {
name = var.device_type
}
resource "aws_iot_certificate" "device_cert" {
active = true
}
resource "aws_iot_thing_attachment" "attach" {
thing_name = aws_iot_thing.device.name
cert_arn = aws_iot_certificate.device_cert.arn
}
resource "aws_iot_topic_rule" "telemetry" {
name = "telemetry-rule"
sql = "SELECT * FROM 'devices/+/telemetry'"
action {
lambda {
function_arn = aws_lambda_function.telemetry_processor.arn
}
}
action {
firehose {
delivery_stream_name = aws_kinesis_firehose_delivery_stream.s3.name
role_arn = aws_iam_role.iot_firehose.arn
}
}
}
Time-Series Storage
#!/usr/bin/env python3
"""InfluxDB for IoT data."""
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
class IoTTimeSeries:
"""Store IoT data in InfluxDB."""
def __init__(self, url, token, org, bucket):
self.client = InfluxDBClient(url=url, token=token, org=org)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.query_api = self.client.query_api()
self.bucket = bucket
def write_telemetry(self, device_id, measurements):
"""Write telemetry data."""
points = []
for key, value in measurements.items():
point = (
Point("telemetry")
.tag("device_id", device_id)
.field(key, value)
.time(datetime.utcnow())
)
points.append(point)
self.write_api.write(bucket=self.bucket, org="my-org", record=points)
def query_device_data(self, device_id, start, stop):
"""Query device data."""
query = f'''
from(bucket: "{self.bucket}")
|> range(start: {start}, stop: {stop})
|> filter(fn: (r) => r._measurement == "telemetry")
|> filter(fn: (r) => r.device_id == "{device_id}")
'''
return self.query_api.query_data_frame(query)
def aggregate_device_data(self, device_id, field, interval='1h'):
"""Aggregate device data."""
query = f'''
from(bucket: "{self.bucket}")
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "telemetry")
|> filter(fn: (r) => r.device_id == "{device_id}")
|> filter(fn: (r) => r._field == "{field}")
|> aggregateWindow(every: {interval}, fn: mean, createEmpty: false)
'''
return self.query_api.query_data_frame(query)
Edge Processing
#!/usr/bin/env python3
"""Edge processing for IoT."""
class EdgeProcessor:
"""Process data at the edge."""
def __init__(self):
self.rules = []
def add_rule(self, rule):
"""Add processing rule."""
self.rules.append(rule)
def process(self, data):
"""Process incoming data."""
results = {'raw': data, 'processed': [], 'alerts': []}
for rule in self.rules:
try:
result = rule.execute(data)
if result.get('action') == 'alert':
results['alerts'].append(result)
elif result.get('action') == 'transform':
results['processed'].append(result)
except Exception as e:
results['errors'] = str(e)
return results
# Example edge rules
class ThresholdRule:
"""Alert when threshold exceeded."""
def __init__(self, field, threshold, severity='warning'):
self.field = field
self.threshold = threshold
self.severity = severity
def execute(self, data):
value = data.get(self.field)
if value and value > self.threshold:
return {
'action': 'alert',
'rule': 'threshold',
'field': self.field,
'value': value,
'threshold': self.threshold,
'severity': self.severity,
'timestamp': datetime.utcnow().isoformat()
}
return None
class AggregationRule:
"""Aggregate values at edge."""
def __init__(self, window='1m', function='mean'):
self.window = window
self.function = function
self.buffer = []
def execute(self, data):
self.buffer.append(data)
# Simple aggregation
values = [d.get('value', 0) for d in self.buffer]
if self.function == 'mean':
result = sum(values) / len(values)
elif self.function == 'sum':
result = sum(values)
return {
'action': 'transform',
'aggregated': result,
'count': len(values)
}
Comments