Skip to content

Cross-Module Communication

Modules communicate using three mechanisms, in order of preference:

  1. Direct service calls --- when the caller needs an immediate result
  2. Django signals --- decoupled side-effect notifications where no return value is needed
  3. Celery tasks --- eventual work that can be deferred

Direct Service Calls

The most common communication pattern. One app's service directly calls another app's service:

# In apps/payments/services.py
from apps.ledger.services import post_journal_entry

async def complete_payment(payment: Payment) -> None:
    # ... process payment ...

    # Direct call to ledger service for GL entries
    await post_journal_entry(
        source=payment,
        lines=[
            {"account": "cash", "debit": payment.amount},
            {"account": "loans_receivable", "credit": payment.principal_applied},
            {"account": "interest_receivable", "credit": payment.interest_applied},
        ],
    )

Dependency Rules

  • Cross-module service calls are allowed, but dependencies must be explicit.
  • No circular dependencies: if app A calls app B's service, B must not call A's service.
  • If you need bidirectional communication, use Django signals or Celery tasks to break the cycle.

Django Signals

Use signals for decoupled notifications where the sender doesn't need a return value:

# Loan status change triggers webhook dispatch
# The loans app doesn't import or depend on the webhooks app
from django.dispatch import Signal

loan_status_changed = Signal()  # Defined in apps/loans/signals.py

Receivers in other apps connect to these signals to perform side effects (webhook dispatch, notification sending, etc.).

Celery Tasks

For work that can be deferred or needs to run asynchronously:

  • Report generation
  • Batch notifications
  • Portfolio snapshots
  • Daily batch processing (interest accrual, delinquency updates)

Tenant-Aware Tasks

All Celery tasks that operate on tenant data use the TenantTask base class, which activates the correct PostgreSQL schema before task execution:

from celery import shared_task
from config.celery import TenantTask

@shared_task(bind=True, base=TenantTask)
def generate_report(self, report_id: str) -> None:
    # Schema is already activated by TenantTask.__call__
    report = Report.objects.get(id=report_id)
    # ... generate report ...

Celery Canvas Workflows

Multi-step async workflows use Celery Canvas primitives instead of manual callback chains:

Primitive Use Case
chain Sequential pipeline --- each task's result feeds the next
group Parallel fan-out --- all tasks run concurrently
chord Group + callback --- run tasks in parallel, then aggregate
chunks Split large iterables into batches for parallel processing

Sequential Pipeline (chain)

from celery import chain

# Daily end-of-day workflow for a loan
chain(
    accrue_daily_interest.s(loan_id),
    recalculate_dpd.s(),
    assess_late_fees_if_due.s(),
    dispatch_dunning.s(),
).apply_async()

Parallel Fan-Out (group)

from celery import group

# Snapshot all portfolios concurrently
group(
    take_snapshot.s(portfolio_id) for portfolio_id in portfolio_ids
).apply_async()

Fan-Out Then Aggregate (chord)

from celery import chord

# Run compliance checks in parallel, then summarize
chord(
    [run_compliance_check.s(loan_id, rule_id) for rule_id in rule_ids],
    summarize_compliance.s(loan_id),
).apply_async()

Canvas conventions

  • Use immutable signatures (.si()) when the next task should not receive the previous result
  • Prefer chain over nested apply_async callbacks
  • Canvas workflows live in tasks.py alongside regular Celery tasks
  • Chord requires a result backend (Redis) --- never use with ignore_result=True

Scheduled Tasks (Celery Beat)

The system runs daily Celery Beat tasks for batch processing:

Task Schedule Purpose
Interest accrual Daily Accrue daily interest on all active loans
Delinquency engine Daily 2:30 AM Recalculate DPD, update buckets, fire events
Default detection Daily Auto-transition loans exceeding DPD threshold
Late fee assessment Daily Assess late fees per fee schedule rules
Broken promise detection Daily Check for unfulfilled promise-to-pay commitments
Dunning execution Daily 4:00 AM Send dunning communications based on DPD triggers
Scheduled fees Daily Assess recurring servicing fees
Variable rate adjustment Daily Update variable-rate loan rates from index
Portfolio snapshots Daily Capture aggregate portfolio metrics
Lien expiration check Daily Alert on liens expiring within 180 days
Forbearance expiry Daily Auto-complete expired forbearance agreements

Beat Dispatch Pattern

Beat dispatch tasks run in the public schema and fan out to individual tenants:

@shared_task
def dispatch_daily_interest_accrual() -> None:
    """Fan out interest accrual to all active tenants."""
    for tenant in Tenant.objects.filter(status="active"):
        for loan_id in get_active_loan_ids(tenant):
            accrue_daily_interest.delay(str(loan_id), schema_name=f"tenant_{tenant.slug}")

Webhook Event Emission

Services emit webhook events to notify external subscribers of business events:

# In a service function, after a business action completes
await emit_webhook_event(
    event_type="payment.received",
    payload={"payment_id": str(payment.id), "loan_id": str(loan.id), "amount": str(payment.amount)},
)

Events are delivered asynchronously via Celery tasks with retry logic.

See Also