Skip to content

Reconciliation Engine Architecture

The reconciliation engine compares CalcBridge's calculated values against trustee-provided source data (trustee tapes), identifying discrepancies across multiple categories with configurable severity thresholds. This is a critical component for CLO portfolio managers who must verify that their independent calculations match the official trustee records.


Overview

Reconciliation answers a fundamental question: do our calculations agree with the trustee's numbers? When they do not, the engine classifies the discrepancy, assigns a severity, and creates a trackable issue.

Key Capabilities

Capability Description
Multi-format tape parsing US Bank, Deutsche Bank, Wilmington Trust, and custom formats
Rule-based discrepancy detection Five categories with configurable tolerance thresholds
Severity classification Critical, Error, Warning, Info levels with escalation rules
Multi-source reconciliation Compare across multiple trustee sources simultaneously
Issue lifecycle management Track discrepancies from detection through resolution
Batch and on-demand Scheduled daily reconciliation or ad-hoc runs

Architecture Components

src/reconciliation/
├── engine.py                       # Core reconciliation orchestrator
├── trustee_tapes/
│   ├── base_parser.py              # Abstract parser interface
│   ├── us_bank_parser.py           # US Bank tape format
│   ├── deutsche_bank_parser.py     # Deutsche Bank tape format
│   ├── wilmington_trust_parser.py  # Wilmington Trust tape format
│   ├── generic_parser.py           # Configurable generic parser
│   └── normalizer.py               # Cross-format normalization
├── detection/
│   ├── discrepancy_detector.py     # Rule-based detection engine
│   ├── rules/
│   │   ├── completeness_rules.py   # Missing loan/field checks
│   │   ├── calculation_rules.py    # Value comparison rules
│   │   ├── timing_rules.py         # Settlement date mismatches
│   │   ├── reference_rules.py      # Identifier consistency checks
│   │   └── business_rules.py       # Domain-specific validations
│   └── tolerance.py                # Configurable threshold management
├── multi_source.py                 # Cross-source comparison
├── issues/
│   ├── issue_manager.py            # Issue lifecycle management
│   └── models.py                   # Issue data models
└── tasks/
    └── reconciliation_tasks.py     # Celery task definitions

Trustee Tape Processing

Upload and Parsing Pipeline

Trustee tapes arrive in various formats. The parsing pipeline normalizes them into a common internal representation:

flowchart LR
    subgraph Upload["1. Upload"]
        FILE["Trustee Tape<br/>(Excel/CSV)"]
        DETECT["Format<br/>Detection"]
    end

    subgraph Parse["2. Parse"]
        PARSER["Format-Specific<br/>Parser"]
        EXTRACT["Loan<br/>Extraction"]
    end

    subgraph Normalize["3. Normalize"]
        FIELDS["Field<br/>Mapping"]
        TYPES["Type<br/>Conversion"]
        DEDUP["Deduplication"]
    end

    subgraph Store["4. Store"]
        DB[("PostgreSQL<br/>JSONB")]
    end

    FILE --> DETECT
    DETECT --> PARSER
    PARSER --> EXTRACT
    EXTRACT --> FIELDS
    FIELDS --> TYPES
    TYPES --> DEDUP
    DEDUP --> DB

    style DETECT fill:#FEF3C7,stroke:#F59E0B
    style FIELDS fill:#DCFCE7,stroke:#22C55E
    style DB fill:#DBEAFE,stroke:#3B82F6

Supported Trustee Formats

Trustee File Format Header Pattern Key Identifier
US Bank XLSX (multi-sheet) Row 3, merged headers CUSIP + LoanID
Deutsche Bank CSV (pipe-delimited) First row ISIN + DealName
Wilmington Trust XLSX (single sheet) Row 1, standard headers CUSIP
Generic CSV/XLSX (configurable) User-defined Configurable

Parser Architecture

Each trustee has unique formatting conventions. Parsers implement a common interface:

src/reconciliation/trustee_tapes/base_parser.py
from abc import ABC, abstractmethod
from dataclasses import dataclass


@dataclass
class ParsedLoan:
    """Normalized loan record extracted from a trustee tape."""

    identifier: str           # Primary key (CUSIP, ISIN, etc.)
    deal_name: str
    current_balance: float
    original_balance: float
    coupon_rate: float
    maturity_date: str
    payment_frequency: str
    day_count_convention: str
    accrued_interest: float
    price: float
    spread: float
    rating_moody: str | None
    rating_sp: str | None
    rating_fitch: str | None
    sector: str
    country: str
    currency: str
    raw_data: dict            # Original fields for audit trail


class BaseTrusteeParser(ABC):
    """Abstract base class for trustee tape parsers."""

    @abstractmethod
    def detect(self, file_path: str) -> bool:
        """Return True if this parser can handle the file."""
        ...

    @abstractmethod
    def parse(self, file_path: str) -> list[ParsedLoan]:
        """Parse the trustee tape into normalized loan records."""
        ...

    @abstractmethod
    def get_report_date(self, file_path: str) -> str:
        """Extract the report/determination date from the tape."""
        ...
src/reconciliation/trustee_tapes/us_bank_parser.py
class USBankParser(BaseTrusteeParser):
    """Parser for US Bank trustee tapes.

    US Bank tapes have a specific structure:
    - Row 1-2: Deal metadata (name, date, etc.)
    - Row 3: Column headers (often merged cells)
    - Row 4+: Loan data
    - Last rows: Totals/summaries (must be excluded)
    """

    HEADER_ROW = 3
    KNOWN_HEADER_PATTERNS = [
        "CUSIP",
        "Loan ID",
        "Current Par",
        "Coupon",
        "Maturity",
    ]

    def detect(self, file_path: str) -> bool:
        """Detect US Bank format by checking header patterns."""
        wb = load_workbook(file_path, read_only=True, data_only=True)
        sheet = wb.active
        row_3 = [str(cell.value or "") for cell in sheet[self.HEADER_ROW]]
        return any(
            pattern in " ".join(row_3)
            for pattern in self.KNOWN_HEADER_PATTERNS
        )

    def parse(self, file_path: str) -> list[ParsedLoan]:
        """Parse US Bank tape into normalized records."""
        wb = load_workbook(file_path, read_only=True, data_only=True)
        sheet = wb.active

        headers = self._extract_headers(sheet)
        loans = []

        for row in sheet.iter_rows(
            min_row=self.HEADER_ROW + 1, values_only=False
        ):
            if self._is_summary_row(row):
                break
            loan = self._row_to_parsed_loan(row, headers)
            if loan:
                loans.append(loan)

        return loans

Normalization

After parsing, the normalizer standardizes field names, data types, and conventions:

src/reconciliation/trustee_tapes/normalizer.py
class TrusteeDataNormalizer:
    """Normalizes parsed trustee data into a consistent schema."""

    # Field name mappings across trustee formats
    FIELD_MAPPINGS = {
        "Current Par": "current_balance",
        "Current Face": "current_balance",
        "Outstanding Balance": "current_balance",
        "Cpn": "coupon_rate",
        "Coupon": "coupon_rate",
        "Interest Rate": "coupon_rate",
        "Mat Date": "maturity_date",
        "Maturity": "maturity_date",
        "Final Maturity": "maturity_date",
    }

    # Day count convention normalization
    DAY_COUNT_MAP = {
        "30/360": "30_360",
        "ACT/360": "actual_360",
        "ACT/365": "actual_365",
        "ACTUAL/360": "actual_360",
    }

    def normalize(self, loans: list[ParsedLoan]) -> list[ParsedLoan]:
        """Apply normalization rules to all parsed loans."""
        normalized = []
        for loan in loans:
            loan.day_count_convention = self.DAY_COUNT_MAP.get(
                loan.day_count_convention, loan.day_count_convention
            )
            loan.identifier = self._normalize_cusip(loan.identifier)
            loan.coupon_rate = self._normalize_rate(loan.coupon_rate)
            normalized.append(loan)
        return normalized

    @staticmethod
    def _normalize_cusip(cusip: str) -> str:
        """Strip whitespace and uppercase CUSIP identifiers."""
        return cusip.strip().upper() if cusip else ""

    @staticmethod
    def _normalize_rate(rate: float) -> float:
        """Convert percentage to decimal if needed (5.25 -> 0.0525)."""
        if rate and rate > 1:
            return rate / 100.0
        return rate

Discrepancy Detection

DiscrepancyDetector

The rule-based engine evaluates each loan against five categories of rules:

src/reconciliation/detection/discrepancy_detector.py
from dataclasses import dataclass
from enum import Enum


class DiscrepancyCategory(str, Enum):
    COMPLETENESS = "completeness"
    CALCULATION = "calculation"
    TIMING = "timing"
    REFERENCE = "reference"
    BUSINESS = "business"


class Severity(str, Enum):
    CRITICAL = "critical"
    ERROR = "error"
    WARNING = "warning"
    INFO = "info"


@dataclass
class Discrepancy:
    """A single identified discrepancy between CalcBridge and trustee data."""

    category: DiscrepancyCategory
    severity: Severity
    field: str
    calcbridge_value: str | float | None
    trustee_value: str | float | None
    difference: float | None
    difference_pct: float | None
    loan_identifier: str
    deal_name: str
    rule_id: str
    message: str


class DiscrepancyDetector:
    """Rule-based engine for identifying discrepancies between
    CalcBridge calculations and trustee-provided data.
    """

    def __init__(self, tolerance_config: ToleranceConfig):
        self._tolerances = tolerance_config
        self._rules = self._load_rules()

    def detect(
        self,
        calcbridge_data: list[dict],
        trustee_data: list[ParsedLoan],
    ) -> list[Discrepancy]:
        """Run all detection rules and collect discrepancies."""
        discrepancies = []

        # Build lookup maps
        cb_by_id = {r["identifier"]: r for r in calcbridge_data}
        trustee_by_id = {l.identifier: l for l in trustee_data}

        # 1. Completeness checks
        discrepancies.extend(
            self._check_completeness(cb_by_id, trustee_by_id)
        )

        # 2-5. Per-loan rule evaluation
        for identifier in cb_by_id.keys() & trustee_by_id.keys():
            cb_loan = cb_by_id[identifier]
            trustee_loan = trustee_by_id[identifier]

            for rule in self._rules:
                result = rule.evaluate(
                    cb_loan, trustee_loan, self._tolerances
                )
                if result:
                    discrepancies.append(result)

        return discrepancies

Five Discrepancy Categories

1. Completeness

Identifies loans present in one source but missing from the other:

Rule ID Check Severity
COMP-001 Loan in CalcBridge but not in trustee tape Error
COMP-002 Loan in trustee tape but not in CalcBridge Critical
COMP-003 Required field missing in CalcBridge record Warning
COMP-004 Required field missing in trustee record Info

Missing Trustee Loans

A loan present in the trustee tape but absent from CalcBridge (COMP-002) is classified as Critical because it may indicate a failed import or an untracked position, which has direct P&L implications.

2. Calculation

Compares numeric values with tolerance-based thresholds:

Rule ID Field Critical Error Warning
CALC-001 Price > 2% diff > 1% diff > 0.5% diff
CALC-002 Spread > 50 bps > 25 bps > 10 bps
CALC-003 Current Balance > $10,000 > $1,000 > $100
CALC-004 Accrued Interest > 1% diff > 0.5% diff > 0.1% diff
CALC-005 Coupon Rate > 10 bps > 5 bps > 1 bp
src/reconciliation/detection/rules/calculation_rules.py
class PriceComparisonRule:
    """Compare price values with percentage-based tolerance."""

    RULE_ID = "CALC-001"

    def evaluate(
        self,
        cb_loan: dict,
        trustee_loan: ParsedLoan,
        tolerances: ToleranceConfig,
    ) -> Discrepancy | None:
        cb_price = cb_loan.get("price")
        trustee_price = trustee_loan.price

        if cb_price is None or trustee_price is None:
            return None

        if trustee_price == 0:
            if cb_price != 0:
                return Discrepancy(
                    category=DiscrepancyCategory.CALCULATION,
                    severity=Severity.ERROR,
                    field="price",
                    calcbridge_value=cb_price,
                    trustee_value=trustee_price,
                    difference=cb_price,
                    difference_pct=None,
                    loan_identifier=trustee_loan.identifier,
                    deal_name=trustee_loan.deal_name,
                    rule_id=self.RULE_ID,
                    message="Price mismatch: trustee reports 0",
                )
            return None

        diff_pct = abs(cb_price - trustee_price) / abs(trustee_price) * 100

        if diff_pct <= tolerances.price_warning_pct:
            return None

        severity = Severity.WARNING
        if diff_pct > tolerances.price_critical_pct:
            severity = Severity.CRITICAL
        elif diff_pct > tolerances.price_error_pct:
            severity = Severity.ERROR

        return Discrepancy(
            category=DiscrepancyCategory.CALCULATION,
            severity=severity,
            field="price",
            calcbridge_value=cb_price,
            trustee_value=trustee_price,
            difference=cb_price - trustee_price,
            difference_pct=diff_pct,
            loan_identifier=trustee_loan.identifier,
            deal_name=trustee_loan.deal_name,
            rule_id=self.RULE_ID,
            message=f"Price diff: {diff_pct:.2f}% "
                    f"(CB: {cb_price}, Trustee: {trustee_price})",
        )

3. Timing

Detects settlement date and reporting period mismatches:

Rule ID Check Severity
TIME-001 Settlement date mismatch Error
TIME-002 Payment date mismatch (> 1 business day) Warning
TIME-003 Report date does not match determination date Info
TIME-004 Maturity date mismatch Error

4. Reference

Validates identifier consistency across sources:

Rule ID Check Severity
REF-001 CUSIP format invalid Warning
REF-002 ISIN checksum failure Warning
REF-003 Deal name mismatch (fuzzy match < 90%) Info
REF-004 Currency code inconsistency Error

5. Business

Domain-specific validations for CLO portfolios:

Rule ID Check Severity
BIZ-001 Rating downgrade not reflected in CalcBridge Critical
BIZ-002 Defaulted loan not flagged Critical
BIZ-003 Sector classification mismatch Warning
BIZ-004 Country of risk disagreement Warning
BIZ-005 Day count convention mismatch Error

Tolerance Configuration

Tolerances are configurable per tenant and can be overridden per deal:

src/reconciliation/detection/tolerance.py
from dataclasses import dataclass


@dataclass
class ToleranceConfig:
    """Configurable tolerance thresholds for reconciliation."""

    # Price tolerance (percentage)
    price_warning_pct: float = 0.5
    price_error_pct: float = 1.0
    price_critical_pct: float = 2.0

    # Spread tolerance (basis points)
    spread_warning_bps: float = 10.0
    spread_error_bps: float = 25.0
    spread_critical_bps: float = 50.0

    # Balance tolerance (absolute value)
    balance_warning_abs: float = 100.0
    balance_error_abs: float = 1_000.0
    balance_critical_abs: float = 10_000.0

    # Accrued interest tolerance (percentage)
    accrued_warning_pct: float = 0.1
    accrued_error_pct: float = 0.5
    accrued_critical_pct: float = 1.0

    # Coupon rate tolerance (basis points)
    coupon_warning_bps: float = 1.0
    coupon_error_bps: float = 5.0
    coupon_critical_bps: float = 10.0

    @classmethod
    def from_tenant_config(cls, config: dict) -> "ToleranceConfig":
        """Build tolerance config from tenant settings."""
        return cls(**{
            k: v for k, v in config.items()
            if k in cls.__dataclass_fields__
        })

Per-Deal Overrides

Some deals have known timing differences or use different pricing sources. Tolerance overrides can be configured at the deal level to suppress expected discrepancies without disabling detection globally.


Multi-Source Reconciliation

The MultiSourceReconciler handles cases where multiple trustee reports or data sources must be compared against each other and against CalcBridge:

src/reconciliation/multi_source.py
class MultiSourceReconciler:
    """Reconciles data across multiple sources simultaneously.

    Use cases:
    - Comparing CalcBridge vs. Trustee vs. Bloomberg
    - Cross-validating two trustee tapes for the same period
    - Verifying data migration accuracy
    """

    def reconcile(
        self,
        sources: dict[str, list[dict]],
        primary_source: str = "calcbridge",
        tolerance_config: ToleranceConfig | None = None,
    ) -> MultiSourceResult:
        """Compare all source pairs and aggregate discrepancies.

        Args:
            sources: Map of source name to loan records.
            primary_source: The source treated as "expected" values.
            tolerance_config: Threshold configuration.

        Returns:
            MultiSourceResult with per-pair and aggregated discrepancies.
        """
        detector = DiscrepancyDetector(
            tolerance_config or ToleranceConfig()
        )
        pair_results = {}

        for source_name, source_data in sources.items():
            if source_name == primary_source:
                continue

            discrepancies = detector.detect(
                calcbridge_data=sources[primary_source],
                trustee_data=source_data,
            )

            pair_results[f"{primary_source}_vs_{source_name}"] = discrepancies

        return MultiSourceResult(
            pairs=pair_results,
            summary=self._build_summary(pair_results),
            consensus=self._find_consensus_values(sources),
        )

Reconciliation Pipeline

Full Flowchart

flowchart TB
    subgraph Input["Input Sources"]
        TAPE["Trustee Tape<br/>(Upload)"]
        CB["CalcBridge<br/>Workbook Data"]
    end

    subgraph Parse["Tape Processing"]
        DETECT["Format Detection"]
        PARSE["Format-Specific<br/>Parser"]
        NORM["Normalizer"]
    end

    subgraph Match["Loan Matching"]
        INDEX["Build Identifier<br/>Index"]
        MATCH["Match Loans<br/>by CUSIP/ISIN"]
        UNMATCH["Identify<br/>Unmatched"]
    end

    subgraph Detect["Discrepancy Detection"]
        COMP["Completeness<br/>Rules"]
        CALC["Calculation<br/>Rules"]
        TIME["Timing<br/>Rules"]
        REF["Reference<br/>Rules"]
        BIZ["Business<br/>Rules"]
    end

    subgraph Classify["Classification"]
        SEV["Severity<br/>Assignment"]
        AGG["Aggregation"]
    end

    subgraph Output["Output"]
        ISSUES["Issue<br/>Creation"]
        REPORT["Reconciliation<br/>Report"]
        NOTIFY["Notifications"]
    end

    TAPE --> DETECT
    DETECT --> PARSE
    PARSE --> NORM

    CB --> INDEX
    NORM --> INDEX

    INDEX --> MATCH
    INDEX --> UNMATCH

    MATCH --> COMP
    MATCH --> CALC
    MATCH --> TIME
    MATCH --> REF
    MATCH --> BIZ
    UNMATCH --> COMP

    COMP --> SEV
    CALC --> SEV
    TIME --> SEV
    REF --> SEV
    BIZ --> SEV

    SEV --> AGG

    AGG --> ISSUES
    AGG --> REPORT
    AGG --> NOTIFY

    style DETECT fill:#FEF3C7,stroke:#F59E0B
    style SEV fill:#FEE2E2,stroke:#EF4444
    style ISSUES fill:#DBEAFE,stroke:#3B82F6

Issue Lifecycle

Discrepancies are tracked as issues with a defined lifecycle:

stateDiagram-v2
    [*] --> open : Discrepancy detected

    open --> acknowledged : Analyst reviews
    open --> auto_resolved : Re-reconciliation clears it

    acknowledged --> investigating : Root cause analysis
    acknowledged --> false_positive : Known difference

    investigating --> resolved : Fix applied
    investigating --> false_positive : Confirmed expected

    resolved --> [*]
    false_positive --> [*]
    auto_resolved --> [*]

Issue States

State Description SLA
open Newly detected discrepancy, awaiting review Review within 24 hours (Critical), 72 hours (Error)
acknowledged Analyst has reviewed and is aware Begin investigation within 48 hours
investigating Root cause analysis in progress Resolve within 5 business days
resolved Discrepancy corrected in source or CalcBridge Verify in next reconciliation cycle
false_positive Confirmed as expected difference Document reason, exclude from future runs (optional)
auto_resolved Cleared automatically by subsequent reconciliation No action required
src/reconciliation/issues/issue_manager.py
class IssueManager:
    """Manages the lifecycle of reconciliation issues."""

    def create_issues(
        self, discrepancies: list[Discrepancy], run_id: str
    ) -> list[Issue]:
        """Create or update issues from detected discrepancies.

        Existing open issues for the same loan+field combination
        are updated rather than duplicated.
        """
        issues = []
        for disc in discrepancies:
            existing = self._find_existing(
                disc.loan_identifier, disc.field, disc.rule_id
            )

            if existing and existing.status in ("open", "acknowledged"):
                existing.occurrence_count += 1
                existing.latest_run_id = run_id
                existing.latest_values = {
                    "calcbridge": disc.calcbridge_value,
                    "trustee": disc.trustee_value,
                }
                issues.append(existing)
            else:
                issue = Issue(
                    discrepancy=disc,
                    run_id=run_id,
                    status="open",
                    occurrence_count=1,
                )
                issues.append(issue)

        return issues

    def auto_resolve(self, run_id: str, current_discrepancies: list[Discrepancy]):
        """Auto-resolve issues that no longer appear in the latest run."""
        open_issues = self._get_open_issues()
        current_keys = {
            (d.loan_identifier, d.field, d.rule_id)
            for d in current_discrepancies
        }

        for issue in open_issues:
            key = (
                issue.loan_identifier,
                issue.field,
                issue.rule_id,
            )
            if key not in current_keys:
                issue.status = "auto_resolved"
                issue.resolved_run_id = run_id

Reconciliation Report

Each reconciliation run produces a structured report:

Report structure
{
    "run_id": "recon-2026-02-15-001",
    "deal_name": "CLO 2024-1",
    "report_date": "2026-02-15",
    "trustee": "US Bank",
    "summary": {
        "total_loans_calcbridge": 245,
        "total_loans_trustee": 247,
        "matched_loans": 243,
        "unmatched_calcbridge": 2,
        "unmatched_trustee": 4,
        "total_discrepancies": 18,
        "by_severity": {
            "critical": 1,
            "error": 4,
            "warning": 8,
            "info": 5
        },
        "by_category": {
            "completeness": 6,
            "calculation": 7,
            "timing": 2,
            "reference": 1,
            "business": 2
        }
    },
    "discrepancies": [ ... ],
    "match_rate": 98.4,
    "clean_rate": 92.7
}
Metric Description Target
Match Rate Percentage of loans present in both sources > 99%
Clean Rate Percentage of matched loans with no discrepancies > 95%
Critical Count Number of critical-severity discrepancies 0
Resolution Time Average time from detection to resolution < 3 business days

Celery Task Integration

src/reconciliation/tasks/reconciliation_tasks.py
@shared_task(
    bind=True,
    queue="default",
    max_retries=1,
    soft_time_limit=600,
    time_limit=900,
)
def execute_reconciliation(
    self,
    tenant_id: str,
    workbook_id: str,
    tape_upload_id: str,
    tolerance_overrides: dict | None = None,
):
    """Run reconciliation between a workbook and a trustee tape.

    Steps:
    1. Load and parse the trustee tape
    2. Extract CalcBridge calculated data
    3. Run discrepancy detection
    4. Create/update issues
    5. Generate reconciliation report
    6. Send notifications for critical findings
    """
    ...

Scheduled Reconciliation

Celery Beat can be configured to run reconciliation automatically when new trustee tapes are uploaded. The tape_upload_id is passed from the upload webhook to trigger the reconciliation pipeline.


Metrics and Monitoring

Metric Type Description
reconciliation_runs_total Counter Total reconciliation runs by deal
reconciliation_duration_seconds Histogram End-to-end reconciliation time
reconciliation_discrepancies_total Gauge Current open discrepancies by severity
reconciliation_match_rate Gauge Loan match rate per deal
reconciliation_clean_rate Gauge Clean (no discrepancy) rate per deal
reconciliation_issues_open Gauge Total open issues by category
reconciliation_resolution_days Histogram Time to resolve issues

Configuration Reference

Reconciliation configuration (environment variables)
# Reconciliation engine
RECON_ENABLED: true
RECON_AUTO_RUN_ON_UPLOAD: true        # Trigger on tape upload
RECON_DEFAULT_TOLERANCE_PROFILE: standard

# Issue management
RECON_ISSUE_SLA_CRITICAL_HOURS: 24
RECON_ISSUE_SLA_ERROR_HOURS: 72
RECON_ISSUE_AUTO_RESOLVE: true        # Auto-resolve cleared discrepancies

# Notifications
RECON_NOTIFY_ON_CRITICAL: true        # Send alerts for critical findings
RECON_NOTIFY_CHANNEL: email,websocket # Notification channels

# Retention
RECON_REPORT_RETENTION_DAYS: 365      # Keep reports for 1 year
RECON_TAPE_RETENTION_DAYS: 180        # Keep uploaded tapes for 6 months