Skip to content

Celery

Task queue configuration, tenant-aware tasks, Canvas workflows, and the beat schedule.

Configuration

File: backend/config/celery.py

CELERY_BROKER_URL = "redis://localhost:6379/1"       # Redis DB 1
CELERY_RESULT_BACKEND = "redis://localhost:6379/2"    # Redis DB 2
CELERY_TIMEZONE = "UTC"
CELERY_TASK_TRACK_STARTED = True
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"

TenantTask Base Class

All tenant-scoped tasks inherit from TenantTask (common/tasks.py). The first argument to every such task must be schema_name:

from celery import shared_task, Task
from common.tasks import TenantTask

@shared_task(base=TenantTask, bind=True, max_retries=0)
def accrue_daily_interest(self: Task[..., Any], schema_name: str) -> None:
    """Accrue interest for all active loans in a tenant."""
    # Schema is already activated by TenantTask.__call__
    ...

TenantTask.__call__ wraps the task body in Schema.create(schema_name=...), activating the correct PostgreSQL search path before any ORM queries execute.

async_task_delay

Dispatching tasks from async code requires async_task_delay() to avoid event loop conflicts:

from common.tasks import async_task_delay

# In an async service
await async_task_delay(accrue_daily_interest, schema_name)

In production, .delay() just serializes arguments (fast and safe). Under CELERY_TASK_ALWAYS_EAGER (test mode), .delay() executes synchronously in the calling thread, which can cause segfaults when an async event loop is running. async_task_delay() wraps the call in sync_to_async() to run it in a worker thread.

dispatch_to_all_tenants

The fan-out pattern dispatches a task to every active tenant using Celery groups:

@shared_task(bind=True, max_retries=0)
def dispatch_to_all_tenants(self, task_name: str, *args, **kwargs) -> None:
    tenants = Tenant.objects.filter(status="active").values_list("schema_name", flat=True)
    task = current_app.tasks[task_name]
    group(task.s(schema_name, *args, **kwargs) for schema_name in tenants).apply_async()

All beat schedule tasks use this pattern to fan out to tenants.

Canvas Workflows

chain — Sequential Pipeline

Each task's result feeds the next. Use .si() (immutable signature) when tasks don't need the previous result:

# End-of-day processing: sequential steps
chain(
    update_period_statuses_task.si(schema_name),
    check_promo_expirations.si(schema_name),
    accrue_daily_interest.si(schema_name),
    run_delinquency_engine.si(schema_name),
    assess_late_fees.si(schema_name),
    execute_dunning_schedules.si(schema_name),
).apply_async()

group — Parallel Fan-Out

All tasks run concurrently:

# Snapshot all portfolios in parallel
portfolio_ids = Portfolio.objects.filter(status="open").values_list("id", flat=True)
group(
    take_portfolio_snapshot.s(schema_name, str(pid), snapshot_date)
    for pid in portfolio_ids
).apply_async()

chord — Parallel + Callback

Run tasks in parallel, then call a callback with the results:

chord(
    [run_compliance_check.s(loan_id, rule_id) for rule_id in rule_ids],
    summarize_compliance.s(loan_id),
).apply_async()

Warning

Chord requires a result backend (Redis). Never use with ignore_result=True.

Beat Schedule

26 scheduled tasks running daily, monthly, or hourly. All times UTC. Tasks fan out to all active tenants via dispatch_to_all_tenants.

Daily Tasks (01:00–07:00 UTC)

Time Task Purpose
01:00 daily-compliance-monitor-poll Poll compliance monitoring engines
01:30 daily-pre-delinquency Pre-delinquency case detection
01:45 daily-promo-expirations Check loan promo expirations
02:00 daily-interest-accrual Daily interest accrual on active loans
02:15 daily-case-sla-breach Detect case SLA breaches
02:30 daily-delinquency-engine Delinquency bucket computation
02:45 daily-default-detection Default/charge-off detection
03:00 daily-late-fee-assessment Assess late fees
03:00 daily-queue-assignment Auto-assign delinquent loans to queues
03:30 daily-broken-promise-check Detect broken promises
03:45 daily-broken-payment-plan-check Payment plan expiry checks
04:00 daily-dunning-execution Execute dunning schedules
04:30 daily-scheduled-fee-assessment Assess scheduled fees
05:00 daily-variable-rate-adjustment Adjust variable-rate loans
05:00 daily-portfolio-auto-assignment Auto-assign loans to portfolios
05:30 daily-portfolio-snapshots Create portfolio snapshots
06:00 daily-recurring-payments Process recurring/scheduled payments
06:00 daily-lien-expiration-check Check lien expirations
06:15 daily-autopay-execution Execute autopay
06:30 daily-forbearance-expiry-check Check forbearance expiry
07:00 daily-gl-reconciliation Reconcile general ledger

Periodic Tasks

Schedule Task Purpose
Every hour hourly-case-unsnooze Unsnooze snoozed cases
1st of month, 08:00 monthly-statement-generation Generate monthly statements
1st & 15th, 08:00 biweekly-statement-generation Generate biweekly statements
1st Jan/Apr/Jul/Oct, 08:00 quarterly-statement-generation Generate quarterly statements
1st of month, 09:00 monthly-metro2-generation Generate Metro 2 credit bureau files

Execution Order

01:00-01:45  Compliance + promo checks
02:00-03:45  Interest → delinquency → fees → dunning
04:00-04:30  Dunning execution + scheduled fees
05:00-06:30  Portfolios → payments → autopay
07:00        GL reconciliation
08:00-09:00  Statements + credit reporting (monthly)

Running Celery

# Worker
cd backend && uv run celery -A config worker -l info

# Beat scheduler
cd backend && uv run celery -A config beat -l info

# Both in development
cd backend && uv run celery -A config worker -l info -B  # Worker + beat combined

See Also