Security Architecture¶
CalcBridge implements a defense-in-depth security architecture designed for financial services compliance, including SOC 2 Type II, SOX, and GDPR requirements. This document details authentication, authorization, encryption, and security controls following OWASP best practices.
Security Overview¶
Defense in Depth¶
flowchart TB
subgraph External["External Layer"]
WAF["WAF / DDoS Protection"]
TLS["TLS 1.3"]
end
subgraph Edge["Edge Layer"]
RATE["Rate Limiting"]
HEADERS["Security Headers"]
SIZE["Request Size Limits"]
end
subgraph Application["Application Layer"]
AUTH["JWT / API Key Auth"]
RBAC["Role-Based Access"]
VALID["Input Validation"]
SAFE["Safe Formula Execution"]
end
subgraph Data["Data Layer"]
RLS["Row-Level Security"]
ENCRYPT["Field Encryption"]
AUDIT["Audit Logging"]
end
External --> Edge
Edge --> Application
Application --> Data
style WAF fill:#FEF3C7,stroke:#F59E0B
style AUTH fill:#DCFCE7,stroke:#22C55E
style RLS fill:#DBEAFE,stroke:#3B82F6
style ENCRYPT fill:#FCE7F3,stroke:#EC4899 Security Principles¶
| Principle | Implementation |
|---|---|
| Least Privilege | Role-based permissions with minimal default access |
| Defense in Depth | Multiple security layers (network, application, data) |
| Zero Trust | Validate every request, assume breach |
| Fail Secure | Deny access on errors, log anomalies |
| Audit Everything | Comprehensive logging for compliance |
| Secure by Default | Safe configurations out of the box |
Authentication¶
Authentication Methods¶
| Method | Use Case | Token Lifetime |
|---|---|---|
| JWT | Web app, user sessions | 15 min access, 7 day refresh |
| API Key | Service integrations | Configurable (manual revocation) |
JWT Authentication Flow¶
sequenceDiagram
participant C as Client
participant API as FastAPI
participant AUTH as Auth Service
participant DB as Database
participant CACHE as Valkey
C->>API: POST /auth/login (email, password)
API->>DB: Verify credentials (Argon2)
DB-->>API: User record
alt Credentials valid
API->>AUTH: Generate tokens
AUTH->>AUTH: Create JWT (15min) with JTI
AUTH->>AUTH: Create refresh token (7d)
AUTH-->>API: Tokens generated
API-->>C: {access_token, refresh_token, token_type}
else Invalid
API->>AUDIT: Log failed attempt
API-->>C: 401 Unauthorized
end
Note over C,API: Subsequent API requests
C->>API: GET /api/resource
Note right of C: Authorization: Bearer {token}
API->>AUTH: Validate JWT signature
AUTH->>AUTH: Check expiration
AUTH->>CACHE: Check blocklist (JTI)
CACHE-->>AUTH: Not blocked
API->>API: Set tenant context (RLS)
API-->>C: 200 OK + data
Note over C,API: Token refresh
C->>API: POST /auth/refresh
API->>AUTH: Validate refresh token
AUTH->>AUTH: Verify type = "refresh"
API-->>C: {access_token} Token Configuration¶
# src/core/security.py
from passlib.context import CryptContext
import jwt
import secrets
from datetime import datetime, timedelta, timezone
# Password hashing using Argon2 (no 72-byte limit like bcrypt)
pwd_context = CryptContext(
schemes=["argon2"],
deprecated="auto",
)
def create_access_token(data: dict, expires_delta: timedelta = None) -> str:
"""Create JWT access token with claims."""
to_encode = data.copy()
now = datetime.now(timezone.utc)
expire = now + (expires_delta or timedelta(minutes=15))
to_encode.update({
"exp": expire,
"iat": now,
"jti": secrets.token_hex(16), # JWT ID for revocation
"type": "access",
})
return jwt.encode(to_encode, settings.jwt.secret_key, algorithm="HS256")
def create_refresh_token(data: dict, expires_delta: timedelta = None) -> str:
"""Create JWT refresh token."""
to_encode = data.copy()
now = datetime.now(timezone.utc)
expire = now + (expires_delta or timedelta(days=7))
to_encode.update({
"exp": expire,
"iat": now,
"jti": secrets.token_hex(16),
"type": "refresh",
})
return jwt.encode(to_encode, settings.jwt.secret_key, algorithm="HS256")
Token Lifecycle¶
| Token Type | Expiration | Purpose | Revocation |
|---|---|---|---|
| Access Token | 15 minutes | API authentication | JTI blocklist in Valkey |
| Refresh Token | 7 days | Obtain new access tokens | JTI blocklist in Valkey |
| API Key | Never (manual) | Service-to-service auth | Database flag + cache |
Token Revocation¶
Tokens can be revoked before expiration using a JTI (JWT ID) blocklist:
# src/core/security.py
async def revoke_token(
jti: str,
valkey_client,
ttl_seconds: int = None,
user_id: str = None,
) -> bool:
"""Add token to blocklist."""
key = f"calcbridge:blocklist:{jti}"
# Default TTL to access token expiration
if ttl_seconds is None:
ttl_seconds = settings.jwt.access_token_expire_minutes * 60
await valkey_client.set(key, "1", ex=ttl_seconds)
# Track per-user revoked tokens
if user_id:
user_key = f"calcbridge:user:{user_id}:revoked"
await valkey_client.sadd(user_key, jti)
await valkey_client.expire(user_key, ttl_seconds)
return True
async def is_token_revoked(jti: str, valkey_client) -> bool:
"""Check if token is blocklisted."""
return await valkey_client.exists(f"calcbridge:blocklist:{jti}") > 0
async def revoke_all_user_tokens(user_id: str, valkey_client, active_jtis: list) -> int:
"""Revoke all tokens for a user (security incident response)."""
ttl = settings.jwt.refresh_token_expire_days * 24 * 60 * 60
for jti in active_jtis:
await revoke_token(jti, valkey_client, ttl, user_id)
return len(active_jtis)
API Key Authentication¶
For service-to-service communication:
# src/core/security.py
def generate_api_key() -> tuple[str, str, str]:
"""Generate API key with prefix and hash.
Returns:
Tuple of (full_key, key_prefix, key_hash)
"""
random_part = secrets.token_urlsafe(32)
full_key = f"cb_live_{random_part}"
key_prefix = full_key[:16] # For identification
# Use SHA-256 for API keys (bcrypt has 72-byte limit)
key_hash = hashlib.sha256(full_key.encode()).hexdigest()
return full_key, key_prefix, key_hash
def verify_api_key(api_key: str, key_hash: str) -> bool:
"""Verify API key against stored hash (constant-time)."""
computed_hash = hashlib.sha256(api_key.encode()).hexdigest()
return secrets.compare_digest(computed_hash, key_hash)
Password Requirements¶
| Requirement | Value |
|---|---|
| Minimum length | 12 characters |
| Uppercase | At least 1 |
| Lowercase | At least 1 |
| Numbers | At least 1 |
| Special characters | At least 1 |
| Hashing algorithm | Argon2id |
Authorization¶
Role-Based Access Control (RBAC)¶
CalcBridge implements a hierarchical role system with four levels:
graph TB
OWNER["Owner"]
ADMIN["Admin"]
ANALYST["Analyst"]
VIEWER["Viewer"]
OWNER --> ADMIN
ADMIN --> ANALYST
ANALYST --> VIEWER
subgraph Permissions
P1["Tenant Settings"]
P2["User Management"]
P3["API Key Management"]
P4["Workbook CRUD"]
P5["Run Calculations"]
P6["View Data"]
P7["Export Data"]
end
OWNER --- P1
OWNER --- P2
ADMIN --- P3
ADMIN --- P4
ANALYST --- P5
VIEWER --- P6 Permission Matrix¶
| Permission | Owner | Admin | Analyst | Viewer |
|---|---|---|---|---|
| Manage Tenant Settings | Yes | - | - | - |
| Manage Users | Yes | Yes | - | - |
| Create/Revoke API Keys | Yes | Yes | - | - |
| Create Workbooks | Yes | Yes | Yes | - |
| Delete Workbooks | Yes | Yes | - | - |
| Run Calculations | Yes | Yes | Yes | - |
| Create Scenarios | Yes | Yes | Yes | - |
| View Data | Yes | Yes | Yes | Yes |
| Export Data | Yes | Yes | Yes | - |
Permission Enforcement¶
# src/api/dependencies.py
from enum import Enum
from fastapi import Depends, HTTPException, status
class Permission(str, Enum):
MANAGE_TENANT = "manage_tenant"
MANAGE_USERS = "manage_users"
MANAGE_WORKBOOKS = "manage_workbooks"
RUN_CALCULATIONS = "run_calculations"
VIEW_DATA = "view_data"
ROLE_PERMISSIONS = {
"owner": {Permission.MANAGE_TENANT, Permission.MANAGE_USERS,
Permission.MANAGE_WORKBOOKS, Permission.RUN_CALCULATIONS,
Permission.VIEW_DATA},
"admin": {Permission.MANAGE_USERS, Permission.MANAGE_WORKBOOKS,
Permission.RUN_CALCULATIONS, Permission.VIEW_DATA},
"analyst": {Permission.MANAGE_WORKBOOKS, Permission.RUN_CALCULATIONS,
Permission.VIEW_DATA},
"viewer": {Permission.VIEW_DATA},
}
def require_permission(permission: Permission):
"""Dependency that checks user has required permission."""
async def check_permission(current_user: User = Depends(get_current_user)):
user_permissions = ROLE_PERMISSIONS.get(current_user.role, set())
if permission not in user_permissions:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission denied: {permission.value} required"
)
return current_user
return check_permission
# Usage in routes
@router.delete("/workbooks/{workbook_id}")
async def delete_workbook(
workbook_id: UUID,
current_user: User = Depends(require_permission(Permission.MANAGE_WORKBOOKS))
):
pass
Tenant Isolation¶
Row-Level Security (RLS) enforces data isolation at the database level. See Multi-Tenancy for complete details.
Key isolation layers:
| Layer | Protection |
|---|---|
| Application | Tenant middleware validates JWT/API key |
| ORM | Repositories filter by tenant_id |
| Database | RLS policies enforce isolation |
| Network | SSL/TLS encryption in transit |
Encryption¶
Password Hashing (Argon2)¶
CalcBridge uses Argon2id for password hashing, the winner of the Password Hashing Competition:
# src/core/security.py
from passlib.context import CryptContext
pwd_context = CryptContext(
schemes=["argon2"],
deprecated="auto",
)
def hash_password(password: str) -> str:
"""Hash password using Argon2id."""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify password against hash."""
return pwd_context.verify(plain_password, hashed_password)
Encryption at Rest (AES-256-GCM)¶
For PII and sensitive financial data, CalcBridge implements field-level encryption:
flowchart LR
subgraph Input
PLAIN["Plaintext<br/>(SSN: 123-45-6789)"]
end
subgraph KeyDerivation["Key Derivation"]
MASTER["Master Key"]
HKDF["HKDF-SHA256"]
CONTEXT["Context<br/>(tenant:field)"]
end
subgraph Encryption
AES["AES-256-GCM"]
NONCE["96-bit Nonce"]
end
subgraph Output
CIPHER["Base64 Ciphertext<br/>(version + nonce + ciphertext + tag)"]
end
PLAIN --> AES
MASTER --> HKDF
CONTEXT --> HKDF
HKDF --> AES
NONCE --> AES
AES --> CIPHER
style AES fill:#DCFCE7,stroke:#22C55E
style HKDF fill:#DBEAFE,stroke:#3B82F6 Key Features:
| Feature | Implementation |
|---|---|
| Algorithm | AES-256-GCM (authenticated encryption) |
| Key Derivation | HKDF-SHA256 with tenant+field context |
| Nonce | 96-bit cryptographically random (never reused) |
| Encoding | URL-safe Base64 for storage |
| Key Rotation | Dual-key support for zero-downtime rotation |
# src/services/encryption/encryption_service.py
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes
import secrets
import struct
import base64
NONCE_SIZE = 12 # 96 bits for AES-GCM
KEY_SIZE = 32 # 256 bits
ENCRYPTION_VERSION = 1
class EncryptionService:
"""AES-256-GCM field-level encryption with key derivation."""
def __init__(self, master_key: str, previous_master_key: str = None):
if len(master_key) < 32:
raise ValueError("Master key must be at least 32 characters")
self._master_key = master_key.encode("utf-8")
self._previous_master_key = (
previous_master_key.encode("utf-8") if previous_master_key else None
)
def _derive_key(self, context: str, master_key: bytes) -> bytes:
"""Derive context-specific key using HKDF."""
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=KEY_SIZE,
salt=b"calcbridge-encryption-v1",
info=context.encode("utf-8"),
)
return hkdf.derive(master_key)
def encrypt(self, plaintext: str, context: str = "") -> str:
"""Encrypt with context-based key derivation."""
key = self._derive_key(context, self._master_key)
aesgcm = AESGCM(key)
nonce = secrets.token_bytes(NONCE_SIZE)
ciphertext = aesgcm.encrypt(
nonce,
plaintext.encode("utf-8"),
context.encode("utf-8") if context else None, # AAD
)
# Pack: version (1 byte) + nonce (12 bytes) + ciphertext + tag
packed = struct.pack("B", ENCRYPTION_VERSION) + nonce + ciphertext
return base64.urlsafe_b64encode(packed).decode("ascii")
def decrypt(self, ciphertext: str, context: str = "") -> str:
"""Decrypt with automatic old key fallback."""
packed = base64.urlsafe_b64decode(ciphertext.encode("ascii"))
version = struct.unpack("B", packed[:1])[0]
nonce = packed[1:13]
encrypted_data = packed[13:]
# Try current key first
key = self._derive_key(context, self._master_key)
try:
aesgcm = AESGCM(key)
plaintext = aesgcm.decrypt(
nonce, encrypted_data,
context.encode("utf-8") if context else None
)
return plaintext.decode("utf-8")
except Exception:
# Fall back to previous key if available
if self._previous_master_key:
old_key = self._derive_key(context, self._previous_master_key)
aesgcm_old = AESGCM(old_key)
plaintext = aesgcm_old.decrypt(
nonce, encrypted_data,
context.encode("utf-8") if context else None
)
return plaintext.decode("utf-8")
raise
TLS Requirements¶
| Environment | Minimum TLS | Certificate |
|---|---|---|
| Production | TLS 1.3 | Valid CA-signed |
| Staging | TLS 1.2 | Valid CA-signed |
| Development | TLS 1.2 | Self-signed allowed |
Database SSL:
Valkey SSL:
Security Headers¶
CalcBridge applies comprehensive security headers to all responses:
# src/api/middleware/security.py
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
"""Add security headers to all responses."""
async def dispatch(self, request: Request, call_next) -> Response:
response = await call_next(request)
# Prevent MIME-type sniffing
response.headers["X-Content-Type-Options"] = "nosniff"
# Prevent clickjacking
response.headers["X-Frame-Options"] = "DENY"
# XSS protection (legacy browsers)
response.headers["X-XSS-Protection"] = "1; mode=block"
# Control referrer information
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
# Restrict browser features
response.headers["Permissions-Policy"] = (
"geolocation=(), microphone=(), camera=()"
)
# HSTS (production only)
if settings.is_production:
response.headers["Strict-Transport-Security"] = (
"max-age=31536000; includeSubDomains; preload"
)
# Content Security Policy
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data:; "
"font-src 'self'; "
"frame-ancestors 'none'; "
"base-uri 'self'"
)
return response
Security Headers Summary¶
| Header | Value | Protection |
|---|---|---|
X-Content-Type-Options | nosniff | MIME-type sniffing |
X-Frame-Options | DENY | Clickjacking |
X-XSS-Protection | 1; mode=block | XSS (legacy browsers) |
Strict-Transport-Security | max-age=31536000; includeSubDomains; preload | Protocol downgrade |
Content-Security-Policy | See above | XSS, injection attacks |
Referrer-Policy | strict-origin-when-cross-origin | Information leakage |
Permissions-Policy | Restrictive | Browser feature abuse |
Input Validation¶
Pydantic Validation¶
All API inputs are validated using Pydantic models:
from pydantic import BaseModel, Field, EmailStr, SecretStr
class UserCreate(BaseModel):
"""User creation request with validation."""
email: EmailStr
password: SecretStr = Field(min_length=12)
full_name: str = Field(min_length=1, max_length=255)
organization_name: str = Field(min_length=1, max_length=255)
class Config:
extra = "forbid" # Reject extra fields (injection prevention)
SQL Injection Prevention¶
CalcBridge uses SQLAlchemy ORM with parameterized queries:
# SAFE - Parameterized query
result = await session.execute(
select(Workbook).where(Workbook.id == workbook_id)
)
# SAFE - Text with parameters
await session.execute(
text("SELECT * FROM workbooks WHERE tenant_id = :tenant_id"),
{"tenant_id": tenant_id}
)
File Upload Validation¶
# src/imports/validation.py
ALLOWED_EXTENSIONS = {".xlsx", ".xls", ".xlsm"}
MAX_FILE_SIZE = 100 * 1024 * 1024 # 100MB
def validate_upload(file: UploadFile) -> ValidationResult:
"""Validate uploaded Excel file."""
# Check extension
ext = Path(file.filename).suffix.lower()
if ext not in ALLOWED_EXTENSIONS:
raise ValidationError(f"Unsupported format: {ext}")
# Check size
if file.size > MAX_FILE_SIZE:
raise ValidationError("File too large")
# Verify MIME type matches extension
mime_type = magic.from_buffer(file.file.read(2048), mime=True)
if not is_valid_excel_mime(mime_type, ext):
raise ValidationError("Invalid file content")
return ValidationResult(valid=True)
Formula Injection Prevention¶
The formula engine uses AST-based parsing with explicit whitelisting:
# src/calculations/safe_evaluator.py
class SafeExpressionEvaluator:
"""Safe expression evaluator - NO dynamic code execution."""
MAX_EXPRESSION_LENGTH = 10000
MAX_DEPTH = 50
# Dangerous patterns - BLOCKED
DANGEROUS_PATTERNS = [
r'__\w+__', # Dunder methods
r'\bimport\b', # Import statements
r'\bexec\b', # Dynamic execution
r'\bcompile\b', # Code compilation
r'\bopen\b', # File operations
r'\bgetattr\b', # Attribute access
r'\bglobals\b', # Globals access
r'\blocals\b', # Locals access
r'\bos\.\w+', # OS module
r'\bsys\.\w+', # Sys module
]
# Whitelisted functions ONLY
SAFE_FUNCTIONS = {
'abs': abs,
'round': round,
'min': min,
'max': max,
'sum': sum,
'SUM': lambda *args: sum(args),
'AVERAGE': lambda *args: sum(args) / len(args),
# ... other safe Excel functions
}
Audit Logging¶
What is Logged¶
CalcBridge maintains comprehensive audit logs for SOC 2/SOX compliance:
# src/services/audit/audit_service.py
class AuditAction(str, Enum):
# Authentication
USER_LOGIN = "user.login"
USER_LOGOUT = "user.logout"
USER_LOGIN_FAILED = "user.login_failed"
USER_PASSWORD_CHANGED = "user.password_changed"
# User Management
USER_CREATE = "user.create"
USER_UPDATE = "user.update"
USER_DELETE = "user.delete"
# API Keys
API_KEY_CREATE = "api_key.create"
API_KEY_REVOKE = "api_key.revoke"
# Workbook Operations
WORKBOOK_CREATE = "workbook.create"
WORKBOOK_UPDATE = "workbook.update"
WORKBOOK_DELETE = "workbook.delete"
WORKBOOK_UPLOAD = "workbook.upload"
WORKBOOK_EXPORT = "workbook.export"
# Calculations
CALCULATION_RUN = "calculation.run"
CALCULATION_COMPLETE = "calculation.complete"
CALCULATION_FAILED = "calculation.failed"
Audit Event Structure¶
@dataclass
class AuditEvent:
"""Immutable audit event record."""
id: UUID
tenant_id: UUID
user_id: Optional[UUID]
action: AuditAction
entity_type: str
entity_id: Optional[UUID]
old_value: Optional[Dict] # Previous state
new_value: Optional[Dict] # New state
metadata: Dict
ip_address: Optional[str]
user_agent: Optional[str]
request_id: Optional[str]
timestamp: datetime
Sensitive Field Redaction¶
Audit logs automatically redact sensitive data:
SENSITIVE_FIELDS = {
"password", "password_hash", "api_key", "secret", "secret_key",
"ssn", "social_security", "credit_card", "card_number",
"cvv", "pin", "token", "access_token", "refresh_token",
"private_key", "encryption_key",
}
def _redact_sensitive(self, data: Dict) -> Dict:
"""Redact sensitive fields before logging."""
redacted = {}
for key, value in data.items():
if any(s in key.lower() for s in self.SENSITIVE_FIELDS):
redacted[key] = "[REDACTED]"
elif isinstance(value, dict):
redacted[key] = self._redact_sensitive(value)
else:
redacted[key] = value
return redacted
Log Retention¶
| Tier | Retention | Storage |
|---|---|---|
| Free | 30 days | Database only |
| Standard | 90 days | Database only |
| Premium | 1 year | Database + S3 |
| Enterprise | 7 years | Database + S3 (WORM) |
SOC 2/SOX Compliance¶
For compliance, audit logs are dual-written to S3 with Object Lock (WORM):
async def _write_to_s3(self, event: AuditEvent) -> None:
"""Write to S3 with Object Lock for 7-year retention."""
retention_date = datetime(
event.timestamp.year + 7,
event.timestamp.month,
event.timestamp.day,
tzinfo=timezone.utc,
)
await self.s3_client.put_object(
Bucket=self.s3_bucket,
Key=f"audit/{event.tenant_id}/{event.timestamp.year}/{event.id}.json",
Body=json.dumps(event.to_dict()),
ObjectLockMode="COMPLIANCE",
ObjectLockRetainUntilDate=retention_date,
ServerSideEncryption="aws:kms",
)
Security Best Practices¶
Key Rotation Schedule¶
| Secret | Rotation Frequency | Method |
|---|---|---|
| JWT Secret | 90 days | Rolling deployment |
| Encryption Master Key | Annual | Dual-key rotation |
| API Keys | On compromise | User-initiated revocation |
| Database Credentials | 90 days | Secrets manager |
Dependency Scanning¶
CalcBridge uses automated security scanning:
# CI/CD security checks
- name: Dependency Audit
run: pip-audit --strict --vulnerability-service osv
- name: SAST Scan
uses: returntocorp/semgrep-action@v1
with:
config: p/security-audit
Security Testing¶
| Test Type | Tool | Frequency |
|---|---|---|
| SAST | Semgrep | Every PR |
| Dependency Scan | pip-audit | Daily |
| Container Scan | Trivy | Every build |
| Penetration Test | External | Annual |
Security Checklist¶
Production Deployment¶
- JWT_SECRET_KEY is unique and securely generated (32+ chars)
- ENCRYPTION_MASTER_KEY is unique and securely stored
- TLS 1.3 configured with valid certificates
- Database SSL mode is
verify-full - Valkey SSL is enabled
- Rate limiting is enabled
- Security headers middleware is active
- Audit logging is enabled with S3 backup
- WAF/DDoS protection is configured
- Monitoring and alerting are operational
Code Review Checklist¶
- No hardcoded secrets or credentials
- All inputs are validated (Pydantic models)
- SQL queries use parameterization
- Sensitive data is encrypted at rest
- Authentication required for protected endpoints
- Authorization checks enforce permissions
- Error messages do not leak sensitive information
- Audit events logged for security-relevant operations
Incident Response¶
| Severity | Examples | Response Time |
|---|---|---|
| Critical | Data breach, RCE | Immediate |
| High | Auth bypass, SQLi | 1 hour |
| Medium | XSS, CSRF | 24 hours |
| Low | Info disclosure | 1 week |
Alerting Thresholds¶
| Metric | Threshold | Alert Channel |
|---|---|---|
| Failed logins | >10/min per user | PagerDuty |
| Rate limit violations | >100/min | Slack |
| Invalid API keys | >50/hour | |
| RLS policy violations | Any | PagerDuty |
| Token revocations | >10/min | Slack |