Introduction
Time series databases (TSDBs) have become critical infrastructure for modern applications. From monitoring system metrics to tracking IoT sensor data, applications generate massive volumes of time-stamped data. Traditional databases struggle with this workload, but specialized time series databases optimize for write-heavy, time-ordered data with efficient compression and fast queries.
This comprehensive guide covers time series database concepts, implementations, and real-world optimization strategies.
Core Concepts & Terminology
Time Series
Sequence of data points indexed in time order, typically at regular intervals.
Metric
Named measurement with tags and values (e.g., cpu_usage, memory_free).
Tag
Indexed label for grouping metrics (e.g., host, region, service).
Field
Actual measurement value (e.g., 75.5 for CPU usage percentage).
Timestamp
Point in time when measurement was taken.
Retention Policy
How long data is kept before deletion.
Downsampling
Reducing data resolution over time (e.g., 1-minute to 1-hour averages).
Cardinality
Number of unique tag combinations (high cardinality = many combinations).
Ingestion Rate
Number of data points written per second.
Query Latency
Time to retrieve data from database.
Compression Ratio
Reduction in data size through compression.
Time Series Database Comparison
Feature Comparison Matrix
| Feature | InfluxDB | TimescaleDB | Prometheus |
|---|---|---|---|
| Model | Time series | PostgreSQL extension | Time series |
| Storage | Custom | PostgreSQL | Custom |
| Query Language | InfluxQL/Flux | SQL | PromQL |
| Retention | Configurable | Configurable | Configurable |
| Clustering | Enterprise | Yes | No (single node) |
| Compression | Excellent | Good | Good |
| Cardinality | Medium | High | Low |
| Pricing | $0-$7k+/month | Free (self-hosted) | Free (open-source) |
| Best For | Metrics, IoT | High cardinality | Monitoring |
InfluxDB Implementation
Setup and Configuration
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
# Connect to InfluxDB
client = InfluxDBClient(
url="http://localhost:8086",
token="your-token",
org="your-org"
)
# Get write and query APIs
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
Writing Data
def write_metrics():
"""Write metrics to InfluxDB"""
# Create points
points = [
Point("cpu_usage")
.tag("host", "server-01")
.tag("region", "us-east-1")
.field("value", 75.5)
.time(1640000000),
Point("memory_usage")
.tag("host", "server-01")
.tag("region", "us-east-1")
.field("value", 4096)
.time(1640000000),
Point("disk_usage")
.tag("host", "server-01")
.tag("region", "us-east-1")
.field("value", 512000)
.time(1640000000),
]
# Write to InfluxDB
write_api.write(bucket="metrics", org="your-org", records=points)
print("Metrics written successfully")
def write_batch_metrics(metrics_list):
"""Write batch of metrics"""
points = []
for metric in metrics_list:
point = Point(metric['name'])
# Add tags
for tag_key, tag_value in metric.get('tags', {}).items():
point.tag(tag_key, tag_value)
# Add fields
for field_key, field_value in metric.get('fields', {}).items():
point.field(field_key, field_value)
# Add timestamp
if 'timestamp' in metric:
point.time(metric['timestamp'])
points.append(point)
# Write batch
write_api.write(bucket="metrics", org="your-org", records=points)
print(f"Wrote {len(points)} metrics")
Querying Data
def query_cpu_usage():
"""Query CPU usage metrics"""
query = '''
from(bucket:"metrics")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "cpu_usage")
|> filter(fn: (r) => r.host == "server-01")
'''
result = query_api.query(org="your-org", query=query)
for table in result:
for record in table.records:
print(f"{record.time}: {record.values}")
def query_aggregated_metrics():
"""Query aggregated metrics"""
query = '''
from(bucket:"metrics")
|> range(start: -7d)
|> filter(fn: (r) => r._measurement == "cpu_usage")
|> aggregateWindow(every: 1h, fn: mean)
|> sort(columns: ["_time"])
'''
result = query_api.query(org="your-org", query=query)
for table in result:
for record in table.records:
print(f"{record.time}: {record.values}")
def query_percentiles():
"""Query percentile metrics"""
query = '''
from(bucket:"metrics")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "cpu_usage")
|> quantile(q: 0.95)
'''
result = query_api.query(org="your-org", query=query)
for table in result:
for record in table.records:
print(f"95th percentile: {record.values}")
TimescaleDB Implementation
Setup and Configuration
import psycopg2
from psycopg2.extras import execute_values
# Connect to TimescaleDB
conn = psycopg2.connect(
host="localhost",
database="metrics",
user="postgres",
password="password"
)
cursor = conn.cursor()
def create_hypertable():
"""Create hypertable for metrics"""
# Create table
cursor.execute("""
CREATE TABLE IF NOT EXISTS metrics (
time TIMESTAMPTZ NOT NULL,
host TEXT NOT NULL,
region TEXT NOT NULL,
metric_name TEXT NOT NULL,
value FLOAT8 NOT NULL
)
""")
# Convert to hypertable
cursor.execute("""
SELECT create_hypertable('metrics', 'time', if_not_exists => TRUE)
""")
# Create indexes
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_metrics_host_time
ON metrics (host, time DESC)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_metrics_region_time
ON metrics (region, time DESC)
""")
conn.commit()
print("Hypertable created")
Writing Data
def write_metrics_timescaledb(metrics_list):
"""Write metrics to TimescaleDB"""
data = []
for metric in metrics_list:
data.append((
metric['timestamp'],
metric['host'],
metric['region'],
metric['metric_name'],
metric['value']
))
# Batch insert
execute_values(
cursor,
"""
INSERT INTO metrics (time, host, region, metric_name, value)
VALUES %s
""",
data
)
conn.commit()
print(f"Inserted {len(data)} metrics")
Querying Data
def query_timescaledb():
"""Query metrics from TimescaleDB"""
# Average CPU usage per hour
cursor.execute("""
SELECT
time_bucket('1 hour', time) as hour,
host,
AVG(value) as avg_value
FROM metrics
WHERE metric_name = 'cpu_usage'
AND time > NOW() - INTERVAL '7 days'
GROUP BY hour, host
ORDER BY hour DESC
""")
results = cursor.fetchall()
for row in results:
print(f"{row[0]}: {row[1]} - {row[2]:.2f}%")
def query_percentiles_timescaledb():
"""Query percentiles"""
cursor.execute("""
SELECT
host,
PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY value) as p50,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY value) as p95,
PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY value) as p99
FROM metrics
WHERE metric_name = 'cpu_usage'
AND time > NOW() - INTERVAL '24 hours'
GROUP BY host
""")
results = cursor.fetchall()
for row in results:
print(f"{row[0]}: p50={row[1]:.2f}, p95={row[2]:.2f}, p99={row[3]:.2f}")
def continuous_aggregates():
"""Create continuous aggregates for faster queries"""
cursor.execute("""
CREATE MATERIALIZED VIEW IF NOT EXISTS metrics_hourly
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) as hour,
host,
metric_name,
AVG(value) as avg_value,
MIN(value) as min_value,
MAX(value) as max_value,
STDDEV(value) as stddev_value
FROM metrics
GROUP BY hour, host, metric_name
WITH DATA
""")
conn.commit()
print("Continuous aggregate created")
Prometheus Implementation
Configuration
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
external_labels:
monitor: 'prometheus'
scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
- job_name: 'node'
static_configs:
- targets: ['localhost:9100']
- job_name: 'application'
static_configs:
- targets: ['localhost:8080']
metrics_path: '/metrics'
scrape_interval: 5s
Querying with PromQL
import requests
def query_prometheus(query, start_time, end_time):
"""Query Prometheus"""
url = "http://localhost:9090/api/v1/query_range"
params = {
'query': query,
'start': start_time,
'end': end_time,
'step': '60s'
}
response = requests.get(url, params=params)
data = response.json()
return data['data']['result']
# Example queries
queries = {
'cpu_usage': 'rate(cpu_usage_seconds_total[5m])',
'memory_usage': 'memory_usage_bytes',
'request_rate': 'rate(http_requests_total[5m])',
'error_rate': 'rate(http_requests_total{status=~"5.."}[5m])',
'p95_latency': 'histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))'
}
# Execute queries
for name, query in queries.items():
results = query_prometheus(query, 1640000000, 1640086400)
print(f"{name}: {results}")
Custom Metrics
from prometheus_client import Counter, Gauge, Histogram, start_http_server
import time
# Define metrics
request_count = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint'])
request_duration = Histogram('http_request_duration_seconds', 'HTTP request duration', ['endpoint'])
active_connections = Gauge('active_connections', 'Active connections')
def track_request(method, endpoint, duration):
"""Track HTTP request"""
request_count.labels(method=method, endpoint=endpoint).inc()
request_duration.labels(endpoint=endpoint).observe(duration)
def update_connections(count):
"""Update active connections"""
active_connections.set(count)
# Start metrics server
if __name__ == '__main__':
start_http_server(8000)
# Simulate requests
for i in range(100):
track_request('GET', '/api/users', 0.05)
track_request('POST', '/api/users', 0.1)
update_connections(i % 50)
time.sleep(1)
Performance Optimization
Data Retention and Downsampling
def setup_retention_policies():
"""Setup retention policies"""
# InfluxDB retention
query = '''
CREATE RETENTION POLICY "30days" ON "metrics"
DURATION 30d REPLICATION 1 DEFAULT
'''
# TimescaleDB retention
cursor.execute("""
SELECT add_retention_policy('metrics', INTERVAL '30 days');
""")
# Prometheus retention (in config)
# --storage.tsdb.retention.time=30d
def downsample_data():
"""Downsample old data"""
# InfluxDB downsampling
query = '''
SELECT MEAN(value) INTO "metrics_1h" FROM "metrics"
WHERE time < now() - 7d
GROUP BY time(1h), *
'''
# TimescaleDB downsampling
cursor.execute("""
SELECT compress_chunk(i)
FROM show_chunks('metrics')
WHERE range < now() - INTERVAL '7 days'
""")
def compression_settings():
"""Configure compression"""
# TimescaleDB compression
cursor.execute("""
ALTER TABLE metrics SET (
timescaledb.compress,
timescaledb.compress_orderby = 'time DESC',
timescaledb.compress_segmentby = 'host, region'
)
""")
# Add compression policy
cursor.execute("""
SELECT add_compression_policy('metrics', INTERVAL '7 days')
""")
Cardinality Management
def analyze_cardinality():
"""Analyze metric cardinality"""
# InfluxDB cardinality
query = '''
SHOW CARDINALITY
'''
# Prometheus cardinality
url = "http://localhost:9090/api/v1/label/__name__/values"
response = requests.get(url)
metrics = response.json()['data']
for metric in metrics:
# Get cardinality for each metric
query = f'count({{__name__="{metric}"}})'
cardinality = query_prometheus(query, 'now()-1h', 'now()')
print(f"{metric}: {cardinality}")
def reduce_cardinality():
"""Reduce high cardinality metrics"""
# Drop unnecessary labels
cursor.execute("""
ALTER TABLE metrics DROP COLUMN IF EXISTS unnecessary_tag
""")
# Aggregate high cardinality data
cursor.execute("""
CREATE MATERIALIZED VIEW metrics_aggregated AS
SELECT
time_bucket('1 hour', time) as hour,
metric_name,
AVG(value) as value
FROM metrics
GROUP BY hour, metric_name
""")
Real-World Use Cases
1. Infrastructure Monitoring
class InfrastructureMonitor:
def __init__(self, client):
self.client = client
def get_system_health(self):
"""Get overall system health"""
query = '''
from(bucket:"metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement =~ /cpu|memory|disk/)
|> aggregateWindow(every: 5m, fn: mean)
'''
result = self.client.query_api().query(query=query)
return result
def detect_anomalies(self):
"""Detect anomalies in metrics"""
query = '''
from(bucket:"metrics")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "cpu_usage")
|> aggregateWindow(every: 1h, fn: mean)
|> anomalyDetection(threshold: 2.0)
'''
result = self.client.query_api().query(query=query)
return result
2. Application Performance Monitoring
class APMMonitor:
def __init__(self, client):
self.client = client
def get_request_metrics(self):
"""Get request performance metrics"""
query = '''
from(bucket:"metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "http_request_duration")
|> aggregateWindow(every: 1m, fn: mean)
'''
return self.client.query_api().query(query=query)
def get_error_rate(self):
"""Get error rate"""
query = '''
from(bucket:"metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "http_errors")
|> aggregateWindow(every: 1m, fn: sum)
'''
return self.client.query_api().query(query=query)
Best Practices & Common Pitfalls
Best Practices
- Plan Cardinality: Limit tag combinations
- Set Retention: Define data retention policies
- Use Downsampling: Aggregate old data
- Batch Writes: Write data in batches
- Index Strategically: Index frequently queried tags
- Monitor Ingestion: Track write rates
- Compress Data: Enable compression for old data
- Query Optimization: Use aggregation windows
- Alerting: Set up alerts for anomalies
- Capacity Planning: Plan for growth
Common Pitfalls
- High Cardinality: Too many unique tag combinations
- Unbounded Retention: Keeping data forever
- Inefficient Queries: Querying too much data
- Missing Indexes: Slow queries on unindexed tags
- Inadequate Retention: Deleting data too quickly
- No Downsampling: Keeping high-resolution data forever
- Slow Ingestion: Not batching writes
- Poor Alerting: Missing important anomalies
- Inadequate Monitoring: Not monitoring the monitor
- Scalability Issues: Not planning for growth
External Resources
Documentation
Tools
Learning Resources
Conclusion
Time series databases are essential for modern monitoring and observability. InfluxDB excels in flexibility, TimescaleDB in SQL compatibility, and Prometheus in simplicity. Success requires proper cardinality management, retention policies, and query optimization.
Start with clear metrics design, implement retention policies, and continuously monitor performance. As your data grows, leverage downsampling and compression to maintain query performance.
Time series databases unlock insights from your operational data.
Comments