Introduction
Database connection pooling is one of the most critical optimization techniques for high-performance applications. Every database connection incurs significant overheadโTCP handshakes, authentication, memory allocation, and thread management. Without proper pooling, your application can quickly exhaust database limits, experience latency spikes, or fail under load.
Modern applications handle thousands of concurrent requests, but databases can typically support only hundreds to thousands of connections. Connection pooling bridges this gap by maintaining a pool of reusable connections, dramatically reducing the overhead of establishing new database connections for each request.
The Connection Problem
Without Connection Pooling
Request 1 โ Create Connection โ Authenticate โ Execute Query โ Close Connection โ Response
Request 2 โ Create Connection โ Authenticate โ Execute Query โ Close Connection โ Response
Request 3 โ Create Connection โ Authenticate โ Execute Query โ Close Connection โ Response
...
Each request creates a new connection, which involves:
- TCP three-way handshake
- SSL/TLS negotiation (if enabled)
- Database authentication
- Session initialization
- Memory allocation for connection state
- Thread/Goroutine allocation
With Connection Pooling
Connection Pool: [Conn1] [Conn2] [Conn3] [Conn4]
โ โ โ โ
Request 1 โโโ Conn1 โโโ Execute โโโ Return to Pool
Request 2 โโโ Conn2 โโโ Execute โโโ Return to Pool
Request 3 โโโ Conn3 โโโ Execute โโโ Return to Pool
Request 4 โโโ Conn4 โโโ Execute โโโ Return to Pool
The pool maintains persistent connections that are reused across requests.
Connection Pool Concepts
Core Components
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Connection Pool โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โ โ Conn 1 โ โ Conn 2 โ โ Conn 3 โ โ Conn N โ โ
โ โ โ ACTIVEโ โ โ IDLE โ โ โ IDLE โ โ โ IDLE โ โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โ โ
โ Min Connections: 10 โ
โ Max Connections: 100 โ
โ Acquire Timeout: 30s โ
โ Idle Timeout: 10min โ
โ Connection Lifetime: 1h โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Pool States
| State | Description |
|---|---|
| Idle | Connection available for use |
| Active | Currently executing a query |
| Acquiring | Waiting to obtain connection |
| Closing | Connection being terminated |
| Broken | Connection marked as unusable |
Language-Specific Connection Pools
Go: pgxpool for PostgreSQL
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/tracelog"
)
type DatabaseConfig struct {
Host string
Port int
Database string
User string
Password string
MinConnections int32
MaxConnections int32
MaxConnLifetime time.Duration
MaxConnIdleTime time.Duration
HealthCheckPeriod time.Duration
}
func NewConnectionPool(config DatabaseConfig) (*pgxpool.Pool, error) {
poolConfig, err := pgxpool.ParseConfig(fmt.Sprintf(
"postgres://%s:%s@%s:%d/%s?sslmode=require",
config.User,
config.Password,
config.Host,
config.Port,
config.Database,
))
if err != nil {
return nil, fmt.Errorf("failed to parse config: %w", err)
}
// Connection pool settings
poolConfig.MinConns = config.MinConnections
poolConfig.MaxConns = config.MaxConnections
poolConfig.MaxConnLifetime = config.MaxConnLifetime
poolConfig.MaxConnIdleTime = config.MaxConnIdleTime
// Health check
poolConfig.HealthCheckPeriod = config.HealthCheckPeriod
// Connection checkout behavior
poolConfig.MaxConnsBeforeIdleCheckout = 5
// Logging
poolConfig.ConnConfig.Tracer = &tracelog.Tracer{
LogLevel: tracelog.LogLevelDebug,
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
pool, err := pgxpool.NewWithConfig(ctx, poolConfig)
if err != nil {
return nil, fmt.Errorf("failed to create pool: %w", err)
}
// Verify connection
if err := pool.Ping(ctx); err != nil {
return nil, fmt.Errorf("failed to ping database: %w", err)
}
return pool, nil
}
func main() {
pool, err := NewConnectionPool(DatabaseConfig{
Host: os.Getenv("DB_HOST"),
Port: 5432,
Database: os.Getenv("DB_NAME"),
User: os.Getenv("DB_USER"),
Password: os.Getenv("DB_PASSWORD"),
MinConnections: 10,
MaxConnections: 100,
MaxConnLifetime: time.Hour,
MaxConnIdleTime: 30 * time.Minute,
HealthCheckPeriod: 1 * time.Minute,
})
if err != nil {
panic(err)
}
defer pool.Close()
// Use pool
ctx := context.Background()
// Query with automatic connection handling
var result int
err = pool.QueryRow(ctx, "SELECT COUNT(*) FROM users").Scan(&result)
if err != nil {
panic(err)
}
fmt.Printf("User count: %d\n", result)
// Use Acquire/Release pattern for complex operations
conn, err := pool.Acquire(ctx)
if err != nil {
panic(err)
}
defer conn.Release()
// Connection is now active
_, err = conn.Conn().Exec(ctx, "INSERT INTO logs (message) VALUES ($1)", "test")
if err != nil {
panic(err)
}
}
Python: asyncpg for PostgreSQL
import asyncio
from typing import Optional
import asyncpg
from asyncpg import Pool, Connection
import logging
logger = logging.getLogger(__name__)
class DatabasePool:
def __init__(
self,
host: str,
port: int,
database: str,
user: str,
password: str,
min_size: int = 10,
max_size: int = 100,
command_timeout: float = 60.0,
pool_timeout: float = 30.0,
max_queries: int = 50000,
max_inactive_connection_lifetime: float = 300.0,
):
self.host = host
self.port = port
self.database = database
self.user = user
self.password = password
self.min_size = min_size
self.max_size = max_size
self.command_timeout = command_timeout
self.pool_timeout = pool_timeout
self.max_queries = max_queries
self.max_inactive_connection_lifetime = max_inactive_connection_lifetime
self._pool: Optional[Pool] = None
async def init_pool(self) -> None:
self._pool = await asyncpg.create_pool(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
min_size=self.min_size,
max_size=self.max_size,
command_timeout=self.command_timeout,
pool_timeout=self.pool_timeout,
max_queries=self.max_queries,
max_inactive_connection_lifetime=self.max_inactive_connection_lifetime,
)
logger.info(f"Database pool initialized: {self.min_size}-{self.max_size} connections")
async def close_pool(self) -> None:
if self._pool:
await self._pool.close()
logger.info("Database pool closed")
@property
def pool(self) -> Pool:
if not self._pool:
raise RuntimeError("Pool not initialized. Call init_pool() first.")
return self._pool
async def fetch(self, query: str, *args) -> list[dict]:
async with self.pool.acquire() as conn:
return await conn.fetch(query, *args)
async def fetchrow(self, query: str, *args) -> Optional[dict]:
async with self.pool.acquire() as conn:
return await conn.fetchrow(query, *args)
async def execute(self, query: str, *args) -> str:
async with self.pool.acquire() as conn:
return await conn.execute(query, *args)
async def executemany(self, query: str, args_list: list[tuple]) -> None:
async with self.pool.acquire() as conn:
await conn.executemany(query, args_list)
async def get_pool_status(self) -> dict:
pool = self.pool
return {
"size": pool.get_size(),
"free": pool.get_idle_size(),
"min_size": self.min_size,
"max_size": self.max_size,
}
async def main():
db = DatabasePool(
host="localhost",
port=5432,
database="mydb",
user="user",
password="password",
min_size=10,
max_size=100,
)
await db.init_pool()
try:
# Simple query
users = await db.fetch("SELECT * FROM users WHERE active = $1", True)
print(f"Found {len(users)} active users")
# Single row
user = await db.fetchrow("SELECT * FROM users WHERE id = $1", 1)
if user:
print(f"User: {user['username']}")
# Insert
await db.execute(
"INSERT INTO users (username, email) VALUES ($1, $2)",
"john", "[email protected]"
)
# Batch insert
await db.executemany(
"INSERT INTO logs (level, message) VALUES ($1, $2)",
[("INFO", "User logged in"), ("ERROR", "Failed login")]
)
# Pool status
status = await db.get_pool_status()
print(f"Pool status: {status}")
finally:
await db.close_pool()
if __name__ == "__main__":
asyncio.run(main())
Node.js: pg Pool
const { Pool } = require('pg');
class DatabasePool {
constructor(config) {
this.config = {
host: config.host || 'localhost',
port: config.port || 5432,
database: config.database,
user: config.user,
password: config.password,
max: config.maxConnections || 20,
min: config.minConnections || 5,
idleTimeoutMillis: config.idleTimeoutMillis || 30000,
connectionTimeoutMillis: config.connectionTimeoutMillis || 2000,
maxUses: config.maxUses || 5000,
allowExitOnIdle: config.allowExitOnIdle || false,
};
this.pool = new Pool(this.config);
this.pool.on('error', (err) => {
console.error('Unexpected error on idle client', err);
});
this.pool.on('connect', () => {
console.log('New client connected to database');
});
}
async query(text, params) {
const start = Date.now();
try {
const result = await this.pool.query(text, params);
const duration = Date.now() - start;
console.log('Executed query', {
text,
duration,
rows: result.rowCount
});
return result;
} catch (error) {
console.error('Query error:', error);
throw error;
}
}
async getClient() {
return await this.pool.connect();
}
async transaction(callback) {
const client = await this.getClient();
try {
await client.query('BEGIN');
const result = await callback(client);
await client.query('COMMIT');
return result;
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async getPoolStatus() {
const pool = this.pool;
return {
totalCount: pool.totalCount,
idleCount: pool.idleCount,
waitingCount: pool.waitingCount,
max: this.config.max,
min: this.config.min,
};
}
async healthCheck() {
try {
const result = await this.pool.query('SELECT 1');
return result.rows.length > 0;
} catch (error) {
console.error('Health check failed:', error);
return false;
}
}
async close() {
await this.pool.end();
console.log('Pool closed');
}
}
// Usage
const db = new DatabasePool({
host: process.env.DB_HOST,
database: process.env.DB_NAME,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
maxConnections: 20,
minConnections: 5,
});
// Simple query
const users = await db.query('SELECT * FROM users WHERE active = $1', [true]);
// Transaction
const result = await db.transaction(async (client) => {
await client.query(
'INSERT INTO orders (user_id, total) VALUES ($1, $2)',
[userId, total]
);
await client.query(
'UPDATE users SET orders_count = orders_count + 1 WHERE id = $1',
[userId]
);
return result;
});
// Pool status
const status = await db.getPoolStatus();
console.log(status);
Connection Pool Proxies
For applications with many instances or when you need centralized connection management, use a connection pool proxy like PgBouncer or ProxySQL.
PgBouncer
PgBouncer is a lightweight connection pooler for PostgreSQL.
# Installation
sudo apt-get install pgbouncer
# Configuration /etc/pgbouncer/pgbouncer.ini
[databases]
mydb = host=127.0.0.1 port=5432 dbname=mydb
[pgbouncer]
listen_addr = 0.0.0.0
listen_port = 6432
auth_type = md5
auth_file = /etc/pgbouncer/userlist.txt
pool_mode = transaction
max_client_conn = 1000
default_pool_size = 25
min_pool_size = 10
reserve_pool_size = 5
reserve_pool_timeout = 5
max_db_connections = 100
max_user_connections = 100
log_connections = 1
log_disconnections = 1
log_pooler_errors = 1
[pgbouncer]
admin_users = admin
stats_users = stats_reader
# Transaction pooling mode
# Connection is returned to pool after transaction completes
pool_mode = transaction
# Session pooling mode
# Connection returns to pool after client disconnects
# pool_mode = session
# Statement pooling mode
# For autocommit mode, not recommended for most apps
# pool_mode = statement
# User list /etc/pgbouncer/userlist.txt
"user" "md5a1b2c3d4e5f6g7h8i9j0"
"admin" "md5abcdef123456"
# Start PgBouncer
sudo systemctl start pgbouncer
sudo systemctl enable pgbouncer
# Connect through PgBouncer
psql -h localhost -p 6432 -U user mydb
Connection String with PgBouncer
# Direct PostgreSQL connection
DATABASE_URL = "postgresql://user:pass@localhost:5432/mydb"
# Through PgBouncer (transaction pooling)
DATABASE_URL = "postgresql://user:pass@localhost:6432/mydb?sslmode=require"
ProxySQL for MySQL
# Installation
sudo apt-get install proxysql
# Configuration /etc/proxysql.cnf
databases:
- hostname: 127.0.0.1
port: 3306
username: root
password: password
dbname: mydb
mysql_variables:
monitor_username: monitor
monitor_password: monitor_password
monitor_connect_interval: 60000
monitor_ping_interval: 10000
monitor_ping_timeout: 1000
max_connections: 10000
default_query_timeout: 3600000
interfaces: "0.0.0.0:6033"
mysql_users:
- username: appuser
password: apppassword
default_hostgroup: 0
mysql_query_rules:
- rule_id: 1
active: 1
match_pattern: "^SELECT.*FOR UPDATE$"
destination_hostgroup: 0
apply: 1
- rule_id: 2
active: 1
match_pattern: "^SELECT.*"
destination_hostgroup: 1
apply: 1
hostgroups:
- name: write
hg: 0
endpoints: "127.0.0.1:3306"
- name: read
hg: 1
endpoints: "127.0.0.1:3307"
Connection Pool Configuration
Determining Optimal Pool Size
Formula: Pool Size = ((C * P) / W) + R
Where:
- C = Average query time (ms)
- P = Queries per second
- W = Wait time target (ms)
- R = Redundancy factor (1-5)
# Example calculation
avg_query_time_ms = 50 # 50ms average query
queries_per_second = 1000 # 1000 QPS
wait_time_target_ms = 100 # Want < 100ms wait
pool_size = ((50 * 1000) / 100) + 3
pool_size = 500 + 3 = 503
# Start with this, then tune based on metrics
Configuration Best Practices
// Go: Balanced configuration
poolConfig := &pgxpool.Config{
MinConns: 10, // Pre-warm connection pool
MaxConns: 100, // Don't exceed DB limits
MaxConnLifetime: time.Hour, // Rotate connections
MaxConnIdleTime: 30 * time.Minute, // Clean up idle
HealthCheckPeriod: 5 * time.Minute, // Detect broken
}
# Python: Production settings
pool = await asyncpg.create_pool(
host='localhost',
min_size=20,
max_size=100,
command_timeout=60,
pool_timeout=30,
max_queries=50000, # Recycle after 50k queries
max_inactive_connection_lifetime=300, # 5 minutes
)
Monitoring Metrics
import time
from dataclasses import dataclass
@dataclass
class PoolMetrics:
active_connections: int
idle_connections: int
waiting_requests: int
total_connections: int
connection_acquire_time_ms: float
query_execution_time_ms: float
connection_errors: int
timeout_errors: int
async def collect_pool_metrics(pool: asyncpg.Pool) -> PoolMetrics:
return PoolMetrics(
active_connections=pool.get_size() - pool.get_idle_size(),
idle_connections=pool.get_idle_size(),
waiting_requests=0, # Not directly exposed in asyncpg
total_connections=pool.get_size(),
connection_acquire_time_ms=await measure_acquire_time(pool),
query_execution_time_ms=await measure_query_time(pool),
connection_errors=0,
timeout_errors=0,
)
async def measure_acquire_time(pool: asyncpg.Pool) -> float:
times = []
for _ in range(100):
start = time.perf_counter()
async with pool.acquire() as conn:
await conn.fetchval('SELECT 1')
elapsed = (time.perf_counter() - start) * 1000
times.append(elapsed)
return sum(times) / len(times)
Connection Pool Patterns
Circuit Breaker Pattern
import asyncio
from typing import Callable, TypeVar, Awaitable
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
T = TypeVar('T')
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
expected_exception: type = Exception,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = "closed" # closed, open, half_open"
async def call(self, func: Callable[..., Awaitable[T]], *args, **kwargs) -> T:
if self.state == "open":
if self._should_attempt_reset():
self.state = "half_open"
else:
raise Exception("Circuit breaker is open")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise e
def _should_attempt_reset(self) -> bool:
if self.last_failure_time is None:
return False
return (datetime.now() - self.last_failure_time).total_seconds() >= self.recovery_timeout
def _on_success(self):
self.failure_count = 0
self.state = "closed"
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = "open"
logger.warning(f"Circuit breaker opened after {self.failure_count} failures")
class DatabaseWithCircuitBreaker:
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
self.circuit_breaker = CircuitBreaker(failure_threshold=5)
async def query(self, query: str, *args):
return await self.circuit_breaker.call(self.pool.query, query, *args)
async def fetch(self, query: str, *args):
return await self.circuit_breaker.call(self.pool.fetch, query, *args)
Retry Pattern with Backoff
import asyncio
import random
async def execute_with_retry(
func: Callable,
*args,
max_retries: int = 3,
base_delay: float = 0.1,
max_delay: float = 10.0,
exponential_base: float = 2.0,
**kwargs
):
last_exception = None
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except (asyncpg.PostgresConnectionError, asyncpg.PostgresTimeoutError) as e:
last_exception = e
delay = min(base_delay * (exponential_base ** attempt), max_delay)
# Add jitter
delay = delay * (0.5 + random.random())
if attempt < max_retries - 1:
logger.warning(
f"Attempt {attempt + 1} failed, retrying in {delay:.2f}s: {e}"
)
await asyncio.sleep(delay)
else:
logger.error(f"All {max_retries} attempts failed")
raise last_exception
Read/Write Splitting
class ReadWriteSplitter:
def __init__(self, write_pool: asyncpg.Pool, read_pools: list[asyncpg.Pool]):
self.write_pool = write_pool
self.read_pools = read_pools
self.read_index = 0
async def execute_write(self, query: str, *args):
return await self.write_pool.fetch(query, *args)
async def execute_read(self, query: str, *args):
# Round-robin across read pools
pool = self.read_pools[self.read_index]
self.read_index = (self.read_index + 1) % len(self.read_pools)
return await pool.fetch(query, *args)
async def execute(self, query: str, *args):
query_upper = query.strip().upper()
if query_upper.startswith('SELECT') and 'FOR UPDATE' not in query_upper:
return await self.execute_read(query, *args)
else:
return await self.execute_write(query, *args)
Troubleshooting Connection Pool Issues
Common Problems
| Problem | Symptom | Solution |
|---|---|---|
| Connection Exhaustion | “too many connections” | Increase pool size or add PgBouncer |
| Long Waits | High latency | Increase pool size, optimize queries |
| Connection Leaks | Connections not returned | Use context managers, add timeouts |
| Stale Connections | Timeouts, errors | Configure health checks, reduce lifetime |
| Thundering Herd | All connections in use | Implement request queuing, circuit breaker |
Debugging Techniques
# Log connection pool events
import logging
import asyncpg
logging.basicConfig(level=logging.DEBUG)
# Or create custom logging
class PoolLogger:
@staticmethod
def log_pool_acquire(con):
print(f"Acquired connection: {con}")
@staticmethod
def log_pool_release(con):
print(f"Released connection: {con}")
# PostgreSQL: View active connections
# SELECT * FROM pg_stat_activity WHERE datname = 'mydb';
# PostgreSQL: Connection statistics
# SELECT * FROM pg_stat_database WHERE datname = 'mydb';
Health Checks
// Go: Implement health checks
func (s *Server) HealthCheck(ctx context.Context) (*healthpb.HealthCheckResponse, error) {
if err := s.db.Ping(ctx); err != nil {
return &healthpb.HealthCheckResponse{
Status: healthpb.HealthCheckResponse_NOT_SERVING,
}, nil
}
return &healthpb.HealthCheckResponse{
Status: healthpb.HealthCheckResponse_SERVING,
}, nil
}
Conclusion
Database connection pooling is essential for building scalable, high-performance applications. By maintaining a pool of reusable connections, you eliminate the overhead of connection establishment and enable your application to handle thousands of concurrent requests efficiently.
Key takeaways:
- Use built-in connection pools (pgxpool, asyncpg, pg Pool)
- Configure appropriate min/max connections based on your workload
- Implement health checks and connection lifetime limits
- Use PgBouncer or ProxySQL for centralized pooling
- Monitor pool metrics and adjust configuration
- Implement circuit breakers and retry patterns for resilience
Resources
- PgBouncer Documentation
- ProxySQL Documentation
- PostgreSQL Connection Pooling
- Go pgxpool Library
- asyncpg Python Library
Comments