Export Pipeline Architecture¶
CalcBridge's export pipeline transforms calculated workbook data into distributable formats while maintaining cryptographic provenance, enforcing PII redaction policies, and ensuring secure delivery. This document covers the end-to-end architecture from export request through signed artifact delivery.
Overview¶
The export system supports four output formats, each with format-specific provenance embedding and security controls:
| Format | Use Case | Provenance Method | Max Size |
|---|---|---|---|
| XLSX | Full workbook distribution | Custom XML Parts | 100 MB |
| CSV | Data feeds, ETL pipelines | Metadata headers (comment rows) | 50 MB |
| Compliance reports, audit packages | Document properties + watermark | 25 MB | |
| JSON | API integrations, downstream systems | Top-level _provenance object | 50 MB |
Format Selection
Format selection is determined by the Accept header or explicit format query parameter on the export endpoint. When omitted, CalcBridge defaults to XLSX to preserve formula metadata and cell formatting.
Architecture Components¶
Core Services¶
src/exports/
├── export_service.py # Orchestrator: format routing, task dispatch
├── provenance/
│ ├── canonical_hasher.py # Deterministic cell-level serialization
│ ├── hmac_signer.py # HMAC-SHA256 signing with tenant keys
│ ├── manifest_builder.py # Manifest generation (JSON structure)
│ └── embedder.py # Format-specific provenance embedding
├── renderers/
│ ├── xlsx_renderer.py # openpyxl-based XLSX generation
│ ├── csv_renderer.py # Streaming CSV writer
│ ├── pdf_renderer.py # ReportLab PDF generation
│ └── json_renderer.py # Structured JSON serializer
├── security/
│ ├── pii_redaction_service.py # Field-level PII masking
│ ├── antivirus_scanner.py # ClamAV integration
│ └── zip_bomb_detector.py # Compression ratio analysis
└── tasks/
└── export_tasks.py # Celery task definitions
Service Responsibilities¶
| Service | Responsibility |
|---|---|
ExportService | Orchestrates the full pipeline, dispatches Celery tasks, manages export state |
ExportProvenanceService | Computes canonical hashes, signs manifests, embeds provenance into output |
ExportVerificationService | Verifies signed exports, validates manifest integrity, detects tampering |
PIIRedactionService | Applies tenant-configured redaction rules before rendering |
AntivirusScanner | Scans generated files before delivery to prevent malware propagation |
Provenance Signing Pipeline¶
Provenance signing ensures that every exported file carries a cryptographic proof of its origin, contents, and the exact calculation state that produced it. This is critical for regulatory compliance and audit trails.
Step 1: Canonical Hashing¶
Cell-level deterministic serialization produces a stable hash regardless of export format or ordering:
import hashlib
import json
from decimal import Decimal
from typing import Any
class CanonicalHasher:
"""Produces deterministic hashes from workbook cell data.
Cells are serialized in row-major order (sheet -> row -> column)
with type-tagged values to prevent hash collisions between
different types that share string representations.
"""
@staticmethod
def serialize_cell(value: Any, cell_type: str) -> str:
"""Serialize a single cell value deterministically.
Type tagging prevents collisions: the string "42" and
the number 42 produce different serializations.
"""
if value is None:
return "null:"
if isinstance(value, Decimal):
# Normalize Decimal to remove trailing zeros
value = value.normalize()
return f"{cell_type}:{value}"
@classmethod
def compute_sheet_hash(cls, sheet_name: str, rows: list[dict]) -> str:
"""Compute SHA-256 hash for a single sheet."""
hasher = hashlib.sha256()
hasher.update(f"sheet:{sheet_name}\n".encode("utf-8"))
for row_idx, row in enumerate(rows):
for col_key in sorted(row.keys()):
cell = row[col_key]
serialized = cls.serialize_cell(
cell.get("value"), cell.get("type", "string")
)
entry = f"{row_idx}|{col_key}|{serialized}\n"
hasher.update(entry.encode("utf-8"))
return hasher.hexdigest()
@classmethod
def compute_workbook_hash(
cls, sheets: dict[str, list[dict]]
) -> str:
"""Compute canonical hash across all exported sheets."""
hasher = hashlib.sha256()
for sheet_name in sorted(sheets.keys()):
sheet_hash = cls.compute_sheet_hash(
sheet_name, sheets[sheet_name]
)
hasher.update(f"{sheet_name}:{sheet_hash}\n".encode("utf-8"))
return hasher.hexdigest()
Determinism Requirements
Canonical hashing must be completely deterministic. Floating-point values are normalized via Decimal, dictionary keys are sorted, and sheets are processed in alphabetical order. Any deviation produces a different hash, invalidating provenance.
Step 2: HMAC-SHA256 Signing¶
The canonical hash is signed using the tenant's HMAC key, binding the export to a specific tenant and preventing cross-tenant forgery:
import hashlib
import hmac
import time
from dataclasses import dataclass
@dataclass
class SignatureResult:
signature: str
algorithm: str
timestamp: int
key_id: str
class HMACSigner:
"""Signs canonical hashes with tenant-specific HMAC keys.
Keys are stored encrypted in the database and retrieved
via the EncryptionService at signing time.
"""
ALGORITHM = "hmac-sha256"
def __init__(self, encryption_service):
self._encryption_service = encryption_service
def sign(
self,
canonical_hash: str,
tenant_id: str,
key_version: int = 1,
) -> SignatureResult:
"""Produce an HMAC-SHA256 signature over the canonical hash."""
key_id = f"export-signing-{tenant_id}-v{key_version}"
signing_key = self._encryption_service.get_hmac_key(
tenant_id, key_version
)
timestamp = int(time.time())
message = f"{canonical_hash}|{tenant_id}|{timestamp}".encode("utf-8")
signature = hmac.new(
signing_key, message, hashlib.sha256
).hexdigest()
return SignatureResult(
signature=signature,
algorithm=self.ALGORITHM,
timestamp=timestamp,
key_id=key_id,
)
Step 3: Manifest Generation¶
The manifest aggregates provenance metadata into a structured document:
from dataclasses import dataclass, field
@dataclass
class ExportManifest:
"""Complete provenance manifest for an export."""
version: str = "1.0"
export_id: str = ""
workbook_id: str = ""
tenant_id: str = ""
exported_by: str = ""
exported_at: str = ""
format: str = ""
canonical_hash: str = ""
signature: str = ""
algorithm: str = ""
key_id: str = ""
sheets_included: list[str] = field(default_factory=list)
row_count: int = 0
redaction_applied: bool = False
redacted_fields: list[str] = field(default_factory=list)
calculation_snapshot_id: str = ""
def to_dict(self) -> dict:
"""Serialize manifest to dictionary for embedding."""
return {
"provenance": {
"version": self.version,
"export_id": self.export_id,
"workbook_id": self.workbook_id,
"tenant_id": self.tenant_id,
"exported_by": self.exported_by,
"exported_at": self.exported_at,
"format": self.format,
},
"integrity": {
"canonical_hash": self.canonical_hash,
"signature": self.signature,
"algorithm": self.algorithm,
"key_id": self.key_id,
},
"content": {
"sheets": self.sheets_included,
"row_count": self.row_count,
},
"security": {
"redaction_applied": self.redaction_applied,
"redacted_fields": self.redacted_fields,
},
"traceability": {
"calculation_snapshot_id": self.calculation_snapshot_id,
},
}
Step 4: Format-Specific Embedding¶
Each output format embeds the provenance manifest differently:
| Format | Embedding Location | Extraction Method |
|---|---|---|
| XLSX | Custom XML Part (/docProps/provenance.xml) | Parse XML from ZIP entry |
| CSV | Comment header rows (lines prefixed with #) | Read lines starting with # PROVENANCE: |
| Document properties (XMP metadata) + visible watermark | Extract XMP or read watermark | |
| JSON | Top-level _provenance key | Direct JSON access |
class ProvenanceEmbedder:
"""Embeds provenance manifests into export files."""
@staticmethod
def embed_xlsx(workbook, manifest: ExportManifest):
"""Embed manifest as a Custom XML Part in XLSX."""
xml_content = manifest_to_xml(manifest.to_dict())
workbook.custom_xml_parts.append(xml_content)
@staticmethod
def embed_csv(output_stream, manifest: ExportManifest):
"""Prepend provenance as comment headers in CSV."""
manifest_json = json.dumps(manifest.to_dict(), indent=None)
output_stream.write(f"# PROVENANCE: {manifest_json}\n")
output_stream.write(
f"# SIGNATURE: {manifest.signature}\n"
)
output_stream.write(
f"# HASH: {manifest.canonical_hash}\n"
)
@staticmethod
def embed_pdf(pdf_document, manifest: ExportManifest):
"""Embed manifest in PDF document properties."""
pdf_document.set_metadata(
"provenance", json.dumps(manifest.to_dict())
)
pdf_document.set_metadata("signature", manifest.signature)
# Add visible watermark with export ID
add_provenance_watermark(
pdf_document, manifest.export_id
)
@staticmethod
def embed_json(data: dict, manifest: ExportManifest) -> dict:
"""Add provenance as top-level key in JSON output."""
data["_provenance"] = manifest.to_dict()
return data
Export Verification¶
The ExportVerificationService allows recipients to verify that an export has not been tampered with:
class ExportVerificationService:
"""Verifies the integrity and authenticity of exported files."""
def verify(self, file_path: str, format: str) -> VerificationResult:
"""Full verification pipeline.
1. Extract embedded manifest
2. Re-compute canonical hash from file data
3. Verify HMAC signature against stored key
4. Check timestamp is within acceptable window
"""
manifest = self._extract_manifest(file_path, format)
recomputed_hash = self._recompute_hash(file_path, format)
hash_valid = manifest.canonical_hash == recomputed_hash
signature_valid = self._verify_signature(manifest)
timestamp_valid = self._check_timestamp(
manifest.timestamp, max_age_days=365
)
return VerificationResult(
is_valid=hash_valid and signature_valid and timestamp_valid,
hash_match=hash_valid,
signature_match=signature_valid,
timestamp_valid=timestamp_valid,
manifest=manifest,
)
Verification API
The verification endpoint POST /api/v1/exports/verify accepts an uploaded file and returns the verification result, enabling automated compliance checks in downstream systems.
Security Measures¶
PII Redaction Service¶
Before any data leaves CalcBridge, the PII redaction service applies tenant-configured masking rules:
class PIIRedactionService:
"""Applies field-level PII redaction before export rendering.
Redaction rules are configured per tenant and stored in the
database. Rules can target specific columns by name or pattern.
"""
REDACTION_STRATEGIES = {
"mask": lambda v: "***REDACTED***",
"partial_mask": lambda v: v[:2] + "*" * (len(str(v)) - 2) if v else "",
"hash": lambda v: hashlib.sha256(str(v).encode()).hexdigest()[:12],
"truncate": lambda v: str(v)[:4] + "...",
"remove": lambda v: None,
}
def redact_dataframe(
self, df: pd.DataFrame, rules: list[RedactionRule]
) -> tuple[pd.DataFrame, list[str]]:
"""Apply redaction rules to a DataFrame.
Returns the redacted DataFrame and a list of affected field names.
"""
redacted_fields = []
df = df.copy()
for rule in rules:
matching_cols = self._match_columns(df.columns, rule.pattern)
strategy = self.REDACTION_STRATEGIES[rule.strategy]
for col in matching_cols:
df[col] = df[col].apply(strategy)
redacted_fields.append(col)
return df, redacted_fields
| Redaction Strategy | Description | Example |
|---|---|---|
mask | Full replacement | John Smith -> ***REDACTED*** |
partial_mask | Preserve prefix | john@example.com -> jo************** |
hash | One-way hash (12 chars) | SSN-123-45-6789 -> a1b2c3d4e5f6 |
truncate | Keep first 4 chars | ACCT-9876543 -> ACCT... |
remove | Set to null | Sensitive Data -> null |
Antivirus Scanning¶
Generated files are scanned before delivery to prevent embedded malware:
class AntivirusScanner:
"""ClamAV integration for scanning generated export files."""
def __init__(self, clamd_socket: str = "/var/run/clamav/clamd.ctl"):
self._socket = clamd_socket
def scan_file(self, file_path: str) -> ScanResult:
"""Scan a file for malware signatures.
Returns ScanResult with status (clean/infected/error)
and optional threat name.
"""
cd = pyclamd.ClamdUnixSocket(self._socket)
result = cd.scan_file(file_path)
if result is None:
return ScanResult(status="clean")
status, threat = result[file_path]
return ScanResult(
status="infected" if status == "FOUND" else "error",
threat_name=threat,
)
Zip Bomb Detection¶
XLSX files are ZIP archives internally. The detector prevents decompression attacks:
class ZipBombDetector:
"""Detects potential zip bombs by analyzing compression ratios."""
MAX_COMPRESSION_RATIO = 100 # 100:1 ratio threshold
MAX_UNCOMPRESSED_SIZE = 500 * 1024 * 1024 # 500 MB
MAX_NESTED_DEPTH = 3
def check(self, file_path: str) -> SafetyResult:
"""Analyze a ZIP/XLSX file for zip bomb indicators."""
with zipfile.ZipFile(file_path, "r") as zf:
total_compressed = sum(i.compress_size for i in zf.infolist())
total_uncompressed = sum(i.file_size for i in zf.infolist())
if total_compressed == 0:
return SafetyResult(safe=False, reason="Empty archive")
ratio = total_uncompressed / total_compressed
if ratio > self.MAX_COMPRESSION_RATIO:
return SafetyResult(
safe=False,
reason=f"Compression ratio {ratio:.1f}:1 exceeds limit",
)
if total_uncompressed > self.MAX_UNCOMPRESSED_SIZE:
return SafetyResult(
safe=False,
reason=f"Uncompressed size {total_uncompressed} exceeds limit",
)
return SafetyResult(safe=True)
File Size Limits¶
| Format | Max File Size | Max Row Count | Max Sheet Count |
|---|---|---|---|
| XLSX | 100 MB | 1,048,576 | 50 |
| CSV | 50 MB | 5,000,000 | 1 (single sheet) |
| 25 MB | 100,000 | 50 | |
| JSON | 50 MB | 1,000,000 | unlimited |
Export Flow¶
Sequence Diagram¶
sequenceDiagram
participant Client
participant API as FastAPI
participant Queue as Celery Queue
participant Worker as Export Worker
participant PII as PII Redaction
participant Renderer as Format Renderer
participant Provenance as Provenance Service
participant AV as Antivirus Scanner
participant Storage as File Storage
Client->>API: POST /api/v1/exports
API->>API: Validate request & permissions
API->>Queue: Dispatch export_task
API-->>Client: 202 Accepted (export_id)
Queue->>Worker: Pick up task
Worker->>Worker: Load workbook data
Worker->>PII: Apply redaction rules
PII-->>Worker: Redacted data + field list
Worker->>Renderer: Render to target format
Renderer-->>Worker: Raw output file
Worker->>Provenance: Compute canonical hash
Provenance-->>Worker: SHA-256 hash
Worker->>Provenance: HMAC-SHA256 sign
Provenance-->>Worker: Signature + key_id
Worker->>Provenance: Build manifest
Provenance-->>Worker: ExportManifest
Worker->>Provenance: Embed in output
Provenance-->>Worker: Signed file
Worker->>AV: Scan generated file
AV-->>Worker: Clean / Infected
alt File is clean
Worker->>Storage: Store signed file
Worker->>API: Update export status (complete)
Client->>API: GET /api/v1/exports/{id}/download
API-->>Client: 200 OK (signed file)
else File is infected
Worker->>API: Update export status (failed)
Worker->>Worker: Log security event
end Request Lifecycle States¶
| State | Description | Transitions To |
|---|---|---|
pending | Export task queued | processing |
processing | Worker is generating the export | signing, failed |
signing | Provenance hash and HMAC are being computed | scanning, failed |
scanning | Antivirus scan in progress | complete, quarantined |
complete | File ready for download | expired |
quarantined | AV scan detected a threat | (terminal) |
failed | Error during processing | (terminal) |
expired | Download link expired (configurable TTL) | (terminal) |
Celery Task Integration¶
Export jobs are processed asynchronously through Celery to avoid blocking API requests during large workbook exports:
from celery import shared_task
@shared_task(
bind=True,
queue="export",
max_retries=2,
autoretry_for=(ConnectionError, TimeoutError),
retry_backoff=True,
soft_time_limit=300,
time_limit=600,
acks_late=True,
)
def execute_export(
self,
tenant_id: str,
export_id: str,
workbook_id: str,
format: str,
options: dict,
):
"""Execute a workbook export with provenance signing.
This task runs in the 'export' queue, separate from
calculation and parsing queues to prevent resource contention.
Args:
tenant_id: Tenant owning the workbook.
export_id: Unique export job identifier.
workbook_id: Source workbook to export.
format: Target format (xlsx, csv, pdf, json).
options: Export options (sheets, redaction, etc.).
"""
update_export_status(export_id, "processing")
try:
# 1. Load workbook data with tenant context
data = load_workbook_data(tenant_id, workbook_id, options)
# 2. Apply PII redaction
update_export_status(export_id, "redacting")
redacted_data, redacted_fields = apply_redaction(
tenant_id, data, options.get("redaction_profile")
)
# 3. Render to target format
output_path = render_export(format, redacted_data, options)
# 4. Sign with provenance
update_export_status(export_id, "signing")
signed_path = sign_export(
output_path, format, tenant_id, export_id,
workbook_id, redacted_fields
)
# 5. Antivirus scan
update_export_status(export_id, "scanning")
scan_result = scan_file(signed_path)
if scan_result.status != "clean":
update_export_status(export_id, "quarantined")
raise SecurityError(f"AV threat: {scan_result.threat_name}")
# 6. Store and mark complete
store_export(export_id, signed_path)
update_export_status(export_id, "complete")
except SecurityError:
raise # Do not retry security failures
except Exception as exc:
update_export_status(export_id, "failed", error=str(exc))
raise self.retry(exc=exc)
Queue Configuration¶
| Queue | Concurrency | Purpose | Priority |
|---|---|---|---|
export | 4 workers | Standard export jobs | Normal |
priority | 2 workers | Urgent exports (compliance deadlines) | High |
default | 8 workers | Parsing, general tasks | Normal |
Export TTL
Completed exports are stored for 7 days by default (configurable via EXPORT_TTL_DAYS). A periodic Celery beat task (cleanup_expired_exports) removes expired files and transitions their status to expired.
Streaming Exports¶
For large datasets that exceed memory limits, the pipeline supports streaming export:
class StreamingCSVRenderer:
"""Streams CSV output in chunks to avoid loading
entire datasets into memory.
"""
CHUNK_SIZE = 10_000 # rows per chunk
def render_streaming(
self,
data_iterator,
output_path: str,
columns: list[str],
):
"""Write CSV in chunks from a database cursor iterator."""
with open(output_path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=columns)
writer.writeheader()
for chunk in self._chunked(data_iterator, self.CHUNK_SIZE):
writer.writerows(chunk)
@staticmethod
def _chunked(iterator, size):
"""Yield successive chunks from an iterator."""
chunk = []
for item in iterator:
chunk.append(item)
if len(chunk) >= size:
yield chunk
chunk = []
if chunk:
yield chunk
Metrics and Monitoring¶
The export pipeline exposes Prometheus metrics for observability:
| Metric | Type | Description |
|---|---|---|
export_requests_total | Counter | Total exports requested, labeled by format |
export_duration_seconds | Histogram | End-to-end export time |
export_file_size_bytes | Histogram | Generated file sizes |
export_redaction_fields_total | Counter | Number of fields redacted |
export_av_scan_duration_seconds | Histogram | Antivirus scan time |
export_failures_total | Counter | Failed exports by reason |
export_quarantined_total | Counter | Files quarantined by AV |
Configuration Reference¶
# Export limits
EXPORT_MAX_FILE_SIZE_MB: 100 # Maximum output file size
EXPORT_TTL_DAYS: 7 # Days before expired exports are cleaned
EXPORT_MAX_CONCURRENT: 10 # Max concurrent export tasks per tenant
# Provenance
EXPORT_SIGNING_ENABLED: true # Enable/disable HMAC signing
EXPORT_HASH_ALGORITHM: sha256 # Canonical hash algorithm
# Security
EXPORT_AV_ENABLED: true # Enable/disable antivirus scanning
EXPORT_AV_SOCKET: /var/run/clamav/clamd.ctl
EXPORT_PII_REDACTION_ENABLED: true # Enable/disable PII redaction
# Storage
EXPORT_STORAGE_BACKEND: s3 # s3 or local
EXPORT_S3_BUCKET: calcbridge-exports
EXPORT_LOCAL_PATH: /tmp/exports
Error Handling¶
| Error Type | Behavior | Retry |
|---|---|---|
ConnectionError | Database/storage connectivity issue | Yes (up to 2 retries, exponential backoff) |
TimeoutError | Export exceeded time limit | Yes (up to 2 retries) |
SecurityError | AV threat detected | No (terminal, quarantined) |
ValidationError | Invalid export parameters | No (terminal, 400 response) |
PermissionError | Tenant lacks export permission | No (terminal, 403 response) |
FileSizeExceeded | Output exceeds format limit | No (terminal, suggest filtering) |
Security Failures Are Never Retried
When an antivirus scan detects a threat, the export is immediately quarantined. The task does not retry, and a security event is logged to the audit trail. Operations teams receive an alert via Alertmanager.
Related Documentation¶
- Data Flow & Processing Pipeline -- Upstream data ingestion
- Security Architecture -- Encryption and tenant isolation
- System Design -- Component overview and deployment