Skip to content

Data Flow & Processing Pipeline

This document describes how data flows through CalcBridge, from Excel file upload to database storage and calculation execution.

Excel Upload Flow

flowchart TB
    subgraph Upload["1. Upload"]
        FILE["Excel File<br/>(xlsx, xls, xlsm)"]
        VALIDATE["File Validation"]
        STORE["Store to S3/Local"]
    end

    subgraph Parse["2. Parse (Async)"]
        QUEUE["Celery Queue"]
        WORKER["Parse Worker"]
        OPENPYXL["openpyxl Parser"]
    end

    subgraph Process["3. Process"]
        TYPE_CONV["Type Conversion"]
        FORMULA_EXT["Formula Extraction"]
        CALC_SVC["Calculation Service"]
    end

    subgraph Store["4. Store"]
        PG["PostgreSQL<br/>(JSONB)"]
        CACHE["Valkey Cache"]
    end

    FILE --> VALIDATE
    VALIDATE --> STORE
    STORE --> QUEUE
    QUEUE --> WORKER
    WORKER --> OPENPYXL
    OPENPYXL --> TYPE_CONV
    TYPE_CONV --> FORMULA_EXT
    FORMULA_EXT --> CALC_SVC
    CALC_SVC --> PG
    CALC_SVC --> CACHE

    style VALIDATE fill:#FEF3C7,stroke:#F59E0B
    style CALC_SVC fill:#DCFCE7,stroke:#22C55E
    style PG fill:#DBEAFE,stroke:#3B82F6

Detailed Processing Stages

Stage 1: File Validation

Before processing, files undergo validation checks:

Check Description Action on Failure
File Size Max 100MB Reject with 413
File Extension xlsx, xls, xlsm Reject with 400
MIME Type Verify actual type Reject with 400
Malware Scan Virus check (optional) Reject with 422
# src/imports/validation.py
ALLOWED_EXTENSIONS = {".xlsx", ".xls", ".xlsm"}
MAX_FILE_SIZE = 100 * 1024 * 1024  # 100MB

def validate_upload(file: UploadFile) -> ValidationResult:
    """Validate uploaded Excel file."""
    # Check extension
    ext = Path(file.filename).suffix.lower()
    if ext not in ALLOWED_EXTENSIONS:
        raise ValidationError(f"Unsupported format: {ext}")

    # Check size
    if file.size > MAX_FILE_SIZE:
        raise ValidationError("File too large")

    # Verify MIME type matches extension
    mime_type = magic.from_buffer(file.file.read(2048), mime=True)
    if not is_valid_excel_mime(mime_type, ext):
        raise ValidationError("Invalid file content")

    return ValidationResult(valid=True)

Stage 2: Excel Parsing

The parser extracts all data from the workbook:

flowchart LR
    subgraph Workbook
        WB["Workbook"]
    end

    subgraph Sheets
        S1["Sheet 1"]
        S2["Sheet 2"]
        SN["Sheet N"]
    end

    subgraph Data
        META["Metadata"]
        CELLS["Cell Data"]
        FORMULAS["Formulas"]
        STYLES["Styles"]
    end

    WB --> S1
    WB --> S2
    WB --> SN

    S1 --> META
    S1 --> CELLS
    S1 --> FORMULAS
    S1 --> STYLES

Extracted Information:

Component Data Extracted
Metadata Sheet names, dimensions, defined names
Cell Data Values, types, formatting
Formulas Raw formula strings, dependencies
Styles Number formats (for type inference)

Stage 3: Type Conversion

Excel types are converted to appropriate Python/database types:

flowchart LR
    subgraph Excel["Excel Types"]
        E_NUM["Number"]
        E_STR["String"]
        E_BOOL["Boolean"]
        E_DATE["Date"]
        E_ERR["Error"]
        E_EMPTY["Empty"]
    end

    subgraph Python["Python Types"]
        P_FLOAT["float / Decimal"]
        P_STR["str"]
        P_BOOL["bool"]
        P_DATE["datetime"]
        P_NONE["None"]
        P_ERR["ErrorValue"]
    end

    subgraph DB["Database (JSONB)"]
        DB_NUM["number"]
        DB_STR["string"]
        DB_BOOL["boolean"]
        DB_DATE["ISO 8601"]
        DB_NULL["null"]
        DB_ERR["error object"]
    end

    E_NUM --> P_FLOAT --> DB_NUM
    E_STR --> P_STR --> DB_STR
    E_BOOL --> P_BOOL --> DB_BOOL
    E_DATE --> P_DATE --> DB_DATE
    E_EMPTY --> P_NONE --> DB_NULL
    E_ERR --> P_ERR --> DB_ERR

Type Conversion Rules:

Excel Type Python Type JSONB Type Notes
Integer int number Preserved exactly
Decimal Decimal number Financial precision
Percentage Percentage number Stored as decimal (0.05 for 5%)
Currency Money number 2 decimal places
Date datetime string ISO 8601 format
Time datetime string ISO 8601 format
Boolean bool boolean TRUE/FALSE
String str string UTF-8 encoded
Error ErrorValue object {"error": "#DIV/0!"}
Empty None null NULL value

Code Example:

# src/imports/type_converter.py
from decimal import Decimal
from datetime import datetime
from src.calculations.types import Money, Percentage, FinancialRate

class ExcelTypeConverter:
    """Convert Excel values to appropriate Python types."""

    def convert(self, value: Any, cell_format: str) -> Any:
        """Convert a cell value based on its format."""
        if value is None:
            return None

        # Check for date formats
        if self._is_date_format(cell_format):
            return self._convert_date(value)

        # Check for percentage
        if self._is_percentage_format(cell_format):
            return Percentage(value)

        # Check for currency
        if self._is_currency_format(cell_format):
            return Money(value, currency=self._extract_currency(cell_format))

        # Numeric values
        if isinstance(value, (int, float)):
            # Use Decimal for financial precision
            return Decimal(str(value))

        return value

Stage 4: Calculation Execution

Formulas are processed using vectorized operations:

flowchart TB
    subgraph Input["Formula Input"]
        RAW["=IF(A1>0, A1*[@Rate], 0)"]
    end

    subgraph Parse["Parse Phase"]
        TOKEN["Tokenize"]
        AST["Build AST"]
    end

    subgraph Translate["Translate Phase"]
        PANDAS["To Pandas Ops"]
        DEPS["Resolve Dependencies"]
    end

    subgraph Execute["Execute Phase"]
        VEC["Vectorized Calc"]
        RESULT["Result Array"]
    end

    RAW --> TOKEN
    TOKEN --> AST
    AST --> PANDAS
    PANDAS --> DEPS
    DEPS --> VEC
    VEC --> RESULT

    style VEC fill:#DCFCE7,stroke:#22C55E

Vectorized Operations:

# src/calculations/functions/vectorized.py
import numpy as np
import pandas as pd

def vectorized_if(
    condition: pd.Series,
    value_if_true: Any,
    value_if_false: Any
) -> pd.Series:
    """Vectorized IF function - 100x faster than iterrows."""
    return np.where(condition, value_if_true, value_if_false)

def vectorized_xlookup(
    lookup_series: pd.Series,
    lookup_dict: dict,
    default: Any = None
) -> pd.Series:
    """Vectorized XLOOKUP using dictionary mapping."""
    return lookup_series.map(lookup_dict).fillna(default)

def vectorized_sumifs(
    sum_range: pd.Series,
    criteria_range: pd.Series,
    criteria: Any
) -> float:
    """Vectorized SUMIFS aggregation."""
    mask = criteria_range == criteria
    return sum_range[mask].sum()

Stage 5: Database Storage

Data is stored in PostgreSQL using JSONB for flexibility:

erDiagram
    WORKBOOK ||--o{ SHEET : contains
    SHEET ||--|| CELL_DATA : stores

    WORKBOOK {
        uuid id PK
        uuid tenant_id FK
        string name
        jsonb metadata
        timestamp created_at
    }

    SHEET {
        uuid id PK
        uuid workbook_id FK
        string name
        integer row_count
        integer col_count
        jsonb settings
    }

    CELL_DATA {
        uuid id PK
        uuid sheet_id FK
        jsonb data
        jsonb formulas
        jsonb calculated
    }

JSONB Storage Structure:

{
  "cells": {
    "A1": {"value": 100, "type": "number"},
    "A2": {"value": "2024-01-15", "type": "date"},
    "A3": {"value": true, "type": "boolean"},
    "B1": {"formula": "=A1*2", "value": 200, "type": "number"},
    "B2": {"formula": "=XLOOKUP(A1, Data!A:A, Data!B:B)", "value": "Found", "type": "string"}
  },
  "row_count": 1000,
  "col_count": 26,
  "named_ranges": {
    "TotalAssets": "Sheet1!$A$1:$A$100"
  }
}

Cross-Sheet Reference Handling

When formulas reference other sheets, the system resolves dependencies:

flowchart TB
    subgraph Sheet1["Sheet1"]
        F1["=XLOOKUP(A1, 'Key Data'!A:A, 'Key Data'!B:B)"]
    end

    subgraph KeyData["Key Data Sheet"]
        D1["Reference Data"]
    end

    subgraph Resolver["Dependency Resolver"]
        LOAD["Load Referenced Sheet"]
        BUILD["Build Lookup Dict"]
        CACHE["Cache for Reuse"]
    end

    subgraph Execute["Execute"]
        APPLY["Apply Lookup"]
        RESULT["Result"]
    end

    F1 --> LOAD
    KeyData --> LOAD
    LOAD --> BUILD
    BUILD --> CACHE
    CACHE --> APPLY
    APPLY --> RESULT

Cross-Sheet Loader:

# src/imports/cross_sheet_loader.py
class CrossSheetLoader:
    """Load and cache data from referenced sheets."""

    def __init__(self, workbook_id: str):
        self.workbook_id = workbook_id
        self._cache: dict[str, pd.DataFrame] = {}

    async def load_sheet(self, sheet_name: str) -> pd.DataFrame:
        """Load sheet data, using cache if available."""
        if sheet_name not in self._cache:
            data = await self._fetch_sheet_data(sheet_name)
            self._cache[sheet_name] = data
        return self._cache[sheet_name]

    def build_lookup_dict(
        self,
        sheet_name: str,
        key_col: str,
        value_col: str
    ) -> dict:
        """Build lookup dictionary for XLOOKUP operations."""
        df = self._cache.get(sheet_name)
        if df is None:
            raise ValueError(f"Sheet not loaded: {sheet_name}")
        return dict(zip(df[key_col], df[value_col]))

Batch Processing

For large workbooks, processing is batched:

flowchart LR
    subgraph Input
        LARGE["Large Dataset<br/>(100K rows)"]
    end

    subgraph Batching
        B1["Batch 1<br/>(10K rows)"]
        B2["Batch 2<br/>(10K rows)"]
        BN["Batch N<br/>(10K rows)"]
    end

    subgraph Process
        P1["Process"]
        P2["Process"]
        PN["Process"]
    end

    subgraph Aggregate
        AGG["Combine Results"]
    end

    LARGE --> B1
    LARGE --> B2
    LARGE --> BN

    B1 --> P1 --> AGG
    B2 --> P2 --> AGG
    BN --> PN --> AGG

Batch Configuration:

Setting Default Description
BATCH_SIZE 10,000 Rows per batch
MAX_WORKERS 4 Parallel batch workers
CHUNK_SIZE 1,000 DB insert chunk size

Data Integrity

Validation Checkpoints

Stage Validation
Upload File size, type, structure
Parse Cell count, sheet names
Convert Type validity, range checks
Calculate Formula syntax, dependency resolution
Store JSONB schema, RLS policy

Error Handling

# Error propagation through pipeline
class ProcessingError(Exception):
    """Base class for processing errors."""
    def __init__(self, stage: str, message: str, details: dict):
        self.stage = stage
        self.message = message
        self.details = details

class ParsingError(ProcessingError):
    """Error during Excel parsing."""
    pass

class ConversionError(ProcessingError):
    """Error during type conversion."""
    pass

class CalculationError(ProcessingError):
    """Error during formula calculation."""
    pass

Performance Optimization

Caching Strategy

Cache Level TTL Use Case
Formula AST 1 hour Parsed formula trees
Lookup Tables 5 min Cross-sheet lookups
Calculation Results 5 min Computed values
Sheet Data Request Active sheet data

Memory Management

# Efficient DataFrame processing
def process_large_sheet(sheet_data: pd.DataFrame) -> pd.DataFrame:
    """Process sheet with memory efficiency."""
    # Use appropriate dtypes
    sheet_data = sheet_data.astype({
        'amount': 'float32',  # vs float64
        'category': 'category',  # vs object
    })

    # Process in chunks
    results = []
    for chunk in np.array_split(sheet_data, 10):
        processed = calculate_chunk(chunk)
        results.append(processed)

    return pd.concat(results, ignore_index=True)

Monitoring & Observability

Processing Metrics

Metric Type Description
upload_duration_seconds Histogram File upload time
parse_duration_seconds Histogram Parsing time
calculation_duration_seconds Histogram Calculation time
rows_processed_total Counter Total rows processed
processing_errors_total Counter Processing failures

Tracing

# OpenTelemetry instrumentation
from opentelemetry import trace

tracer = trace.get_tracer(__name__)

async def process_workbook(workbook_id: str):
    with tracer.start_as_current_span("process_workbook") as span:
        span.set_attribute("workbook_id", workbook_id)

        with tracer.start_as_current_span("parse"):
            data = await parse_excel(workbook_id)

        with tracer.start_as_current_span("calculate"):
            results = await calculate_formulas(data)

        with tracer.start_as_current_span("store"):
            await store_results(results)