Introduction
DuckDB’s unique combination of embedded simplicity and analytical power makes it suitable for a wide range of applications. From quick data exploration to production analytics pipelines, DuckDB excels where traditional databases would be overkill and data warehouses would be overkill.
This article explores practical DuckDB use cases, implementation patterns, and best practices for production deployments.
Data Analysis and Exploration
Quick Data Exploration
import duckdb
import pandas as pd
# Connect to DuckDB (in-memory for quick analysis)
con = duckdb.connect()
# Load CSV and explore
df = con.execute("""
SELECT *
FROM read_csv_auto('data.csv')
LIMIT 1000
""").df()
# Quick statistics
print(df.describe())
print(df.info())
# Query directly from file
result = con.execute("""
SELECT
category,
COUNT(*) as count,
AVG(amount) as avg_amount,
MIN(amount) as min_amount,
MAX(amount) as max_amount
FROM read_csv_auto('data.csv')
GROUP BY category
ORDER BY count DESC
""").df()
print(result)
Ad-Hoc Analysis
-- Quick queries on large datasets
-- No setup required
-- Aggregate millions of rows
SELECT
date_trunc('month', order_date) as month,
category,
SUM(revenue) as revenue,
COUNT(DISTINCT customer_id) as customers
FROM read_csv_auto('orders_2024.csv')
WHERE status = 'completed'
GROUP BY 1, 2
ORDER BY 1, 3 DESC;
-- Complex joins
SELECT
c.name,
o.order_count,
o.total_revenue
FROM customers c
JOIN (
SELECT
customer_id,
COUNT(*) as order_count,
SUM(amount) as total_revenue
FROM orders
GROUP BY customer_id
) o ON c.id = o.customer_id
WHERE o.order_count > 10
ORDER BY o.total_revenue DESC;
ETL and Data Processing
CSV to Parquet Conversion
import duckdb
import glob
import os
class CSVToParquetETL:
"""Convert CSV files to Parquet."""
def __init__(self, db_path='etl.db'):
self.con = duckdb.connect(db_path)
def convert_directory(self, input_dir, output_dir):
"""Convert all CSV files in directory to Parquet."""
os.makedirs(output_dir, exist_ok=True)
csv_files = glob.glob(os.path.join(input_dir, '*.csv'))
for csv_file in csv_files:
table_name = os.path.splitext(os.path.basename(csv_file))[0]
# Create table from CSV
self.con.execute(f"""
CREATE OR REPLACE TABLE {table_name} AS
SELECT * FROM read_csv_auto('{csv_file}')
""")
# Export to Parquet
output_file = os.path.join(output_dir, f'{table_name}.parquet')
self.con.execute(f"""
COPY {table_name} TO '{output_file}' (FORMAT PARQUET)
""")
print(f"Converted: {csv_file} -> {output_file}")
def convert_incremental(self, input_file, output_dir, partition_by=None):
"""Convert with partitioning."""
table_name = 'temp_data'
self.con.execute(f"""
CREATE OR REPLACE TABLE {table_name} AS
SELECT * FROM read_csv_auto('{input_file}')
""")
if partition_by:
# Partition by date
partitions = self.con.execute(f"""
SELECT DISTINCT {partition_by} FROM {table_name}
""").fetchall()
for partition in partitions:
date_val = partition[0]
output_file = os.path.join(
output_dir,
f"{table_name}_{date_val}.parquet"
)
self.con.execute(f"""
COPY (SELECT * FROM {table_name} WHERE {partition_by} = '{date_val}')
TO '{output_file}' (FORMAT PARQUET)
""")
print(f"Converted: {input_file}")
# Usage
etl = CSVToParquetETL()
etl.convert_directory('/data/csv', '/data/parquet')
Data Cleaning Pipeline
import duckdb
class DataCleaner:
"""Data cleaning with DuckDB."""
def __init__(self):
self.con = duckdb.connect()
def clean_and_transform(self, input_file):
"""Clean and transform data."""
# Create cleaned table
self.con.execute("""
CREATE TABLE cleaned_data AS
SELECT
-- Clean strings
TRIM(name) as name,
LOWER(email) as email,
-- Handle nulls
COALESCE(phone, 'N/A') as phone,
-- Type conversions
CAST(age AS INTEGER) as age,
CAST(amount AS DOUBLE) as amount,
-- Date transformations
STR_TO_DATE(date_str, '%Y-%m-%d') as date,
-- Derived fields
UPPER(category) as category,
CASE
WHEN amount > 1000 THEN 'high'
WHEN amount > 100 THEN 'medium'
ELSE 'low'
END as amount_category
FROM read_csv_auto('{input_file}')
WHERE email IS NOT NULL
""")
# Validate cleaned data
stats = self.con.execute("""
SELECT
COUNT(*) as total_rows,
COUNT(DISTINCT email) as unique_emails,
MIN(date) as earliest_date,
MAX(date) as latest_date
FROM cleaned_data
""").fetchone()
return stats
# Usage
cleaner = DataCleaner()
stats = cleaner.clean_and_transform('raw_data.csv')
print(f"Cleaned {stats[0]} rows, {stats[1]} unique emails")
Business Intelligence
Dashboard Backend
import streamlit as st
import duckdb
class DashboardBackend:
"""Backend for Streamlit dashboard."""
def __init__(self, db_path='analytics.db'):
self.con = duckdb.connect(db_path, read_only=True)
def kpi_metrics(self):
"""Get KPI metrics."""
return self.con.execute("""
SELECT
(SELECT COUNT(*) FROM orders) as total_orders,
(SELECT SUM(amount) FROM orders) as total_revenue,
(SELECT AVG(amount) FROM orders) as avg_order_value,
(SELECT COUNT(DISTINCT customer_id) FROM orders) as unique_customers
""").fetchone()
def sales_trend(self, days=30):
"""Get sales trend."""
return self.con.execute(f"""
SELECT
DATE_TRUNC('day', order_date) as date,
COUNT(*) as orders,
SUM(amount) as revenue
FROM orders
WHERE order_date > CURRENT_DATE - INTERVAL '{days} days'
GROUP BY 1
ORDER BY 1
""").df()
def top_products(self, limit=10):
"""Get top selling products."""
return self.con.execute(f"""
SELECT
p.name,
COUNT(*) as order_count,
SUM(oi.quantity) as units_sold,
SUM(oi.amount) as revenue
FROM products p
JOIN order_items oi ON p.id = oi.product_id
GROUP BY 1
ORDER BY 3 DESC
LIMIT {limit}
""").df()
def customer_cohorts(self):
"""Get customer cohort analysis."""
return self.con.execute("""
SELECT
DATE_TRUNC('month', first_order_date) as cohort,
COUNT(*) as customers,
AVG(total_lifetime) as avg_ltv
FROM (
SELECT
customer_id,
MIN(order_date) as first_order_date,
SUM(amount) as total_lifetime
FROM orders
GROUP BY customer_id
)
GROUP BY 1
ORDER BY 1
""").df()
# Streamlit Dashboard
st.title("Sales Dashboard")
backend = DashboardBackend()
# KPI Row
kpis = backend.kpi_metrics()
col1, col2, col3, col4 = st.columns(4)
col1.metric("Orders", f"{kpis[0]:,}")
col2.metric("Revenue", f"${kpis[1]:,.2f}")
col3.metric("Avg Order", f"${kpis[2]:,.2f}")
col4.metric("Customers", f"{kpis[3]:,}")
# Charts
st.subheader("Sales Trend")
trend = backend.sales_trend(30)
st.line_chart(trend.set_index('date')['revenue'])
st.subheader("Top Products")
st.dataframe(backend.top_products())
Report Generation
import duckdb
from datetime import datetime
class ReportGenerator:
"""Generate analytics reports."""
def __init__(self, db_path='analytics.db'):
self.con = duckdb.connect(db_path)
def generate_monthly_report(self, year, month):
"""Generate monthly report."""
report = {
'period': f'{year}-{month:02d}',
'generated_at': datetime.now().isoformat(),
}
# Summary stats
summary = self.con.execute(f"""
SELECT
COUNT(*) as order_count,
SUM(amount) as total_revenue,
AVG(amount) as avg_order_value,
COUNT(DISTINCT customer_id) as customer_count
FROM orders
WHERE EXTRACT(YEAR FROM order_date) = {year}
AND EXTRACT(MONTH FROM order_date) = {month}
""").fetchone()
report['orders'] = summary[0]
report['revenue'] = summary[1]
report['avg_order'] = summary[2]
report['customers'] = summary[3]
# Top categories
categories = self.con.execute(f"""
SELECT
category,
SUM(amount) as revenue,
COUNT(*) as orders
FROM orders
WHERE EXTRACT(YEAR FROM order_date) = {year}
AND EXTRACT(MONTH FROM order_date) = {month}
GROUP BY category
ORDER BY revenue DESC
LIMIT 5
""").fetchall()
report['top_categories'] = [
{'category': c[0], 'revenue': c[1], 'orders': c[2]}
for c in categories
]
return report
# Usage
generator = ReportGenerator()
report = generator.generate_monthly_report(2026, 2)
print(report)
Embedded Analytics
Desktop Application
import tkinter as tk
from tkinter import ttk
import duckdb
class AnalyticsApp:
"""Desktop analytics application."""
def __init__(self, db_path):
self.db_path = db_path
self.setup_ui()
def setup_ui(self):
self.root = tk.Tk()
self.root.title("Analytics App")
# Query input
self.query_input = ttk.Entry(self.root, width=50)
self.query_input.pack(pady=10)
# Run button
ttk.Button(
self.root,
text="Run Query",
command=self.run_query
).pack()
# Results
self.results = ttk.Treeview(self.root)
self.results.pack(fill=tk.BOTH, expand=True)
self.root.mainloop()
def run_query(self):
query = self.query_input.get()
con = duckdb.connect(self.db_path, read_only=True)
try:
results = con.execute(query).fetchall()
# Clear previous
for item in self.results.get_children():
self.results.delete(item)
# Show results
for row in results:
self.results.insert('', tk.END, values=row)
except Exception as e:
print(f"Error: {e}")
con.close()
# Usage
app = AnalyticsApp('analytics.db')
Mobile App (with SQLite backup)
# For mobile, use SQLite-compatible mode
# Then export to DuckDB for heavy analysis
import sqlite3
import duckdb
class MobileAnalytics:
"""Mobile analytics with DuckDB backend."""
def __init__(self):
# Local SQLite for offline
self.sqlite = sqlite3.connect('mobile_data.db')
# Copy to DuckDB when needed
self.duckdb = duckdb.connect('analytics.db')
def sync_to_duckdb(self):
"""Sync SQLite data to DuckDB."""
# Export from SQLite
data = self.sqlite.execute("SELECT * FROM events").fetchall()
# Insert into DuckDB
self.duckdb.executemany(
"INSERT INTO events VALUES (?, ?, ?)",
data
)
self.duckdb.commit()
def run_analytics(self):
"""Run analytics on DuckDB."""
return self.duckdb.execute("""
SELECT
DATE(event_time) as date,
COUNT(*) as events
FROM events
GROUP BY 1
""").df()
Log Analysis
Application Logs
import duckdb
import gzip
from datetime import datetime
class LogAnalyzer:
"""Analyze application logs with DuckDB."""
def __init__(self):
self.con = duckdb.connect('logs.duckdb')
def ingest_logs(self, log_file):
"""Ingest log file."""
# Create table
self.con.execute("""
CREATE TABLE IF NOT EXISTS logs (
timestamp TIMESTAMP,
level VARCHAR,
message VARCHAR,
service VARCHAR
)
""")
# Parse and insert logs
with open(log_file) as f:
for line in f:
parts = line.strip().split(' ')
if len(parts) >= 4:
self.con.execute("""
INSERT INTO logs VALUES (?, ?, ?, ?)
""", (parts[0], parts[1], ' '.join(parts[2:-1]), parts[-1]))
self.con.commit()
def error_analysis(self):
"""Analyze errors."""
return self.con.execute("""
SELECT
DATE(timestamp) as date,
level,
COUNT(*) as count
FROM logs
WHERE level IN ('ERROR', 'FATAL')
GROUP BY 1, 2
ORDER BY 1 DESC, 3 DESC
""").df()
def service_health(self):
"""Check service health."""
return self.con.execute("""
SELECT
service,
COUNT(*) as total_logs,
SUM(CASE WHEN level = 'ERROR' THEN 1 ELSE 0 END) as errors,
MAX(timestamp) as last_activity
FROM logs
GROUP BY service
""").df()
# Usage
analyzer = LogAnalyzer()
analyzer.ingest_logs('/var/log/app.log')
print(analyzer.error_analysis())
Financial Analysis
Portfolio Analytics
import duckdb
import pandas as pd
class PortfolioAnalyzer:
"""Financial portfolio analytics."""
def __init__(self, db_path='portfolio.duckdb'):
self.con = duckdb.connect(db_path)
def calculate_returns(self):
"""Calculate portfolio returns."""
return self.con.execute("""
SELECT
symbol,
DATE_TRUNC('day', date) as date,
price,
LAG(price) OVER (PARTITION BY symbol ORDER BY date) as prev_price,
(price - LAG(price) OVER (PARTITION BY symbol ORDER BY date))
/ LAG(price) OVER (PARTITION BY symbol ORDER BY date) * 100 as return_pct
FROM prices
ORDER BY symbol, date
""").df()
def portfolio_summary(self, holdings):
"""Calculate portfolio summary."""
# Create temporary table for holdings
holdings_df = pd.DataFrame(holdings)
return self.con.execute("""
SELECT
h.symbol,
h.shares,
p.latest_price,
h.shares * p.latest_price as market_value,
h.cost_basis,
(h.shares * p.latest_price) - h.cost_basis as gain_loss,
((h.shares * p.latest_price) - h.cost_basis) / h.cost_basis * 100 as return_pct
FROM holdings h
JOIN (
SELECT symbol, MAX(price) as latest_price
FROM prices
GROUP BY symbol
) p ON h.symbol = p.symbol
""").df()
def risk_metrics(self):
"""Calculate risk metrics."""
returns = self.con.execute("""
SELECT
symbol,
(price - AVG(price) OVER (PARTITION BY symbol)) / STDDEV(price) OVER (PARTITION BY symbol) as z_score
FROM prices
""").fetchall()
# Calculate VaR
var_95 = self.con.execute("""
SELECT
0.05 as quantile,
PERCENTILE_CONT(0.05) WITHIN GROUP (ORDER BY daily_return) as var_95
FROM (
SELECT
symbol,
(price - LAG(price)) / LAG(price) as daily_return
FROM prices
)
""").fetchone()
return {'var_95': var_95}
# Usage
portfolio = PortfolioAnalyzer()
summary = portfolio.portfolio_summary([
{'symbol': 'AAPL', 'shares': 100, 'cost_basis': 15000},
{'symbol': 'GOOGL', 'shares': 50, 'cost_basis': 7500},
])
print(summary)
Best Practices Summary
When to Use DuckDB
| Use Case | Recommendation |
|---|---|
| Data exploration | โ Perfect |
| ETL pipelines | โ Excellent |
| BI dashboards | โ Good |
| ML features | โ Excellent |
| Embedded analytics | โ Ideal |
| Transactional apps | โ Use PostgreSQL |
| Real-time apps | โ Use Redis |
| Distributed analytics | โ Use ClickHouse |
Performance Tips
# Use appropriate memory settings
con = duckdb.connect(config={
'memory_limit': '8GB',
'threads': 8
})
# Filter early
# Good
SELECT * FROM table WHERE date > '2025-01-01' GROUP BY category
# Use Parquet for large data
SELECT * FROM read_parquet('data/*.parquet')
# Batch operations
con.executemany("INSERT INTO table VALUES (?, ?)", data)
Resources
Conclusion
DuckDB’s versatility makes it an excellent choice for analytical workloads across many domains. From quick data exploration to production ML pipelines, DuckDB provides a simple yet powerful solution for modern data analytics.
With this comprehensive guide, you now have the knowledge to implement DuckDB effectively in various production scenarios.
Comments