Domain Modules Architecture¶
CalcBridge organizes financial data processing into domain modules -- self-contained units that encapsulate the ingestion rules, column mappings, and validation logic for a specific class of financial data. This document covers the registry pattern, alias profile system, column normalization pipeline, and schema drift detection.
Overview¶
CalcBridge currently supports two primary domains, each with multiple alias profiles for handling data from different providers:
| Domain | Alias Profiles | Primary Use Case |
|---|---|---|
| CLO | 15 profiles | Structured finance CLO holdings and analytics |
| Servicer | 14 profiles | Loan servicer snapshots, regulatory templates, capital sheets |
Each domain registers with the central DomainRegistry at startup, declaring its alias profiles, validation rules, and normalization pipeline.
Registry Pattern¶
DomainRegistry¶
The DomainRegistry is a singleton that manages domain module lifecycle -- discovery, registration, and lookup:
flowchart TB
subgraph Startup["Application Startup"]
SCAN["Scan Domain<br/>Modules"]
REG["DomainRegistry<br/>(Singleton)"]
end
subgraph Domains["Registered Domains"]
CLO["CLO Domain"]
SVC["Servicer Domain"]
CUSTOM["Custom Domains<br/>(Tenant-specific)"]
end
subgraph Profiles["Alias Profiles"]
CLO_P["15 CLO Profiles"]
SVC_P["14 Servicer Profiles"]
CUSTOM_P["Custom Profiles"]
end
subgraph Ingest["Ingestion Pipeline"]
DETECT["Profile<br/>Detection"]
NORM["Column<br/>Normalization"]
VALID["Schema<br/>Validation"]
DIAG["Drift<br/>Detection"]
end
SCAN --> REG
REG --> CLO
REG --> SVC
REG --> CUSTOM
CLO --> CLO_P
SVC --> SVC_P
CUSTOM --> CUSTOM_P
CLO_P --> DETECT
SVC_P --> DETECT
CUSTOM_P --> DETECT
DETECT --> NORM
NORM --> VALID
VALID --> DIAG
style REG fill:#DBEAFE,stroke:#3B82F6
style DETECT fill:#FEF3C7,stroke:#F59E0B
style DIAG fill:#FEE2E2,stroke:#EF4444 from typing import Protocol
class DomainModule(Protocol):
"""Protocol that all domain modules must implement."""
@property
def name(self) -> str:
"""Unique domain identifier (e.g., 'clo', 'servicer')."""
...
@property
def alias_profiles(self) -> dict[str, AliasProfile]:
"""Map of profile name to alias profile configuration."""
...
def detect_profile(self, headers: list[str]) -> str | None:
"""Auto-detect which alias profile matches the given headers."""
...
def normalize(
self, df: pd.DataFrame, profile_name: str
) -> pd.DataFrame:
"""Normalize column names using the specified profile."""
...
def validate(self, df: pd.DataFrame) -> ValidationResult:
"""Validate the normalized DataFrame against domain schema."""
...
class DomainRegistry:
"""Singleton registry for domain modules.
Domains register themselves at application startup.
The registry provides profile lookup and auto-detection
for incoming data files.
"""
_instance: "DomainRegistry | None" = None
_domains: dict[str, DomainModule]
def __new__(cls) -> "DomainRegistry":
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._domains = {}
return cls._instance
def register(self, domain: DomainModule) -> None:
"""Register a domain module."""
if domain.name in self._domains:
raise ValueError(
f"Domain '{domain.name}' is already registered"
)
self._domains[domain.name] = domain
def get_domain(self, name: str) -> DomainModule:
"""Retrieve a registered domain by name."""
if name not in self._domains:
raise KeyError(f"Domain '{name}' is not registered")
return self._domains[name]
def detect_domain_and_profile(
self, headers: list[str]
) -> tuple[str, str] | None:
"""Auto-detect domain and profile from file headers.
Iterates through all registered domains and their
detection logic. Returns (domain_name, profile_name)
or None if no match.
"""
for name, domain in self._domains.items():
profile = domain.detect_profile(headers)
if profile is not None:
return (name, profile)
return None
def list_domains(self) -> list[str]:
"""Return all registered domain names."""
return list(self._domains.keys())
def list_profiles(self, domain_name: str) -> list[str]:
"""Return all alias profile names for a domain."""
domain = self.get_domain(domain_name)
return list(domain.alias_profiles.keys())
Domain Module Registration¶
Domains register during application startup:
from src.domains.registry import DomainRegistry
from src.domains.clo import CLODomain
from src.domains.servicer import ServicerDomain
def register_domains():
"""Register all domain modules with the global registry."""
registry = DomainRegistry()
registry.register(CLODomain())
registry.register(ServicerDomain())
CLO Domain¶
The CLO domain handles structured finance Collateralized Loan Obligation data from 15 different provider formats.
Alias Profiles¶
| Profile Name | Provider/Source | Key Fields |
|---|---|---|
structured_finance_clo | Standard CLO holdings | WAL, WAS, ratings, spreads, prices |
everest_extended | Everest platform export | Extended analytics, OC/IC tests |
ratings_export | Rating agency data feeds | Moody's, S&P, Fitch ratings history |
etfcom_clo_holdings | ETF.com CLO data | Holdings weights, NAV attribution |
invesco_clo_holdings | Invesco CLO portfolio | Position-level detail, sector codes |
ft_tearsheet_holdings | FT tearsheet format | Summary-level CLO positions |
hartford_trpa_holdings | Hartford TRPA reports | TRPA-specific fields, compliance data |
stockanalysis_clox_holdings | StockAnalysis CLOX | Market data, trading analytics |
eagle_point_clo | Eagle Point Credit | Fund-level CLO allocations |
pgim_clo_holdings | PGIM Fixed Income | Institutional CLO positions |
ares_clo_portfolio | Ares Management | Portfolio construction data |
blackrock_clo | BlackRock Aladdin export | Risk analytics, attribution |
carlyle_clo_holdings | Carlyle Group | Direct lending CLO data |
neuberger_clo | Neuberger Berman | Multi-strategy CLO data |
generic_clo | Fallback/custom format | Minimal required fields only |
Profile Configuration¶
Each alias profile is defined as a JSON configuration file:
{
"profile_name": "structured_finance_clo",
"version": "2.1",
"description": "Standard CLO holdings format for structured finance data",
"detection_headers": [
"CUSIP", "Security Description", "Par Amount",
"Market Value", "WAL", "WAS"
],
"detection_threshold": 0.7,
"column_mappings": {
"CUSIP": "cusip",
"ISIN": "isin",
"Security Description": "security_description",
"Security Name": "security_description",
"Par Amount": "par_amount",
"Par Value": "par_amount",
"Current Face": "par_amount",
"Market Value": "market_value",
"MV": "market_value",
"Price": "price",
"Clean Price": "price",
"Bid Price": "bid_price",
"Ask Price": "ask_price",
"Mid Price": "price",
"Spread": "spread",
"DM": "spread",
"Discount Margin": "spread",
"WAL": "wal",
"Weighted Avg Life": "wal",
"WAS": "was",
"Weighted Avg Spread": "was",
"Moody's": "rating_moody",
"Moodys": "rating_moody",
"Moody Rating": "rating_moody",
"S&P": "rating_sp",
"SP": "rating_sp",
"S&P Rating": "rating_sp",
"Fitch": "rating_fitch",
"Fitch Rating": "rating_fitch",
"Coupon": "coupon",
"Coupon Rate": "coupon",
"Maturity": "maturity_date",
"Maturity Date": "maturity_date",
"Sector": "sector",
"Industry": "sector",
"Country": "country",
"Country of Risk": "country"
},
"required_fields": [
"cusip", "par_amount", "price"
],
"optional_fields": [
"isin", "security_description", "market_value",
"spread", "wal", "was", "rating_moody", "rating_sp",
"rating_fitch", "coupon", "maturity_date", "sector", "country"
],
"type_coercions": {
"par_amount": "float",
"market_value": "float",
"price": "float",
"spread": "float",
"wal": "float",
"was": "float",
"coupon": "float"
}
}
Detection Threshold
The detection_threshold (0.0 to 1.0) controls how many of the detection_headers must match for auto-detection. A threshold of 0.7 means 70% of the detection headers must be present in the file's column headers. Lower thresholds increase recall but risk false positives.
Servicer Domain¶
The Servicer domain processes loan-level data from servicer operational snapshots and regulatory reporting templates.
Alias Profiles¶
| Profile Name | Purpose | Key Characteristics |
|---|---|---|
key_data | Master loan reference data | Agency mappings, CUSIP lookups |
risk_restrictions | Compliance and risk limits | Concentration limits, eligibility |
capital_sheet | Capital structure data | Tranche details, subordination |
abc_snapshot | ABC servicer format | CA-CI calculated columns |
atlys_snapshot | Atlys servicer format | CB-CI calculated columns |
pme_snapshot | PME data format | BZ-CB calculated columns |
boe_sme | Bank of England SME template | UK regulatory loan-level |
boe_corporate | BoE Corporate template | UK corporate lending |
boe_asset_finance | BoE Asset Finance template | UK asset-backed |
rba_rmbs | RBA RMBS template | Australian RMBS reporting |
ecb_rmbs | ECB RMBS template | European RMBS securitization |
ecb_decc_public | ECB DECC Public Sector | Public sector ABS |
ecb_decc_sme | ECB DECC SME | SME securitization data |
generic_servicer | Fallback format | Minimal required fields |
{
"profile_name": "key_data",
"version": "1.3",
"description": "Key Data reference sheet with agency mappings",
"detection_headers": [
"Y-Cusip-A", "Agency", "Senior Loan",
"Senior Principal Balance"
],
"detection_threshold": 0.75,
"column_mappings": {
"Y-Cusip-A": "y_cusip_a",
"Y Cusip A": "y_cusip_a",
"Agency": "agency",
"Agency Name": "agency",
"Senior Loan": "senior_loan",
"Senior Loan Amount": "senior_loan",
"Senior Principal Balance": "senior_principal_balance",
"Sr Principal Bal": "senior_principal_balance",
"Invoice Number": "invoice_number",
"Invoice #": "invoice_number",
"Days Open": "days_open",
"Settlement Date": "settlement_date"
},
"required_fields": [
"y_cusip_a", "agency", "senior_loan"
],
"type_coercions": {
"senior_loan": "float",
"senior_principal_balance": "float",
"days_open": "int",
"invoice_number": "string"
}
}
Invoice Number Type Coercion
The invoice_number field must always be coerced to string. Excel stores invoice numbers as floats (e.g., 200803.0), and without explicit string conversion, downstream lookups and comparisons will fail silently.
Column Normalization Pipeline¶
When a file is ingested, the normalization pipeline transforms raw headers into the canonical internal schema:
flowchart LR
subgraph Input["Raw Input"]
HEADERS["Raw Column<br/>Headers"]
end
subgraph Detect["Detection"]
AUTO["Auto-Detect<br/>Profile"]
OVERRIDE["Mapping Config<br/>Override"]
end
subgraph Normalize["Normalization"]
ALIAS["Apply Alias<br/>Mappings"]
CLEAN["Clean &<br/>Standardize"]
COERCE["Type<br/>Coercion"]
end
subgraph Validate["Validation"]
REQUIRED["Check Required<br/>Fields"]
DRIFT["Schema Drift<br/>Detection"]
DIAG["Generate<br/>Diagnostics"]
end
subgraph Output["Normalized Output"]
DF["Canonical<br/>DataFrame"]
end
HEADERS --> AUTO
AUTO --> ALIAS
OVERRIDE --> ALIAS
ALIAS --> CLEAN
CLEAN --> COERCE
COERCE --> REQUIRED
REQUIRED --> DRIFT
DRIFT --> DIAG
DIAG --> DF
style AUTO fill:#FEF3C7,stroke:#F59E0B
style DRIFT fill:#FEE2E2,stroke:#EF4444
style DF fill:#DCFCE7,stroke:#22C55E Normalization Steps¶
class ColumnNormalizer:
"""Transforms raw file headers into canonical column names
using alias profile mappings.
"""
def normalize(
self,
df: pd.DataFrame,
profile: AliasProfile,
mapping_overrides: dict[str, str] | None = None,
) -> NormalizationResult:
"""Apply column normalization pipeline.
Priority order:
1. Mapping config overrides (user-defined)
2. Alias profile mappings (JSON config)
3. Fuzzy matching fallback (optional)
"""
# Step 1: Build effective mapping
effective_mapping = dict(profile.column_mappings)
if mapping_overrides:
effective_mapping.update(mapping_overrides)
# Step 2: Apply mappings
rename_map = {}
unmapped_columns = []
for col in df.columns:
normalized = self._normalize_header(col)
if normalized in effective_mapping:
rename_map[col] = effective_mapping[normalized]
elif col in effective_mapping:
rename_map[col] = effective_mapping[col]
else:
unmapped_columns.append(col)
df = df.rename(columns=rename_map)
# Step 3: Type coercion
for field, target_type in profile.type_coercions.items():
if field in df.columns:
df[field] = self._coerce_type(df[field], target_type)
# Step 4: Validate required fields
missing_required = [
f for f in profile.required_fields if f not in df.columns
]
return NormalizationResult(
dataframe=df,
mapped_columns=rename_map,
unmapped_columns=unmapped_columns,
missing_required=missing_required,
)
@staticmethod
def _normalize_header(header: str) -> str:
"""Standardize header for matching.
- Strip whitespace
- Lowercase
- Replace special chars with underscores
"""
import re
header = str(header).strip().lower()
header = re.sub(r"[^a-z0-9]+", "_", header)
return header.strip("_")
@staticmethod
def _coerce_type(series: pd.Series, target_type: str) -> pd.Series:
"""Coerce a pandas Series to the target type."""
coercion_map = {
"float": lambda s: pd.to_numeric(s, errors="coerce"),
"int": lambda s: pd.to_numeric(s, errors="coerce").astype("Int64"),
"string": lambda s: s.astype(str),
"date": lambda s: pd.to_datetime(s, errors="coerce"),
"boolean": lambda s: s.astype(bool),
}
coercer = coercion_map.get(target_type)
if coercer:
return coercer(series)
return series
Mapping Config Overrides¶
Users can override alias profile mappings through the Mappings API. Overrides take precedence over profile defaults:
| Priority | Source | Scope |
|---|---|---|
| 1 (highest) | Mapping Config API | Per-tenant, per-workbook |
| 2 | Alias Profile JSON | Global, per-profile |
| 3 (lowest) | Fuzzy matching | Automatic fallback |
Mapping Hub UI
The Mapping Hub UI at /mappings provides a visual interface for creating and managing mapping overrides. Users can drag-and-drop source columns to target fields, preview normalization results, and save reusable mapping configurations.
Schema Drift Detection¶
Schema drift occurs when a data provider changes their file format -- adding, removing, or renaming columns. The drift detector identifies these changes and generates diagnostics:
from dataclasses import dataclass, field
from enum import Enum
class DriftType(str, Enum):
COLUMN_ADDED = "column_added"
COLUMN_REMOVED = "column_removed"
COLUMN_RENAMED = "column_renamed"
TYPE_CHANGED = "type_changed"
ORDER_CHANGED = "order_changed"
@dataclass
class DriftEvent:
"""A single schema drift event."""
drift_type: DriftType
column_name: str
previous_value: str | None
current_value: str | None
confidence: float # 0.0-1.0 for rename detection
@dataclass
class DriftReport:
"""Complete schema drift analysis for a single ingestion."""
profile_name: str
drift_events: list[DriftEvent] = field(default_factory=list)
is_breaking: bool = False
severity: str = "info" # info, warning, error
recommendation: str = ""
@property
def has_drift(self) -> bool:
return len(self.drift_events) > 0
class SchemaDriftDetector:
"""Detects schema changes between ingestion runs.
Compares the current file's column structure against
the historical schema fingerprint stored from previous
successful ingestions.
"""
RENAME_SIMILARITY_THRESHOLD = 0.85
def detect(
self,
current_headers: list[str],
historical_fingerprint: SchemaFingerprint,
) -> DriftReport:
"""Compare current headers against historical schema."""
events = []
current_set = set(current_headers)
historical_set = set(historical_fingerprint.columns)
# Detect removed columns
removed = historical_set - current_set
for col in removed:
events.append(DriftEvent(
drift_type=DriftType.COLUMN_REMOVED,
column_name=col,
previous_value=col,
current_value=None,
confidence=1.0,
))
# Detect added columns
added = current_set - historical_set
for col in added:
# Check if this is a rename
rename_match = self._find_rename_candidate(
col, removed, historical_fingerprint
)
if rename_match:
events.append(DriftEvent(
drift_type=DriftType.COLUMN_RENAMED,
column_name=rename_match,
previous_value=rename_match,
current_value=col,
confidence=rename_match.similarity,
))
# Remove from added/removed
removed.discard(rename_match.original)
else:
events.append(DriftEvent(
drift_type=DriftType.COLUMN_ADDED,
column_name=col,
previous_value=None,
current_value=col,
confidence=1.0,
))
# Detect order changes
if current_headers != historical_fingerprint.ordered_columns:
preserved = [
h for h in current_headers if h in historical_set
]
expected = [
h for h in historical_fingerprint.ordered_columns
if h in current_set
]
if preserved != expected:
events.append(DriftEvent(
drift_type=DriftType.ORDER_CHANGED,
column_name="*",
previous_value=str(expected[:5]),
current_value=str(preserved[:5]),
confidence=1.0,
))
# Classify severity
is_breaking = any(
e.drift_type == DriftType.COLUMN_REMOVED
and e.column_name in historical_fingerprint.required_columns
for e in events
)
return DriftReport(
profile_name=historical_fingerprint.profile_name,
drift_events=events,
is_breaking=is_breaking,
severity="error" if is_breaking else (
"warning" if events else "info"
),
recommendation=self._generate_recommendation(events, is_breaking),
)
Ingest Diagnostics¶
Every ingestion produces a diagnostics payload stored alongside the workbook metadata:
{
"clo": {
"ingest_diagnostics": {
"profile_used": "structured_finance_clo",
"profile_version": "2.1",
"detection_confidence": 0.85,
"schema_drift": {
"has_drift": true,
"events": [
{
"type": "column_added",
"column": "ESG Score",
"confidence": 1.0
}
],
"severity": "info"
},
"mapping_audit": {
"total_columns": 42,
"mapped_columns": 38,
"unmapped_columns": ["Custom Field 1", "Notes", "Flag", "ESG Score"],
"mapping_coverage": 0.905
},
"input_checksums": {
"file_sha256": "a1b2c3...",
"row_count": 1247,
"column_count": 42
},
"formula_risk": {
"formulas_detected": 12,
"circular_references": 0,
"external_references": 0
},
"locale_risk": {
"decimal_separator": ".",
"date_format": "MM/DD/YYYY",
"encoding": "utf-8"
}
}
}
}
| Diagnostic | Purpose | Action When Abnormal |
|---|---|---|
| Schema Drift | Detect format changes from providers | Alert data ops team if breaking |
| Mapping Audit | Track mapping coverage and gaps | Update alias profile or add overrides |
| Input Checksums | File integrity and deduplication | Reject duplicate uploads |
| Formula Risk | Identify potentially unsafe formulas | Flag for review before evaluation |
| Locale Risk | Detect regional formatting differences | Apply locale-specific parsing |
Domain Module Lifecycle¶
sequenceDiagram
participant App as Application Startup
participant Reg as DomainRegistry
participant CLO as CLO Domain
participant SVC as Servicer Domain
participant Ingest as Ingestion Pipeline
App->>Reg: Initialize singleton
App->>CLO: Load alias profiles from JSON
CLO->>Reg: register(CLODomain)
App->>SVC: Load alias profiles from JSON
SVC->>Reg: register(ServicerDomain)
Note over Reg: Registry ready
Ingest->>Reg: detect_domain_and_profile(headers)
Reg->>CLO: detect_profile(headers)
CLO-->>Reg: "structured_finance_clo"
Reg-->>Ingest: ("clo", "structured_finance_clo")
Ingest->>CLO: normalize(df, profile)
CLO-->>Ingest: NormalizationResult
Ingest->>CLO: validate(df)
CLO-->>Ingest: ValidationResult Adding a New Domain¶
To add a new domain module:
- Create the domain class implementing the
DomainModuleprotocol:
class NewDomain:
"""New domain module implementation."""
@property
def name(self) -> str:
return "new_domain"
@property
def alias_profiles(self) -> dict[str, AliasProfile]:
return self._profiles
def __init__(self):
self._profiles = load_profiles_from_json(
"config/new_domain_aliases/"
)
def detect_profile(self, headers: list[str]) -> str | None:
for name, profile in self._profiles.items():
score = compute_header_match_score(
headers, profile.detection_headers
)
if score >= profile.detection_threshold:
return name
return None
def normalize(self, df, profile_name):
profile = self._profiles[profile_name]
return ColumnNormalizer().normalize(df, profile)
def validate(self, df):
return SchemaValidator().validate(df, self._profiles)
-
Create alias profile JSON files in
config/new_domain_aliases/. -
Register the domain in
src/domains/__init__.py. -
Write tests covering profile detection, normalization, and validation.
Metrics and Monitoring¶
| Metric | Type | Description |
|---|---|---|
domain_ingest_total | Counter | Total ingestions by domain and profile |
domain_detection_duration_seconds | Histogram | Profile auto-detection time |
domain_normalization_coverage | Gauge | Column mapping coverage ratio |
domain_drift_events_total | Counter | Schema drift events by type |
domain_unmapped_columns_total | Gauge | Unmapped columns per ingestion |
domain_validation_failures_total | Counter | Validation failures by domain |
Configuration Reference¶
# Domain detection
DOMAIN_AUTO_DETECT: true # Enable automatic profile detection
DOMAIN_DETECTION_THRESHOLD: 0.7 # Default detection threshold
DOMAIN_FUZZY_MATCHING: true # Enable fuzzy column matching
# Schema drift
DOMAIN_DRIFT_DETECTION: true # Enable schema drift detection
DOMAIN_DRIFT_ALERT_ON_BREAKING: true # Alert on breaking schema changes
DOMAIN_DRIFT_HISTORY_DEPTH: 10 # Number of historical schemas to keep
# Alias profiles
CLO_ALIAS_DIR: config/clo_aliases # CLO alias profile directory
SERVICER_ALIAS_DIR: config/servicer_aliases # Servicer alias profile directory
Related Documentation¶
- Data Flow & Processing Pipeline -- How ingested data flows through the system
- Reconciliation Engine -- Using domain-normalized data for reconciliation
- Formula Engine -- Formula evaluation on normalized data
- System Design -- Component overview and service interactions