Introduction
Time series data—measurements indexed by time—powers everything from financial trading systems to IoT sensor networks, from application monitoring to weather forecasting. Traditional databases struggle with the unique demands of time series data: high write throughput, efficient time-range queries, and data lifecycle management. Time series databases (TSDB) are purpose-built for these workloads.
In 2026, time series databases have matured significantly, with solutions ranging from specialized databases like InfluxDB and TimescaleDB to cloud-native offerings. This comprehensive guide explores time series databases in depth, covering data models, query languages, architecture patterns, and practical implementation with popular TSDB solutions.
Understanding Time Series Data
Time series data has unique characteristics that distinguish it from relational or document data. Understanding these characteristics is crucial for choosing and implementing the right solution.
Data Characteristics
┌─────────────────────────────────────────────────────────────────────┐
│ Time Series Data Pattern │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Temperature sensor data (1 sample/second): │
│ │
│ Timestamp | Sensor | Location | Value │
│ ---------------------|--------|----------|------- │
│ 2026-01-01T00:00:00Z| temp-01| room-1 | 22.5 │
│ 2026-01-01T00:00:01Z| temp-01| room-1 | 22.6 │
│ 2026-01-01T00:00:02Z| temp-01| room-1 | 22.4 │
│ 2026-01-01T00:00:03Z| temp-01| room-1 | 22.5 │
│ ... ... │
│ │
│ Key characteristics: │
│ - Sequential, time-ordered │
│ - Typically append-only │
│ - High ingestion rate │
│ - Queries focus on ranges │
│ - Data retention based on age │
│ - Often downsampled for long-term storage │
│ │
└─────────────────────────────────────────────────────────────────────┘
Core Concepts
- Time Series: Sequence of data points indexed in time order, typically at regular intervals.
- Metric: Named measurement with tags and values (e.g., cpu_usage, memory_free).
- Tag: Indexed label for grouping metrics (e.g., host, region, service).
- Field: Actual measurement value (e.g., 75.5 for CPU usage percentage).
- Timestamp: Point in time when measurement was taken.
- Retention Policy: How long data is kept before deletion.
- Downsampling: Reducing data resolution over time (e.g., 1-minute to 1-hour averages).
- Cardinality: Number of unique tag combinations (high cardinality = many combinations).
- Ingestion Rate: Number of data points written per second.
- Query Latency: Time to retrieve data from database.
- Compression Ratio: Reduction in data size through compression.
Data Models
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
from datetime import datetime
@dataclass
class DataPoint:
measurement: str
timestamp: datetime
fields: Dict[str, float]
tags: Dict[str, str] = None
def __post_init__(self):
if self.tags is None:
self.tags = {}
@dataclass
class TimeSeriesQuery:
measurement: str
start_time: datetime
end_time: Optional[datetime] = None
fields: List[str] = None
tags: Dict[str, str] = None
aggregation: str = None
granularity: str = None
def to_dict(self) -> Dict:
return {
'measurement': self.measurement,
'start': self.start_time.isoformat(),
'end': self.end_time.isoformat() if self.end_time else None,
'fields': self.fields,
'filters': self.tags,
'aggregation': self.aggregation,
'granularity': self.granularity
}
class TimeSeriesModel:
def __init__(self):
self.data_points: List[DataPoint] = []
def add_point(self, measurement: str, timestamp: datetime,
fields: Dict[str, float], tags: Dict[str, str] = None):
point = DataPoint(measurement, timestamp, fields, tags)
self.data_points.append(point)
def get_measurement_names(self) -> List[str]:
return list(set(p.measurement for p in self.data_points))
def filter_by_time_range(self, start: datetime, end: datetime) -> List[DataPoint]:
return [p for p in self.data_points
if start <= p.timestamp <= end]
def filter_by_tags(self, tags: Dict[str, str]) -> List[DataPoint]:
return [p for p in self.data_points
if all(p.tags.get(k) == v for k, v in tags.items())]
Time Series Database Comparison
Feature Comparison Matrix
| Feature | InfluxDB | TimescaleDB | Prometheus |
|---|---|---|---|
| Model | Time series | PostgreSQL extension | Time series |
| Storage | Custom | PostgreSQL | Custom |
| Query Language | InfluxQL/Flux | SQL | PromQL |
| Retention | Configurable | Configurable | Configurable |
| Clustering | Enterprise | Yes | No (single node) |
| Compression | Excellent | Good | Good |
| Cardinality | Medium | High | Low |
| Pricing | Free to Enterprise | Free (self-hosted) | Free (open-source) |
| Best For | Metrics, IoT | High cardinality | Monitoring |
InfluxDB
InfluxDB is the most popular purpose-built time series database, known for its powerful Flux query language and comprehensive ecosystem.
InfluxDB Data Model
InfluxDB uses a flexible data model with measurements, tags, fields, and timestamps:
from influxdb_client import InfluxDBClient, Point, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS
import datetime
class InfluxDBClient:
def __init__(
self,
url: str = "http://localhost:8086",
token: str = None,
org: str = None
):
self.client = InfluxDBClient(url=url, token=token, org=org)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.query_api = self.client.query_api()
self.delete_api = self.client.delete_api()
def write_point(self, bucket: str, point: Point):
return self.write_api.write(bucket=bucket, org=self.org, record=point)
def write_points(self, bucket: str, points: List[Point]):
return self.write_api.write(bucket=bucket, org=self.org, record=points)
def write_line_protocol(self, bucket: str, line_protocol: str):
return self.write_api.write(bucket=bucket, org=self.org, record=line_protocol)
def query(self, query: str) -> List[dict]:
return self.query_api.query_data_frame(query).to_dict('records')
def close(self):
self.client.close()
Writing Data
class InfluxDBWriter:
def __init__(self, client: InfluxDBClient, bucket: str):
self.client = client
self.bucket = bucket
def write_single_point(self, measurement: str, fields: dict, tags: dict = None,
timestamp: datetime = None):
point = Point(measurement)
for key, value in (tags or {}).iteritems():
point.tag(key, value)
for key, value in fields.items():
point.field(key, value)
if timestamp:
point.time(timestamp)
self.client.write_point(self.bucket, point)
def write_multiple_points(self, measurements: List[dict]):
points = []
for m in measurements:
point = Point(m['measurement'])
for key, value in m.get('tags', {}).items():
point.tag(key, value)
for key, value in m.get('fields', {}).items():
point.field(key, value)
if 'timestamp' in m:
point.time(m['timestamp'])
points.append(point)
self.client.write_points(self.bucket, points)
def write_with_write_options(self, bucket: str):
write_api = self.client.client.write_api(
write_options=WriteOptions(
batch_size=1000,
flush_interval=5000,
jitter_interval=2000,
retry_interval=5000,
max_retries=5
)
)
points = [Point("cpu").tag("host", "server01").field("value", i)
for i in range(100)]
write_api.write(bucket=bucket, org=self.client.org, record=points)
Querying Data with Flux
class InfluxDBQuerier:
def __init__(self, client: InfluxDBClient):
self.client = client
def simple_query(self, measurement: str, start: str = "-1h"):
query = f'''
from(bucket: "my-bucket")
|> range(start: {start})
|> filter(fn: (r) => r._measurement == "{measurement}")
'''
return self.client.query(query)
def query_with_fields(self, measurement: str, field: str, start: str = "-24h"):
query = f'''
from(bucket: "my-bucket")
|> range(start: {start})
|> filter(fn: (r) => r._measurement == "{measurement}")
|> filter(fn: (r) => r._field == "{field}")
'''
return self.client.query(query)
def query_with_tags(self, measurement: str, tag_filters: dict, start: str = "-1h"):
filter_expr = ' and '.join(
f'r["{key}"] == "{value}"'
for key, value in tag_filters.items()
)
query = f'''
from(bucket: "my-bucket")
|> range(start: {start})
|> filter(fn: (r) => r._measurement == "{measurement}")
|> filter(fn: (r) => {filter_expr})
'''
return self.client.query(query)
def aggregate_query(self, measurement: str, window: str = "1m",
function: str = "mean"):
query = f'''
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "{measurement}")
|> aggregateWindow(every: {window}, fn: {function}, createEmpty: false)
'''
return self.client.query(query)
def downsample_query(self, measurement: str, original_bucket: str,
downsampled_bucket: str, window: str = "5m"):
query = f'''
from(bucket: "{original_bucket}")
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "{measurement}")
|> aggregateWindow(every: {window}, fn: mean, createEmpty: false)
|> to(bucket: "{downsampled_bucket}", org: "my-org")
'''
return self.client.query(query)
InfluxDB Schema Design
class InfluxDBSchema:
@staticmethod
def design_measurement(measurement_name: str, data_pattern: str) -> dict:
if data_pattern == "high_cardinality":
return {
"measurement": measurement_name,
"tags": [
"device_id",
"location"
],
"fields": [
"value"
]
}
elif data_pattern == "low_cardinality":
return {
"measurement": measurement_name,
"tags": [
"host",
"service",
"status"
],
"fields": [
"cpu_percent",
"memory_percent",
"disk_percent"
]
}
return {}
@staticmethod
def get_tag_cardinality(client: InfluxDBClient, measurement: str) -> dict:
query = f'''
import "influxdata/influxdb/schema"
schema.measurementTagValues(bucket: "my-bucket", measurement: "{measurement}")
'''
results = client.query(query)
cardinality = {}
for record in results:
tag_key = record.get('tagKey', '')
cardinality[tag_key] = cardinality.get(tag_key, 0) + 1
return cardinality
@staticmethod
def optimize_schema(client: InfluxDBClient, measurement: str):
cardinality = InfluxDBSchema.get_tag_cardinality(client, measurement)
high_cardinality = {k: v for k, v in cardinality.items() if v > 10000}
recommendations = []
if high_cardinality:
recommendations.append(
f"Move {list(high_cardinality.keys())} to fields or bucket by time"
)
return recommendations
TimescaleDB
TimescaleDB is a PostgreSQL extension that brings time series capabilities to the world’s most popular relational database. It combines the power of SQL with time series optimizations.
TimescaleDB Setup and Hypertable
import psycopg2
from psycopg2.extras import execute_values
from datetime import datetime, timedelta
import time
class TimescaleDBConnection:
def __init__(self, host: str, port: int, database: str,
user: str, password: str):
self.conn = psycopg2.connect(
host=host,
port=port,
database=database,
user=user,
password=password
)
self.conn.autocommit = True
self.cursor = self.conn.cursor()
def execute(self, query: str, params: tuple = None):
self.cursor.execute(query, params)
return self.cursor
def executemany(self, query: str, params_list: list):
execute_values(self.cursor, query, params_list)
def fetchall(self):
return self.cursor.fetchall()
def close(self):
self.cursor.close()
self.conn.close()
class TimescaleDBManager:
def __init__(self, connection: TimescaleDBConnection):
self.conn = connection
def create_extension(self):
self.conn.execute("CREATE EXTENSION IF NOT EXISTS timescaledb")
def create_table(self, table_name: str):
query = f'''
CREATE TABLE IF NOT EXISTS {table_name} (
time TIMESTAMPTZ NOT NULL,
device_id TEXT,
location TEXT,
temperature DOUBLE PRECISION,
humidity DOUBLE PRECISION,
pressure DOUBLE PRECISION
)
'''
self.conn.execute(query)
def convert_to_hypertable(self, table_name: str, time_column: str = "time"):
query = f'''
SELECT create_hypertable(
'{table_name}',
'{time_column}',
if_not_exists => TRUE,
migrate_data => TRUE
)
'''
self.conn.execute(query)
def create_hypertable_compressed(self, table_name: str, time_column: str = "time"):
query = f'''
SELECT create_hypertable(
'{table_name}',
'{time_column}',
chunk_time_interval => INTERVAL '1 day',
if_not_exists => TRUE
)
ALTER TABLE {table_name} SET (
timescaledb.compress,
timescaledb.compress_orderby = '{time_column} DESC',
timescaledb.compress_segmentby = 'device_id, location'
)
'''
self.conn.execute(query)
def add_compression_policy(self, table_name: str, compress_after: str = "INTERVAL '7 days'"):
query = f'''
SELECT add_compression_policy(
'{table_name}',
{compress_after}
)
'''
self.conn.execute(query)
def create_continuous_aggregate(self, table_name: str, aggregate_name: str,
bucket: str = "1 hour"):
query = f'''
CREATE MATERIALIZED VIEW {aggregate_name}
WITH (timescaledb.continuous) AS
SELECT
time_bucket('{bucket}', time) AS bucket,
device_id,
location,
AVG(temperature) AS avg_temperature,
AVG(humidity) AS avg_humidity,
MAX(temperature) AS max_temperature,
MIN(temperature) AS min_temperature,
COUNT(*) AS sample_count
FROM {table_name}
GROUP BY bucket, device_id, location
'''
self.conn.execute(query)
def add_refresh_policy(self, aggregate_name: str, start_offset: str = "INTERVAL '1 week'",
end_offset: str = "INTERVAL '1 hour'"):
query = f'''
SELECT add_continuous_aggregate_policy(
'{aggregate_name}',
start_offset => {start_offset},
end_offset => {end_offset},
schedule_interval => INTERVAL '1 hour'
)
'''
self.conn.execute(query)
TimescaleDB Queries
class TimescaleDBQuerier:
def __init__(self, connection: TimescaleDBConnection):
self.conn = connection
def insert_data(self, table_name: str, data: list):
query = f'''
INSERT INTO {table_name} (time, device_id, location, temperature, humidity)
VALUES %s
'''
self.conn.executemany(query, data)
def insert_single(self, table_name: str, time: datetime, device_id: str,
location: str, temperature: float, humidity: float):
query = f'''
INSERT INTO {table_name} (time, device_id, location, temperature, humidity)
VALUES (%s, %s, %s, %s, %s)
'''
self.conn.execute(query, (time, device_id, location, temperature, humidity))
def query_range(self, table_name: str, start: datetime, end: datetime):
query = f'''
SELECT * FROM {table_name}
WHERE time BETWEEN %s AND %s
ORDER BY time DESC
'''
self.conn.execute(query, (start, end))
return self.conn.fetchall()
def query_device(self, table_name: str, device_id: str, start: datetime):
query = f'''
SELECT * FROM {table_name}
WHERE device_id = %s AND time >= %s
ORDER BY time DESC
'''
self.conn.execute(query, (device_id, start))
return self.conn.fetchall()
def time_bucket_query(self, table_name: str, bucket: str, start: datetime):
query = f'''
SELECT
time_bucket('{bucket}', time) AS bucket,
AVG(temperature) AS avg_temp,
MAX(temperature) AS max_temp,
MIN(temperature) AS min_temp
FROM {table_name}
WHERE time >= %s
GROUP BY bucket
ORDER BY bucket
'''
self.conn.execute(query, (start,))
return self.conn.fetchall()
def last_value_query(self, table_name: str):
query = f'''
SELECT device_id, location, temperature, time
FROM {table_name}
WHERE time = (
SELECT MAX(time) FROM {table_name} AS t2
WHERE t2.device_id = {table_name}.device_id
)
'''
self.conn.execute(query)
return self.conn.fetchall()
def moving_average_query(self, table_name: str, window: str = "1 hour", start: datetime):
query = f'''
SELECT
time,
temperature,
AVG(temperature) OVER (
ORDER BY time
ROWS BETWEEN {window} PRECEDING AND CURRENT ROW
) AS moving_avg
FROM {table_name}
WHERE time >= %s
ORDER BY time
'''
self.conn.execute(query, (start,))
return self.conn.fetchall()
def gap_fill_query(self, table_name: str, start: datetime, bucket: str = "1 minute"):
query = f'''
SELECT
time_bucket('{bucket}', time) AS bucket,
device_id,
LOCF(avg(temperature)) AS temperature,
INTERPOLATE(avg(humidity)) AS humidity
FROM {table_name}
WHERE time >= %s
GROUP BY bucket, device_id
ORDER BY bucket
'''
self.conn.execute(query, (start,))
return self.conn.fetchall()
TimescaleDB Performance
class TimescaleDBPerformance:
def __init__(self, connection: TimescaleDBConnection):
self.conn = connection
def create_index(self, table_name: str, columns: list):
index_name = f"idx_{table_name}_{'_'.join(columns)}"
query = f'''
CREATE INDEX IF NOT EXISTS {index_name}
ON {table_name} ({', '.join(columns)})
'''
self.conn.execute(query)
def create_time_index(self, table_name: str, time_column: str = "time"):
query = f'''
CREATE INDEX IF NOT EXISTS idx_{table_name}_time
ON {table_name} ({time_column})
'''
self.conn.execute(query)
def create_composite_index(self, table_name: str):
query = f'''
CREATE INDEX IF NOT EXISTS idx_{table_name}_composite
ON {table_name} (device_id, time DESC)
'''
self.conn.execute(query)
def get_chunk_info(self, table_name: str):
query = f'''
SELECT
chunk_schema,
chunk_table,
ranges
FROM timescaledb_information.chunks
WHERE hypertable_name = '{table_name}'
'''
self.conn.execute(query)
return self.conn.fetchall()
def get_compression_stats(self, table_name: str):
query = f'''
SELECT
chunk_table,
before_compression_total_bytes,
after_compression_total_bytes,
before_compression_rows,
after_compression_rows,
compression_ratio
FROM timescaledb_information.compressed_chunk_stats
ORDER BY compression_ratio DESC
'''
self.conn.execute(query)
return self.conn.fetchall()
def reorder_chunk(self, table_name: str, older_than: str):
query = f'''
CALL reorder_chunk('{table_name}', older_than => {older_than})
'''
self.conn.execute(query)
def drop_chunks(self, table_name: str, older_than: str):
query = f'''
SELECT drop_chunks('{table_name}', older_than => {older_than})
'''
self.conn.execute(query)
Prometheus
Prometheus is a leading open-source monitoring and alerting toolkit designed for reliability and operational simplicity. It uses a pull-based model to scrape metrics from targets.
Configuration
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
external_labels:
monitor: 'prometheus'
scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
- job_name: 'node'
static_configs:
- targets: ['localhost:9100']
- job_name: 'application'
static_configs:
- targets: ['localhost:8080']
metrics_path: '/metrics'
scrape_interval: 5s
Querying with PromQL
import requests
def query_prometheus(query, start_time, end_time):
url = "http://localhost:9090/api/v1/query_range"
params = {
'query': query,
'start': start_time,
'end': end_time,
'step': '60s'
}
response = requests.get(url, params=params)
data = response.json()
return data['data']['result']
# Example queries
queries = {
'cpu_usage': 'rate(cpu_usage_seconds_total[5m])',
'memory_usage': 'memory_usage_bytes',
'request_rate': 'rate(http_requests_total[5m])',
'error_rate': 'rate(http_requests_total{status=~"5.."}[5m])',
'p95_latency': 'histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))'
}
Custom Metrics
from prometheus_client import Counter, Gauge, Histogram, start_http_server
request_count = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint'])
request_duration = Histogram('http_request_duration_seconds', 'HTTP request duration', ['endpoint'])
active_connections = Gauge('active_connections', 'Active connections')
def track_request(method, endpoint, duration):
request_count.labels(method=method, endpoint=endpoint).inc()
request_duration.labels(endpoint=endpoint).observe(duration)
def update_connections(count):
active_connections.set(count)
# Start metrics server
if __name__ == '__main__':
start_http_server(8000)
Architecture Patterns
Data Collection Pipeline
import threading
import queue
from typing import Callable
class TimeSeriesCollector:
def __init__(self, batch_size: int = 100, flush_interval: float = 5.0):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.buffer = []
self.buffer_lock = threading.Lock()
self.running = False
def add_data_point(self, measurement: str, fields: dict,
tags: dict = None, timestamp: datetime = None):
point = {
'measurement': measurement,
'fields': fields,
'tags': tags or {},
'timestamp': timestamp or datetime.utcnow()
}
with self.buffer_lock:
self.buffer.append(point)
if len(self.buffer) >= self.batch_size:
return self._flush()
return None
def _flush(self) -> list:
if not self.buffer:
return None
with self.buffer_lock:
batch = self.buffer[:self.batch_size]
self.buffer = self.buffer[self.batch_size:]
return batch
def start_flush_timer(self, flush_callback: Callable):
def timer_loop():
while self.running:
time.sleep(self.flush_interval)
batch = self._flush()
if batch:
flush_callback(batch)
self.running = True
thread = threading.Thread(target=timer_loop, daemon=True)
thread.start()
def stop(self):
self.running = False
batch = self._flush()
return batch
Metrics Aggregation
class MetricsAggregator:
def __init__(self, db_client):
self.client = db_client
def aggregate_metrics(self, table_name: str, window: str = "1 minute"):
queries = {
"min": f'''
SELECT
time_bucket('{window}', time) AS bucket,
MIN(cpu) AS min_cpu,
MIN(memory) AS min_memory
FROM {table_name}
GROUP BY bucket
''',
"max": f'''
SELECT
time_bucket('{window}', time) AS bucket,
MAX(cpu) AS max_cpu,
MAX(memory) AS max_memory
FROM {table_name}
GROUP BY bucket
''',
"avg": f'''
SELECT
time_bucket('{window}', time) AS bucket,
AVG(cpu) AS avg_cpu,
AVG(memory) AS avg_memory
FROM {table_name}
GROUP BY bucket
''',
"percentile": f'''
SELECT
time_bucket('{window}', time) AS bucket,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY cpu) AS p95_cpu,
PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY memory) AS p99_memory
FROM {table_name}
GROUP BY bucket
'''
}
results = {}
for name, query in queries.items():
self.client.execute(query)
results[name] = self.client.fetchall()
return results
def calculate_rate(self, table_name: str, counter_column: str):
query = f'''
SELECT
time,
{counter_column},
{counter_column} - LAG({counter_column}) OVER (ORDER BY time) AS rate
FROM {table_name}
WHERE time >= NOW() - INTERVAL '1 hour'
'''
self.client.execute(query)
return self.client.fetchall()
Data Retention
class DataRetentionManager:
def __init__(self, db_client, table_name: str):
self.client = db_client
self.table_name = table_name
def create_retention_policy(self, keep_duration: str):
if isinstance(self.client, TimescaleDBConnection):
query = f'''
SELECT add_retention_policy('{self.table_name}', INTERVAL '{keep_duration}')
'''
self.client.execute(query)
def get_retention_info(self):
if isinstance(self.client, TimescaleDBConnection):
query = f'''
SELECT * FROM timescaledb_information.retention_policies
WHERE hypertable_name = '{self.table_name}'
'''
self.client.execute(query)
return self.client.fetchall()
return []
def manual_cleanup(self, older_than: str):
if isinstance(self.client, TimescaleDBConnection):
query = f'''
SELECT drop_chunks('{self.table_name}', older_than => INTERVAL '{older_than}')
'''
self.client.execute(query)
def archive_old_data(self, archive_table: str, older_than: str):
if isinstance(self.client, TimescaleDBConnection):
query = f'''
INSERT INTO {archive_table}
SELECT * FROM {self.table_name}
WHERE time < NOW() - INTERVAL '{older_than}'
'''
self.client.execute(query)
self.manual_cleanup(older_than)
Integration Patterns
Telegraf Integration
class TelegrafConfig:
@staticmethod
def generateInfluxDB_output(url: str, database: str, username: str, password: str) -> str:
return f'''
[[outputs.influxdb]]
urls = ["{url}"]
database = "{database}"
username = "{username}"
password = "{password}"
retention_policy = "autogen"
write_consistency = "any"
timeout = "5s"
'''
@staticmethod
def generateKafka_input(topic: str, brokers: list) -> str:
brokers_str = ', '.join(f'"{b}"' for b in brokers)
return f'''
[[inputs.kafka_consumer]]
brokers = [{brokers_str}]
topics = ["{topic}"]
data_format = "influx"
'''
@staticmethod
def generateHTTPListener_input(port: int = 8186) -> str:
return f'''
[[inputs.http_listener]]
service_address = ":{port}"
paths = ["/telegraf"]
data_format = "influx"
'''
Prometheus Integration
class PrometheusRemoteStorage:
def __init__(self, influx_client: InfluxDBClient):
self.client = influx_client
def write_prometheus_remote_write(self, data: bytes):
points = []
for line in data.decode('utf-8').strip().split('\n'):
if not line or line.startswith('#'):
continue
parts = line.split()
metric_with_labels = parts[0]
value = float(parts[1])
if '{' in metric_with_labels:
metric_name, labels_str = metric_with_labels.split('{')
labels_str = labels_str.rstrip('}')
labels = dict(
pair.split('=') for pair in labels_str.split(',')
)
labels = {k: v.strip('"') for k, v in labels.items()}
else:
metric_name = metric_with_labels
labels = {}
point = Point(metric_name).field("value", value)
for key, val in labels.items():
point.tag(key, val)
points.append(point)
if points:
self.client.write_points("prometheus", points)
def query_prometheus_range(self, query: str, start: str, end: str, step: str):
flux_query = f'''
from(bucket: "prometheus")
|> range(start: {start}, stop: {end})
|> filter(fn: (r) => r._measurement == "{query}")
|> aggregateWindow(every: {step}, fn: mean)
'''
return self.client.query(flux_query)
Monitoring and Observability
Cardinality Management
High cardinality — too many unique tag combinations — is the most common performance issue in time series databases.
def analyze_cardinality():
# InfluxDB cardinality
query = 'SHOW CARDINALITY'
# Prometheus cardinality
url = "http://localhost:9090/api/v1/label/__name__/values"
response = requests.get(url)
metrics = response.json()['data']
for metric in metrics:
query = f'count({{__name__="{metric}"}})'
cardinality = query_prometheus(query, 'now()-1h', 'now()')
print(f"{metric}: {cardinality}")
def reduce_cardinality():
# Drop unnecessary labels
cursor.execute("ALTER TABLE metrics DROP COLUMN IF EXISTS unnecessary_tag")
# Aggregate high cardinality data
cursor.execute("""
CREATE MATERIALIZED VIEW metrics_aggregated AS
SELECT
time_bucket('1 hour', time) as hour,
metric_name,
AVG(value) as value
FROM metrics
GROUP BY hour, metric_name
""")
Performance Monitoring
class TimeSeriesMonitoring:
def __init__(self, db_client):
self.client = db_client
def get_write_latency(self, table_name: str):
query = f'''
SELECT
time,
latency
FROM {table_name}
WHERE metric = 'write_latency'
AND time >= NOW() - INTERVAL '1 hour'
'''
self.client.execute(query)
return self.client.fetchall()
def get_query_performance(self, table_name: str):
query = f'''
SELECT
time_bucket('1 minute', time) AS bucket,
COUNT(*) AS query_count,
AVG(duration_ms) AS avg_duration,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY duration_ms) AS p95_duration
FROM {table_name}
WHERE metric = 'query_duration'
AND time >= NOW() - INTERVAL '24 hours'
GROUP BY bucket
ORDER BY bucket
'''
self.client.execute(query)
return self.client.fetchall()
def get_storage_usage(self, table_name: str):
query = f'''
SELECT
chunk_table,
before_compression_total_bytes,
after_compression_total_bytes
FROM timescaledb_information.compressed_chunk_stats
WHERE hypertable_name = '{table_name}'
ORDER BY chunk_table DESC
LIMIT 10
'''
self.client.execute(query)
return self.client.fetchall()
Best Practices
Schema Design
TSDB_BEST_PRACTICES = {
"schema_design": [
"Use low-cardinality tags for filtering and grouping",
"Use fields for high-cardinality values",
"Keep tag keys consistent across measurements",
"Avoid too many unique tag values (high cardinality)",
"Use meaningful measurement names",
"Consider partitioning by time and location"
],
"data_ingestion": [
"Batch writes for better throughput",
"Use appropriate compression",
"Set appropriate retention policies",
"Monitor write latency and error rates",
"Use bulk inserts when possible",
"Configure appropriate batch sizes"
],
"queries": [
"Always filter by time range",
"Use appropriate aggregation functions",
"Leverage continuous aggregates for common queries",
"Create indexes on frequently queried tags",
"Use time bucketing for large datasets",
"Consider pre-computed views"
],
"retention": [
"Define clear retention requirements",
"Use automated compression policies",
"Archive data before deletion",
"Monitor storage growth",
"Implement tiered storage",
"Test restore procedures"
]
}
Performance Tuning
class TSDBPerformanceTuner:
@staticmethod
def tune_write_performance(batch_size: int = 1000, flush_interval: int = 5):
return {
"batch_size": batch_size,
"flush_interval": flush_interval,
"max_retries": 3,
"use_compression": True
}
@staticmethod
def tune_query_performance():
return {
"result_cache_size": "256MB",
"max_cache_size": "1GB",
"enable_parallel_queries": True,
"enable_async_query": True
}
@staticmethod
def tune_storage():
return {
"compression_enabled": True,
"chunk_size": "7 days",
"compress_after": "7 days",
"index_on_time": True,
"index_on_tags": True
}
Comparison and Selection
Feature Comparison
| Feature | InfluxDB | TimescaleDB | Prometheus | ClickHouse |
|---|---|---|---|---|
| SQL Support | Limited | Full | No | Full |
| Retention Policies | Native | Native | Native | TTL |
| Continuous Aggregates | Tasks | Native | Recording Rules | Materialized |
| Compression | Native | Native | Native | Native |
| Clustering | Enterprise | Native | Federation | Native |
| Downsampling | Native | Native | Recording Rules | Materialized |
When to Use Each
Use InfluxDB when:
- Building IoT or sensor data applications
- Need Flux query language
- Cloud-native deployment preferred
- Simpler operational requirements
Use TimescaleDB when:
- Already using PostgreSQL
- Need full SQL capabilities
- Complex relational + time series queries
- Strong ACID requirements
Use Prometheus when:
- Monitoring infrastructure and applications
- NeedPull-based collection
- Working with Kubernetes
- Short-term metrics retention
Use ClickHouse when:
- Massive scale (billions of rows)
- Analytics-focused workloads
- Need complex aggregations
- Cost-effective storage required
Resources
- InfluxDB Documentation
- TimescaleDB Documentation
- InfluxDB Schema Design
- TimescaleDB Best Practices
- Time Series Fundamentals
Conclusion
Time series databases have become essential infrastructure for modern applications. Whether you choose InfluxDB for its purpose-built simplicity, TimescaleDB for its PostgreSQL compatibility, or another solution for specific requirements, understanding time series data patterns and best practices is crucial.
The key to successful time series database implementation lies in proper schema design, appropriate retention policies, efficient querying patterns, and thoughtful scaling strategies. With the right approach, time series databases can handle massive data volumes while providing fast queries for analytics and monitoring applications.
Comments