Data Flow & Processing Pipeline¶
This document describes how data flows through CalcBridge, from Excel file upload to database storage and calculation execution.
Excel Upload Flow¶
flowchart TB
subgraph Upload["1. Upload"]
FILE["Excel File<br/>(xlsx, xls, xlsm)"]
VALIDATE["File Validation"]
STORE["Store to S3/Local"]
end
subgraph Parse["2. Parse (Async)"]
QUEUE["Celery Queue"]
WORKER["Parse Worker"]
OPENPYXL["openpyxl Parser"]
end
subgraph Process["3. Process"]
TYPE_CONV["Type Conversion"]
FORMULA_EXT["Formula Extraction"]
CALC_SVC["Calculation Service"]
end
subgraph Store["4. Store"]
PG["PostgreSQL<br/>(JSONB)"]
CACHE["Valkey Cache"]
end
FILE --> VALIDATE
VALIDATE --> STORE
STORE --> QUEUE
QUEUE --> WORKER
WORKER --> OPENPYXL
OPENPYXL --> TYPE_CONV
TYPE_CONV --> FORMULA_EXT
FORMULA_EXT --> CALC_SVC
CALC_SVC --> PG
CALC_SVC --> CACHE
style VALIDATE fill:#FEF3C7,stroke:#F59E0B
style CALC_SVC fill:#DCFCE7,stroke:#22C55E
style PG fill:#DBEAFE,stroke:#3B82F6 Detailed Processing Stages¶
Stage 1: File Validation¶
Before processing, files undergo validation checks:
| Check | Description | Action on Failure |
|---|---|---|
| File Size | Max 100MB | Reject with 413 |
| File Extension | xlsx, xls, xlsm | Reject with 400 |
| MIME Type | Verify actual type | Reject with 400 |
| Malware Scan | Virus check (optional) | Reject with 422 |
# src/imports/validation.py
ALLOWED_EXTENSIONS = {".xlsx", ".xls", ".xlsm"}
MAX_FILE_SIZE = 100 * 1024 * 1024 # 100MB
def validate_upload(file: UploadFile) -> ValidationResult:
"""Validate uploaded Excel file."""
# Check extension
ext = Path(file.filename).suffix.lower()
if ext not in ALLOWED_EXTENSIONS:
raise ValidationError(f"Unsupported format: {ext}")
# Check size
if file.size > MAX_FILE_SIZE:
raise ValidationError("File too large")
# Verify MIME type matches extension
mime_type = magic.from_buffer(file.file.read(2048), mime=True)
if not is_valid_excel_mime(mime_type, ext):
raise ValidationError("Invalid file content")
return ValidationResult(valid=True)
Stage 2: Excel Parsing¶
The parser extracts all data from the workbook:
flowchart LR
subgraph Workbook
WB["Workbook"]
end
subgraph Sheets
S1["Sheet 1"]
S2["Sheet 2"]
SN["Sheet N"]
end
subgraph Data
META["Metadata"]
CELLS["Cell Data"]
FORMULAS["Formulas"]
STYLES["Styles"]
end
WB --> S1
WB --> S2
WB --> SN
S1 --> META
S1 --> CELLS
S1 --> FORMULAS
S1 --> STYLES Extracted Information:
| Component | Data Extracted |
|---|---|
| Metadata | Sheet names, dimensions, defined names |
| Cell Data | Values, types, formatting |
| Formulas | Raw formula strings, dependencies |
| Styles | Number formats (for type inference) |
Stage 3: Type Conversion¶
Excel types are converted to appropriate Python/database types:
flowchart LR
subgraph Excel["Excel Types"]
E_NUM["Number"]
E_STR["String"]
E_BOOL["Boolean"]
E_DATE["Date"]
E_ERR["Error"]
E_EMPTY["Empty"]
end
subgraph Python["Python Types"]
P_FLOAT["float / Decimal"]
P_STR["str"]
P_BOOL["bool"]
P_DATE["datetime"]
P_NONE["None"]
P_ERR["ErrorValue"]
end
subgraph DB["Database (JSONB)"]
DB_NUM["number"]
DB_STR["string"]
DB_BOOL["boolean"]
DB_DATE["ISO 8601"]
DB_NULL["null"]
DB_ERR["error object"]
end
E_NUM --> P_FLOAT --> DB_NUM
E_STR --> P_STR --> DB_STR
E_BOOL --> P_BOOL --> DB_BOOL
E_DATE --> P_DATE --> DB_DATE
E_EMPTY --> P_NONE --> DB_NULL
E_ERR --> P_ERR --> DB_ERR Type Conversion Rules:
| Excel Type | Python Type | JSONB Type | Notes |
|---|---|---|---|
| Integer | int | number | Preserved exactly |
| Decimal | Decimal | number | Financial precision |
| Percentage | Percentage | number | Stored as decimal (0.05 for 5%) |
| Currency | Money | number | 2 decimal places |
| Date | datetime | string | ISO 8601 format |
| Time | datetime | string | ISO 8601 format |
| Boolean | bool | boolean | TRUE/FALSE |
| String | str | string | UTF-8 encoded |
| Error | ErrorValue | object | {"error": "#DIV/0!"} |
| Empty | None | null | NULL value |
Code Example:
# src/imports/type_converter.py
from decimal import Decimal
from datetime import datetime
from src.calculations.types import Money, Percentage, FinancialRate
class ExcelTypeConverter:
"""Convert Excel values to appropriate Python types."""
def convert(self, value: Any, cell_format: str) -> Any:
"""Convert a cell value based on its format."""
if value is None:
return None
# Check for date formats
if self._is_date_format(cell_format):
return self._convert_date(value)
# Check for percentage
if self._is_percentage_format(cell_format):
return Percentage(value)
# Check for currency
if self._is_currency_format(cell_format):
return Money(value, currency=self._extract_currency(cell_format))
# Numeric values
if isinstance(value, (int, float)):
# Use Decimal for financial precision
return Decimal(str(value))
return value
Stage 4: Calculation Execution¶
Formulas are processed using vectorized operations:
flowchart TB
subgraph Input["Formula Input"]
RAW["=IF(A1>0, A1*[@Rate], 0)"]
end
subgraph Parse["Parse Phase"]
TOKEN["Tokenize"]
AST["Build AST"]
end
subgraph Translate["Translate Phase"]
PANDAS["To Pandas Ops"]
DEPS["Resolve Dependencies"]
end
subgraph Execute["Execute Phase"]
VEC["Vectorized Calc"]
RESULT["Result Array"]
end
RAW --> TOKEN
TOKEN --> AST
AST --> PANDAS
PANDAS --> DEPS
DEPS --> VEC
VEC --> RESULT
style VEC fill:#DCFCE7,stroke:#22C55E Vectorized Operations:
# src/calculations/functions/vectorized.py
import numpy as np
import pandas as pd
def vectorized_if(
condition: pd.Series,
value_if_true: Any,
value_if_false: Any
) -> pd.Series:
"""Vectorized IF function - 100x faster than iterrows."""
return np.where(condition, value_if_true, value_if_false)
def vectorized_xlookup(
lookup_series: pd.Series,
lookup_dict: dict,
default: Any = None
) -> pd.Series:
"""Vectorized XLOOKUP using dictionary mapping."""
return lookup_series.map(lookup_dict).fillna(default)
def vectorized_sumifs(
sum_range: pd.Series,
criteria_range: pd.Series,
criteria: Any
) -> float:
"""Vectorized SUMIFS aggregation."""
mask = criteria_range == criteria
return sum_range[mask].sum()
Stage 5: Database Storage¶
Data is stored in PostgreSQL using JSONB for flexibility:
erDiagram
WORKBOOK ||--o{ SHEET : contains
SHEET ||--|| CELL_DATA : stores
WORKBOOK {
uuid id PK
uuid tenant_id FK
string name
jsonb metadata
timestamp created_at
}
SHEET {
uuid id PK
uuid workbook_id FK
string name
integer row_count
integer col_count
jsonb settings
}
CELL_DATA {
uuid id PK
uuid sheet_id FK
jsonb data
jsonb formulas
jsonb calculated
} JSONB Storage Structure:
{
"cells": {
"A1": {"value": 100, "type": "number"},
"A2": {"value": "2024-01-15", "type": "date"},
"A3": {"value": true, "type": "boolean"},
"B1": {"formula": "=A1*2", "value": 200, "type": "number"},
"B2": {"formula": "=XLOOKUP(A1, Data!A:A, Data!B:B)", "value": "Found", "type": "string"}
},
"row_count": 1000,
"col_count": 26,
"named_ranges": {
"TotalAssets": "Sheet1!$A$1:$A$100"
}
}
Cross-Sheet Reference Handling¶
When formulas reference other sheets, the system resolves dependencies:
flowchart TB
subgraph Sheet1["Sheet1"]
F1["=XLOOKUP(A1, 'Key Data'!A:A, 'Key Data'!B:B)"]
end
subgraph KeyData["Key Data Sheet"]
D1["Reference Data"]
end
subgraph Resolver["Dependency Resolver"]
LOAD["Load Referenced Sheet"]
BUILD["Build Lookup Dict"]
CACHE["Cache for Reuse"]
end
subgraph Execute["Execute"]
APPLY["Apply Lookup"]
RESULT["Result"]
end
F1 --> LOAD
KeyData --> LOAD
LOAD --> BUILD
BUILD --> CACHE
CACHE --> APPLY
APPLY --> RESULT Cross-Sheet Loader:
# src/imports/cross_sheet_loader.py
class CrossSheetLoader:
"""Load and cache data from referenced sheets."""
def __init__(self, workbook_id: str):
self.workbook_id = workbook_id
self._cache: dict[str, pd.DataFrame] = {}
async def load_sheet(self, sheet_name: str) -> pd.DataFrame:
"""Load sheet data, using cache if available."""
if sheet_name not in self._cache:
data = await self._fetch_sheet_data(sheet_name)
self._cache[sheet_name] = data
return self._cache[sheet_name]
def build_lookup_dict(
self,
sheet_name: str,
key_col: str,
value_col: str
) -> dict:
"""Build lookup dictionary for XLOOKUP operations."""
df = self._cache.get(sheet_name)
if df is None:
raise ValueError(f"Sheet not loaded: {sheet_name}")
return dict(zip(df[key_col], df[value_col]))
Batch Processing¶
For large workbooks, processing is batched:
flowchart LR
subgraph Input
LARGE["Large Dataset<br/>(100K rows)"]
end
subgraph Batching
B1["Batch 1<br/>(10K rows)"]
B2["Batch 2<br/>(10K rows)"]
BN["Batch N<br/>(10K rows)"]
end
subgraph Process
P1["Process"]
P2["Process"]
PN["Process"]
end
subgraph Aggregate
AGG["Combine Results"]
end
LARGE --> B1
LARGE --> B2
LARGE --> BN
B1 --> P1 --> AGG
B2 --> P2 --> AGG
BN --> PN --> AGG Batch Configuration:
| Setting | Default | Description |
|---|---|---|
BATCH_SIZE | 10,000 | Rows per batch |
MAX_WORKERS | 4 | Parallel batch workers |
CHUNK_SIZE | 1,000 | DB insert chunk size |
Data Integrity¶
Validation Checkpoints¶
| Stage | Validation |
|---|---|
| Upload | File size, type, structure |
| Parse | Cell count, sheet names |
| Convert | Type validity, range checks |
| Calculate | Formula syntax, dependency resolution |
| Store | JSONB schema, RLS policy |
Error Handling¶
# Error propagation through pipeline
class ProcessingError(Exception):
"""Base class for processing errors."""
def __init__(self, stage: str, message: str, details: dict):
self.stage = stage
self.message = message
self.details = details
class ParsingError(ProcessingError):
"""Error during Excel parsing."""
pass
class ConversionError(ProcessingError):
"""Error during type conversion."""
pass
class CalculationError(ProcessingError):
"""Error during formula calculation."""
pass
Performance Optimization¶
Caching Strategy¶
| Cache Level | TTL | Use Case |
|---|---|---|
| Formula AST | 1 hour | Parsed formula trees |
| Lookup Tables | 5 min | Cross-sheet lookups |
| Calculation Results | 5 min | Computed values |
| Sheet Data | Request | Active sheet data |
Memory Management¶
# Efficient DataFrame processing
def process_large_sheet(sheet_data: pd.DataFrame) -> pd.DataFrame:
"""Process sheet with memory efficiency."""
# Use appropriate dtypes
sheet_data = sheet_data.astype({
'amount': 'float32', # vs float64
'category': 'category', # vs object
})
# Process in chunks
results = []
for chunk in np.array_split(sheet_data, 10):
processed = calculate_chunk(chunk)
results.append(processed)
return pd.concat(results, ignore_index=True)
Monitoring & Observability¶
Processing Metrics¶
| Metric | Type | Description |
|---|---|---|
upload_duration_seconds | Histogram | File upload time |
parse_duration_seconds | Histogram | Parsing time |
calculation_duration_seconds | Histogram | Calculation time |
rows_processed_total | Counter | Total rows processed |
processing_errors_total | Counter | Processing failures |
Tracing¶
# OpenTelemetry instrumentation
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
async def process_workbook(workbook_id: str):
with tracer.start_as_current_span("process_workbook") as span:
span.set_attribute("workbook_id", workbook_id)
with tracer.start_as_current_span("parse"):
data = await parse_excel(workbook_id)
with tracer.start_as_current_span("calculate"):
results = await calculate_formulas(data)
with tracer.start_as_current_span("store"):
await store_results(results)