Multi-Tenancy & Isolation
CalcBridge implements a robust multi-tenancy architecture using PostgreSQL Row-Level Security (RLS) for complete data isolation between tenants. This document details the tenant model, isolation mechanisms, and access control patterns.
Tenant Model
Hierarchy
graph TB
subgraph Tenant["Tenant (Organization)"]
T["Tenant"]
end
subgraph Users["Users"]
OWNER["Owner"]
ADMIN["Admin"]
ANALYST["Analyst"]
VIEWER["Viewer"]
end
subgraph Resources["Resources"]
WB["Workbooks"]
KEYS["API Keys"]
SETTINGS["Settings"]
end
T --> OWNER
T --> ADMIN
T --> ANALYST
T --> VIEWER
T --> WB
T --> KEYS
T --> SETTINGS
Tenant Entity
# src/db/models/tenant.py
class Tenant(Base):
"""Tenant model representing an organization."""
__tablename__ = "tenants"
id = Column(UUID, primary_key=True, default=uuid4)
name = Column(String(255), nullable=False)
slug = Column(String(100), unique=True, nullable=False)
subscription_tier = Column(String(50), default="free")
is_active = Column(Boolean, default=True)
settings = Column(JSONB, default={})
created_at = Column(DateTime, default=datetime.utcnow)
deleted_at = Column(DateTime, nullable=True)
# Relationships
users = relationship("User", back_populates="tenant")
workbooks = relationship("Workbook", back_populates="tenant")
api_keys = relationship("ApiKey", back_populates="tenant")
Tenant Context
# src/core/tenant/context.py
from contextvars import ContextVar
from dataclasses import dataclass
@dataclass
class TenantContext:
"""Thread-safe tenant context for the request."""
tenant_id: UUID
tenant_slug: str
subscription_tier: str
user_id: Optional[UUID] = None
user_role: Optional[str] = None
request_id: Optional[str] = None
# Context variable (thread-safe)
_tenant_context: ContextVar[Optional[TenantContext]] = ContextVar(
"tenant_context",
default=None
)
def get_current_tenant() -> Optional[TenantContext]:
"""Get the current tenant context."""
return _tenant_context.get()
def set_current_tenant(context: TenantContext) -> None:
"""Set the current tenant context."""
_tenant_context.set(context)
def require_tenant() -> TenantContext:
"""Get tenant context, raising if not set."""
context = get_current_tenant()
if context is None:
raise RuntimeError("No tenant context set")
return context
Row-Level Security (RLS)
RLS Policy Architecture
flowchart TB
subgraph Application["Application Layer"]
APP["FastAPI"]
MW["Tenant Middleware"]
end
subgraph Database["Database Layer"]
SET["SET app.current_tenant_id"]
QUERY["SELECT * FROM workbooks"]
RLS["RLS Policy Check"]
RESULT["Filtered Results"]
end
APP --> MW
MW --> SET
SET --> QUERY
QUERY --> RLS
RLS --> RESULT
style RLS fill:#DCFCE7,stroke:#22C55E
RLS Policy Implementation
-- db/migrations/V002__enable_rls.sql
-- Enable RLS on tenant-scoped tables
ALTER TABLE workbooks ENABLE ROW LEVEL SECURITY;
ALTER TABLE sheets ENABLE ROW LEVEL SECURITY;
ALTER TABLE cell_data ENABLE ROW LEVEL SECURITY;
ALTER TABLE api_keys ENABLE ROW LEVEL SECURITY;
ALTER TABLE users ENABLE ROW LEVEL SECURITY;
-- Force RLS for table owners too (defense in depth)
ALTER TABLE workbooks FORCE ROW LEVEL SECURITY;
ALTER TABLE sheets FORCE ROW LEVEL SECURITY;
-- Create RLS policies
CREATE POLICY tenant_isolation_workbooks ON workbooks
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);
CREATE POLICY tenant_isolation_sheets ON sheets
USING (
workbook_id IN (
SELECT id FROM workbooks
WHERE tenant_id = current_setting('app.current_tenant_id')::uuid
)
);
CREATE POLICY tenant_isolation_users ON users
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);
CREATE POLICY tenant_isolation_api_keys ON api_keys
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);
Setting Tenant Context
# src/core/tenant/middleware.py
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
class TenantMiddleware(BaseHTTPMiddleware):
"""Middleware that sets tenant context for each request."""
async def dispatch(self, request: Request, call_next):
# Extract tenant from JWT or API key
tenant_context = await self._extract_tenant(request)
if tenant_context:
# Set Python context
set_current_tenant(tenant_context)
# Set PostgreSQL context for RLS
async with get_db_session() as session:
await session.execute(
text("SET app.current_tenant_id = :tenant_id"),
{"tenant_id": str(tenant_context.tenant_id)}
)
try:
response = await call_next(request)
return response
finally:
clear_current_tenant()
Defense in Depth
Multiple layers protect tenant data:
| Layer | Protection |
| Application | Tenant middleware validates JWT/API key |
| ORM | Repositories filter by tenant_id |
| Database | RLS policies enforce isolation |
| Network | SSL/TLS encryption in transit |
# src/db/repositories/workbook.py
class WorkbookRepository(BaseRepository[Workbook]):
"""Repository with automatic tenant filtering."""
async def list_all(
self,
skip: int = 0,
limit: int = 100
) -> List[Workbook]:
"""List workbooks - automatically filtered by RLS."""
# Even without explicit WHERE clause,
# RLS policy ensures tenant isolation
result = await self.session.execute(
select(Workbook)
.order_by(Workbook.created_at.desc())
.offset(skip)
.limit(limit)
)
return list(result.scalars().all())
User Roles and Permissions
Role Hierarchy
graph TB
OWNER["Owner"]
ADMIN["Admin"]
ANALYST["Analyst"]
VIEWER["Viewer"]
OWNER --> ADMIN
ADMIN --> ANALYST
ANALYST --> VIEWER
subgraph Permissions
P_TENANT["Manage Tenant"]
P_USERS["Manage Users"]
P_WB["Manage Workbooks"]
P_CALC["Run Calculations"]
P_VIEW["View Data"]
end
OWNER --- P_TENANT
OWNER --- P_USERS
ADMIN --- P_WB
ANALYST --- P_CALC
VIEWER --- P_VIEW
Role Permissions Matrix
| Permission | Owner | Admin | Analyst | Viewer |
| Tenant Settings | Yes | No | No | No |
| Manage Users | Yes | Yes | No | No |
| Manage API Keys | Yes | Yes | No | No |
| Create Workbooks | Yes | Yes | Yes | No |
| Delete Workbooks | Yes | Yes | No | No |
| Run Calculations | Yes | Yes | Yes | No |
| Create Scenarios | Yes | Yes | Yes | No |
| View Data | Yes | Yes | Yes | Yes |
| Export Data | Yes | Yes | Yes | No |
Permission Enforcement
# src/api/dependencies.py
from enum import Enum
from fastapi import Depends, HTTPException, status
class Permission(str, Enum):
MANAGE_TENANT = "manage_tenant"
MANAGE_USERS = "manage_users"
MANAGE_WORKBOOKS = "manage_workbooks"
RUN_CALCULATIONS = "run_calculations"
VIEW_DATA = "view_data"
ROLE_PERMISSIONS = {
"owner": {Permission.MANAGE_TENANT, Permission.MANAGE_USERS,
Permission.MANAGE_WORKBOOKS, Permission.RUN_CALCULATIONS,
Permission.VIEW_DATA},
"admin": {Permission.MANAGE_USERS, Permission.MANAGE_WORKBOOKS,
Permission.RUN_CALCULATIONS, Permission.VIEW_DATA},
"analyst": {Permission.MANAGE_WORKBOOKS, Permission.RUN_CALCULATIONS,
Permission.VIEW_DATA},
"viewer": {Permission.VIEW_DATA},
}
def require_permission(permission: Permission):
"""Dependency that checks user has required permission."""
async def check_permission(
current_user: User = Depends(get_current_user)
) -> User:
user_permissions = ROLE_PERMISSIONS.get(current_user.role, set())
if permission not in user_permissions:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission denied: {permission.value} required"
)
return current_user
return check_permission
# Usage in routes
@router.delete("/workbooks/{workbook_id}")
async def delete_workbook(
workbook_id: UUID,
current_user: User = Depends(
require_permission(Permission.MANAGE_WORKBOOKS)
)
):
pass
Subscription Tiers
Tier Configuration
| Tier | Users | Workbooks | API Calls/min | Features |
| Free | 3 | 5 | 60 | Basic calculations |
| Standard | 10 | 50 | 300 | + Compliance testing |
| Premium | 50 | 500 | 1000 | + What-if scenarios |
| Enterprise | Unlimited | Unlimited | 5000 | + Custom integrations |
Tier Enforcement
# src/core/config.py
class SubscriptionTierConfig(BaseModel):
"""Configuration for subscription tiers."""
max_users: int
max_workbooks: int
rate_limit_per_minute: int
features: set[str]
TIER_CONFIGS = {
"free": SubscriptionTierConfig(
max_users=3,
max_workbooks=5,
rate_limit_per_minute=60,
features={"basic_calculations"}
),
"standard": SubscriptionTierConfig(
max_users=10,
max_workbooks=50,
rate_limit_per_minute=300,
features={"basic_calculations", "compliance_testing"}
),
"premium": SubscriptionTierConfig(
max_users=50,
max_workbooks=500,
rate_limit_per_minute=1000,
features={"basic_calculations", "compliance_testing", "what_if_scenarios"}
),
"enterprise": SubscriptionTierConfig(
max_users=999999,
max_workbooks=999999,
rate_limit_per_minute=5000,
features={"basic_calculations", "compliance_testing",
"what_if_scenarios", "custom_integrations", "sso"}
),
}
Feature Gating
# src/api/dependencies.py
def require_feature(feature: str):
"""Dependency that checks tenant has access to feature."""
async def check_feature(
tenant: TenantContext = Depends(get_current_tenant)
) -> TenantContext:
tier_config = TIER_CONFIGS.get(tenant.subscription_tier)
if not tier_config or feature not in tier_config.features:
raise HTTPException(
status_code=status.HTTP_402_PAYMENT_REQUIRED,
detail=f"Feature '{feature}' requires upgrade"
)
return tenant
return check_feature
# Usage
@router.post("/scenarios")
async def create_scenario(
tenant: TenantContext = Depends(require_feature("what_if_scenarios"))
):
pass
Rate Limiting per Tenant
Sliding Window Algorithm
flowchart LR
subgraph Request["Incoming Request"]
REQ["API Request"]
end
subgraph RateLimit["Rate Limiter"]
GET["Get Counter"]
CHECK["Check Limit"]
INCR["Increment"]
end
subgraph Cache["Valkey"]
KEY["tenant:{id}:rate"]
SORTED["Sorted Set<br/>(timestamps)"]
end
subgraph Response["Response"]
OK["200 OK"]
LIMITED["429 Too Many"]
end
REQ --> GET
GET --> KEY
KEY --> CHECK
CHECK -->|Under limit| INCR
CHECK -->|Over limit| LIMITED
INCR --> SORTED
INCR --> OK
Implementation
# src/api/middleware/rate_limit.py
class RateLimitMiddleware(BaseHTTPMiddleware):
"""Rate limiting with tenant-specific limits."""
async def _sliding_window_check(
self,
key: str,
now_ms: int,
window_start_ms: int,
limit: int
) -> Tuple[int, int]:
"""Sliding window rate limit check."""
async with self._valkey.async_connection() as client:
pipe = client.pipeline()
# Remove expired entries
pipe.zremrangebyscore(key, 0, window_start_ms)
# Count current entries
pipe.zcard(key)
results = await pipe.execute()
current_count = results[1]
if current_count < limit:
# Under limit - add request
pipe2 = client.pipeline()
pipe2.zadd(key, {str(now_ms): now_ms})
pipe2.expire(key, self._window_seconds + 60)
await pipe2.execute()
remaining = limit - current_count - 1
else:
remaining = -1
return remaining, reset_timestamp
| Header | Description |
X-RateLimit-Limit | Maximum requests in window |
X-RateLimit-Remaining | Requests remaining |
X-RateLimit-Reset | Window reset timestamp |
Retry-After | Seconds until retry (on 429) |
Audit Logging
Audit Event Types
| Event | Data Captured |
user.login | User ID, IP, timestamp |
user.logout | User ID, session duration |
workbook.create | User, workbook ID, name |
workbook.delete | User, workbook ID |
calculation.run | User, formula count, duration |
api_key.create | User, key prefix |
api_key.revoke | User, key ID |
settings.update | User, changed fields |
Audit Service
# src/services/audit/audit_service.py
from enum import Enum
from dataclasses import dataclass
from datetime import datetime
class AuditAction(str, Enum):
CREATE = "create"
READ = "read"
UPDATE = "update"
DELETE = "delete"
LOGIN = "login"
LOGOUT = "logout"
@dataclass
class AuditEvent:
"""Audit event for compliance logging."""
tenant_id: UUID
user_id: Optional[UUID]
action: AuditAction
resource_type: str
resource_id: Optional[str]
details: dict
ip_address: Optional[str]
user_agent: Optional[str]
timestamp: datetime
class AuditService:
"""Service for audit logging (SOC 2 compliance)."""
async def log(self, event: AuditEvent) -> None:
"""Log an audit event."""
# Store in database
await self._store_event(event)
# Also log to structured logging
logger.info(
"audit_event",
extra={
"tenant_id": str(event.tenant_id),
"user_id": str(event.user_id),
"action": event.action.value,
"resource_type": event.resource_type,
"resource_id": event.resource_id,
"details": event.details,
}
)
Audit Log Retention
| Tier | Retention Period |
| Free | 30 days |
| Standard | 90 days |
| Premium | 1 year |
| Enterprise | 7 years (configurable) |
Tenant Lifecycle
Tenant Creation
sequenceDiagram
participant USER as User
participant API as API
participant DB as Database
participant EMAIL as Email Service
USER->>API: POST /auth/register
API->>API: Validate input
API->>DB: Create tenant
API->>DB: Create owner user
API->>EMAIL: Send welcome email
API-->>USER: 201 Created
Tenant Deletion (Soft Delete)
# src/db/repositories/tenant.py
async def soft_delete(self, tenant_id: str) -> Optional[Tenant]:
"""Soft delete a tenant (GDPR-compliant)."""
return await self.update(
tenant_id,
is_active=False,
deleted_at=datetime.now(timezone.utc)
)
# RLS policy respects soft delete
CREATE POLICY tenant_isolation ON workbooks
USING (
tenant_id = current_setting('app.current_tenant_id')::uuid
AND tenant_id IN (
SELECT id FROM tenants
WHERE deleted_at IS NULL
)
);