Cross-Module Communication¶
Modules communicate using three mechanisms, in order of preference:
- Direct service calls --- when the caller needs an immediate result
- Django signals --- decoupled side-effect notifications where no return value is needed
- Celery tasks --- eventual work that can be deferred
Direct Service Calls¶
The most common communication pattern. One app's service directly calls another app's service:
# In apps/payments/services.py
from apps.ledger.services import post_journal_entry
async def complete_payment(payment: Payment) -> None:
# ... process payment ...
# Direct call to ledger service for GL entries
await post_journal_entry(
source=payment,
lines=[
{"account": "cash", "debit": payment.amount},
{"account": "loans_receivable", "credit": payment.principal_applied},
{"account": "interest_receivable", "credit": payment.interest_applied},
],
)
Dependency Rules¶
- Cross-module service calls are allowed, but dependencies must be explicit.
- No circular dependencies: if app A calls app B's service, B must not call A's service.
- If you need bidirectional communication, use Django signals or Celery tasks to break the cycle.
Django Signals¶
Use signals for decoupled notifications where the sender doesn't need a return value:
# Loan status change triggers webhook dispatch
# The loans app doesn't import or depend on the webhooks app
from django.dispatch import Signal
loan_status_changed = Signal() # Defined in apps/loans/signals.py
Receivers in other apps connect to these signals to perform side effects (webhook dispatch, notification sending, etc.).
Celery Tasks¶
For work that can be deferred or needs to run asynchronously:
- Report generation
- Batch notifications
- Portfolio snapshots
- Daily batch processing (interest accrual, delinquency updates)
Tenant-Aware Tasks¶
All Celery tasks that operate on tenant data use the TenantTask base class, which activates the correct PostgreSQL schema before task execution:
from celery import shared_task
from config.celery import TenantTask
@shared_task(bind=True, base=TenantTask)
def generate_report(self, report_id: str) -> None:
# Schema is already activated by TenantTask.__call__
report = Report.objects.get(id=report_id)
# ... generate report ...
Celery Canvas Workflows¶
Multi-step async workflows use Celery Canvas primitives instead of manual callback chains:
| Primitive | Use Case |
|---|---|
chain |
Sequential pipeline --- each task's result feeds the next |
group |
Parallel fan-out --- all tasks run concurrently |
chord |
Group + callback --- run tasks in parallel, then aggregate |
chunks |
Split large iterables into batches for parallel processing |
Sequential Pipeline (chain)¶
from celery import chain
# Daily end-of-day workflow for a loan
chain(
accrue_daily_interest.s(loan_id),
recalculate_dpd.s(),
assess_late_fees_if_due.s(),
dispatch_dunning.s(),
).apply_async()
Parallel Fan-Out (group)¶
from celery import group
# Snapshot all portfolios concurrently
group(
take_snapshot.s(portfolio_id) for portfolio_id in portfolio_ids
).apply_async()
Fan-Out Then Aggregate (chord)¶
from celery import chord
# Run compliance checks in parallel, then summarize
chord(
[run_compliance_check.s(loan_id, rule_id) for rule_id in rule_ids],
summarize_compliance.s(loan_id),
).apply_async()
Canvas conventions
- Use immutable signatures (
.si()) when the next task should not receive the previous result - Prefer
chainover nestedapply_asynccallbacks - Canvas workflows live in
tasks.pyalongside regular Celery tasks - Chord requires a result backend (Redis) --- never use with
ignore_result=True
Scheduled Tasks (Celery Beat)¶
The system runs daily Celery Beat tasks for batch processing:
| Task | Schedule | Purpose |
|---|---|---|
| Interest accrual | Daily | Accrue daily interest on all active loans |
| Delinquency engine | Daily 2:30 AM | Recalculate DPD, update buckets, fire events |
| Default detection | Daily | Auto-transition loans exceeding DPD threshold |
| Late fee assessment | Daily | Assess late fees per fee schedule rules |
| Broken promise detection | Daily | Check for unfulfilled promise-to-pay commitments |
| Dunning execution | Daily 4:00 AM | Send dunning communications based on DPD triggers |
| Scheduled fees | Daily | Assess recurring servicing fees |
| Variable rate adjustment | Daily | Update variable-rate loan rates from index |
| Portfolio snapshots | Daily | Capture aggregate portfolio metrics |
| Lien expiration check | Daily | Alert on liens expiring within 180 days |
| Forbearance expiry | Daily | Auto-complete expired forbearance agreements |
Beat Dispatch Pattern¶
Beat dispatch tasks run in the public schema and fan out to individual tenants:
@shared_task
def dispatch_daily_interest_accrual() -> None:
"""Fan out interest accrual to all active tenants."""
for tenant in Tenant.objects.filter(status="active"):
for loan_id in get_active_loan_ids(tenant):
accrue_daily_interest.delay(str(loan_id), schema_name=f"tenant_{tenant.slug}")
Webhook Event Emission¶
Services emit webhook events to notify external subscribers of business events:
# In a service function, after a business action completes
await emit_webhook_event(
event_type="payment.received",
payload={"payment_id": str(payment.id), "loan_id": str(loan.id), "amount": str(payment.amount)},
)
Events are delivered asynchronously via Celery tasks with retry logic.
See Also¶
- Architecture Overview --- Where communication fits in the system
- Async Patterns --- Async task dispatch from services
- Layered Architecture --- Which layers can call which