Reconciliation Engine Architecture¶
The reconciliation engine compares CalcBridge's calculated values against trustee-provided source data (trustee tapes), identifying discrepancies across multiple categories with configurable severity thresholds. This is a critical component for CLO portfolio managers who must verify that their independent calculations match the official trustee records.
Overview¶
Reconciliation answers a fundamental question: do our calculations agree with the trustee's numbers? When they do not, the engine classifies the discrepancy, assigns a severity, and creates a trackable issue.
Key Capabilities¶
| Capability | Description |
|---|---|
| Multi-format tape parsing | US Bank, Deutsche Bank, Wilmington Trust, and custom formats |
| Rule-based discrepancy detection | Five categories with configurable tolerance thresholds |
| Severity classification | Critical, Error, Warning, Info levels with escalation rules |
| Multi-source reconciliation | Compare across multiple trustee sources simultaneously |
| Issue lifecycle management | Track discrepancies from detection through resolution |
| Batch and on-demand | Scheduled daily reconciliation or ad-hoc runs |
Architecture Components¶
src/reconciliation/
├── engine.py # Core reconciliation orchestrator
├── trustee_tapes/
│ ├── base_parser.py # Abstract parser interface
│ ├── us_bank_parser.py # US Bank tape format
│ ├── deutsche_bank_parser.py # Deutsche Bank tape format
│ ├── wilmington_trust_parser.py # Wilmington Trust tape format
│ ├── generic_parser.py # Configurable generic parser
│ └── normalizer.py # Cross-format normalization
├── detection/
│ ├── discrepancy_detector.py # Rule-based detection engine
│ ├── rules/
│ │ ├── completeness_rules.py # Missing loan/field checks
│ │ ├── calculation_rules.py # Value comparison rules
│ │ ├── timing_rules.py # Settlement date mismatches
│ │ ├── reference_rules.py # Identifier consistency checks
│ │ └── business_rules.py # Domain-specific validations
│ └── tolerance.py # Configurable threshold management
├── multi_source.py # Cross-source comparison
├── issues/
│ ├── issue_manager.py # Issue lifecycle management
│ └── models.py # Issue data models
└── tasks/
└── reconciliation_tasks.py # Celery task definitions
Trustee Tape Processing¶
Upload and Parsing Pipeline¶
Trustee tapes arrive in various formats. The parsing pipeline normalizes them into a common internal representation:
flowchart LR
subgraph Upload["1. Upload"]
FILE["Trustee Tape<br/>(Excel/CSV)"]
DETECT["Format<br/>Detection"]
end
subgraph Parse["2. Parse"]
PARSER["Format-Specific<br/>Parser"]
EXTRACT["Loan<br/>Extraction"]
end
subgraph Normalize["3. Normalize"]
FIELDS["Field<br/>Mapping"]
TYPES["Type<br/>Conversion"]
DEDUP["Deduplication"]
end
subgraph Store["4. Store"]
DB[("PostgreSQL<br/>JSONB")]
end
FILE --> DETECT
DETECT --> PARSER
PARSER --> EXTRACT
EXTRACT --> FIELDS
FIELDS --> TYPES
TYPES --> DEDUP
DEDUP --> DB
style DETECT fill:#FEF3C7,stroke:#F59E0B
style FIELDS fill:#DCFCE7,stroke:#22C55E
style DB fill:#DBEAFE,stroke:#3B82F6 Supported Trustee Formats¶
| Trustee | File Format | Header Pattern | Key Identifier |
|---|---|---|---|
| US Bank | XLSX (multi-sheet) | Row 3, merged headers | CUSIP + LoanID |
| Deutsche Bank | CSV (pipe-delimited) | First row | ISIN + DealName |
| Wilmington Trust | XLSX (single sheet) | Row 1, standard headers | CUSIP |
| Generic | CSV/XLSX (configurable) | User-defined | Configurable |
Parser Architecture¶
Each trustee has unique formatting conventions. Parsers implement a common interface:
from abc import ABC, abstractmethod
from dataclasses import dataclass
@dataclass
class ParsedLoan:
"""Normalized loan record extracted from a trustee tape."""
identifier: str # Primary key (CUSIP, ISIN, etc.)
deal_name: str
current_balance: float
original_balance: float
coupon_rate: float
maturity_date: str
payment_frequency: str
day_count_convention: str
accrued_interest: float
price: float
spread: float
rating_moody: str | None
rating_sp: str | None
rating_fitch: str | None
sector: str
country: str
currency: str
raw_data: dict # Original fields for audit trail
class BaseTrusteeParser(ABC):
"""Abstract base class for trustee tape parsers."""
@abstractmethod
def detect(self, file_path: str) -> bool:
"""Return True if this parser can handle the file."""
...
@abstractmethod
def parse(self, file_path: str) -> list[ParsedLoan]:
"""Parse the trustee tape into normalized loan records."""
...
@abstractmethod
def get_report_date(self, file_path: str) -> str:
"""Extract the report/determination date from the tape."""
...
class USBankParser(BaseTrusteeParser):
"""Parser for US Bank trustee tapes.
US Bank tapes have a specific structure:
- Row 1-2: Deal metadata (name, date, etc.)
- Row 3: Column headers (often merged cells)
- Row 4+: Loan data
- Last rows: Totals/summaries (must be excluded)
"""
HEADER_ROW = 3
KNOWN_HEADER_PATTERNS = [
"CUSIP",
"Loan ID",
"Current Par",
"Coupon",
"Maturity",
]
def detect(self, file_path: str) -> bool:
"""Detect US Bank format by checking header patterns."""
wb = load_workbook(file_path, read_only=True, data_only=True)
sheet = wb.active
row_3 = [str(cell.value or "") for cell in sheet[self.HEADER_ROW]]
return any(
pattern in " ".join(row_3)
for pattern in self.KNOWN_HEADER_PATTERNS
)
def parse(self, file_path: str) -> list[ParsedLoan]:
"""Parse US Bank tape into normalized records."""
wb = load_workbook(file_path, read_only=True, data_only=True)
sheet = wb.active
headers = self._extract_headers(sheet)
loans = []
for row in sheet.iter_rows(
min_row=self.HEADER_ROW + 1, values_only=False
):
if self._is_summary_row(row):
break
loan = self._row_to_parsed_loan(row, headers)
if loan:
loans.append(loan)
return loans
Normalization¶
After parsing, the normalizer standardizes field names, data types, and conventions:
class TrusteeDataNormalizer:
"""Normalizes parsed trustee data into a consistent schema."""
# Field name mappings across trustee formats
FIELD_MAPPINGS = {
"Current Par": "current_balance",
"Current Face": "current_balance",
"Outstanding Balance": "current_balance",
"Cpn": "coupon_rate",
"Coupon": "coupon_rate",
"Interest Rate": "coupon_rate",
"Mat Date": "maturity_date",
"Maturity": "maturity_date",
"Final Maturity": "maturity_date",
}
# Day count convention normalization
DAY_COUNT_MAP = {
"30/360": "30_360",
"ACT/360": "actual_360",
"ACT/365": "actual_365",
"ACTUAL/360": "actual_360",
}
def normalize(self, loans: list[ParsedLoan]) -> list[ParsedLoan]:
"""Apply normalization rules to all parsed loans."""
normalized = []
for loan in loans:
loan.day_count_convention = self.DAY_COUNT_MAP.get(
loan.day_count_convention, loan.day_count_convention
)
loan.identifier = self._normalize_cusip(loan.identifier)
loan.coupon_rate = self._normalize_rate(loan.coupon_rate)
normalized.append(loan)
return normalized
@staticmethod
def _normalize_cusip(cusip: str) -> str:
"""Strip whitespace and uppercase CUSIP identifiers."""
return cusip.strip().upper() if cusip else ""
@staticmethod
def _normalize_rate(rate: float) -> float:
"""Convert percentage to decimal if needed (5.25 -> 0.0525)."""
if rate and rate > 1:
return rate / 100.0
return rate
Discrepancy Detection¶
DiscrepancyDetector¶
The rule-based engine evaluates each loan against five categories of rules:
from dataclasses import dataclass
from enum import Enum
class DiscrepancyCategory(str, Enum):
COMPLETENESS = "completeness"
CALCULATION = "calculation"
TIMING = "timing"
REFERENCE = "reference"
BUSINESS = "business"
class Severity(str, Enum):
CRITICAL = "critical"
ERROR = "error"
WARNING = "warning"
INFO = "info"
@dataclass
class Discrepancy:
"""A single identified discrepancy between CalcBridge and trustee data."""
category: DiscrepancyCategory
severity: Severity
field: str
calcbridge_value: str | float | None
trustee_value: str | float | None
difference: float | None
difference_pct: float | None
loan_identifier: str
deal_name: str
rule_id: str
message: str
class DiscrepancyDetector:
"""Rule-based engine for identifying discrepancies between
CalcBridge calculations and trustee-provided data.
"""
def __init__(self, tolerance_config: ToleranceConfig):
self._tolerances = tolerance_config
self._rules = self._load_rules()
def detect(
self,
calcbridge_data: list[dict],
trustee_data: list[ParsedLoan],
) -> list[Discrepancy]:
"""Run all detection rules and collect discrepancies."""
discrepancies = []
# Build lookup maps
cb_by_id = {r["identifier"]: r for r in calcbridge_data}
trustee_by_id = {l.identifier: l for l in trustee_data}
# 1. Completeness checks
discrepancies.extend(
self._check_completeness(cb_by_id, trustee_by_id)
)
# 2-5. Per-loan rule evaluation
for identifier in cb_by_id.keys() & trustee_by_id.keys():
cb_loan = cb_by_id[identifier]
trustee_loan = trustee_by_id[identifier]
for rule in self._rules:
result = rule.evaluate(
cb_loan, trustee_loan, self._tolerances
)
if result:
discrepancies.append(result)
return discrepancies
Five Discrepancy Categories¶
1. Completeness¶
Identifies loans present in one source but missing from the other:
| Rule ID | Check | Severity |
|---|---|---|
COMP-001 | Loan in CalcBridge but not in trustee tape | Error |
COMP-002 | Loan in trustee tape but not in CalcBridge | Critical |
COMP-003 | Required field missing in CalcBridge record | Warning |
COMP-004 | Required field missing in trustee record | Info |
Missing Trustee Loans
A loan present in the trustee tape but absent from CalcBridge (COMP-002) is classified as Critical because it may indicate a failed import or an untracked position, which has direct P&L implications.
2. Calculation¶
Compares numeric values with tolerance-based thresholds:
| Rule ID | Field | Critical | Error | Warning |
|---|---|---|---|---|
CALC-001 | Price | > 2% diff | > 1% diff | > 0.5% diff |
CALC-002 | Spread | > 50 bps | > 25 bps | > 10 bps |
CALC-003 | Current Balance | > $10,000 | > $1,000 | > $100 |
CALC-004 | Accrued Interest | > 1% diff | > 0.5% diff | > 0.1% diff |
CALC-005 | Coupon Rate | > 10 bps | > 5 bps | > 1 bp |
class PriceComparisonRule:
"""Compare price values with percentage-based tolerance."""
RULE_ID = "CALC-001"
def evaluate(
self,
cb_loan: dict,
trustee_loan: ParsedLoan,
tolerances: ToleranceConfig,
) -> Discrepancy | None:
cb_price = cb_loan.get("price")
trustee_price = trustee_loan.price
if cb_price is None or trustee_price is None:
return None
if trustee_price == 0:
if cb_price != 0:
return Discrepancy(
category=DiscrepancyCategory.CALCULATION,
severity=Severity.ERROR,
field="price",
calcbridge_value=cb_price,
trustee_value=trustee_price,
difference=cb_price,
difference_pct=None,
loan_identifier=trustee_loan.identifier,
deal_name=trustee_loan.deal_name,
rule_id=self.RULE_ID,
message="Price mismatch: trustee reports 0",
)
return None
diff_pct = abs(cb_price - trustee_price) / abs(trustee_price) * 100
if diff_pct <= tolerances.price_warning_pct:
return None
severity = Severity.WARNING
if diff_pct > tolerances.price_critical_pct:
severity = Severity.CRITICAL
elif diff_pct > tolerances.price_error_pct:
severity = Severity.ERROR
return Discrepancy(
category=DiscrepancyCategory.CALCULATION,
severity=severity,
field="price",
calcbridge_value=cb_price,
trustee_value=trustee_price,
difference=cb_price - trustee_price,
difference_pct=diff_pct,
loan_identifier=trustee_loan.identifier,
deal_name=trustee_loan.deal_name,
rule_id=self.RULE_ID,
message=f"Price diff: {diff_pct:.2f}% "
f"(CB: {cb_price}, Trustee: {trustee_price})",
)
3. Timing¶
Detects settlement date and reporting period mismatches:
| Rule ID | Check | Severity |
|---|---|---|
TIME-001 | Settlement date mismatch | Error |
TIME-002 | Payment date mismatch (> 1 business day) | Warning |
TIME-003 | Report date does not match determination date | Info |
TIME-004 | Maturity date mismatch | Error |
4. Reference¶
Validates identifier consistency across sources:
| Rule ID | Check | Severity |
|---|---|---|
REF-001 | CUSIP format invalid | Warning |
REF-002 | ISIN checksum failure | Warning |
REF-003 | Deal name mismatch (fuzzy match < 90%) | Info |
REF-004 | Currency code inconsistency | Error |
5. Business¶
Domain-specific validations for CLO portfolios:
| Rule ID | Check | Severity |
|---|---|---|
BIZ-001 | Rating downgrade not reflected in CalcBridge | Critical |
BIZ-002 | Defaulted loan not flagged | Critical |
BIZ-003 | Sector classification mismatch | Warning |
BIZ-004 | Country of risk disagreement | Warning |
BIZ-005 | Day count convention mismatch | Error |
Tolerance Configuration¶
Tolerances are configurable per tenant and can be overridden per deal:
from dataclasses import dataclass
@dataclass
class ToleranceConfig:
"""Configurable tolerance thresholds for reconciliation."""
# Price tolerance (percentage)
price_warning_pct: float = 0.5
price_error_pct: float = 1.0
price_critical_pct: float = 2.0
# Spread tolerance (basis points)
spread_warning_bps: float = 10.0
spread_error_bps: float = 25.0
spread_critical_bps: float = 50.0
# Balance tolerance (absolute value)
balance_warning_abs: float = 100.0
balance_error_abs: float = 1_000.0
balance_critical_abs: float = 10_000.0
# Accrued interest tolerance (percentage)
accrued_warning_pct: float = 0.1
accrued_error_pct: float = 0.5
accrued_critical_pct: float = 1.0
# Coupon rate tolerance (basis points)
coupon_warning_bps: float = 1.0
coupon_error_bps: float = 5.0
coupon_critical_bps: float = 10.0
@classmethod
def from_tenant_config(cls, config: dict) -> "ToleranceConfig":
"""Build tolerance config from tenant settings."""
return cls(**{
k: v for k, v in config.items()
if k in cls.__dataclass_fields__
})
Per-Deal Overrides
Some deals have known timing differences or use different pricing sources. Tolerance overrides can be configured at the deal level to suppress expected discrepancies without disabling detection globally.
Multi-Source Reconciliation¶
The MultiSourceReconciler handles cases where multiple trustee reports or data sources must be compared against each other and against CalcBridge:
class MultiSourceReconciler:
"""Reconciles data across multiple sources simultaneously.
Use cases:
- Comparing CalcBridge vs. Trustee vs. Bloomberg
- Cross-validating two trustee tapes for the same period
- Verifying data migration accuracy
"""
def reconcile(
self,
sources: dict[str, list[dict]],
primary_source: str = "calcbridge",
tolerance_config: ToleranceConfig | None = None,
) -> MultiSourceResult:
"""Compare all source pairs and aggregate discrepancies.
Args:
sources: Map of source name to loan records.
primary_source: The source treated as "expected" values.
tolerance_config: Threshold configuration.
Returns:
MultiSourceResult with per-pair and aggregated discrepancies.
"""
detector = DiscrepancyDetector(
tolerance_config or ToleranceConfig()
)
pair_results = {}
for source_name, source_data in sources.items():
if source_name == primary_source:
continue
discrepancies = detector.detect(
calcbridge_data=sources[primary_source],
trustee_data=source_data,
)
pair_results[f"{primary_source}_vs_{source_name}"] = discrepancies
return MultiSourceResult(
pairs=pair_results,
summary=self._build_summary(pair_results),
consensus=self._find_consensus_values(sources),
)
Reconciliation Pipeline¶
Full Flowchart¶
flowchart TB
subgraph Input["Input Sources"]
TAPE["Trustee Tape<br/>(Upload)"]
CB["CalcBridge<br/>Workbook Data"]
end
subgraph Parse["Tape Processing"]
DETECT["Format Detection"]
PARSE["Format-Specific<br/>Parser"]
NORM["Normalizer"]
end
subgraph Match["Loan Matching"]
INDEX["Build Identifier<br/>Index"]
MATCH["Match Loans<br/>by CUSIP/ISIN"]
UNMATCH["Identify<br/>Unmatched"]
end
subgraph Detect["Discrepancy Detection"]
COMP["Completeness<br/>Rules"]
CALC["Calculation<br/>Rules"]
TIME["Timing<br/>Rules"]
REF["Reference<br/>Rules"]
BIZ["Business<br/>Rules"]
end
subgraph Classify["Classification"]
SEV["Severity<br/>Assignment"]
AGG["Aggregation"]
end
subgraph Output["Output"]
ISSUES["Issue<br/>Creation"]
REPORT["Reconciliation<br/>Report"]
NOTIFY["Notifications"]
end
TAPE --> DETECT
DETECT --> PARSE
PARSE --> NORM
CB --> INDEX
NORM --> INDEX
INDEX --> MATCH
INDEX --> UNMATCH
MATCH --> COMP
MATCH --> CALC
MATCH --> TIME
MATCH --> REF
MATCH --> BIZ
UNMATCH --> COMP
COMP --> SEV
CALC --> SEV
TIME --> SEV
REF --> SEV
BIZ --> SEV
SEV --> AGG
AGG --> ISSUES
AGG --> REPORT
AGG --> NOTIFY
style DETECT fill:#FEF3C7,stroke:#F59E0B
style SEV fill:#FEE2E2,stroke:#EF4444
style ISSUES fill:#DBEAFE,stroke:#3B82F6 Issue Lifecycle¶
Discrepancies are tracked as issues with a defined lifecycle:
stateDiagram-v2
[*] --> open : Discrepancy detected
open --> acknowledged : Analyst reviews
open --> auto_resolved : Re-reconciliation clears it
acknowledged --> investigating : Root cause analysis
acknowledged --> false_positive : Known difference
investigating --> resolved : Fix applied
investigating --> false_positive : Confirmed expected
resolved --> [*]
false_positive --> [*]
auto_resolved --> [*] Issue States¶
| State | Description | SLA |
|---|---|---|
open | Newly detected discrepancy, awaiting review | Review within 24 hours (Critical), 72 hours (Error) |
acknowledged | Analyst has reviewed and is aware | Begin investigation within 48 hours |
investigating | Root cause analysis in progress | Resolve within 5 business days |
resolved | Discrepancy corrected in source or CalcBridge | Verify in next reconciliation cycle |
false_positive | Confirmed as expected difference | Document reason, exclude from future runs (optional) |
auto_resolved | Cleared automatically by subsequent reconciliation | No action required |
class IssueManager:
"""Manages the lifecycle of reconciliation issues."""
def create_issues(
self, discrepancies: list[Discrepancy], run_id: str
) -> list[Issue]:
"""Create or update issues from detected discrepancies.
Existing open issues for the same loan+field combination
are updated rather than duplicated.
"""
issues = []
for disc in discrepancies:
existing = self._find_existing(
disc.loan_identifier, disc.field, disc.rule_id
)
if existing and existing.status in ("open", "acknowledged"):
existing.occurrence_count += 1
existing.latest_run_id = run_id
existing.latest_values = {
"calcbridge": disc.calcbridge_value,
"trustee": disc.trustee_value,
}
issues.append(existing)
else:
issue = Issue(
discrepancy=disc,
run_id=run_id,
status="open",
occurrence_count=1,
)
issues.append(issue)
return issues
def auto_resolve(self, run_id: str, current_discrepancies: list[Discrepancy]):
"""Auto-resolve issues that no longer appear in the latest run."""
open_issues = self._get_open_issues()
current_keys = {
(d.loan_identifier, d.field, d.rule_id)
for d in current_discrepancies
}
for issue in open_issues:
key = (
issue.loan_identifier,
issue.field,
issue.rule_id,
)
if key not in current_keys:
issue.status = "auto_resolved"
issue.resolved_run_id = run_id
Reconciliation Report¶
Each reconciliation run produces a structured report:
{
"run_id": "recon-2026-02-15-001",
"deal_name": "CLO 2024-1",
"report_date": "2026-02-15",
"trustee": "US Bank",
"summary": {
"total_loans_calcbridge": 245,
"total_loans_trustee": 247,
"matched_loans": 243,
"unmatched_calcbridge": 2,
"unmatched_trustee": 4,
"total_discrepancies": 18,
"by_severity": {
"critical": 1,
"error": 4,
"warning": 8,
"info": 5
},
"by_category": {
"completeness": 6,
"calculation": 7,
"timing": 2,
"reference": 1,
"business": 2
}
},
"discrepancies": [ ... ],
"match_rate": 98.4,
"clean_rate": 92.7
}
| Metric | Description | Target |
|---|---|---|
| Match Rate | Percentage of loans present in both sources | > 99% |
| Clean Rate | Percentage of matched loans with no discrepancies | > 95% |
| Critical Count | Number of critical-severity discrepancies | 0 |
| Resolution Time | Average time from detection to resolution | < 3 business days |
Celery Task Integration¶
@shared_task(
bind=True,
queue="default",
max_retries=1,
soft_time_limit=600,
time_limit=900,
)
def execute_reconciliation(
self,
tenant_id: str,
workbook_id: str,
tape_upload_id: str,
tolerance_overrides: dict | None = None,
):
"""Run reconciliation between a workbook and a trustee tape.
Steps:
1. Load and parse the trustee tape
2. Extract CalcBridge calculated data
3. Run discrepancy detection
4. Create/update issues
5. Generate reconciliation report
6. Send notifications for critical findings
"""
...
Scheduled Reconciliation
Celery Beat can be configured to run reconciliation automatically when new trustee tapes are uploaded. The tape_upload_id is passed from the upload webhook to trigger the reconciliation pipeline.
Metrics and Monitoring¶
| Metric | Type | Description |
|---|---|---|
reconciliation_runs_total | Counter | Total reconciliation runs by deal |
reconciliation_duration_seconds | Histogram | End-to-end reconciliation time |
reconciliation_discrepancies_total | Gauge | Current open discrepancies by severity |
reconciliation_match_rate | Gauge | Loan match rate per deal |
reconciliation_clean_rate | Gauge | Clean (no discrepancy) rate per deal |
reconciliation_issues_open | Gauge | Total open issues by category |
reconciliation_resolution_days | Histogram | Time to resolve issues |
Configuration Reference¶
# Reconciliation engine
RECON_ENABLED: true
RECON_AUTO_RUN_ON_UPLOAD: true # Trigger on tape upload
RECON_DEFAULT_TOLERANCE_PROFILE: standard
# Issue management
RECON_ISSUE_SLA_CRITICAL_HOURS: 24
RECON_ISSUE_SLA_ERROR_HOURS: 72
RECON_ISSUE_AUTO_RESOLVE: true # Auto-resolve cleared discrepancies
# Notifications
RECON_NOTIFY_ON_CRITICAL: true # Send alerts for critical findings
RECON_NOTIFY_CHANNEL: email,websocket # Notification channels
# Retention
RECON_REPORT_RETENTION_DAYS: 365 # Keep reports for 1 year
RECON_TAPE_RETENTION_DAYS: 180 # Keep uploaded tapes for 6 months
Related Documentation¶
- Data Flow & Processing Pipeline -- Upstream data ingestion
- Domain Modules -- CLO and Servicer domain configuration
- Export Pipeline -- Exporting reconciliation reports
- Security Architecture -- Tenant isolation in reconciliation data