Skip to content

Domain Modules Architecture

CalcBridge organizes financial data processing into domain modules -- self-contained units that encapsulate the ingestion rules, column mappings, and validation logic for a specific class of financial data. This document covers the registry pattern, alias profile system, column normalization pipeline, and schema drift detection.


Overview

CalcBridge currently supports two primary domains, each with multiple alias profiles for handling data from different providers:

Domain Alias Profiles Primary Use Case
CLO 15 profiles Structured finance CLO holdings and analytics
Servicer 14 profiles Loan servicer snapshots, regulatory templates, capital sheets

Each domain registers with the central DomainRegistry at startup, declaring its alias profiles, validation rules, and normalization pipeline.


Registry Pattern

DomainRegistry

The DomainRegistry is a singleton that manages domain module lifecycle -- discovery, registration, and lookup:

flowchart TB
    subgraph Startup["Application Startup"]
        SCAN["Scan Domain<br/>Modules"]
        REG["DomainRegistry<br/>(Singleton)"]
    end

    subgraph Domains["Registered Domains"]
        CLO["CLO Domain"]
        SVC["Servicer Domain"]
        CUSTOM["Custom Domains<br/>(Tenant-specific)"]
    end

    subgraph Profiles["Alias Profiles"]
        CLO_P["15 CLO Profiles"]
        SVC_P["14 Servicer Profiles"]
        CUSTOM_P["Custom Profiles"]
    end

    subgraph Ingest["Ingestion Pipeline"]
        DETECT["Profile<br/>Detection"]
        NORM["Column<br/>Normalization"]
        VALID["Schema<br/>Validation"]
        DIAG["Drift<br/>Detection"]
    end

    SCAN --> REG
    REG --> CLO
    REG --> SVC
    REG --> CUSTOM

    CLO --> CLO_P
    SVC --> SVC_P
    CUSTOM --> CUSTOM_P

    CLO_P --> DETECT
    SVC_P --> DETECT
    CUSTOM_P --> DETECT

    DETECT --> NORM
    NORM --> VALID
    VALID --> DIAG

    style REG fill:#DBEAFE,stroke:#3B82F6
    style DETECT fill:#FEF3C7,stroke:#F59E0B
    style DIAG fill:#FEE2E2,stroke:#EF4444
src/domains/registry.py
from typing import Protocol


class DomainModule(Protocol):
    """Protocol that all domain modules must implement."""

    @property
    def name(self) -> str:
        """Unique domain identifier (e.g., 'clo', 'servicer')."""
        ...

    @property
    def alias_profiles(self) -> dict[str, AliasProfile]:
        """Map of profile name to alias profile configuration."""
        ...

    def detect_profile(self, headers: list[str]) -> str | None:
        """Auto-detect which alias profile matches the given headers."""
        ...

    def normalize(
        self, df: pd.DataFrame, profile_name: str
    ) -> pd.DataFrame:
        """Normalize column names using the specified profile."""
        ...

    def validate(self, df: pd.DataFrame) -> ValidationResult:
        """Validate the normalized DataFrame against domain schema."""
        ...


class DomainRegistry:
    """Singleton registry for domain modules.

    Domains register themselves at application startup.
    The registry provides profile lookup and auto-detection
    for incoming data files.
    """

    _instance: "DomainRegistry | None" = None
    _domains: dict[str, DomainModule]

    def __new__(cls) -> "DomainRegistry":
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._domains = {}
        return cls._instance

    def register(self, domain: DomainModule) -> None:
        """Register a domain module."""
        if domain.name in self._domains:
            raise ValueError(
                f"Domain '{domain.name}' is already registered"
            )
        self._domains[domain.name] = domain

    def get_domain(self, name: str) -> DomainModule:
        """Retrieve a registered domain by name."""
        if name not in self._domains:
            raise KeyError(f"Domain '{name}' is not registered")
        return self._domains[name]

    def detect_domain_and_profile(
        self, headers: list[str]
    ) -> tuple[str, str] | None:
        """Auto-detect domain and profile from file headers.

        Iterates through all registered domains and their
        detection logic. Returns (domain_name, profile_name)
        or None if no match.
        """
        for name, domain in self._domains.items():
            profile = domain.detect_profile(headers)
            if profile is not None:
                return (name, profile)
        return None

    def list_domains(self) -> list[str]:
        """Return all registered domain names."""
        return list(self._domains.keys())

    def list_profiles(self, domain_name: str) -> list[str]:
        """Return all alias profile names for a domain."""
        domain = self.get_domain(domain_name)
        return list(domain.alias_profiles.keys())

Domain Module Registration

Domains register during application startup:

src/domains/__init__.py
from src.domains.registry import DomainRegistry
from src.domains.clo import CLODomain
from src.domains.servicer import ServicerDomain


def register_domains():
    """Register all domain modules with the global registry."""
    registry = DomainRegistry()
    registry.register(CLODomain())
    registry.register(ServicerDomain())

CLO Domain

The CLO domain handles structured finance Collateralized Loan Obligation data from 15 different provider formats.

Alias Profiles

Profile Name Provider/Source Key Fields
structured_finance_clo Standard CLO holdings WAL, WAS, ratings, spreads, prices
everest_extended Everest platform export Extended analytics, OC/IC tests
ratings_export Rating agency data feeds Moody's, S&P, Fitch ratings history
etfcom_clo_holdings ETF.com CLO data Holdings weights, NAV attribution
invesco_clo_holdings Invesco CLO portfolio Position-level detail, sector codes
ft_tearsheet_holdings FT tearsheet format Summary-level CLO positions
hartford_trpa_holdings Hartford TRPA reports TRPA-specific fields, compliance data
stockanalysis_clox_holdings StockAnalysis CLOX Market data, trading analytics
eagle_point_clo Eagle Point Credit Fund-level CLO allocations
pgim_clo_holdings PGIM Fixed Income Institutional CLO positions
ares_clo_portfolio Ares Management Portfolio construction data
blackrock_clo BlackRock Aladdin export Risk analytics, attribution
carlyle_clo_holdings Carlyle Group Direct lending CLO data
neuberger_clo Neuberger Berman Multi-strategy CLO data
generic_clo Fallback/custom format Minimal required fields only

Profile Configuration

Each alias profile is defined as a JSON configuration file:

config/clo_aliases/structured_finance_clo.json
{
    "profile_name": "structured_finance_clo",
    "version": "2.1",
    "description": "Standard CLO holdings format for structured finance data",
    "detection_headers": [
        "CUSIP", "Security Description", "Par Amount",
        "Market Value", "WAL", "WAS"
    ],
    "detection_threshold": 0.7,
    "column_mappings": {
        "CUSIP": "cusip",
        "ISIN": "isin",
        "Security Description": "security_description",
        "Security Name": "security_description",
        "Par Amount": "par_amount",
        "Par Value": "par_amount",
        "Current Face": "par_amount",
        "Market Value": "market_value",
        "MV": "market_value",
        "Price": "price",
        "Clean Price": "price",
        "Bid Price": "bid_price",
        "Ask Price": "ask_price",
        "Mid Price": "price",
        "Spread": "spread",
        "DM": "spread",
        "Discount Margin": "spread",
        "WAL": "wal",
        "Weighted Avg Life": "wal",
        "WAS": "was",
        "Weighted Avg Spread": "was",
        "Moody's": "rating_moody",
        "Moodys": "rating_moody",
        "Moody Rating": "rating_moody",
        "S&P": "rating_sp",
        "SP": "rating_sp",
        "S&P Rating": "rating_sp",
        "Fitch": "rating_fitch",
        "Fitch Rating": "rating_fitch",
        "Coupon": "coupon",
        "Coupon Rate": "coupon",
        "Maturity": "maturity_date",
        "Maturity Date": "maturity_date",
        "Sector": "sector",
        "Industry": "sector",
        "Country": "country",
        "Country of Risk": "country"
    },
    "required_fields": [
        "cusip", "par_amount", "price"
    ],
    "optional_fields": [
        "isin", "security_description", "market_value",
        "spread", "wal", "was", "rating_moody", "rating_sp",
        "rating_fitch", "coupon", "maturity_date", "sector", "country"
    ],
    "type_coercions": {
        "par_amount": "float",
        "market_value": "float",
        "price": "float",
        "spread": "float",
        "wal": "float",
        "was": "float",
        "coupon": "float"
    }
}

Detection Threshold

The detection_threshold (0.0 to 1.0) controls how many of the detection_headers must match for auto-detection. A threshold of 0.7 means 70% of the detection headers must be present in the file's column headers. Lower thresholds increase recall but risk false positives.


Servicer Domain

The Servicer domain processes loan-level data from servicer operational snapshots and regulatory reporting templates.

Alias Profiles

Profile Name Purpose Key Characteristics
key_data Master loan reference data Agency mappings, CUSIP lookups
risk_restrictions Compliance and risk limits Concentration limits, eligibility
capital_sheet Capital structure data Tranche details, subordination
abc_snapshot ABC servicer format CA-CI calculated columns
atlys_snapshot Atlys servicer format CB-CI calculated columns
pme_snapshot PME data format BZ-CB calculated columns
boe_sme Bank of England SME template UK regulatory loan-level
boe_corporate BoE Corporate template UK corporate lending
boe_asset_finance BoE Asset Finance template UK asset-backed
rba_rmbs RBA RMBS template Australian RMBS reporting
ecb_rmbs ECB RMBS template European RMBS securitization
ecb_decc_public ECB DECC Public Sector Public sector ABS
ecb_decc_sme ECB DECC SME SME securitization data
generic_servicer Fallback format Minimal required fields
config/servicer_aliases/key_data.json
{
    "profile_name": "key_data",
    "version": "1.3",
    "description": "Key Data reference sheet with agency mappings",
    "detection_headers": [
        "Y-Cusip-A", "Agency", "Senior Loan",
        "Senior Principal Balance"
    ],
    "detection_threshold": 0.75,
    "column_mappings": {
        "Y-Cusip-A": "y_cusip_a",
        "Y Cusip A": "y_cusip_a",
        "Agency": "agency",
        "Agency Name": "agency",
        "Senior Loan": "senior_loan",
        "Senior Loan Amount": "senior_loan",
        "Senior Principal Balance": "senior_principal_balance",
        "Sr Principal Bal": "senior_principal_balance",
        "Invoice Number": "invoice_number",
        "Invoice #": "invoice_number",
        "Days Open": "days_open",
        "Settlement Date": "settlement_date"
    },
    "required_fields": [
        "y_cusip_a", "agency", "senior_loan"
    ],
    "type_coercions": {
        "senior_loan": "float",
        "senior_principal_balance": "float",
        "days_open": "int",
        "invoice_number": "string"
    }
}

Invoice Number Type Coercion

The invoice_number field must always be coerced to string. Excel stores invoice numbers as floats (e.g., 200803.0), and without explicit string conversion, downstream lookups and comparisons will fail silently.


Column Normalization Pipeline

When a file is ingested, the normalization pipeline transforms raw headers into the canonical internal schema:

flowchart LR
    subgraph Input["Raw Input"]
        HEADERS["Raw Column<br/>Headers"]
    end

    subgraph Detect["Detection"]
        AUTO["Auto-Detect<br/>Profile"]
        OVERRIDE["Mapping Config<br/>Override"]
    end

    subgraph Normalize["Normalization"]
        ALIAS["Apply Alias<br/>Mappings"]
        CLEAN["Clean &<br/>Standardize"]
        COERCE["Type<br/>Coercion"]
    end

    subgraph Validate["Validation"]
        REQUIRED["Check Required<br/>Fields"]
        DRIFT["Schema Drift<br/>Detection"]
        DIAG["Generate<br/>Diagnostics"]
    end

    subgraph Output["Normalized Output"]
        DF["Canonical<br/>DataFrame"]
    end

    HEADERS --> AUTO
    AUTO --> ALIAS
    OVERRIDE --> ALIAS
    ALIAS --> CLEAN
    CLEAN --> COERCE
    COERCE --> REQUIRED
    REQUIRED --> DRIFT
    DRIFT --> DIAG
    DIAG --> DF

    style AUTO fill:#FEF3C7,stroke:#F59E0B
    style DRIFT fill:#FEE2E2,stroke:#EF4444
    style DF fill:#DCFCE7,stroke:#22C55E

Normalization Steps

src/domains/normalization.py
class ColumnNormalizer:
    """Transforms raw file headers into canonical column names
    using alias profile mappings.
    """

    def normalize(
        self,
        df: pd.DataFrame,
        profile: AliasProfile,
        mapping_overrides: dict[str, str] | None = None,
    ) -> NormalizationResult:
        """Apply column normalization pipeline.

        Priority order:
        1. Mapping config overrides (user-defined)
        2. Alias profile mappings (JSON config)
        3. Fuzzy matching fallback (optional)
        """
        # Step 1: Build effective mapping
        effective_mapping = dict(profile.column_mappings)
        if mapping_overrides:
            effective_mapping.update(mapping_overrides)

        # Step 2: Apply mappings
        rename_map = {}
        unmapped_columns = []
        for col in df.columns:
            normalized = self._normalize_header(col)
            if normalized in effective_mapping:
                rename_map[col] = effective_mapping[normalized]
            elif col in effective_mapping:
                rename_map[col] = effective_mapping[col]
            else:
                unmapped_columns.append(col)

        df = df.rename(columns=rename_map)

        # Step 3: Type coercion
        for field, target_type in profile.type_coercions.items():
            if field in df.columns:
                df[field] = self._coerce_type(df[field], target_type)

        # Step 4: Validate required fields
        missing_required = [
            f for f in profile.required_fields if f not in df.columns
        ]

        return NormalizationResult(
            dataframe=df,
            mapped_columns=rename_map,
            unmapped_columns=unmapped_columns,
            missing_required=missing_required,
        )

    @staticmethod
    def _normalize_header(header: str) -> str:
        """Standardize header for matching.

        - Strip whitespace
        - Lowercase
        - Replace special chars with underscores
        """
        import re
        header = str(header).strip().lower()
        header = re.sub(r"[^a-z0-9]+", "_", header)
        return header.strip("_")

    @staticmethod
    def _coerce_type(series: pd.Series, target_type: str) -> pd.Series:
        """Coerce a pandas Series to the target type."""
        coercion_map = {
            "float": lambda s: pd.to_numeric(s, errors="coerce"),
            "int": lambda s: pd.to_numeric(s, errors="coerce").astype("Int64"),
            "string": lambda s: s.astype(str),
            "date": lambda s: pd.to_datetime(s, errors="coerce"),
            "boolean": lambda s: s.astype(bool),
        }
        coercer = coercion_map.get(target_type)
        if coercer:
            return coercer(series)
        return series

Mapping Config Overrides

Users can override alias profile mappings through the Mappings API. Overrides take precedence over profile defaults:

Priority Source Scope
1 (highest) Mapping Config API Per-tenant, per-workbook
2 Alias Profile JSON Global, per-profile
3 (lowest) Fuzzy matching Automatic fallback

Mapping Hub UI

The Mapping Hub UI at /mappings provides a visual interface for creating and managing mapping overrides. Users can drag-and-drop source columns to target fields, preview normalization results, and save reusable mapping configurations.


Schema Drift Detection

Schema drift occurs when a data provider changes their file format -- adding, removing, or renaming columns. The drift detector identifies these changes and generates diagnostics:

src/domains/drift_detector.py
from dataclasses import dataclass, field
from enum import Enum


class DriftType(str, Enum):
    COLUMN_ADDED = "column_added"
    COLUMN_REMOVED = "column_removed"
    COLUMN_RENAMED = "column_renamed"
    TYPE_CHANGED = "type_changed"
    ORDER_CHANGED = "order_changed"


@dataclass
class DriftEvent:
    """A single schema drift event."""

    drift_type: DriftType
    column_name: str
    previous_value: str | None
    current_value: str | None
    confidence: float  # 0.0-1.0 for rename detection


@dataclass
class DriftReport:
    """Complete schema drift analysis for a single ingestion."""

    profile_name: str
    drift_events: list[DriftEvent] = field(default_factory=list)
    is_breaking: bool = False
    severity: str = "info"  # info, warning, error
    recommendation: str = ""

    @property
    def has_drift(self) -> bool:
        return len(self.drift_events) > 0


class SchemaDriftDetector:
    """Detects schema changes between ingestion runs.

    Compares the current file's column structure against
    the historical schema fingerprint stored from previous
    successful ingestions.
    """

    RENAME_SIMILARITY_THRESHOLD = 0.85

    def detect(
        self,
        current_headers: list[str],
        historical_fingerprint: SchemaFingerprint,
    ) -> DriftReport:
        """Compare current headers against historical schema."""
        events = []

        current_set = set(current_headers)
        historical_set = set(historical_fingerprint.columns)

        # Detect removed columns
        removed = historical_set - current_set
        for col in removed:
            events.append(DriftEvent(
                drift_type=DriftType.COLUMN_REMOVED,
                column_name=col,
                previous_value=col,
                current_value=None,
                confidence=1.0,
            ))

        # Detect added columns
        added = current_set - historical_set
        for col in added:
            # Check if this is a rename
            rename_match = self._find_rename_candidate(
                col, removed, historical_fingerprint
            )
            if rename_match:
                events.append(DriftEvent(
                    drift_type=DriftType.COLUMN_RENAMED,
                    column_name=rename_match,
                    previous_value=rename_match,
                    current_value=col,
                    confidence=rename_match.similarity,
                ))
                # Remove from added/removed
                removed.discard(rename_match.original)
            else:
                events.append(DriftEvent(
                    drift_type=DriftType.COLUMN_ADDED,
                    column_name=col,
                    previous_value=None,
                    current_value=col,
                    confidence=1.0,
                ))

        # Detect order changes
        if current_headers != historical_fingerprint.ordered_columns:
            preserved = [
                h for h in current_headers if h in historical_set
            ]
            expected = [
                h for h in historical_fingerprint.ordered_columns
                if h in current_set
            ]
            if preserved != expected:
                events.append(DriftEvent(
                    drift_type=DriftType.ORDER_CHANGED,
                    column_name="*",
                    previous_value=str(expected[:5]),
                    current_value=str(preserved[:5]),
                    confidence=1.0,
                ))

        # Classify severity
        is_breaking = any(
            e.drift_type == DriftType.COLUMN_REMOVED
            and e.column_name in historical_fingerprint.required_columns
            for e in events
        )

        return DriftReport(
            profile_name=historical_fingerprint.profile_name,
            drift_events=events,
            is_breaking=is_breaking,
            severity="error" if is_breaking else (
                "warning" if events else "info"
            ),
            recommendation=self._generate_recommendation(events, is_breaking),
        )

Ingest Diagnostics

Every ingestion produces a diagnostics payload stored alongside the workbook metadata:

Diagnostics structure in aggregated_metrics
{
    "clo": {
        "ingest_diagnostics": {
            "profile_used": "structured_finance_clo",
            "profile_version": "2.1",
            "detection_confidence": 0.85,
            "schema_drift": {
                "has_drift": true,
                "events": [
                    {
                        "type": "column_added",
                        "column": "ESG Score",
                        "confidence": 1.0
                    }
                ],
                "severity": "info"
            },
            "mapping_audit": {
                "total_columns": 42,
                "mapped_columns": 38,
                "unmapped_columns": ["Custom Field 1", "Notes", "Flag", "ESG Score"],
                "mapping_coverage": 0.905
            },
            "input_checksums": {
                "file_sha256": "a1b2c3...",
                "row_count": 1247,
                "column_count": 42
            },
            "formula_risk": {
                "formulas_detected": 12,
                "circular_references": 0,
                "external_references": 0
            },
            "locale_risk": {
                "decimal_separator": ".",
                "date_format": "MM/DD/YYYY",
                "encoding": "utf-8"
            }
        }
    }
}
Diagnostic Purpose Action When Abnormal
Schema Drift Detect format changes from providers Alert data ops team if breaking
Mapping Audit Track mapping coverage and gaps Update alias profile or add overrides
Input Checksums File integrity and deduplication Reject duplicate uploads
Formula Risk Identify potentially unsafe formulas Flag for review before evaluation
Locale Risk Detect regional formatting differences Apply locale-specific parsing

Domain Module Lifecycle

sequenceDiagram
    participant App as Application Startup
    participant Reg as DomainRegistry
    participant CLO as CLO Domain
    participant SVC as Servicer Domain
    participant Ingest as Ingestion Pipeline

    App->>Reg: Initialize singleton
    App->>CLO: Load alias profiles from JSON
    CLO->>Reg: register(CLODomain)
    App->>SVC: Load alias profiles from JSON
    SVC->>Reg: register(ServicerDomain)

    Note over Reg: Registry ready

    Ingest->>Reg: detect_domain_and_profile(headers)
    Reg->>CLO: detect_profile(headers)
    CLO-->>Reg: "structured_finance_clo"
    Reg-->>Ingest: ("clo", "structured_finance_clo")

    Ingest->>CLO: normalize(df, profile)
    CLO-->>Ingest: NormalizationResult

    Ingest->>CLO: validate(df)
    CLO-->>Ingest: ValidationResult

Adding a New Domain

To add a new domain module:

  1. Create the domain class implementing the DomainModule protocol:
src/domains/new_domain.py
class NewDomain:
    """New domain module implementation."""

    @property
    def name(self) -> str:
        return "new_domain"

    @property
    def alias_profiles(self) -> dict[str, AliasProfile]:
        return self._profiles

    def __init__(self):
        self._profiles = load_profiles_from_json(
            "config/new_domain_aliases/"
        )

    def detect_profile(self, headers: list[str]) -> str | None:
        for name, profile in self._profiles.items():
            score = compute_header_match_score(
                headers, profile.detection_headers
            )
            if score >= profile.detection_threshold:
                return name
        return None

    def normalize(self, df, profile_name):
        profile = self._profiles[profile_name]
        return ColumnNormalizer().normalize(df, profile)

    def validate(self, df):
        return SchemaValidator().validate(df, self._profiles)
  1. Create alias profile JSON files in config/new_domain_aliases/.

  2. Register the domain in src/domains/__init__.py.

  3. Write tests covering profile detection, normalization, and validation.


Metrics and Monitoring

Metric Type Description
domain_ingest_total Counter Total ingestions by domain and profile
domain_detection_duration_seconds Histogram Profile auto-detection time
domain_normalization_coverage Gauge Column mapping coverage ratio
domain_drift_events_total Counter Schema drift events by type
domain_unmapped_columns_total Gauge Unmapped columns per ingestion
domain_validation_failures_total Counter Validation failures by domain

Configuration Reference

Domain module configuration (environment variables)
# Domain detection
DOMAIN_AUTO_DETECT: true                # Enable automatic profile detection
DOMAIN_DETECTION_THRESHOLD: 0.7        # Default detection threshold
DOMAIN_FUZZY_MATCHING: true            # Enable fuzzy column matching

# Schema drift
DOMAIN_DRIFT_DETECTION: true           # Enable schema drift detection
DOMAIN_DRIFT_ALERT_ON_BREAKING: true   # Alert on breaking schema changes
DOMAIN_DRIFT_HISTORY_DEPTH: 10         # Number of historical schemas to keep

# Alias profiles
CLO_ALIAS_DIR: config/clo_aliases      # CLO alias profile directory
SERVICER_ALIAS_DIR: config/servicer_aliases  # Servicer alias profile directory