Celery¶
Task queue configuration, tenant-aware tasks, Canvas workflows, and the beat schedule.
Configuration¶
File: backend/config/celery.py
CELERY_BROKER_URL = "redis://localhost:6379/1" # Redis DB 1
CELERY_RESULT_BACKEND = "redis://localhost:6379/2" # Redis DB 2
CELERY_TIMEZONE = "UTC"
CELERY_TASK_TRACK_STARTED = True
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
TenantTask Base Class¶
All tenant-scoped tasks inherit from TenantTask (common/tasks.py). The first argument to every such task must be schema_name:
from celery import shared_task, Task
from common.tasks import TenantTask
@shared_task(base=TenantTask, bind=True, max_retries=0)
def accrue_daily_interest(self: Task[..., Any], schema_name: str) -> None:
"""Accrue interest for all active loans in a tenant."""
# Schema is already activated by TenantTask.__call__
...
TenantTask.__call__ wraps the task body in Schema.create(schema_name=...), activating the correct PostgreSQL search path before any ORM queries execute.
async_task_delay¶
Dispatching tasks from async code requires async_task_delay() to avoid event loop conflicts:
from common.tasks import async_task_delay
# In an async service
await async_task_delay(accrue_daily_interest, schema_name)
In production, .delay() just serializes arguments (fast and safe). Under CELERY_TASK_ALWAYS_EAGER (test mode), .delay() executes synchronously in the calling thread, which can cause segfaults when an async event loop is running. async_task_delay() wraps the call in sync_to_async() to run it in a worker thread.
dispatch_to_all_tenants¶
The fan-out pattern dispatches a task to every active tenant using Celery groups:
@shared_task(bind=True, max_retries=0)
def dispatch_to_all_tenants(self, task_name: str, *args, **kwargs) -> None:
tenants = Tenant.objects.filter(status="active").values_list("schema_name", flat=True)
task = current_app.tasks[task_name]
group(task.s(schema_name, *args, **kwargs) for schema_name in tenants).apply_async()
All beat schedule tasks use this pattern to fan out to tenants.
Canvas Workflows¶
chain — Sequential Pipeline¶
Each task's result feeds the next. Use .si() (immutable signature) when tasks don't need the previous result:
# End-of-day processing: sequential steps
chain(
update_period_statuses_task.si(schema_name),
check_promo_expirations.si(schema_name),
accrue_daily_interest.si(schema_name),
run_delinquency_engine.si(schema_name),
assess_late_fees.si(schema_name),
execute_dunning_schedules.si(schema_name),
).apply_async()
group — Parallel Fan-Out¶
All tasks run concurrently:
# Snapshot all portfolios in parallel
portfolio_ids = Portfolio.objects.filter(status="open").values_list("id", flat=True)
group(
take_portfolio_snapshot.s(schema_name, str(pid), snapshot_date)
for pid in portfolio_ids
).apply_async()
chord — Parallel + Callback¶
Run tasks in parallel, then call a callback with the results:
chord(
[run_compliance_check.s(loan_id, rule_id) for rule_id in rule_ids],
summarize_compliance.s(loan_id),
).apply_async()
Warning
Chord requires a result backend (Redis). Never use with ignore_result=True.
Beat Schedule¶
26 scheduled tasks running daily, monthly, or hourly. All times UTC. Tasks fan out to all active tenants via dispatch_to_all_tenants.
Daily Tasks (01:00–07:00 UTC)¶
| Time | Task | Purpose |
|---|---|---|
| 01:00 | daily-compliance-monitor-poll |
Poll compliance monitoring engines |
| 01:30 | daily-pre-delinquency |
Pre-delinquency case detection |
| 01:45 | daily-promo-expirations |
Check loan promo expirations |
| 02:00 | daily-interest-accrual |
Daily interest accrual on active loans |
| 02:15 | daily-case-sla-breach |
Detect case SLA breaches |
| 02:30 | daily-delinquency-engine |
Delinquency bucket computation |
| 02:45 | daily-default-detection |
Default/charge-off detection |
| 03:00 | daily-late-fee-assessment |
Assess late fees |
| 03:00 | daily-queue-assignment |
Auto-assign delinquent loans to queues |
| 03:30 | daily-broken-promise-check |
Detect broken promises |
| 03:45 | daily-broken-payment-plan-check |
Payment plan expiry checks |
| 04:00 | daily-dunning-execution |
Execute dunning schedules |
| 04:30 | daily-scheduled-fee-assessment |
Assess scheduled fees |
| 05:00 | daily-variable-rate-adjustment |
Adjust variable-rate loans |
| 05:00 | daily-portfolio-auto-assignment |
Auto-assign loans to portfolios |
| 05:30 | daily-portfolio-snapshots |
Create portfolio snapshots |
| 06:00 | daily-recurring-payments |
Process recurring/scheduled payments |
| 06:00 | daily-lien-expiration-check |
Check lien expirations |
| 06:15 | daily-autopay-execution |
Execute autopay |
| 06:30 | daily-forbearance-expiry-check |
Check forbearance expiry |
| 07:00 | daily-gl-reconciliation |
Reconcile general ledger |
Periodic Tasks¶
| Schedule | Task | Purpose |
|---|---|---|
| Every hour | hourly-case-unsnooze |
Unsnooze snoozed cases |
| 1st of month, 08:00 | monthly-statement-generation |
Generate monthly statements |
| 1st & 15th, 08:00 | biweekly-statement-generation |
Generate biweekly statements |
| 1st Jan/Apr/Jul/Oct, 08:00 | quarterly-statement-generation |
Generate quarterly statements |
| 1st of month, 09:00 | monthly-metro2-generation |
Generate Metro 2 credit bureau files |
Execution Order¶
01:00-01:45 Compliance + promo checks
02:00-03:45 Interest → delinquency → fees → dunning
04:00-04:30 Dunning execution + scheduled fees
05:00-06:30 Portfolios → payments → autopay
07:00 GL reconciliation
08:00-09:00 Statements + credit reporting (monthly)
Running Celery¶
# Worker
cd backend && uv run celery -A config worker -l info
# Beat scheduler
cd backend && uv run celery -A config beat -l info
# Both in development
cd backend && uv run celery -A config worker -l info -B # Worker + beat combined
See Also¶
- Celery Beat Schedule --- Detailed schedule reference
- Async Patterns --- Async architecture
- Cross-Module Communication --- Task patterns