Skip to main content
โšก Calmops

Task Scheduling with APScheduler and Celery: Background Jobs and Automation

Task Scheduling with APScheduler and Celery: Background Jobs and Automation

Background task scheduling is essential for production applications. Python offers two main approaches: APScheduler for simple scheduling and Celery for distributed task queues.

APScheduler: Simple Scheduling

Installation and Setup

pip install apscheduler

Basic Scheduling

from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import time

# Create scheduler
scheduler = BackgroundScheduler()

# Define job
def my_job():
    print(f"Job executed at {datetime.now()}")

# Add jobs with different triggers
scheduler.add_job(my_job, 'interval', seconds=10, id='job1')
scheduler.add_job(my_job, 'cron', hour=2, minute=0, id='job2')  # Daily at 2 AM
scheduler.add_job(my_job, 'date', run_date='2025-12-25 12:00:00', id='job3')

# Start scheduler
scheduler.start()

# Keep running
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    scheduler.shutdown()

Trigger Types

from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta

scheduler = BackgroundScheduler()

def job():
    print(f"Job at {datetime.now()}")

# Interval trigger: Run every N seconds/minutes/hours/days
scheduler.add_job(job, 'interval', seconds=30)
scheduler.add_job(job, 'interval', minutes=5)
scheduler.add_job(job, 'interval', hours=1)
scheduler.add_job(job, 'interval', days=1)

# Cron trigger: Unix cron-like scheduling
scheduler.add_job(job, 'cron', hour=0, minute=0)  # Daily at midnight
scheduler.add_job(job, 'cron', day_of_week='mon-fri', hour=9)  # Weekdays at 9 AM
scheduler.add_job(job, 'cron', month='1-6', day='1')  # First day of Jan-Jun

# Date trigger: Run once at specific time
scheduler.add_job(job, 'date', run_date=datetime(2025, 12, 25, 12, 0, 0))
scheduler.add_job(job, 'date', run_date=datetime.now() + timedelta(hours=1))

scheduler.start()

Job Management

from apscheduler.schedulers.background import BackgroundScheduler
import time

scheduler = BackgroundScheduler()

def job(name):
    print(f"Job {name} executed")

# Add jobs
scheduler.add_job(job, 'interval', seconds=10, args=['job1'], id='job1')
scheduler.add_job(job, 'interval', seconds=20, args=['job2'], id='job2')

scheduler.start()

# Manage jobs
time.sleep(5)

# Get job
job_obj = scheduler.get_job('job1')
print(f"Job: {job_obj}")

# List all jobs
for job in scheduler.get_jobs():
    print(f"Job ID: {job.id}, Next run: {job.next_run_time}")

# Pause job
scheduler.pause_job('job1')

# Resume job
scheduler.resume_job('job1')

# Remove job
scheduler.remove_job('job1')

# Reschedule job
scheduler.reschedule_job('job2', trigger='interval', seconds=30)

scheduler.shutdown()

Error Handling and Logging

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

scheduler = BackgroundScheduler()

def job_with_error():
    """Job that might fail"""
    import random
    if random.random() < 0.5:
        raise Exception("Random error occurred")
    print("Job succeeded")

# Event listeners
def job_executed(event):
    logger.info(f"Job {event.job_id} executed successfully")

def job_error(event):
    logger.error(f"Job {event.job_id} failed: {event.exception}")

scheduler.add_listener(job_executed, EVENT_JOB_EXECUTED)
scheduler.add_listener(job_error, EVENT_JOB_ERROR)

scheduler.add_job(job_with_error, 'interval', seconds=5)
scheduler.start()

# Keep running
try:
    import time
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    scheduler.shutdown()

Persistent Scheduling

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from sqlalchemy import create_engine

# Create database engine
engine = create_engine('sqlite:///jobs.db')

# Configure scheduler with persistent job store
jobstores = {
    'default': SQLAlchemyJobStore(engine=engine)
}

scheduler = BackgroundScheduler(jobstores=jobstores)

def my_job():
    print("Job executed")

# Add job (persisted to database)
scheduler.add_job(my_job, 'interval', seconds=10, id='persistent_job')

scheduler.start()

# Jobs will be restored even after restart

Celery: Distributed Task Queue

Installation and Setup

pip install celery redis
# or
pip install celery[redis]

Basic Celery Application

from celery import Celery
import time

# Create Celery app
app = Celery('tasks', broker='redis://localhost:6379/0')

# Configure Celery
app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
)

# Define tasks
@app.task
def add(x, y):
    """Simple addition task"""
    return x + y

@app.task
def long_running_task(duration):
    """Task that takes time"""
    time.sleep(duration)
    return f"Completed after {duration} seconds"

@app.task
def task_with_retry(max_retries=3):
    """Task with retry logic"""
    try:
        # Perform operation
        result = 1 / 0  # Intentional error
        return result
    except Exception as exc:
        # Retry with exponential backoff
        raise task_with_retry.retry(exc=exc, countdown=2 ** task_with_retry.request.retries)

# Usage
if __name__ == '__main__':
    # Call task asynchronously
    result = add.delay(4, 6)
    print(f"Task ID: {result.id}")
    
    # Get result (blocking)
    print(f"Result: {result.get(timeout=10)}")
    
    # Check task status
    print(f"Status: {result.status}")
    
    # Long running task
    long_task = long_running_task.delay(5)
    print(f"Long task status: {long_task.status}")

Task Scheduling with Celery Beat

from celery import Celery
from celery.schedules import crontab
from datetime import timedelta

app = Celery('tasks', broker='redis://localhost:6379/0')

# Configure periodic tasks
app.conf.beat_schedule = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': timedelta(seconds=30),
        'args': (16, 16)
    },
    'multiply-at-midnight': {
        'task': 'tasks.multiply',
        'schedule': crontab(hour=0, minute=0),
        'args': (10, 20)
    },
    'cleanup-every-hour': {
        'task': 'tasks.cleanup',
        'schedule': crontab(minute=0),
    },
}

@app.task
def add(x, y):
    return x + y

@app.task
def multiply(x, y):
    return x * y

@app.task
def cleanup():
    print("Cleanup task executed")
    return "Cleanup completed"

# Run Celery Beat scheduler
# celery -A tasks beat

Task Chains and Workflows

from celery import Celery, chain, group, chord
import time

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    time.sleep(1)
    return x + y

@app.task
def multiply(x, y):
    time.sleep(1)
    return x * y

@app.task
def xsum(values):
    return sum(values)

# Chain: Execute tasks sequentially
workflow = chain(add.s(2, 2), multiply.s(4), add.s(4))
result = workflow.apply_async()
print(f"Chain result: {result.get()}")  # ((2+2)*4)+4 = 20

# Group: Execute tasks in parallel
job = group(add.s(2, 2), add.s(4, 4), add.s(8, 8))
result = job.apply_async()
print(f"Group results: {result.get()}")  # [4, 8, 16]

# Chord: Parallel tasks with callback
callback = xsum.s()
header = group(add.s(2, 2), add.s(4, 4), add.s(8, 8))
workflow = chord(header)(callback)
result = workflow.apply_async()
print(f"Chord result: {result.get()}")  # 4+8+16 = 28

Error Handling and Retries

from celery import Celery
from celery.exceptions import SoftTimeLimitExceeded
import random

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task(bind=True, max_retries=3)
def task_with_retry(self, x, y):
    """Task with automatic retry"""
    try:
        if random.random() < 0.5:
            raise Exception("Random error")
        return x + y
    except Exception as exc:
        # Retry with exponential backoff
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)

@app.task(bind=True, time_limit=30)
def task_with_timeout(self):
    """Task with timeout"""
    try:
        import time
        time.sleep(60)  # Will timeout
    except SoftTimeLimitExceeded:
        print("Task exceeded time limit")
        return "Timeout"

@app.task(bind=True)
def task_with_error_handling(self):
    """Task with error handling"""
    try:
        result = 1 / 0
    except ZeroDivisionError as exc:
        # Log error and return default value
        print(f"Error: {exc}")
        return None

Monitoring and Management

from celery import Celery
from celery.app.control import Inspect

app = Celery('tasks', broker='redis://localhost:6379/0')

# Inspect active tasks
inspect = Inspect(app=app)

# Get active tasks
active = inspect.active()
print(f"Active tasks: {active}")

# Get scheduled tasks
scheduled = inspect.scheduled()
print(f"Scheduled tasks: {scheduled}")

# Get registered tasks
registered = inspect.registered()
print(f"Registered tasks: {registered}")

# Get worker stats
stats = inspect.stats()
print(f"Worker stats: {stats}")

# Revoke task
app.control.revoke('task_id', terminate=True)

# Purge queue
app.control.purge()

Comparison: APScheduler vs Celery

Feature APScheduler Celery
Complexity Simple Complex
Distributed No Yes
Persistence Optional Yes
Scalability Single machine Multiple machines
Learning Curve Easy Moderate
Use Case Simple scheduling Complex workflows

Best Practices

  1. Use appropriate tool: APScheduler for simple tasks, Celery for complex workflows
  2. Error handling: Always implement retry logic and error handling
  3. Monitoring: Monitor task execution and failures
  4. Logging: Log all task executions for debugging
  5. Resource limits: Set timeouts and resource limits
  6. Testing: Test tasks independently and in workflows

Common Pitfalls

Bad Practice:

# Don't: No error handling
@app.task
def risky_task():
    return 1 / 0  # Will fail silently

# Don't: No timeout
@app.task
def infinite_task():
    while True:
        pass

# Don't: Blocking operations
@app.task
def blocking_task():
    import time
    time.sleep(3600)  # Blocks worker

Good Practice:

# Do: Error handling and retry
@app.task(bind=True, max_retries=3)
def safe_task(self):
    try:
        return 1 / 0
    except Exception as exc:
        raise self.retry(exc=exc)

# Do: Set timeout
@app.task(time_limit=300)
def timed_task():
    pass

# Do: Use async operations
@app.task
def async_task():
    import asyncio
    asyncio.run(async_operation())

Conclusion

Task scheduling is crucial for production applications. APScheduler handles simple scheduling needs, while Celery provides distributed task processing for complex workflows. Choose based on your application’s requirements and scale. Always implement proper error handling, monitoring, and logging.

Comments