Task Scheduling with APScheduler and Celery: Background Jobs and Automation
Background task scheduling is essential for production applications. Python offers two main approaches: APScheduler for simple scheduling and Celery for distributed task queues.
APScheduler: Simple Scheduling
Installation and Setup
pip install apscheduler
Basic Scheduling
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import time
# Create scheduler
scheduler = BackgroundScheduler()
# Define job
def my_job():
print(f"Job executed at {datetime.now()}")
# Add jobs with different triggers
scheduler.add_job(my_job, 'interval', seconds=10, id='job1')
scheduler.add_job(my_job, 'cron', hour=2, minute=0, id='job2') # Daily at 2 AM
scheduler.add_job(my_job, 'date', run_date='2025-12-25 12:00:00', id='job3')
# Start scheduler
scheduler.start()
# Keep running
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
scheduler.shutdown()
Trigger Types
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta
scheduler = BackgroundScheduler()
def job():
print(f"Job at {datetime.now()}")
# Interval trigger: Run every N seconds/minutes/hours/days
scheduler.add_job(job, 'interval', seconds=30)
scheduler.add_job(job, 'interval', minutes=5)
scheduler.add_job(job, 'interval', hours=1)
scheduler.add_job(job, 'interval', days=1)
# Cron trigger: Unix cron-like scheduling
scheduler.add_job(job, 'cron', hour=0, minute=0) # Daily at midnight
scheduler.add_job(job, 'cron', day_of_week='mon-fri', hour=9) # Weekdays at 9 AM
scheduler.add_job(job, 'cron', month='1-6', day='1') # First day of Jan-Jun
# Date trigger: Run once at specific time
scheduler.add_job(job, 'date', run_date=datetime(2025, 12, 25, 12, 0, 0))
scheduler.add_job(job, 'date', run_date=datetime.now() + timedelta(hours=1))
scheduler.start()
Job Management
from apscheduler.schedulers.background import BackgroundScheduler
import time
scheduler = BackgroundScheduler()
def job(name):
print(f"Job {name} executed")
# Add jobs
scheduler.add_job(job, 'interval', seconds=10, args=['job1'], id='job1')
scheduler.add_job(job, 'interval', seconds=20, args=['job2'], id='job2')
scheduler.start()
# Manage jobs
time.sleep(5)
# Get job
job_obj = scheduler.get_job('job1')
print(f"Job: {job_obj}")
# List all jobs
for job in scheduler.get_jobs():
print(f"Job ID: {job.id}, Next run: {job.next_run_time}")
# Pause job
scheduler.pause_job('job1')
# Resume job
scheduler.resume_job('job1')
# Remove job
scheduler.remove_job('job1')
# Reschedule job
scheduler.reschedule_job('job2', trigger='interval', seconds=30)
scheduler.shutdown()
Error Handling and Logging
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
scheduler = BackgroundScheduler()
def job_with_error():
"""Job that might fail"""
import random
if random.random() < 0.5:
raise Exception("Random error occurred")
print("Job succeeded")
# Event listeners
def job_executed(event):
logger.info(f"Job {event.job_id} executed successfully")
def job_error(event):
logger.error(f"Job {event.job_id} failed: {event.exception}")
scheduler.add_listener(job_executed, EVENT_JOB_EXECUTED)
scheduler.add_listener(job_error, EVENT_JOB_ERROR)
scheduler.add_job(job_with_error, 'interval', seconds=5)
scheduler.start()
# Keep running
try:
import time
while True:
time.sleep(1)
except KeyboardInterrupt:
scheduler.shutdown()
Persistent Scheduling
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from sqlalchemy import create_engine
# Create database engine
engine = create_engine('sqlite:///jobs.db')
# Configure scheduler with persistent job store
jobstores = {
'default': SQLAlchemyJobStore(engine=engine)
}
scheduler = BackgroundScheduler(jobstores=jobstores)
def my_job():
print("Job executed")
# Add job (persisted to database)
scheduler.add_job(my_job, 'interval', seconds=10, id='persistent_job')
scheduler.start()
# Jobs will be restored even after restart
Celery: Distributed Task Queue
Installation and Setup
pip install celery redis
# or
pip install celery[redis]
Basic Celery Application
from celery import Celery
import time
# Create Celery app
app = Celery('tasks', broker='redis://localhost:6379/0')
# Configure Celery
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
)
# Define tasks
@app.task
def add(x, y):
"""Simple addition task"""
return x + y
@app.task
def long_running_task(duration):
"""Task that takes time"""
time.sleep(duration)
return f"Completed after {duration} seconds"
@app.task
def task_with_retry(max_retries=3):
"""Task with retry logic"""
try:
# Perform operation
result = 1 / 0 # Intentional error
return result
except Exception as exc:
# Retry with exponential backoff
raise task_with_retry.retry(exc=exc, countdown=2 ** task_with_retry.request.retries)
# Usage
if __name__ == '__main__':
# Call task asynchronously
result = add.delay(4, 6)
print(f"Task ID: {result.id}")
# Get result (blocking)
print(f"Result: {result.get(timeout=10)}")
# Check task status
print(f"Status: {result.status}")
# Long running task
long_task = long_running_task.delay(5)
print(f"Long task status: {long_task.status}")
Task Scheduling with Celery Beat
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta
app = Celery('tasks', broker='redis://localhost:6379/0')
# Configure periodic tasks
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': timedelta(seconds=30),
'args': (16, 16)
},
'multiply-at-midnight': {
'task': 'tasks.multiply',
'schedule': crontab(hour=0, minute=0),
'args': (10, 20)
},
'cleanup-every-hour': {
'task': 'tasks.cleanup',
'schedule': crontab(minute=0),
},
}
@app.task
def add(x, y):
return x + y
@app.task
def multiply(x, y):
return x * y
@app.task
def cleanup():
print("Cleanup task executed")
return "Cleanup completed"
# Run Celery Beat scheduler
# celery -A tasks beat
Task Chains and Workflows
from celery import Celery, chain, group, chord
import time
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
time.sleep(1)
return x + y
@app.task
def multiply(x, y):
time.sleep(1)
return x * y
@app.task
def xsum(values):
return sum(values)
# Chain: Execute tasks sequentially
workflow = chain(add.s(2, 2), multiply.s(4), add.s(4))
result = workflow.apply_async()
print(f"Chain result: {result.get()}") # ((2+2)*4)+4 = 20
# Group: Execute tasks in parallel
job = group(add.s(2, 2), add.s(4, 4), add.s(8, 8))
result = job.apply_async()
print(f"Group results: {result.get()}") # [4, 8, 16]
# Chord: Parallel tasks with callback
callback = xsum.s()
header = group(add.s(2, 2), add.s(4, 4), add.s(8, 8))
workflow = chord(header)(callback)
result = workflow.apply_async()
print(f"Chord result: {result.get()}") # 4+8+16 = 28
Error Handling and Retries
from celery import Celery
from celery.exceptions import SoftTimeLimitExceeded
import random
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task(bind=True, max_retries=3)
def task_with_retry(self, x, y):
"""Task with automatic retry"""
try:
if random.random() < 0.5:
raise Exception("Random error")
return x + y
except Exception as exc:
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
@app.task(bind=True, time_limit=30)
def task_with_timeout(self):
"""Task with timeout"""
try:
import time
time.sleep(60) # Will timeout
except SoftTimeLimitExceeded:
print("Task exceeded time limit")
return "Timeout"
@app.task(bind=True)
def task_with_error_handling(self):
"""Task with error handling"""
try:
result = 1 / 0
except ZeroDivisionError as exc:
# Log error and return default value
print(f"Error: {exc}")
return None
Monitoring and Management
from celery import Celery
from celery.app.control import Inspect
app = Celery('tasks', broker='redis://localhost:6379/0')
# Inspect active tasks
inspect = Inspect(app=app)
# Get active tasks
active = inspect.active()
print(f"Active tasks: {active}")
# Get scheduled tasks
scheduled = inspect.scheduled()
print(f"Scheduled tasks: {scheduled}")
# Get registered tasks
registered = inspect.registered()
print(f"Registered tasks: {registered}")
# Get worker stats
stats = inspect.stats()
print(f"Worker stats: {stats}")
# Revoke task
app.control.revoke('task_id', terminate=True)
# Purge queue
app.control.purge()
Comparison: APScheduler vs Celery
| Feature | APScheduler | Celery |
|---|---|---|
| Complexity | Simple | Complex |
| Distributed | No | Yes |
| Persistence | Optional | Yes |
| Scalability | Single machine | Multiple machines |
| Learning Curve | Easy | Moderate |
| Use Case | Simple scheduling | Complex workflows |
Best Practices
- Use appropriate tool: APScheduler for simple tasks, Celery for complex workflows
- Error handling: Always implement retry logic and error handling
- Monitoring: Monitor task execution and failures
- Logging: Log all task executions for debugging
- Resource limits: Set timeouts and resource limits
- Testing: Test tasks independently and in workflows
Common Pitfalls
Bad Practice:
# Don't: No error handling
@app.task
def risky_task():
return 1 / 0 # Will fail silently
# Don't: No timeout
@app.task
def infinite_task():
while True:
pass
# Don't: Blocking operations
@app.task
def blocking_task():
import time
time.sleep(3600) # Blocks worker
Good Practice:
# Do: Error handling and retry
@app.task(bind=True, max_retries=3)
def safe_task(self):
try:
return 1 / 0
except Exception as exc:
raise self.retry(exc=exc)
# Do: Set timeout
@app.task(time_limit=300)
def timed_task():
pass
# Do: Use async operations
@app.task
def async_task():
import asyncio
asyncio.run(async_operation())
Conclusion
Task scheduling is crucial for production applications. APScheduler handles simple scheduling needs, while Celery provides distributed task processing for complex workflows. Choose based on your application’s requirements and scale. Always implement proper error handling, monitoring, and logging.
Comments