Skip to main content
โšก Calmops

DuckDB Use Cases: Real-World Applications and Production Patterns

Introduction

DuckDB’s unique combination of embedded simplicity and analytical power makes it suitable for a wide range of applications. From quick data exploration to production analytics pipelines, DuckDB excels where traditional databases would be overkill and data warehouses would be overkill.

This article explores practical DuckDB use cases, implementation patterns, and best practices for production deployments.


Data Analysis and Exploration

Quick Data Exploration

import duckdb
import pandas as pd

# Connect to DuckDB (in-memory for quick analysis)
con = duckdb.connect()

# Load CSV and explore
df = con.execute("""
    SELECT * 
    FROM read_csv_auto('data.csv')
    LIMIT 1000
""").df()

# Quick statistics
print(df.describe())
print(df.info())

# Query directly from file
result = con.execute("""
    SELECT 
        category,
        COUNT(*) as count,
        AVG(amount) as avg_amount,
        MIN(amount) as min_amount,
        MAX(amount) as max_amount
    FROM read_csv_auto('data.csv')
    GROUP BY category
    ORDER BY count DESC
""").df()

print(result)

Ad-Hoc Analysis

-- Quick queries on large datasets
-- No setup required

-- Aggregate millions of rows
SELECT 
    date_trunc('month', order_date) as month,
    category,
    SUM(revenue) as revenue,
    COUNT(DISTINCT customer_id) as customers
FROM read_csv_auto('orders_2024.csv')
WHERE status = 'completed'
GROUP BY 1, 2
ORDER BY 1, 3 DESC;

-- Complex joins
SELECT 
    c.name,
    o.order_count,
    o.total_revenue
FROM customers c
JOIN (
    SELECT 
        customer_id,
        COUNT(*) as order_count,
        SUM(amount) as total_revenue
    FROM orders
    GROUP BY customer_id
) o ON c.id = o.customer_id
WHERE o.order_count > 10
ORDER BY o.total_revenue DESC;

ETL and Data Processing

CSV to Parquet Conversion

import duckdb
import glob
import os

class CSVToParquetETL:
    """Convert CSV files to Parquet."""
    
    def __init__(self, db_path='etl.db'):
        self.con = duckdb.connect(db_path)
    
    def convert_directory(self, input_dir, output_dir):
        """Convert all CSV files in directory to Parquet."""
        os.makedirs(output_dir, exist_ok=True)
        
        csv_files = glob.glob(os.path.join(input_dir, '*.csv'))
        
        for csv_file in csv_files:
            table_name = os.path.splitext(os.path.basename(csv_file))[0]
            
            # Create table from CSV
            self.con.execute(f"""
                CREATE OR REPLACE TABLE {table_name} AS
                SELECT * FROM read_csv_auto('{csv_file}')
            """)
            
            # Export to Parquet
            output_file = os.path.join(output_dir, f'{table_name}.parquet')
            self.con.execute(f"""
                COPY {table_name} TO '{output_file}' (FORMAT PARQUET)
            """)
            
            print(f"Converted: {csv_file} -> {output_file}")
    
    def convert_incremental(self, input_file, output_dir, partition_by=None):
        """Convert with partitioning."""
        table_name = 'temp_data'
        
        self.con.execute(f"""
            CREATE OR REPLACE TABLE {table_name} AS
            SELECT * FROM read_csv_auto('{input_file}')
        """)
        
        if partition_by:
            # Partition by date
            partitions = self.con.execute(f"""
                SELECT DISTINCT {partition_by} FROM {table_name}
            """).fetchall()
            
            for partition in partitions:
                date_val = partition[0]
                output_file = os.path.join(
                    output_dir, 
                    f"{table_name}_{date_val}.parquet"
                )
                self.con.execute(f"""
                    COPY (SELECT * FROM {table_name} WHERE {partition_by} = '{date_val}')
                    TO '{output_file}' (FORMAT PARQUET)
                """)
        
        print(f"Converted: {input_file}")

# Usage
etl = CSVToParquetETL()
etl.convert_directory('/data/csv', '/data/parquet')

Data Cleaning Pipeline

import duckdb

class DataCleaner:
    """Data cleaning with DuckDB."""
    
    def __init__(self):
        self.con = duckdb.connect()
    
    def clean_and_transform(self, input_file):
        """Clean and transform data."""
        # Create cleaned table
        self.con.execute("""
            CREATE TABLE cleaned_data AS
            SELECT 
                -- Clean strings
                TRIM(name) as name,
                LOWER(email) as email,
                
                -- Handle nulls
                COALESCE(phone, 'N/A') as phone,
                
                -- Type conversions
                CAST(age AS INTEGER) as age,
                CAST(amount AS DOUBLE) as amount,
                
                -- Date transformations
                STR_TO_DATE(date_str, '%Y-%m-%d') as date,
                
                -- Derived fields
                UPPER(category) as category,
                CASE 
                    WHEN amount > 1000 THEN 'high'
                    WHEN amount > 100 THEN 'medium'
                    ELSE 'low'
                END as amount_category
                
            FROM read_csv_auto('{input_file}')
            WHERE email IS NOT NULL
        """)
        
        # Validate cleaned data
        stats = self.con.execute("""
            SELECT 
                COUNT(*) as total_rows,
                COUNT(DISTINCT email) as unique_emails,
                MIN(date) as earliest_date,
                MAX(date) as latest_date
            FROM cleaned_data
        """).fetchone()
        
        return stats

# Usage
cleaner = DataCleaner()
stats = cleaner.clean_and_transform('raw_data.csv')
print(f"Cleaned {stats[0]} rows, {stats[1]} unique emails")

Business Intelligence

Dashboard Backend

import streamlit as st
import duckdb

class DashboardBackend:
    """Backend for Streamlit dashboard."""
    
    def __init__(self, db_path='analytics.db'):
        self.con = duckdb.connect(db_path, read_only=True)
    
    def kpi_metrics(self):
        """Get KPI metrics."""
        return self.con.execute("""
            SELECT 
                (SELECT COUNT(*) FROM orders) as total_orders,
                (SELECT SUM(amount) FROM orders) as total_revenue,
                (SELECT AVG(amount) FROM orders) as avg_order_value,
                (SELECT COUNT(DISTINCT customer_id) FROM orders) as unique_customers
        """).fetchone()
    
    def sales_trend(self, days=30):
        """Get sales trend."""
        return self.con.execute(f"""
            SELECT 
                DATE_TRUNC('day', order_date) as date,
                COUNT(*) as orders,
                SUM(amount) as revenue
            FROM orders
            WHERE order_date > CURRENT_DATE - INTERVAL '{days} days'
            GROUP BY 1
            ORDER BY 1
        """).df()
    
    def top_products(self, limit=10):
        """Get top selling products."""
        return self.con.execute(f"""
            SELECT 
                p.name,
                COUNT(*) as order_count,
                SUM(oi.quantity) as units_sold,
                SUM(oi.amount) as revenue
            FROM products p
            JOIN order_items oi ON p.id = oi.product_id
            GROUP BY 1
            ORDER BY 3 DESC
            LIMIT {limit}
        """).df()
    
    def customer_cohorts(self):
        """Get customer cohort analysis."""
        return self.con.execute("""
            SELECT 
                DATE_TRUNC('month', first_order_date) as cohort,
                COUNT(*) as customers,
                AVG(total_lifetime) as avg_ltv
            FROM (
                SELECT 
                    customer_id,
                    MIN(order_date) as first_order_date,
                    SUM(amount) as total_lifetime
                FROM orders
                GROUP BY customer_id
            )
            GROUP BY 1
            ORDER BY 1
        """).df()

# Streamlit Dashboard
st.title("Sales Dashboard")

backend = DashboardBackend()

# KPI Row
kpis = backend.kpi_metrics()
col1, col2, col3, col4 = st.columns(4)
col1.metric("Orders", f"{kpis[0]:,}")
col2.metric("Revenue", f"${kpis[1]:,.2f}")
col3.metric("Avg Order", f"${kpis[2]:,.2f}")
col4.metric("Customers", f"{kpis[3]:,}")

# Charts
st.subheader("Sales Trend")
trend = backend.sales_trend(30)
st.line_chart(trend.set_index('date')['revenue'])

st.subheader("Top Products")
st.dataframe(backend.top_products())

Report Generation

import duckdb
from datetime import datetime

class ReportGenerator:
    """Generate analytics reports."""
    
    def __init__(self, db_path='analytics.db'):
        self.con = duckdb.connect(db_path)
    
    def generate_monthly_report(self, year, month):
        """Generate monthly report."""
        report = {
            'period': f'{year}-{month:02d}',
            'generated_at': datetime.now().isoformat(),
        }
        
        # Summary stats
        summary = self.con.execute(f"""
            SELECT 
                COUNT(*) as order_count,
                SUM(amount) as total_revenue,
                AVG(amount) as avg_order_value,
                COUNT(DISTINCT customer_id) as customer_count
            FROM orders
            WHERE EXTRACT(YEAR FROM order_date) = {year}
            AND EXTRACT(MONTH FROM order_date) = {month}
        """).fetchone()
        
        report['orders'] = summary[0]
        report['revenue'] = summary[1]
        report['avg_order'] = summary[2]
        report['customers'] = summary[3]
        
        # Top categories
        categories = self.con.execute(f"""
            SELECT 
                category,
                SUM(amount) as revenue,
                COUNT(*) as orders
            FROM orders
            WHERE EXTRACT(YEAR FROM order_date) = {year}
            AND EXTRACT(MONTH FROM order_date) = {month}
            GROUP BY category
            ORDER BY revenue DESC
            LIMIT 5
        """).fetchall()
        
        report['top_categories'] = [
            {'category': c[0], 'revenue': c[1], 'orders': c[2]}
            for c in categories
        ]
        
        return report

# Usage
generator = ReportGenerator()
report = generator.generate_monthly_report(2026, 2)
print(report)

Embedded Analytics

Desktop Application

import tkinter as tk
from tkinter import ttk
import duckdb

class AnalyticsApp:
    """Desktop analytics application."""
    
    def __init__(self, db_path):
        self.db_path = db_path
        self.setup_ui()
    
    def setup_ui(self):
        self.root = tk.Tk()
        self.root.title("Analytics App")
        
        # Query input
        self.query_input = ttk.Entry(self.root, width=50)
        self.query_input.pack(pady=10)
        
        # Run button
        ttk.Button(
            self.root, 
            text="Run Query", 
            command=self.run_query
        ).pack()
        
        # Results
        self.results = ttk.Treeview(self.root)
        self.results.pack(fill=tk.BOTH, expand=True)
        
        self.root.mainloop()
    
    def run_query(self):
        query = self.query_input.get()
        
        con = duckdb.connect(self.db_path, read_only=True)
        
        try:
            results = con.execute(query).fetchall()
            
            # Clear previous
            for item in self.results.get_children():
                self.results.delete(item)
            
            # Show results
            for row in results:
                self.results.insert('', tk.END, values=row)
                
        except Exception as e:
            print(f"Error: {e}")
        
        con.close()

# Usage
app = AnalyticsApp('analytics.db')

Mobile App (with SQLite backup)

# For mobile, use SQLite-compatible mode
# Then export to DuckDB for heavy analysis

import sqlite3
import duckdb

class MobileAnalytics:
    """Mobile analytics with DuckDB backend."""
    
    def __init__(self):
        # Local SQLite for offline
        self.sqlite = sqlite3.connect('mobile_data.db')
        
        # Copy to DuckDB when needed
        self.duckdb = duckdb.connect('analytics.db')
    
    def sync_to_duckdb(self):
        """Sync SQLite data to DuckDB."""
        # Export from SQLite
        data = self.sqlite.execute("SELECT * FROM events").fetchall()
        
        # Insert into DuckDB
        self.duckdb.executemany(
            "INSERT INTO events VALUES (?, ?, ?)",
            data
        )
        self.duckdb.commit()
    
    def run_analytics(self):
        """Run analytics on DuckDB."""
        return self.duckdb.execute("""
            SELECT 
                DATE(event_time) as date,
                COUNT(*) as events
            FROM events
            GROUP BY 1
        """).df()

Log Analysis

Application Logs

import duckdb
import gzip
from datetime import datetime

class LogAnalyzer:
    """Analyze application logs with DuckDB."""
    
    def __init__(self):
        self.con = duckdb.connect('logs.duckdb')
    
    def ingest_logs(self, log_file):
        """Ingest log file."""
        # Create table
        self.con.execute("""
            CREATE TABLE IF NOT EXISTS logs (
                timestamp TIMESTAMP,
                level VARCHAR,
                message VARCHAR,
                service VARCHAR
            )
        """)
        
        # Parse and insert logs
        with open(log_file) as f:
            for line in f:
                parts = line.strip().split(' ')
                if len(parts) >= 4:
                    self.con.execute("""
                        INSERT INTO logs VALUES (?, ?, ?, ?)
                    """, (parts[0], parts[1], ' '.join(parts[2:-1]), parts[-1]))
        
        self.con.commit()
    
    def error_analysis(self):
        """Analyze errors."""
        return self.con.execute("""
            SELECT 
                DATE(timestamp) as date,
                level,
                COUNT(*) as count
            FROM logs
            WHERE level IN ('ERROR', 'FATAL')
            GROUP BY 1, 2
            ORDER BY 1 DESC, 3 DESC
        """).df()
    
    def service_health(self):
        """Check service health."""
        return self.con.execute("""
            SELECT 
                service,
                COUNT(*) as total_logs,
                SUM(CASE WHEN level = 'ERROR' THEN 1 ELSE 0 END) as errors,
                MAX(timestamp) as last_activity
            FROM logs
            GROUP BY service
        """).df()

# Usage
analyzer = LogAnalyzer()
analyzer.ingest_logs('/var/log/app.log')
print(analyzer.error_analysis())

Financial Analysis

Portfolio Analytics

import duckdb
import pandas as pd

class PortfolioAnalyzer:
    """Financial portfolio analytics."""
    
    def __init__(self, db_path='portfolio.duckdb'):
        self.con = duckdb.connect(db_path)
    
    def calculate_returns(self):
        """Calculate portfolio returns."""
        return self.con.execute("""
            SELECT 
                symbol,
                DATE_TRUNC('day', date) as date,
                price,
                LAG(price) OVER (PARTITION BY symbol ORDER BY date) as prev_price,
                (price - LAG(price) OVER (PARTITION BY symbol ORDER BY date)) 
                    / LAG(price) OVER (PARTITION BY symbol ORDER BY date) * 100 as return_pct
            FROM prices
            ORDER BY symbol, date
        """).df()
    
    def portfolio_summary(self, holdings):
        """Calculate portfolio summary."""
        # Create temporary table for holdings
        holdings_df = pd.DataFrame(holdings)
        
        return self.con.execute("""
            SELECT 
                h.symbol,
                h.shares,
                p.latest_price,
                h.shares * p.latest_price as market_value,
                h.cost_basis,
                (h.shares * p.latest_price) - h.cost_basis as gain_loss,
                ((h.shares * p.latest_price) - h.cost_basis) / h.cost_basis * 100 as return_pct
            FROM holdings h
            JOIN (
                SELECT symbol, MAX(price) as latest_price
                FROM prices
                GROUP BY symbol
            ) p ON h.symbol = p.symbol
        """).df()
    
    def risk_metrics(self):
        """Calculate risk metrics."""
        returns = self.con.execute("""
            SELECT 
                symbol,
                (price - AVG(price) OVER (PARTITION BY symbol)) / STDDEV(price) OVER (PARTITION BY symbol) as z_score
            FROM prices
        """).fetchall()
        
        # Calculate VaR
        var_95 = self.con.execute("""
            SELECT 
                0.05 as quantile,
                PERCENTILE_CONT(0.05) WITHIN GROUP (ORDER BY daily_return) as var_95
            FROM (
                SELECT 
                    symbol,
                    (price - LAG(price)) / LAG(price) as daily_return
                FROM prices
            )
        """).fetchone()
        
        return {'var_95': var_95}

# Usage
portfolio = PortfolioAnalyzer()
summary = portfolio.portfolio_summary([
    {'symbol': 'AAPL', 'shares': 100, 'cost_basis': 15000},
    {'symbol': 'GOOGL', 'shares': 50, 'cost_basis': 7500},
])
print(summary)

Best Practices Summary

When to Use DuckDB

Use Case Recommendation
Data exploration โœ… Perfect
ETL pipelines โœ… Excellent
BI dashboards โœ… Good
ML features โœ… Excellent
Embedded analytics โœ… Ideal
Transactional apps โŒ Use PostgreSQL
Real-time apps โŒ Use Redis
Distributed analytics โŒ Use ClickHouse

Performance Tips

# Use appropriate memory settings
con = duckdb.connect(config={
    'memory_limit': '8GB',
    'threads': 8
})

# Filter early
# Good
SELECT * FROM table WHERE date > '2025-01-01' GROUP BY category

# Use Parquet for large data
SELECT * FROM read_parquet('data/*.parquet')

# Batch operations
con.executemany("INSERT INTO table VALUES (?, ?)", data)

Resources


Conclusion

DuckDB’s versatility makes it an excellent choice for analytical workloads across many domains. From quick data exploration to production ML pipelines, DuckDB provides a simple yet powerful solution for modern data analytics.

With this comprehensive guide, you now have the knowledge to implement DuckDB effectively in various production scenarios.

Comments