Skip to content

Multi-Tenancy & Isolation

CalcBridge implements a robust multi-tenancy architecture using PostgreSQL Row-Level Security (RLS) for complete data isolation between tenants. This document details the tenant model, isolation mechanisms, and access control patterns.

Tenant Model

Hierarchy

graph TB
    subgraph Tenant["Tenant (Organization)"]
        T["Tenant"]
    end

    subgraph Users["Users"]
        OWNER["Owner"]
        ADMIN["Admin"]
        ANALYST["Analyst"]
        VIEWER["Viewer"]
    end

    subgraph Resources["Resources"]
        WB["Workbooks"]
        KEYS["API Keys"]
        SETTINGS["Settings"]
    end

    T --> OWNER
    T --> ADMIN
    T --> ANALYST
    T --> VIEWER

    T --> WB
    T --> KEYS
    T --> SETTINGS

Tenant Entity

# src/db/models/tenant.py
class Tenant(Base):
    """Tenant model representing an organization."""

    __tablename__ = "tenants"

    id = Column(UUID, primary_key=True, default=uuid4)
    name = Column(String(255), nullable=False)
    slug = Column(String(100), unique=True, nullable=False)
    subscription_tier = Column(String(50), default="free")
    is_active = Column(Boolean, default=True)
    settings = Column(JSONB, default={})
    created_at = Column(DateTime, default=datetime.utcnow)
    deleted_at = Column(DateTime, nullable=True)

    # Relationships
    users = relationship("User", back_populates="tenant")
    workbooks = relationship("Workbook", back_populates="tenant")
    api_keys = relationship("ApiKey", back_populates="tenant")

Tenant Context

# src/core/tenant/context.py
from contextvars import ContextVar
from dataclasses import dataclass

@dataclass
class TenantContext:
    """Thread-safe tenant context for the request."""

    tenant_id: UUID
    tenant_slug: str
    subscription_tier: str
    user_id: Optional[UUID] = None
    user_role: Optional[str] = None
    request_id: Optional[str] = None

# Context variable (thread-safe)
_tenant_context: ContextVar[Optional[TenantContext]] = ContextVar(
    "tenant_context",
    default=None
)

def get_current_tenant() -> Optional[TenantContext]:
    """Get the current tenant context."""
    return _tenant_context.get()

def set_current_tenant(context: TenantContext) -> None:
    """Set the current tenant context."""
    _tenant_context.set(context)

def require_tenant() -> TenantContext:
    """Get tenant context, raising if not set."""
    context = get_current_tenant()
    if context is None:
        raise RuntimeError("No tenant context set")
    return context

Row-Level Security (RLS)

RLS Policy Architecture

flowchart TB
    subgraph Application["Application Layer"]
        APP["FastAPI"]
        MW["Tenant Middleware"]
    end

    subgraph Database["Database Layer"]
        SET["SET app.current_tenant_id"]
        QUERY["SELECT * FROM workbooks"]
        RLS["RLS Policy Check"]
        RESULT["Filtered Results"]
    end

    APP --> MW
    MW --> SET
    SET --> QUERY
    QUERY --> RLS
    RLS --> RESULT

    style RLS fill:#DCFCE7,stroke:#22C55E

RLS Policy Implementation

-- db/migrations/V002__enable_rls.sql

-- Enable RLS on tenant-scoped tables
ALTER TABLE workbooks ENABLE ROW LEVEL SECURITY;
ALTER TABLE sheets ENABLE ROW LEVEL SECURITY;
ALTER TABLE cell_data ENABLE ROW LEVEL SECURITY;
ALTER TABLE api_keys ENABLE ROW LEVEL SECURITY;
ALTER TABLE users ENABLE ROW LEVEL SECURITY;

-- Force RLS for table owners too (defense in depth)
ALTER TABLE workbooks FORCE ROW LEVEL SECURITY;
ALTER TABLE sheets FORCE ROW LEVEL SECURITY;

-- Create RLS policies
CREATE POLICY tenant_isolation_workbooks ON workbooks
    USING (tenant_id = current_setting('app.current_tenant_id')::uuid);

CREATE POLICY tenant_isolation_sheets ON sheets
    USING (
        workbook_id IN (
            SELECT id FROM workbooks
            WHERE tenant_id = current_setting('app.current_tenant_id')::uuid
        )
    );

CREATE POLICY tenant_isolation_users ON users
    USING (tenant_id = current_setting('app.current_tenant_id')::uuid);

CREATE POLICY tenant_isolation_api_keys ON api_keys
    USING (tenant_id = current_setting('app.current_tenant_id')::uuid);

Setting Tenant Context

# src/core/tenant/middleware.py
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware

class TenantMiddleware(BaseHTTPMiddleware):
    """Middleware that sets tenant context for each request."""

    async def dispatch(self, request: Request, call_next):
        # Extract tenant from JWT or API key
        tenant_context = await self._extract_tenant(request)

        if tenant_context:
            # Set Python context
            set_current_tenant(tenant_context)

            # Set PostgreSQL context for RLS
            async with get_db_session() as session:
                await session.execute(
                    text("SET app.current_tenant_id = :tenant_id"),
                    {"tenant_id": str(tenant_context.tenant_id)}
                )

        try:
            response = await call_next(request)
            return response
        finally:
            clear_current_tenant()

Defense in Depth

Multiple layers protect tenant data:

Layer Protection
Application Tenant middleware validates JWT/API key
ORM Repositories filter by tenant_id
Database RLS policies enforce isolation
Network SSL/TLS encryption in transit
# src/db/repositories/workbook.py
class WorkbookRepository(BaseRepository[Workbook]):
    """Repository with automatic tenant filtering."""

    async def list_all(
        self,
        skip: int = 0,
        limit: int = 100
    ) -> List[Workbook]:
        """List workbooks - automatically filtered by RLS."""
        # Even without explicit WHERE clause,
        # RLS policy ensures tenant isolation
        result = await self.session.execute(
            select(Workbook)
            .order_by(Workbook.created_at.desc())
            .offset(skip)
            .limit(limit)
        )
        return list(result.scalars().all())

User Roles and Permissions

Role Hierarchy

graph TB
    OWNER["Owner"]
    ADMIN["Admin"]
    ANALYST["Analyst"]
    VIEWER["Viewer"]

    OWNER --> ADMIN
    ADMIN --> ANALYST
    ANALYST --> VIEWER

    subgraph Permissions
        P_TENANT["Manage Tenant"]
        P_USERS["Manage Users"]
        P_WB["Manage Workbooks"]
        P_CALC["Run Calculations"]
        P_VIEW["View Data"]
    end

    OWNER --- P_TENANT
    OWNER --- P_USERS
    ADMIN --- P_WB
    ANALYST --- P_CALC
    VIEWER --- P_VIEW

Role Permissions Matrix

Permission Owner Admin Analyst Viewer
Tenant Settings Yes No No No
Manage Users Yes Yes No No
Manage API Keys Yes Yes No No
Create Workbooks Yes Yes Yes No
Delete Workbooks Yes Yes No No
Run Calculations Yes Yes Yes No
Create Scenarios Yes Yes Yes No
View Data Yes Yes Yes Yes
Export Data Yes Yes Yes No

Permission Enforcement

# src/api/dependencies.py
from enum import Enum
from fastapi import Depends, HTTPException, status

class Permission(str, Enum):
    MANAGE_TENANT = "manage_tenant"
    MANAGE_USERS = "manage_users"
    MANAGE_WORKBOOKS = "manage_workbooks"
    RUN_CALCULATIONS = "run_calculations"
    VIEW_DATA = "view_data"

ROLE_PERMISSIONS = {
    "owner": {Permission.MANAGE_TENANT, Permission.MANAGE_USERS,
              Permission.MANAGE_WORKBOOKS, Permission.RUN_CALCULATIONS,
              Permission.VIEW_DATA},
    "admin": {Permission.MANAGE_USERS, Permission.MANAGE_WORKBOOKS,
              Permission.RUN_CALCULATIONS, Permission.VIEW_DATA},
    "analyst": {Permission.MANAGE_WORKBOOKS, Permission.RUN_CALCULATIONS,
                Permission.VIEW_DATA},
    "viewer": {Permission.VIEW_DATA},
}

def require_permission(permission: Permission):
    """Dependency that checks user has required permission."""

    async def check_permission(
        current_user: User = Depends(get_current_user)
    ) -> User:
        user_permissions = ROLE_PERMISSIONS.get(current_user.role, set())

        if permission not in user_permissions:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Permission denied: {permission.value} required"
            )
        return current_user

    return check_permission

# Usage in routes
@router.delete("/workbooks/{workbook_id}")
async def delete_workbook(
    workbook_id: UUID,
    current_user: User = Depends(
        require_permission(Permission.MANAGE_WORKBOOKS)
    )
):
    pass

Subscription Tiers

Tier Configuration

Tier Users Workbooks API Calls/min Features
Free 3 5 60 Basic calculations
Standard 10 50 300 + Compliance testing
Premium 50 500 1000 + What-if scenarios
Enterprise Unlimited Unlimited 5000 + Custom integrations

Tier Enforcement

# src/core/config.py
class SubscriptionTierConfig(BaseModel):
    """Configuration for subscription tiers."""

    max_users: int
    max_workbooks: int
    rate_limit_per_minute: int
    features: set[str]

TIER_CONFIGS = {
    "free": SubscriptionTierConfig(
        max_users=3,
        max_workbooks=5,
        rate_limit_per_minute=60,
        features={"basic_calculations"}
    ),
    "standard": SubscriptionTierConfig(
        max_users=10,
        max_workbooks=50,
        rate_limit_per_minute=300,
        features={"basic_calculations", "compliance_testing"}
    ),
    "premium": SubscriptionTierConfig(
        max_users=50,
        max_workbooks=500,
        rate_limit_per_minute=1000,
        features={"basic_calculations", "compliance_testing", "what_if_scenarios"}
    ),
    "enterprise": SubscriptionTierConfig(
        max_users=999999,
        max_workbooks=999999,
        rate_limit_per_minute=5000,
        features={"basic_calculations", "compliance_testing",
                  "what_if_scenarios", "custom_integrations", "sso"}
    ),
}

Feature Gating

# src/api/dependencies.py
def require_feature(feature: str):
    """Dependency that checks tenant has access to feature."""

    async def check_feature(
        tenant: TenantContext = Depends(get_current_tenant)
    ) -> TenantContext:
        tier_config = TIER_CONFIGS.get(tenant.subscription_tier)

        if not tier_config or feature not in tier_config.features:
            raise HTTPException(
                status_code=status.HTTP_402_PAYMENT_REQUIRED,
                detail=f"Feature '{feature}' requires upgrade"
            )
        return tenant

    return check_feature

# Usage
@router.post("/scenarios")
async def create_scenario(
    tenant: TenantContext = Depends(require_feature("what_if_scenarios"))
):
    pass

Rate Limiting per Tenant

Sliding Window Algorithm

flowchart LR
    subgraph Request["Incoming Request"]
        REQ["API Request"]
    end

    subgraph RateLimit["Rate Limiter"]
        GET["Get Counter"]
        CHECK["Check Limit"]
        INCR["Increment"]
    end

    subgraph Cache["Valkey"]
        KEY["tenant:{id}:rate"]
        SORTED["Sorted Set<br/>(timestamps)"]
    end

    subgraph Response["Response"]
        OK["200 OK"]
        LIMITED["429 Too Many"]
    end

    REQ --> GET
    GET --> KEY
    KEY --> CHECK

    CHECK -->|Under limit| INCR
    CHECK -->|Over limit| LIMITED

    INCR --> SORTED
    INCR --> OK

Implementation

# src/api/middleware/rate_limit.py
class RateLimitMiddleware(BaseHTTPMiddleware):
    """Rate limiting with tenant-specific limits."""

    async def _sliding_window_check(
        self,
        key: str,
        now_ms: int,
        window_start_ms: int,
        limit: int
    ) -> Tuple[int, int]:
        """Sliding window rate limit check."""
        async with self._valkey.async_connection() as client:
            pipe = client.pipeline()

            # Remove expired entries
            pipe.zremrangebyscore(key, 0, window_start_ms)

            # Count current entries
            pipe.zcard(key)

            results = await pipe.execute()
            current_count = results[1]

            if current_count < limit:
                # Under limit - add request
                pipe2 = client.pipeline()
                pipe2.zadd(key, {str(now_ms): now_ms})
                pipe2.expire(key, self._window_seconds + 60)
                await pipe2.execute()
                remaining = limit - current_count - 1
            else:
                remaining = -1

            return remaining, reset_timestamp

Rate Limit Headers

Header Description
X-RateLimit-Limit Maximum requests in window
X-RateLimit-Remaining Requests remaining
X-RateLimit-Reset Window reset timestamp
Retry-After Seconds until retry (on 429)

Audit Logging

Audit Event Types

Event Data Captured
user.login User ID, IP, timestamp
user.logout User ID, session duration
workbook.create User, workbook ID, name
workbook.delete User, workbook ID
calculation.run User, formula count, duration
api_key.create User, key prefix
api_key.revoke User, key ID
settings.update User, changed fields

Audit Service

# src/services/audit/audit_service.py
from enum import Enum
from dataclasses import dataclass
from datetime import datetime

class AuditAction(str, Enum):
    CREATE = "create"
    READ = "read"
    UPDATE = "update"
    DELETE = "delete"
    LOGIN = "login"
    LOGOUT = "logout"

@dataclass
class AuditEvent:
    """Audit event for compliance logging."""
    tenant_id: UUID
    user_id: Optional[UUID]
    action: AuditAction
    resource_type: str
    resource_id: Optional[str]
    details: dict
    ip_address: Optional[str]
    user_agent: Optional[str]
    timestamp: datetime

class AuditService:
    """Service for audit logging (SOC 2 compliance)."""

    async def log(self, event: AuditEvent) -> None:
        """Log an audit event."""
        # Store in database
        await self._store_event(event)

        # Also log to structured logging
        logger.info(
            "audit_event",
            extra={
                "tenant_id": str(event.tenant_id),
                "user_id": str(event.user_id),
                "action": event.action.value,
                "resource_type": event.resource_type,
                "resource_id": event.resource_id,
                "details": event.details,
            }
        )

Audit Log Retention

Tier Retention Period
Free 30 days
Standard 90 days
Premium 1 year
Enterprise 7 years (configurable)

Tenant Lifecycle

Tenant Creation

sequenceDiagram
    participant USER as User
    participant API as API
    participant DB as Database
    participant EMAIL as Email Service

    USER->>API: POST /auth/register
    API->>API: Validate input
    API->>DB: Create tenant
    API->>DB: Create owner user
    API->>EMAIL: Send welcome email
    API-->>USER: 201 Created

Tenant Deletion (Soft Delete)

# src/db/repositories/tenant.py
async def soft_delete(self, tenant_id: str) -> Optional[Tenant]:
    """Soft delete a tenant (GDPR-compliant)."""
    return await self.update(
        tenant_id,
        is_active=False,
        deleted_at=datetime.now(timezone.utc)
    )

# RLS policy respects soft delete
CREATE POLICY tenant_isolation ON workbooks
    USING (
        tenant_id = current_setting('app.current_tenant_id')::uuid
        AND tenant_id IN (
            SELECT id FROM tenants
            WHERE deleted_at IS NULL
        )
    );