Skip to main content
โšก Calmops

Time Series Databases: InfluxDB, TimescaleDB, Prometheus

Introduction

Time series databases (TSDBs) have become critical infrastructure for modern applications. From monitoring system metrics to tracking IoT sensor data, applications generate massive volumes of time-stamped data. Traditional databases struggle with this workload, but specialized time series databases optimize for write-heavy, time-ordered data with efficient compression and fast queries.

This comprehensive guide covers time series database concepts, implementations, and real-world optimization strategies.


Core Concepts & Terminology

Time Series

Sequence of data points indexed in time order, typically at regular intervals.

Metric

Named measurement with tags and values (e.g., cpu_usage, memory_free).

Tag

Indexed label for grouping metrics (e.g., host, region, service).

Field

Actual measurement value (e.g., 75.5 for CPU usage percentage).

Timestamp

Point in time when measurement was taken.

Retention Policy

How long data is kept before deletion.

Downsampling

Reducing data resolution over time (e.g., 1-minute to 1-hour averages).

Cardinality

Number of unique tag combinations (high cardinality = many combinations).

Ingestion Rate

Number of data points written per second.

Query Latency

Time to retrieve data from database.

Compression Ratio

Reduction in data size through compression.


Time Series Database Comparison

Feature Comparison Matrix

Feature InfluxDB TimescaleDB Prometheus
Model Time series PostgreSQL extension Time series
Storage Custom PostgreSQL Custom
Query Language InfluxQL/Flux SQL PromQL
Retention Configurable Configurable Configurable
Clustering Enterprise Yes No (single node)
Compression Excellent Good Good
Cardinality Medium High Low
Pricing $0-$7k+/month Free (self-hosted) Free (open-source)
Best For Metrics, IoT High cardinality Monitoring

InfluxDB Implementation

Setup and Configuration

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

# Connect to InfluxDB
client = InfluxDBClient(
    url="http://localhost:8086",
    token="your-token",
    org="your-org"
)

# Get write and query APIs
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()

Writing Data

def write_metrics():
    """Write metrics to InfluxDB"""
    
    # Create points
    points = [
        Point("cpu_usage")
            .tag("host", "server-01")
            .tag("region", "us-east-1")
            .field("value", 75.5)
            .time(1640000000),
        Point("memory_usage")
            .tag("host", "server-01")
            .tag("region", "us-east-1")
            .field("value", 4096)
            .time(1640000000),
        Point("disk_usage")
            .tag("host", "server-01")
            .tag("region", "us-east-1")
            .field("value", 512000)
            .time(1640000000),
    ]
    
    # Write to InfluxDB
    write_api.write(bucket="metrics", org="your-org", records=points)
    print("Metrics written successfully")

def write_batch_metrics(metrics_list):
    """Write batch of metrics"""
    
    points = []
    for metric in metrics_list:
        point = Point(metric['name'])
        
        # Add tags
        for tag_key, tag_value in metric.get('tags', {}).items():
            point.tag(tag_key, tag_value)
        
        # Add fields
        for field_key, field_value in metric.get('fields', {}).items():
            point.field(field_key, field_value)
        
        # Add timestamp
        if 'timestamp' in metric:
            point.time(metric['timestamp'])
        
        points.append(point)
    
    # Write batch
    write_api.write(bucket="metrics", org="your-org", records=points)
    print(f"Wrote {len(points)} metrics")

Querying Data

def query_cpu_usage():
    """Query CPU usage metrics"""
    
    query = '''
        from(bucket:"metrics")
            |> range(start: -24h)
            |> filter(fn: (r) => r._measurement == "cpu_usage")
            |> filter(fn: (r) => r.host == "server-01")
    '''
    
    result = query_api.query(org="your-org", query=query)
    
    for table in result:
        for record in table.records:
            print(f"{record.time}: {record.values}")

def query_aggregated_metrics():
    """Query aggregated metrics"""
    
    query = '''
        from(bucket:"metrics")
            |> range(start: -7d)
            |> filter(fn: (r) => r._measurement == "cpu_usage")
            |> aggregateWindow(every: 1h, fn: mean)
            |> sort(columns: ["_time"])
    '''
    
    result = query_api.query(org="your-org", query=query)
    
    for table in result:
        for record in table.records:
            print(f"{record.time}: {record.values}")

def query_percentiles():
    """Query percentile metrics"""
    
    query = '''
        from(bucket:"metrics")
            |> range(start: -24h)
            |> filter(fn: (r) => r._measurement == "cpu_usage")
            |> quantile(q: 0.95)
    '''
    
    result = query_api.query(org="your-org", query=query)
    
    for table in result:
        for record in table.records:
            print(f"95th percentile: {record.values}")

TimescaleDB Implementation

Setup and Configuration

import psycopg2
from psycopg2.extras import execute_values

# Connect to TimescaleDB
conn = psycopg2.connect(
    host="localhost",
    database="metrics",
    user="postgres",
    password="password"
)

cursor = conn.cursor()

def create_hypertable():
    """Create hypertable for metrics"""
    
    # Create table
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS metrics (
            time TIMESTAMPTZ NOT NULL,
            host TEXT NOT NULL,
            region TEXT NOT NULL,
            metric_name TEXT NOT NULL,
            value FLOAT8 NOT NULL
        )
    """)
    
    # Convert to hypertable
    cursor.execute("""
        SELECT create_hypertable('metrics', 'time', if_not_exists => TRUE)
    """)
    
    # Create indexes
    cursor.execute("""
        CREATE INDEX IF NOT EXISTS idx_metrics_host_time
        ON metrics (host, time DESC)
    """)
    
    cursor.execute("""
        CREATE INDEX IF NOT EXISTS idx_metrics_region_time
        ON metrics (region, time DESC)
    """)
    
    conn.commit()
    print("Hypertable created")

Writing Data

def write_metrics_timescaledb(metrics_list):
    """Write metrics to TimescaleDB"""
    
    data = []
    for metric in metrics_list:
        data.append((
            metric['timestamp'],
            metric['host'],
            metric['region'],
            metric['metric_name'],
            metric['value']
        ))
    
    # Batch insert
    execute_values(
        cursor,
        """
            INSERT INTO metrics (time, host, region, metric_name, value)
            VALUES %s
        """,
        data
    )
    
    conn.commit()
    print(f"Inserted {len(data)} metrics")

Querying Data

def query_timescaledb():
    """Query metrics from TimescaleDB"""
    
    # Average CPU usage per hour
    cursor.execute("""
        SELECT
            time_bucket('1 hour', time) as hour,
            host,
            AVG(value) as avg_value
        FROM metrics
        WHERE metric_name = 'cpu_usage'
            AND time > NOW() - INTERVAL '7 days'
        GROUP BY hour, host
        ORDER BY hour DESC
    """)
    
    results = cursor.fetchall()
    for row in results:
        print(f"{row[0]}: {row[1]} - {row[2]:.2f}%")

def query_percentiles_timescaledb():
    """Query percentiles"""
    
    cursor.execute("""
        SELECT
            host,
            PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY value) as p50,
            PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY value) as p95,
            PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY value) as p99
        FROM metrics
        WHERE metric_name = 'cpu_usage'
            AND time > NOW() - INTERVAL '24 hours'
        GROUP BY host
    """)
    
    results = cursor.fetchall()
    for row in results:
        print(f"{row[0]}: p50={row[1]:.2f}, p95={row[2]:.2f}, p99={row[3]:.2f}")

def continuous_aggregates():
    """Create continuous aggregates for faster queries"""
    
    cursor.execute("""
        CREATE MATERIALIZED VIEW IF NOT EXISTS metrics_hourly
        WITH (timescaledb.continuous) AS
        SELECT
            time_bucket('1 hour', time) as hour,
            host,
            metric_name,
            AVG(value) as avg_value,
            MIN(value) as min_value,
            MAX(value) as max_value,
            STDDEV(value) as stddev_value
        FROM metrics
        GROUP BY hour, host, metric_name
        WITH DATA
    """)
    
    conn.commit()
    print("Continuous aggregate created")

Prometheus Implementation

Configuration

# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s
  external_labels:
    monitor: 'prometheus'

scrape_configs:
  - job_name: 'prometheus'
    static_configs:
      - targets: ['localhost:9090']
  
  - job_name: 'node'
    static_configs:
      - targets: ['localhost:9100']
  
  - job_name: 'application'
    static_configs:
      - targets: ['localhost:8080']
    metrics_path: '/metrics'
    scrape_interval: 5s

Querying with PromQL

import requests

def query_prometheus(query, start_time, end_time):
    """Query Prometheus"""
    
    url = "http://localhost:9090/api/v1/query_range"
    
    params = {
        'query': query,
        'start': start_time,
        'end': end_time,
        'step': '60s'
    }
    
    response = requests.get(url, params=params)
    data = response.json()
    
    return data['data']['result']

# Example queries
queries = {
    'cpu_usage': 'rate(cpu_usage_seconds_total[5m])',
    'memory_usage': 'memory_usage_bytes',
    'request_rate': 'rate(http_requests_total[5m])',
    'error_rate': 'rate(http_requests_total{status=~"5.."}[5m])',
    'p95_latency': 'histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))'
}

# Execute queries
for name, query in queries.items():
    results = query_prometheus(query, 1640000000, 1640086400)
    print(f"{name}: {results}")

Custom Metrics

from prometheus_client import Counter, Gauge, Histogram, start_http_server
import time

# Define metrics
request_count = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint'])
request_duration = Histogram('http_request_duration_seconds', 'HTTP request duration', ['endpoint'])
active_connections = Gauge('active_connections', 'Active connections')

def track_request(method, endpoint, duration):
    """Track HTTP request"""
    request_count.labels(method=method, endpoint=endpoint).inc()
    request_duration.labels(endpoint=endpoint).observe(duration)

def update_connections(count):
    """Update active connections"""
    active_connections.set(count)

# Start metrics server
if __name__ == '__main__':
    start_http_server(8000)
    
    # Simulate requests
    for i in range(100):
        track_request('GET', '/api/users', 0.05)
        track_request('POST', '/api/users', 0.1)
        update_connections(i % 50)
        time.sleep(1)

Performance Optimization

Data Retention and Downsampling

def setup_retention_policies():
    """Setup retention policies"""
    
    # InfluxDB retention
    query = '''
        CREATE RETENTION POLICY "30days" ON "metrics"
        DURATION 30d REPLICATION 1 DEFAULT
    '''
    
    # TimescaleDB retention
    cursor.execute("""
        SELECT add_retention_policy('metrics', INTERVAL '30 days');
    """)
    
    # Prometheus retention (in config)
    # --storage.tsdb.retention.time=30d

def downsample_data():
    """Downsample old data"""
    
    # InfluxDB downsampling
    query = '''
        SELECT MEAN(value) INTO "metrics_1h" FROM "metrics"
        WHERE time < now() - 7d
        GROUP BY time(1h), *
    '''
    
    # TimescaleDB downsampling
    cursor.execute("""
        SELECT compress_chunk(i)
        FROM show_chunks('metrics')
        WHERE range < now() - INTERVAL '7 days'
    """)

def compression_settings():
    """Configure compression"""
    
    # TimescaleDB compression
    cursor.execute("""
        ALTER TABLE metrics SET (
            timescaledb.compress,
            timescaledb.compress_orderby = 'time DESC',
            timescaledb.compress_segmentby = 'host, region'
        )
    """)
    
    # Add compression policy
    cursor.execute("""
        SELECT add_compression_policy('metrics', INTERVAL '7 days')
    """)

Cardinality Management

def analyze_cardinality():
    """Analyze metric cardinality"""
    
    # InfluxDB cardinality
    query = '''
        SHOW CARDINALITY
    '''
    
    # Prometheus cardinality
    url = "http://localhost:9090/api/v1/label/__name__/values"
    response = requests.get(url)
    metrics = response.json()['data']
    
    for metric in metrics:
        # Get cardinality for each metric
        query = f'count({{__name__="{metric}"}})'
        cardinality = query_prometheus(query, 'now()-1h', 'now()')
        print(f"{metric}: {cardinality}")

def reduce_cardinality():
    """Reduce high cardinality metrics"""
    
    # Drop unnecessary labels
    cursor.execute("""
        ALTER TABLE metrics DROP COLUMN IF EXISTS unnecessary_tag
    """)
    
    # Aggregate high cardinality data
    cursor.execute("""
        CREATE MATERIALIZED VIEW metrics_aggregated AS
        SELECT
            time_bucket('1 hour', time) as hour,
            metric_name,
            AVG(value) as value
        FROM metrics
        GROUP BY hour, metric_name
    """)

Real-World Use Cases

1. Infrastructure Monitoring

class InfrastructureMonitor:
    def __init__(self, client):
        self.client = client
    
    def get_system_health(self):
        """Get overall system health"""
        
        query = '''
            from(bucket:"metrics")
                |> range(start: -1h)
                |> filter(fn: (r) => r._measurement =~ /cpu|memory|disk/)
                |> aggregateWindow(every: 5m, fn: mean)
        '''
        
        result = self.client.query_api().query(query=query)
        return result
    
    def detect_anomalies(self):
        """Detect anomalies in metrics"""
        
        query = '''
            from(bucket:"metrics")
                |> range(start: -24h)
                |> filter(fn: (r) => r._measurement == "cpu_usage")
                |> aggregateWindow(every: 1h, fn: mean)
                |> anomalyDetection(threshold: 2.0)
        '''
        
        result = self.client.query_api().query(query=query)
        return result

2. Application Performance Monitoring

class APMMonitor:
    def __init__(self, client):
        self.client = client
    
    def get_request_metrics(self):
        """Get request performance metrics"""
        
        query = '''
            from(bucket:"metrics")
                |> range(start: -1h)
                |> filter(fn: (r) => r._measurement == "http_request_duration")
                |> aggregateWindow(every: 1m, fn: mean)
        '''
        
        return self.client.query_api().query(query=query)
    
    def get_error_rate(self):
        """Get error rate"""
        
        query = '''
            from(bucket:"metrics")
                |> range(start: -1h)
                |> filter(fn: (r) => r._measurement == "http_errors")
                |> aggregateWindow(every: 1m, fn: sum)
        '''
        
        return self.client.query_api().query(query=query)

Best Practices & Common Pitfalls

Best Practices

  1. Plan Cardinality: Limit tag combinations
  2. Set Retention: Define data retention policies
  3. Use Downsampling: Aggregate old data
  4. Batch Writes: Write data in batches
  5. Index Strategically: Index frequently queried tags
  6. Monitor Ingestion: Track write rates
  7. Compress Data: Enable compression for old data
  8. Query Optimization: Use aggregation windows
  9. Alerting: Set up alerts for anomalies
  10. Capacity Planning: Plan for growth

Common Pitfalls

  1. High Cardinality: Too many unique tag combinations
  2. Unbounded Retention: Keeping data forever
  3. Inefficient Queries: Querying too much data
  4. Missing Indexes: Slow queries on unindexed tags
  5. Inadequate Retention: Deleting data too quickly
  6. No Downsampling: Keeping high-resolution data forever
  7. Slow Ingestion: Not batching writes
  8. Poor Alerting: Missing important anomalies
  9. Inadequate Monitoring: Not monitoring the monitor
  10. Scalability Issues: Not planning for growth

External Resources

Documentation

Tools

Learning Resources


Conclusion

Time series databases are essential for modern monitoring and observability. InfluxDB excels in flexibility, TimescaleDB in SQL compatibility, and Prometheus in simplicity. Success requires proper cardinality management, retention policies, and query optimization.

Start with clear metrics design, implement retention policies, and continuously monitor performance. As your data grows, leverage downsampling and compression to maintain query performance.

Time series databases unlock insights from your operational data.

Comments