Skip to content

Security Architecture

CalcBridge implements a defense-in-depth security architecture designed for financial services compliance, including SOC 2 Type II, SOX, and GDPR requirements. This document details authentication, authorization, encryption, and security controls following OWASP best practices.

Security Overview

Defense in Depth

flowchart TB
    subgraph External["External Layer"]
        WAF["WAF / DDoS Protection"]
        TLS["TLS 1.3"]
    end

    subgraph Edge["Edge Layer"]
        RATE["Rate Limiting"]
        HEADERS["Security Headers"]
        SIZE["Request Size Limits"]
    end

    subgraph Application["Application Layer"]
        AUTH["JWT / API Key Auth"]
        RBAC["Role-Based Access"]
        VALID["Input Validation"]
        SAFE["Safe Formula Execution"]
    end

    subgraph Data["Data Layer"]
        RLS["Row-Level Security"]
        ENCRYPT["Field Encryption"]
        AUDIT["Audit Logging"]
    end

    External --> Edge
    Edge --> Application
    Application --> Data

    style WAF fill:#FEF3C7,stroke:#F59E0B
    style AUTH fill:#DCFCE7,stroke:#22C55E
    style RLS fill:#DBEAFE,stroke:#3B82F6
    style ENCRYPT fill:#FCE7F3,stroke:#EC4899

Security Principles

Principle Implementation
Least Privilege Role-based permissions with minimal default access
Defense in Depth Multiple security layers (network, application, data)
Zero Trust Validate every request, assume breach
Fail Secure Deny access on errors, log anomalies
Audit Everything Comprehensive logging for compliance
Secure by Default Safe configurations out of the box

Authentication

Authentication Methods

Method Use Case Token Lifetime
JWT Web app, user sessions 15 min access, 7 day refresh
API Key Service integrations Configurable (manual revocation)

JWT Authentication Flow

sequenceDiagram
    participant C as Client
    participant API as FastAPI
    participant AUTH as Auth Service
    participant DB as Database
    participant CACHE as Valkey

    C->>API: POST /auth/login (email, password)
    API->>DB: Verify credentials (Argon2)
    DB-->>API: User record

    alt Credentials valid
        API->>AUTH: Generate tokens
        AUTH->>AUTH: Create JWT (15min) with JTI
        AUTH->>AUTH: Create refresh token (7d)
        AUTH-->>API: Tokens generated
        API-->>C: {access_token, refresh_token, token_type}
    else Invalid
        API->>AUDIT: Log failed attempt
        API-->>C: 401 Unauthorized
    end

    Note over C,API: Subsequent API requests

    C->>API: GET /api/resource
    Note right of C: Authorization: Bearer {token}
    API->>AUTH: Validate JWT signature
    AUTH->>AUTH: Check expiration
    AUTH->>CACHE: Check blocklist (JTI)
    CACHE-->>AUTH: Not blocked
    API->>API: Set tenant context (RLS)
    API-->>C: 200 OK + data

    Note over C,API: Token refresh

    C->>API: POST /auth/refresh
    API->>AUTH: Validate refresh token
    AUTH->>AUTH: Verify type = "refresh"
    API-->>C: {access_token}

Token Configuration

# src/core/security.py
from passlib.context import CryptContext
import jwt
import secrets
from datetime import datetime, timedelta, timezone

# Password hashing using Argon2 (no 72-byte limit like bcrypt)
pwd_context = CryptContext(
    schemes=["argon2"],
    deprecated="auto",
)

def create_access_token(data: dict, expires_delta: timedelta = None) -> str:
    """Create JWT access token with claims."""
    to_encode = data.copy()
    now = datetime.now(timezone.utc)
    expire = now + (expires_delta or timedelta(minutes=15))

    to_encode.update({
        "exp": expire,
        "iat": now,
        "jti": secrets.token_hex(16),  # JWT ID for revocation
        "type": "access",
    })
    return jwt.encode(to_encode, settings.jwt.secret_key, algorithm="HS256")

def create_refresh_token(data: dict, expires_delta: timedelta = None) -> str:
    """Create JWT refresh token."""
    to_encode = data.copy()
    now = datetime.now(timezone.utc)
    expire = now + (expires_delta or timedelta(days=7))

    to_encode.update({
        "exp": expire,
        "iat": now,
        "jti": secrets.token_hex(16),
        "type": "refresh",
    })
    return jwt.encode(to_encode, settings.jwt.secret_key, algorithm="HS256")

Token Lifecycle

Token Type Expiration Purpose Revocation
Access Token 15 minutes API authentication JTI blocklist in Valkey
Refresh Token 7 days Obtain new access tokens JTI blocklist in Valkey
API Key Never (manual) Service-to-service auth Database flag + cache

Token Revocation

Tokens can be revoked before expiration using a JTI (JWT ID) blocklist:

# src/core/security.py
async def revoke_token(
    jti: str,
    valkey_client,
    ttl_seconds: int = None,
    user_id: str = None,
) -> bool:
    """Add token to blocklist."""
    key = f"calcbridge:blocklist:{jti}"

    # Default TTL to access token expiration
    if ttl_seconds is None:
        ttl_seconds = settings.jwt.access_token_expire_minutes * 60

    await valkey_client.set(key, "1", ex=ttl_seconds)

    # Track per-user revoked tokens
    if user_id:
        user_key = f"calcbridge:user:{user_id}:revoked"
        await valkey_client.sadd(user_key, jti)
        await valkey_client.expire(user_key, ttl_seconds)

    return True

async def is_token_revoked(jti: str, valkey_client) -> bool:
    """Check if token is blocklisted."""
    return await valkey_client.exists(f"calcbridge:blocklist:{jti}") > 0

async def revoke_all_user_tokens(user_id: str, valkey_client, active_jtis: list) -> int:
    """Revoke all tokens for a user (security incident response)."""
    ttl = settings.jwt.refresh_token_expire_days * 24 * 60 * 60
    for jti in active_jtis:
        await revoke_token(jti, valkey_client, ttl, user_id)
    return len(active_jtis)

API Key Authentication

For service-to-service communication:

# src/core/security.py
def generate_api_key() -> tuple[str, str, str]:
    """Generate API key with prefix and hash.

    Returns:
        Tuple of (full_key, key_prefix, key_hash)
    """
    random_part = secrets.token_urlsafe(32)
    full_key = f"cb_live_{random_part}"
    key_prefix = full_key[:16]  # For identification

    # Use SHA-256 for API keys (bcrypt has 72-byte limit)
    key_hash = hashlib.sha256(full_key.encode()).hexdigest()

    return full_key, key_prefix, key_hash

def verify_api_key(api_key: str, key_hash: str) -> bool:
    """Verify API key against stored hash (constant-time)."""
    computed_hash = hashlib.sha256(api_key.encode()).hexdigest()
    return secrets.compare_digest(computed_hash, key_hash)

Password Requirements

Requirement Value
Minimum length 12 characters
Uppercase At least 1
Lowercase At least 1
Numbers At least 1
Special characters At least 1
Hashing algorithm Argon2id

Authorization

Role-Based Access Control (RBAC)

CalcBridge implements a hierarchical role system with four levels:

graph TB
    OWNER["Owner"]
    ADMIN["Admin"]
    ANALYST["Analyst"]
    VIEWER["Viewer"]

    OWNER --> ADMIN
    ADMIN --> ANALYST
    ANALYST --> VIEWER

    subgraph Permissions
        P1["Tenant Settings"]
        P2["User Management"]
        P3["API Key Management"]
        P4["Workbook CRUD"]
        P5["Run Calculations"]
        P6["View Data"]
        P7["Export Data"]
    end

    OWNER --- P1
    OWNER --- P2
    ADMIN --- P3
    ADMIN --- P4
    ANALYST --- P5
    VIEWER --- P6

Permission Matrix

Permission Owner Admin Analyst Viewer
Manage Tenant Settings Yes - - -
Manage Users Yes Yes - -
Create/Revoke API Keys Yes Yes - -
Create Workbooks Yes Yes Yes -
Delete Workbooks Yes Yes - -
Run Calculations Yes Yes Yes -
Create Scenarios Yes Yes Yes -
View Data Yes Yes Yes Yes
Export Data Yes Yes Yes -

Permission Enforcement

# src/api/dependencies.py
from enum import Enum
from fastapi import Depends, HTTPException, status

class Permission(str, Enum):
    MANAGE_TENANT = "manage_tenant"
    MANAGE_USERS = "manage_users"
    MANAGE_WORKBOOKS = "manage_workbooks"
    RUN_CALCULATIONS = "run_calculations"
    VIEW_DATA = "view_data"

ROLE_PERMISSIONS = {
    "owner": {Permission.MANAGE_TENANT, Permission.MANAGE_USERS,
              Permission.MANAGE_WORKBOOKS, Permission.RUN_CALCULATIONS,
              Permission.VIEW_DATA},
    "admin": {Permission.MANAGE_USERS, Permission.MANAGE_WORKBOOKS,
              Permission.RUN_CALCULATIONS, Permission.VIEW_DATA},
    "analyst": {Permission.MANAGE_WORKBOOKS, Permission.RUN_CALCULATIONS,
                Permission.VIEW_DATA},
    "viewer": {Permission.VIEW_DATA},
}

def require_permission(permission: Permission):
    """Dependency that checks user has required permission."""
    async def check_permission(current_user: User = Depends(get_current_user)):
        user_permissions = ROLE_PERMISSIONS.get(current_user.role, set())
        if permission not in user_permissions:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Permission denied: {permission.value} required"
            )
        return current_user
    return check_permission

# Usage in routes
@router.delete("/workbooks/{workbook_id}")
async def delete_workbook(
    workbook_id: UUID,
    current_user: User = Depends(require_permission(Permission.MANAGE_WORKBOOKS))
):
    pass

Tenant Isolation

Row-Level Security (RLS) enforces data isolation at the database level. See Multi-Tenancy for complete details.

Key isolation layers:

Layer Protection
Application Tenant middleware validates JWT/API key
ORM Repositories filter by tenant_id
Database RLS policies enforce isolation
Network SSL/TLS encryption in transit

Encryption

Password Hashing (Argon2)

CalcBridge uses Argon2id for password hashing, the winner of the Password Hashing Competition:

# src/core/security.py
from passlib.context import CryptContext

pwd_context = CryptContext(
    schemes=["argon2"],
    deprecated="auto",
)

def hash_password(password: str) -> str:
    """Hash password using Argon2id."""
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    """Verify password against hash."""
    return pwd_context.verify(plain_password, hashed_password)

Encryption at Rest (AES-256-GCM)

For PII and sensitive financial data, CalcBridge implements field-level encryption:

flowchart LR
    subgraph Input
        PLAIN["Plaintext<br/>(SSN: 123-45-6789)"]
    end

    subgraph KeyDerivation["Key Derivation"]
        MASTER["Master Key"]
        HKDF["HKDF-SHA256"]
        CONTEXT["Context<br/>(tenant:field)"]
    end

    subgraph Encryption
        AES["AES-256-GCM"]
        NONCE["96-bit Nonce"]
    end

    subgraph Output
        CIPHER["Base64 Ciphertext<br/>(version + nonce + ciphertext + tag)"]
    end

    PLAIN --> AES
    MASTER --> HKDF
    CONTEXT --> HKDF
    HKDF --> AES
    NONCE --> AES
    AES --> CIPHER

    style AES fill:#DCFCE7,stroke:#22C55E
    style HKDF fill:#DBEAFE,stroke:#3B82F6

Key Features:

Feature Implementation
Algorithm AES-256-GCM (authenticated encryption)
Key Derivation HKDF-SHA256 with tenant+field context
Nonce 96-bit cryptographically random (never reused)
Encoding URL-safe Base64 for storage
Key Rotation Dual-key support for zero-downtime rotation
# src/services/encryption/encryption_service.py
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes
import secrets
import struct
import base64

NONCE_SIZE = 12  # 96 bits for AES-GCM
KEY_SIZE = 32    # 256 bits
ENCRYPTION_VERSION = 1

class EncryptionService:
    """AES-256-GCM field-level encryption with key derivation."""

    def __init__(self, master_key: str, previous_master_key: str = None):
        if len(master_key) < 32:
            raise ValueError("Master key must be at least 32 characters")
        self._master_key = master_key.encode("utf-8")
        self._previous_master_key = (
            previous_master_key.encode("utf-8") if previous_master_key else None
        )

    def _derive_key(self, context: str, master_key: bytes) -> bytes:
        """Derive context-specific key using HKDF."""
        hkdf = HKDF(
            algorithm=hashes.SHA256(),
            length=KEY_SIZE,
            salt=b"calcbridge-encryption-v1",
            info=context.encode("utf-8"),
        )
        return hkdf.derive(master_key)

    def encrypt(self, plaintext: str, context: str = "") -> str:
        """Encrypt with context-based key derivation."""
        key = self._derive_key(context, self._master_key)
        aesgcm = AESGCM(key)

        nonce = secrets.token_bytes(NONCE_SIZE)
        ciphertext = aesgcm.encrypt(
            nonce,
            plaintext.encode("utf-8"),
            context.encode("utf-8") if context else None,  # AAD
        )

        # Pack: version (1 byte) + nonce (12 bytes) + ciphertext + tag
        packed = struct.pack("B", ENCRYPTION_VERSION) + nonce + ciphertext
        return base64.urlsafe_b64encode(packed).decode("ascii")

    def decrypt(self, ciphertext: str, context: str = "") -> str:
        """Decrypt with automatic old key fallback."""
        packed = base64.urlsafe_b64decode(ciphertext.encode("ascii"))
        version = struct.unpack("B", packed[:1])[0]
        nonce = packed[1:13]
        encrypted_data = packed[13:]

        # Try current key first
        key = self._derive_key(context, self._master_key)
        try:
            aesgcm = AESGCM(key)
            plaintext = aesgcm.decrypt(
                nonce, encrypted_data,
                context.encode("utf-8") if context else None
            )
            return plaintext.decode("utf-8")
        except Exception:
            # Fall back to previous key if available
            if self._previous_master_key:
                old_key = self._derive_key(context, self._previous_master_key)
                aesgcm_old = AESGCM(old_key)
                plaintext = aesgcm_old.decrypt(
                    nonce, encrypted_data,
                    context.encode("utf-8") if context else None
                )
                return plaintext.decode("utf-8")
            raise

TLS Requirements

Environment Minimum TLS Certificate
Production TLS 1.3 Valid CA-signed
Staging TLS 1.2 Valid CA-signed
Development TLS 1.2 Self-signed allowed

Database SSL:

DATABASE_URL = (
    "postgresql+psycopg://user:pass@host:5432/calcbridge"
    "?sslmode=verify-full"
)

Valkey SSL:

VALKEY_URL = "rediss://host:6379/0?ssl_cert_reqs=required"

Security Headers

CalcBridge applies comprehensive security headers to all responses:

# src/api/middleware/security.py
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
    """Add security headers to all responses."""

    async def dispatch(self, request: Request, call_next) -> Response:
        response = await call_next(request)

        # Prevent MIME-type sniffing
        response.headers["X-Content-Type-Options"] = "nosniff"

        # Prevent clickjacking
        response.headers["X-Frame-Options"] = "DENY"

        # XSS protection (legacy browsers)
        response.headers["X-XSS-Protection"] = "1; mode=block"

        # Control referrer information
        response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"

        # Restrict browser features
        response.headers["Permissions-Policy"] = (
            "geolocation=(), microphone=(), camera=()"
        )

        # HSTS (production only)
        if settings.is_production:
            response.headers["Strict-Transport-Security"] = (
                "max-age=31536000; includeSubDomains; preload"
            )

        # Content Security Policy
        response.headers["Content-Security-Policy"] = (
            "default-src 'self'; "
            "script-src 'self'; "
            "style-src 'self' 'unsafe-inline'; "
            "img-src 'self' data:; "
            "font-src 'self'; "
            "frame-ancestors 'none'; "
            "base-uri 'self'"
        )

        return response

Security Headers Summary

Header Value Protection
X-Content-Type-Options nosniff MIME-type sniffing
X-Frame-Options DENY Clickjacking
X-XSS-Protection 1; mode=block XSS (legacy browsers)
Strict-Transport-Security max-age=31536000; includeSubDomains; preload Protocol downgrade
Content-Security-Policy See above XSS, injection attacks
Referrer-Policy strict-origin-when-cross-origin Information leakage
Permissions-Policy Restrictive Browser feature abuse

Input Validation

Pydantic Validation

All API inputs are validated using Pydantic models:

from pydantic import BaseModel, Field, EmailStr, SecretStr

class UserCreate(BaseModel):
    """User creation request with validation."""
    email: EmailStr
    password: SecretStr = Field(min_length=12)
    full_name: str = Field(min_length=1, max_length=255)
    organization_name: str = Field(min_length=1, max_length=255)

    class Config:
        extra = "forbid"  # Reject extra fields (injection prevention)

SQL Injection Prevention

CalcBridge uses SQLAlchemy ORM with parameterized queries:

# SAFE - Parameterized query
result = await session.execute(
    select(Workbook).where(Workbook.id == workbook_id)
)

# SAFE - Text with parameters
await session.execute(
    text("SELECT * FROM workbooks WHERE tenant_id = :tenant_id"),
    {"tenant_id": tenant_id}
)

File Upload Validation

# src/imports/validation.py
ALLOWED_EXTENSIONS = {".xlsx", ".xls", ".xlsm"}
MAX_FILE_SIZE = 100 * 1024 * 1024  # 100MB

def validate_upload(file: UploadFile) -> ValidationResult:
    """Validate uploaded Excel file."""
    # Check extension
    ext = Path(file.filename).suffix.lower()
    if ext not in ALLOWED_EXTENSIONS:
        raise ValidationError(f"Unsupported format: {ext}")

    # Check size
    if file.size > MAX_FILE_SIZE:
        raise ValidationError("File too large")

    # Verify MIME type matches extension
    mime_type = magic.from_buffer(file.file.read(2048), mime=True)
    if not is_valid_excel_mime(mime_type, ext):
        raise ValidationError("Invalid file content")

    return ValidationResult(valid=True)

Formula Injection Prevention

The formula engine uses AST-based parsing with explicit whitelisting:

# src/calculations/safe_evaluator.py
class SafeExpressionEvaluator:
    """Safe expression evaluator - NO dynamic code execution."""

    MAX_EXPRESSION_LENGTH = 10000
    MAX_DEPTH = 50

    # Dangerous patterns - BLOCKED
    DANGEROUS_PATTERNS = [
        r'__\w+__',      # Dunder methods
        r'\bimport\b',   # Import statements
        r'\bexec\b',     # Dynamic execution
        r'\bcompile\b',  # Code compilation
        r'\bopen\b',     # File operations
        r'\bgetattr\b',  # Attribute access
        r'\bglobals\b',  # Globals access
        r'\blocals\b',   # Locals access
        r'\bos\.\w+',    # OS module
        r'\bsys\.\w+',   # Sys module
    ]

    # Whitelisted functions ONLY
    SAFE_FUNCTIONS = {
        'abs': abs,
        'round': round,
        'min': min,
        'max': max,
        'sum': sum,
        'SUM': lambda *args: sum(args),
        'AVERAGE': lambda *args: sum(args) / len(args),
        # ... other safe Excel functions
    }

Audit Logging

What is Logged

CalcBridge maintains comprehensive audit logs for SOC 2/SOX compliance:

# src/services/audit/audit_service.py
class AuditAction(str, Enum):
    # Authentication
    USER_LOGIN = "user.login"
    USER_LOGOUT = "user.logout"
    USER_LOGIN_FAILED = "user.login_failed"
    USER_PASSWORD_CHANGED = "user.password_changed"

    # User Management
    USER_CREATE = "user.create"
    USER_UPDATE = "user.update"
    USER_DELETE = "user.delete"

    # API Keys
    API_KEY_CREATE = "api_key.create"
    API_KEY_REVOKE = "api_key.revoke"

    # Workbook Operations
    WORKBOOK_CREATE = "workbook.create"
    WORKBOOK_UPDATE = "workbook.update"
    WORKBOOK_DELETE = "workbook.delete"
    WORKBOOK_UPLOAD = "workbook.upload"
    WORKBOOK_EXPORT = "workbook.export"

    # Calculations
    CALCULATION_RUN = "calculation.run"
    CALCULATION_COMPLETE = "calculation.complete"
    CALCULATION_FAILED = "calculation.failed"

Audit Event Structure

@dataclass
class AuditEvent:
    """Immutable audit event record."""
    id: UUID
    tenant_id: UUID
    user_id: Optional[UUID]
    action: AuditAction
    entity_type: str
    entity_id: Optional[UUID]
    old_value: Optional[Dict]  # Previous state
    new_value: Optional[Dict]  # New state
    metadata: Dict
    ip_address: Optional[str]
    user_agent: Optional[str]
    request_id: Optional[str]
    timestamp: datetime

Sensitive Field Redaction

Audit logs automatically redact sensitive data:

SENSITIVE_FIELDS = {
    "password", "password_hash", "api_key", "secret", "secret_key",
    "ssn", "social_security", "credit_card", "card_number",
    "cvv", "pin", "token", "access_token", "refresh_token",
    "private_key", "encryption_key",
}

def _redact_sensitive(self, data: Dict) -> Dict:
    """Redact sensitive fields before logging."""
    redacted = {}
    for key, value in data.items():
        if any(s in key.lower() for s in self.SENSITIVE_FIELDS):
            redacted[key] = "[REDACTED]"
        elif isinstance(value, dict):
            redacted[key] = self._redact_sensitive(value)
        else:
            redacted[key] = value
    return redacted

Log Retention

Tier Retention Storage
Free 30 days Database only
Standard 90 days Database only
Premium 1 year Database + S3
Enterprise 7 years Database + S3 (WORM)

SOC 2/SOX Compliance

For compliance, audit logs are dual-written to S3 with Object Lock (WORM):

async def _write_to_s3(self, event: AuditEvent) -> None:
    """Write to S3 with Object Lock for 7-year retention."""
    retention_date = datetime(
        event.timestamp.year + 7,
        event.timestamp.month,
        event.timestamp.day,
        tzinfo=timezone.utc,
    )

    await self.s3_client.put_object(
        Bucket=self.s3_bucket,
        Key=f"audit/{event.tenant_id}/{event.timestamp.year}/{event.id}.json",
        Body=json.dumps(event.to_dict()),
        ObjectLockMode="COMPLIANCE",
        ObjectLockRetainUntilDate=retention_date,
        ServerSideEncryption="aws:kms",
    )

Security Best Practices

Key Rotation Schedule

Secret Rotation Frequency Method
JWT Secret 90 days Rolling deployment
Encryption Master Key Annual Dual-key rotation
API Keys On compromise User-initiated revocation
Database Credentials 90 days Secrets manager

Dependency Scanning

CalcBridge uses automated security scanning:

# CI/CD security checks
- name: Dependency Audit
  run: pip-audit --strict --vulnerability-service osv

- name: SAST Scan
  uses: returntocorp/semgrep-action@v1
  with:
    config: p/security-audit

Security Testing

Test Type Tool Frequency
SAST Semgrep Every PR
Dependency Scan pip-audit Daily
Container Scan Trivy Every build
Penetration Test External Annual

Security Checklist

Production Deployment

  • JWT_SECRET_KEY is unique and securely generated (32+ chars)
  • ENCRYPTION_MASTER_KEY is unique and securely stored
  • TLS 1.3 configured with valid certificates
  • Database SSL mode is verify-full
  • Valkey SSL is enabled
  • Rate limiting is enabled
  • Security headers middleware is active
  • Audit logging is enabled with S3 backup
  • WAF/DDoS protection is configured
  • Monitoring and alerting are operational

Code Review Checklist

  • No hardcoded secrets or credentials
  • All inputs are validated (Pydantic models)
  • SQL queries use parameterization
  • Sensitive data is encrypted at rest
  • Authentication required for protected endpoints
  • Authorization checks enforce permissions
  • Error messages do not leak sensitive information
  • Audit events logged for security-relevant operations

Incident Response

Severity Examples Response Time
Critical Data breach, RCE Immediate
High Auth bypass, SQLi 1 hour
Medium XSS, CSRF 24 hours
Low Info disclosure 1 week

Alerting Thresholds

Metric Threshold Alert Channel
Failed logins >10/min per user PagerDuty
Rate limit violations >100/min Slack
Invalid API keys >50/hour Email
RLS policy violations Any PagerDuty
Token revocations >10/min Slack