Skip to content

Export Pipeline Architecture

CalcBridge's export pipeline transforms calculated workbook data into distributable formats while maintaining cryptographic provenance, enforcing PII redaction policies, and ensuring secure delivery. This document covers the end-to-end architecture from export request through signed artifact delivery.


Overview

The export system supports four output formats, each with format-specific provenance embedding and security controls:

Format Use Case Provenance Method Max Size
XLSX Full workbook distribution Custom XML Parts 100 MB
CSV Data feeds, ETL pipelines Metadata headers (comment rows) 50 MB
PDF Compliance reports, audit packages Document properties + watermark 25 MB
JSON API integrations, downstream systems Top-level _provenance object 50 MB

Format Selection

Format selection is determined by the Accept header or explicit format query parameter on the export endpoint. When omitted, CalcBridge defaults to XLSX to preserve formula metadata and cell formatting.


Architecture Components

Core Services

src/exports/
├── export_service.py              # Orchestrator: format routing, task dispatch
├── provenance/
│   ├── canonical_hasher.py        # Deterministic cell-level serialization
│   ├── hmac_signer.py             # HMAC-SHA256 signing with tenant keys
│   ├── manifest_builder.py        # Manifest generation (JSON structure)
│   └── embedder.py                # Format-specific provenance embedding
├── renderers/
│   ├── xlsx_renderer.py           # openpyxl-based XLSX generation
│   ├── csv_renderer.py            # Streaming CSV writer
│   ├── pdf_renderer.py            # ReportLab PDF generation
│   └── json_renderer.py           # Structured JSON serializer
├── security/
│   ├── pii_redaction_service.py   # Field-level PII masking
│   ├── antivirus_scanner.py       # ClamAV integration
│   └── zip_bomb_detector.py       # Compression ratio analysis
└── tasks/
    └── export_tasks.py            # Celery task definitions

Service Responsibilities

Service Responsibility
ExportService Orchestrates the full pipeline, dispatches Celery tasks, manages export state
ExportProvenanceService Computes canonical hashes, signs manifests, embeds provenance into output
ExportVerificationService Verifies signed exports, validates manifest integrity, detects tampering
PIIRedactionService Applies tenant-configured redaction rules before rendering
AntivirusScanner Scans generated files before delivery to prevent malware propagation

Provenance Signing Pipeline

Provenance signing ensures that every exported file carries a cryptographic proof of its origin, contents, and the exact calculation state that produced it. This is critical for regulatory compliance and audit trails.

Step 1: Canonical Hashing

Cell-level deterministic serialization produces a stable hash regardless of export format or ordering:

src/exports/provenance/canonical_hasher.py
import hashlib
import json
from decimal import Decimal
from typing import Any


class CanonicalHasher:
    """Produces deterministic hashes from workbook cell data.

    Cells are serialized in row-major order (sheet -> row -> column)
    with type-tagged values to prevent hash collisions between
    different types that share string representations.
    """

    @staticmethod
    def serialize_cell(value: Any, cell_type: str) -> str:
        """Serialize a single cell value deterministically.

        Type tagging prevents collisions: the string "42" and
        the number 42 produce different serializations.
        """
        if value is None:
            return "null:"
        if isinstance(value, Decimal):
            # Normalize Decimal to remove trailing zeros
            value = value.normalize()
        return f"{cell_type}:{value}"

    @classmethod
    def compute_sheet_hash(cls, sheet_name: str, rows: list[dict]) -> str:
        """Compute SHA-256 hash for a single sheet."""
        hasher = hashlib.sha256()
        hasher.update(f"sheet:{sheet_name}\n".encode("utf-8"))

        for row_idx, row in enumerate(rows):
            for col_key in sorted(row.keys()):
                cell = row[col_key]
                serialized = cls.serialize_cell(
                    cell.get("value"), cell.get("type", "string")
                )
                entry = f"{row_idx}|{col_key}|{serialized}\n"
                hasher.update(entry.encode("utf-8"))

        return hasher.hexdigest()

    @classmethod
    def compute_workbook_hash(
        cls, sheets: dict[str, list[dict]]
    ) -> str:
        """Compute canonical hash across all exported sheets."""
        hasher = hashlib.sha256()
        for sheet_name in sorted(sheets.keys()):
            sheet_hash = cls.compute_sheet_hash(
                sheet_name, sheets[sheet_name]
            )
            hasher.update(f"{sheet_name}:{sheet_hash}\n".encode("utf-8"))

        return hasher.hexdigest()

Determinism Requirements

Canonical hashing must be completely deterministic. Floating-point values are normalized via Decimal, dictionary keys are sorted, and sheets are processed in alphabetical order. Any deviation produces a different hash, invalidating provenance.

Step 2: HMAC-SHA256 Signing

The canonical hash is signed using the tenant's HMAC key, binding the export to a specific tenant and preventing cross-tenant forgery:

src/exports/provenance/hmac_signer.py
import hashlib
import hmac
import time
from dataclasses import dataclass


@dataclass
class SignatureResult:
    signature: str
    algorithm: str
    timestamp: int
    key_id: str


class HMACSigner:
    """Signs canonical hashes with tenant-specific HMAC keys.

    Keys are stored encrypted in the database and retrieved
    via the EncryptionService at signing time.
    """

    ALGORITHM = "hmac-sha256"

    def __init__(self, encryption_service):
        self._encryption_service = encryption_service

    def sign(
        self,
        canonical_hash: str,
        tenant_id: str,
        key_version: int = 1,
    ) -> SignatureResult:
        """Produce an HMAC-SHA256 signature over the canonical hash."""
        key_id = f"export-signing-{tenant_id}-v{key_version}"
        signing_key = self._encryption_service.get_hmac_key(
            tenant_id, key_version
        )

        timestamp = int(time.time())
        message = f"{canonical_hash}|{tenant_id}|{timestamp}".encode("utf-8")

        signature = hmac.new(
            signing_key, message, hashlib.sha256
        ).hexdigest()

        return SignatureResult(
            signature=signature,
            algorithm=self.ALGORITHM,
            timestamp=timestamp,
            key_id=key_id,
        )

Step 3: Manifest Generation

The manifest aggregates provenance metadata into a structured document:

src/exports/provenance/manifest_builder.py
from dataclasses import dataclass, field


@dataclass
class ExportManifest:
    """Complete provenance manifest for an export."""

    version: str = "1.0"
    export_id: str = ""
    workbook_id: str = ""
    tenant_id: str = ""
    exported_by: str = ""
    exported_at: str = ""
    format: str = ""
    canonical_hash: str = ""
    signature: str = ""
    algorithm: str = ""
    key_id: str = ""
    sheets_included: list[str] = field(default_factory=list)
    row_count: int = 0
    redaction_applied: bool = False
    redacted_fields: list[str] = field(default_factory=list)
    calculation_snapshot_id: str = ""

    def to_dict(self) -> dict:
        """Serialize manifest to dictionary for embedding."""
        return {
            "provenance": {
                "version": self.version,
                "export_id": self.export_id,
                "workbook_id": self.workbook_id,
                "tenant_id": self.tenant_id,
                "exported_by": self.exported_by,
                "exported_at": self.exported_at,
                "format": self.format,
            },
            "integrity": {
                "canonical_hash": self.canonical_hash,
                "signature": self.signature,
                "algorithm": self.algorithm,
                "key_id": self.key_id,
            },
            "content": {
                "sheets": self.sheets_included,
                "row_count": self.row_count,
            },
            "security": {
                "redaction_applied": self.redaction_applied,
                "redacted_fields": self.redacted_fields,
            },
            "traceability": {
                "calculation_snapshot_id": self.calculation_snapshot_id,
            },
        }

Step 4: Format-Specific Embedding

Each output format embeds the provenance manifest differently:

Format Embedding Location Extraction Method
XLSX Custom XML Part (/docProps/provenance.xml) Parse XML from ZIP entry
CSV Comment header rows (lines prefixed with #) Read lines starting with # PROVENANCE:
PDF Document properties (XMP metadata) + visible watermark Extract XMP or read watermark
JSON Top-level _provenance key Direct JSON access
src/exports/provenance/embedder.py
class ProvenanceEmbedder:
    """Embeds provenance manifests into export files."""

    @staticmethod
    def embed_xlsx(workbook, manifest: ExportManifest):
        """Embed manifest as a Custom XML Part in XLSX."""
        xml_content = manifest_to_xml(manifest.to_dict())
        workbook.custom_xml_parts.append(xml_content)

    @staticmethod
    def embed_csv(output_stream, manifest: ExportManifest):
        """Prepend provenance as comment headers in CSV."""
        manifest_json = json.dumps(manifest.to_dict(), indent=None)
        output_stream.write(f"# PROVENANCE: {manifest_json}\n")
        output_stream.write(
            f"# SIGNATURE: {manifest.signature}\n"
        )
        output_stream.write(
            f"# HASH: {manifest.canonical_hash}\n"
        )

    @staticmethod
    def embed_pdf(pdf_document, manifest: ExportManifest):
        """Embed manifest in PDF document properties."""
        pdf_document.set_metadata(
            "provenance", json.dumps(manifest.to_dict())
        )
        pdf_document.set_metadata("signature", manifest.signature)
        # Add visible watermark with export ID
        add_provenance_watermark(
            pdf_document, manifest.export_id
        )

    @staticmethod
    def embed_json(data: dict, manifest: ExportManifest) -> dict:
        """Add provenance as top-level key in JSON output."""
        data["_provenance"] = manifest.to_dict()
        return data

Export Verification

The ExportVerificationService allows recipients to verify that an export has not been tampered with:

src/exports/verification_service.py
class ExportVerificationService:
    """Verifies the integrity and authenticity of exported files."""

    def verify(self, file_path: str, format: str) -> VerificationResult:
        """Full verification pipeline.

        1. Extract embedded manifest
        2. Re-compute canonical hash from file data
        3. Verify HMAC signature against stored key
        4. Check timestamp is within acceptable window
        """
        manifest = self._extract_manifest(file_path, format)
        recomputed_hash = self._recompute_hash(file_path, format)

        hash_valid = manifest.canonical_hash == recomputed_hash
        signature_valid = self._verify_signature(manifest)
        timestamp_valid = self._check_timestamp(
            manifest.timestamp, max_age_days=365
        )

        return VerificationResult(
            is_valid=hash_valid and signature_valid and timestamp_valid,
            hash_match=hash_valid,
            signature_match=signature_valid,
            timestamp_valid=timestamp_valid,
            manifest=manifest,
        )

Verification API

The verification endpoint POST /api/v1/exports/verify accepts an uploaded file and returns the verification result, enabling automated compliance checks in downstream systems.


Security Measures

PII Redaction Service

Before any data leaves CalcBridge, the PII redaction service applies tenant-configured masking rules:

src/exports/security/pii_redaction_service.py
class PIIRedactionService:
    """Applies field-level PII redaction before export rendering.

    Redaction rules are configured per tenant and stored in the
    database. Rules can target specific columns by name or pattern.
    """

    REDACTION_STRATEGIES = {
        "mask": lambda v: "***REDACTED***",
        "partial_mask": lambda v: v[:2] + "*" * (len(str(v)) - 2) if v else "",
        "hash": lambda v: hashlib.sha256(str(v).encode()).hexdigest()[:12],
        "truncate": lambda v: str(v)[:4] + "...",
        "remove": lambda v: None,
    }

    def redact_dataframe(
        self, df: pd.DataFrame, rules: list[RedactionRule]
    ) -> tuple[pd.DataFrame, list[str]]:
        """Apply redaction rules to a DataFrame.

        Returns the redacted DataFrame and a list of affected field names.
        """
        redacted_fields = []
        df = df.copy()

        for rule in rules:
            matching_cols = self._match_columns(df.columns, rule.pattern)
            strategy = self.REDACTION_STRATEGIES[rule.strategy]

            for col in matching_cols:
                df[col] = df[col].apply(strategy)
                redacted_fields.append(col)

        return df, redacted_fields
Redaction Strategy Description Example
mask Full replacement John Smith -> ***REDACTED***
partial_mask Preserve prefix john@example.com -> jo**************
hash One-way hash (12 chars) SSN-123-45-6789 -> a1b2c3d4e5f6
truncate Keep first 4 chars ACCT-9876543 -> ACCT...
remove Set to null Sensitive Data -> null

Antivirus Scanning

Generated files are scanned before delivery to prevent embedded malware:

src/exports/security/antivirus_scanner.py
class AntivirusScanner:
    """ClamAV integration for scanning generated export files."""

    def __init__(self, clamd_socket: str = "/var/run/clamav/clamd.ctl"):
        self._socket = clamd_socket

    def scan_file(self, file_path: str) -> ScanResult:
        """Scan a file for malware signatures.

        Returns ScanResult with status (clean/infected/error)
        and optional threat name.
        """
        cd = pyclamd.ClamdUnixSocket(self._socket)
        result = cd.scan_file(file_path)

        if result is None:
            return ScanResult(status="clean")

        status, threat = result[file_path]
        return ScanResult(
            status="infected" if status == "FOUND" else "error",
            threat_name=threat,
        )

Zip Bomb Detection

XLSX files are ZIP archives internally. The detector prevents decompression attacks:

src/exports/security/zip_bomb_detector.py
class ZipBombDetector:
    """Detects potential zip bombs by analyzing compression ratios."""

    MAX_COMPRESSION_RATIO = 100  # 100:1 ratio threshold
    MAX_UNCOMPRESSED_SIZE = 500 * 1024 * 1024  # 500 MB
    MAX_NESTED_DEPTH = 3

    def check(self, file_path: str) -> SafetyResult:
        """Analyze a ZIP/XLSX file for zip bomb indicators."""
        with zipfile.ZipFile(file_path, "r") as zf:
            total_compressed = sum(i.compress_size for i in zf.infolist())
            total_uncompressed = sum(i.file_size for i in zf.infolist())

            if total_compressed == 0:
                return SafetyResult(safe=False, reason="Empty archive")

            ratio = total_uncompressed / total_compressed

            if ratio > self.MAX_COMPRESSION_RATIO:
                return SafetyResult(
                    safe=False,
                    reason=f"Compression ratio {ratio:.1f}:1 exceeds limit",
                )

            if total_uncompressed > self.MAX_UNCOMPRESSED_SIZE:
                return SafetyResult(
                    safe=False,
                    reason=f"Uncompressed size {total_uncompressed} exceeds limit",
                )

        return SafetyResult(safe=True)

File Size Limits

Format Max File Size Max Row Count Max Sheet Count
XLSX 100 MB 1,048,576 50
CSV 50 MB 5,000,000 1 (single sheet)
PDF 25 MB 100,000 50
JSON 50 MB 1,000,000 unlimited

Export Flow

Sequence Diagram

sequenceDiagram
    participant Client
    participant API as FastAPI
    participant Queue as Celery Queue
    participant Worker as Export Worker
    participant PII as PII Redaction
    participant Renderer as Format Renderer
    participant Provenance as Provenance Service
    participant AV as Antivirus Scanner
    participant Storage as File Storage

    Client->>API: POST /api/v1/exports
    API->>API: Validate request & permissions
    API->>Queue: Dispatch export_task
    API-->>Client: 202 Accepted (export_id)

    Queue->>Worker: Pick up task
    Worker->>Worker: Load workbook data

    Worker->>PII: Apply redaction rules
    PII-->>Worker: Redacted data + field list

    Worker->>Renderer: Render to target format
    Renderer-->>Worker: Raw output file

    Worker->>Provenance: Compute canonical hash
    Provenance-->>Worker: SHA-256 hash

    Worker->>Provenance: HMAC-SHA256 sign
    Provenance-->>Worker: Signature + key_id

    Worker->>Provenance: Build manifest
    Provenance-->>Worker: ExportManifest

    Worker->>Provenance: Embed in output
    Provenance-->>Worker: Signed file

    Worker->>AV: Scan generated file
    AV-->>Worker: Clean / Infected

    alt File is clean
        Worker->>Storage: Store signed file
        Worker->>API: Update export status (complete)
        Client->>API: GET /api/v1/exports/{id}/download
        API-->>Client: 200 OK (signed file)
    else File is infected
        Worker->>API: Update export status (failed)
        Worker->>Worker: Log security event
    end

Request Lifecycle States

State Description Transitions To
pending Export task queued processing
processing Worker is generating the export signing, failed
signing Provenance hash and HMAC are being computed scanning, failed
scanning Antivirus scan in progress complete, quarantined
complete File ready for download expired
quarantined AV scan detected a threat (terminal)
failed Error during processing (terminal)
expired Download link expired (configurable TTL) (terminal)

Celery Task Integration

Export jobs are processed asynchronously through Celery to avoid blocking API requests during large workbook exports:

src/workers/tasks/export_tasks.py
from celery import shared_task


@shared_task(
    bind=True,
    queue="export",
    max_retries=2,
    autoretry_for=(ConnectionError, TimeoutError),
    retry_backoff=True,
    soft_time_limit=300,
    time_limit=600,
    acks_late=True,
)
def execute_export(
    self,
    tenant_id: str,
    export_id: str,
    workbook_id: str,
    format: str,
    options: dict,
):
    """Execute a workbook export with provenance signing.

    This task runs in the 'export' queue, separate from
    calculation and parsing queues to prevent resource contention.

    Args:
        tenant_id: Tenant owning the workbook.
        export_id: Unique export job identifier.
        workbook_id: Source workbook to export.
        format: Target format (xlsx, csv, pdf, json).
        options: Export options (sheets, redaction, etc.).
    """
    update_export_status(export_id, "processing")

    try:
        # 1. Load workbook data with tenant context
        data = load_workbook_data(tenant_id, workbook_id, options)

        # 2. Apply PII redaction
        update_export_status(export_id, "redacting")
        redacted_data, redacted_fields = apply_redaction(
            tenant_id, data, options.get("redaction_profile")
        )

        # 3. Render to target format
        output_path = render_export(format, redacted_data, options)

        # 4. Sign with provenance
        update_export_status(export_id, "signing")
        signed_path = sign_export(
            output_path, format, tenant_id, export_id,
            workbook_id, redacted_fields
        )

        # 5. Antivirus scan
        update_export_status(export_id, "scanning")
        scan_result = scan_file(signed_path)

        if scan_result.status != "clean":
            update_export_status(export_id, "quarantined")
            raise SecurityError(f"AV threat: {scan_result.threat_name}")

        # 6. Store and mark complete
        store_export(export_id, signed_path)
        update_export_status(export_id, "complete")

    except SecurityError:
        raise  # Do not retry security failures
    except Exception as exc:
        update_export_status(export_id, "failed", error=str(exc))
        raise self.retry(exc=exc)

Queue Configuration

Queue Concurrency Purpose Priority
export 4 workers Standard export jobs Normal
priority 2 workers Urgent exports (compliance deadlines) High
default 8 workers Parsing, general tasks Normal

Export TTL

Completed exports are stored for 7 days by default (configurable via EXPORT_TTL_DAYS). A periodic Celery beat task (cleanup_expired_exports) removes expired files and transitions their status to expired.


Streaming Exports

For large datasets that exceed memory limits, the pipeline supports streaming export:

src/exports/renderers/csv_renderer.py
class StreamingCSVRenderer:
    """Streams CSV output in chunks to avoid loading
    entire datasets into memory.
    """

    CHUNK_SIZE = 10_000  # rows per chunk

    def render_streaming(
        self,
        data_iterator,
        output_path: str,
        columns: list[str],
    ):
        """Write CSV in chunks from a database cursor iterator."""
        with open(output_path, "w", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=columns)
            writer.writeheader()

            for chunk in self._chunked(data_iterator, self.CHUNK_SIZE):
                writer.writerows(chunk)

    @staticmethod
    def _chunked(iterator, size):
        """Yield successive chunks from an iterator."""
        chunk = []
        for item in iterator:
            chunk.append(item)
            if len(chunk) >= size:
                yield chunk
                chunk = []
        if chunk:
            yield chunk

Metrics and Monitoring

The export pipeline exposes Prometheus metrics for observability:

Metric Type Description
export_requests_total Counter Total exports requested, labeled by format
export_duration_seconds Histogram End-to-end export time
export_file_size_bytes Histogram Generated file sizes
export_redaction_fields_total Counter Number of fields redacted
export_av_scan_duration_seconds Histogram Antivirus scan time
export_failures_total Counter Failed exports by reason
export_quarantined_total Counter Files quarantined by AV

Configuration Reference

Export configuration (environment variables)
# Export limits
EXPORT_MAX_FILE_SIZE_MB: 100        # Maximum output file size
EXPORT_TTL_DAYS: 7                  # Days before expired exports are cleaned
EXPORT_MAX_CONCURRENT: 10           # Max concurrent export tasks per tenant

# Provenance
EXPORT_SIGNING_ENABLED: true        # Enable/disable HMAC signing
EXPORT_HASH_ALGORITHM: sha256       # Canonical hash algorithm

# Security
EXPORT_AV_ENABLED: true             # Enable/disable antivirus scanning
EXPORT_AV_SOCKET: /var/run/clamav/clamd.ctl
EXPORT_PII_REDACTION_ENABLED: true  # Enable/disable PII redaction

# Storage
EXPORT_STORAGE_BACKEND: s3          # s3 or local
EXPORT_S3_BUCKET: calcbridge-exports
EXPORT_LOCAL_PATH: /tmp/exports

Error Handling

Error Type Behavior Retry
ConnectionError Database/storage connectivity issue Yes (up to 2 retries, exponential backoff)
TimeoutError Export exceeded time limit Yes (up to 2 retries)
SecurityError AV threat detected No (terminal, quarantined)
ValidationError Invalid export parameters No (terminal, 400 response)
PermissionError Tenant lacks export permission No (terminal, 403 response)
FileSizeExceeded Output exceeds format limit No (terminal, suggest filtering)

Security Failures Are Never Retried

When an antivirus scan detects a threat, the export is immediately quarantined. The task does not retry, and a security event is logged to the audit trail. Operations teams receive an alert via Alertmanager.