WebSocket Architecture¶
CalcBridge uses WebSocket connections to deliver real-time events to clients -- calculation progress updates, completion notifications, workbook change alerts, and tenant-wide broadcasts. This document covers the connection lifecycle, room-based subscription model, event types, and the integration between Celery workers and WebSocket delivery via Valkey pub/sub.
Overview¶
The WebSocket layer bridges the gap between asynchronous backend processing (Celery tasks) and the client's need for immediate feedback. Without WebSocket, clients would need to poll the API for status updates, which is both wasteful and introduces latency.
Key Design Decisions¶
| Decision | Rationale |
|---|---|
| Room-based subscriptions | Clients subscribe only to relevant events, reducing bandwidth and processing |
| Valkey pub/sub for cross-process delivery | Celery workers and API processes are separate; Valkey bridges them |
| JWT authentication on upgrade | Reuses existing auth infrastructure, prevents unauthorized connections |
| Automatic reconnection | Clients handle transient disconnects transparently |
| Event schema versioning | Forward-compatible event payloads with explicit version field |
Architecture Components¶
src/websocket/
├── manager.py # ConnectionManager: tracks connections and rooms
├── auth.py # JWT validation on WebSocket upgrade
├── events.py # Event type definitions and serialization
├── rooms.py # Room type definitions and subscription logic
├── handlers.py # Incoming message handlers
├── publisher.py # Valkey pub/sub publisher (used by workers)
└── subscriber.py # Valkey pub/sub subscriber (used by API)
src/workers/
└── tasks/
└── websocket_events.py # Worker-side event publishing
System Diagram¶
flowchart TB
subgraph Clients["Client Layer"]
BROWSER["Browser<br/>(Next.js)"]
MOBILE["Mobile App"]
API_CLIENT["API Client"]
end
subgraph APILayer["API Layer"]
WS_ENDPOINT["WebSocket<br/>Endpoint"]
MANAGER["Connection<br/>Manager"]
AUTH["JWT<br/>Validator"]
SUBSCRIBER["Valkey<br/>Subscriber"]
end
subgraph MessageBus["Message Bus"]
VALKEY[("Valkey<br/>Pub/Sub")]
end
subgraph Workers["Worker Layer"]
CALC_WORKER["Calculation<br/>Worker"]
EXPORT_WORKER["Export<br/>Worker"]
RECON_WORKER["Reconciliation<br/>Worker"]
PUBLISHER["Event<br/>Publisher"]
end
BROWSER -->|"WSS"| WS_ENDPOINT
MOBILE -->|"WSS"| WS_ENDPOINT
API_CLIENT -->|"WSS"| WS_ENDPOINT
WS_ENDPOINT --> AUTH
AUTH --> MANAGER
MANAGER --> SUBSCRIBER
SUBSCRIBER -->|"subscribe"| VALKEY
CALC_WORKER --> PUBLISHER
EXPORT_WORKER --> PUBLISHER
RECON_WORKER --> PUBLISHER
PUBLISHER -->|"publish"| VALKEY
VALKEY -->|"events"| SUBSCRIBER
SUBSCRIBER --> MANAGER
MANAGER -->|"broadcast"| WS_ENDPOINT
style VALKEY fill:#FEF3C7,stroke:#F59E0B
style MANAGER fill:#DBEAFE,stroke:#3B82F6
style AUTH fill:#DCFCE7,stroke:#22C55E Connection Lifecycle¶
Sequence Diagram¶
sequenceDiagram
participant Client
participant WS as WebSocket Endpoint
participant Auth as JWT Validator
participant CM as ConnectionManager
participant Valkey as Valkey Pub/Sub
Client->>WS: WSS upgrade request<br/>(Authorization: Bearer <JWT>)
WS->>Auth: Validate JWT token
Auth->>Auth: Verify signature, expiry, tenant
alt Token valid
Auth-->>WS: User context (user_id, tenant_id, roles)
WS->>CM: register_connection(conn, user_context)
CM->>CM: Add to user room
CM->>CM: Add to tenant room
CM->>Valkey: Subscribe to user + tenant channels
WS-->>Client: 101 Switching Protocols
loop Active Connection
Client->>WS: Subscribe to workbook room
WS->>CM: join_room(conn, "workbook:{id}")
CM->>Valkey: Subscribe to workbook channel
WS-->>Client: {"type": "subscribed", "room": "workbook:abc123"}
Note over Valkey,CM: Worker publishes event
Valkey-->>CM: Event on workbook channel
CM->>CM: Find connections in room
CM-->>Client: {"type": "calculation_progress", ...}
end
Client->>WS: Close connection
WS->>CM: unregister_connection(conn)
CM->>Valkey: Unsubscribe from channels
CM->>CM: Remove from all rooms
else Token invalid
Auth-->>WS: Authentication failed
WS-->>Client: 4001 Unauthorized (close)
end Connection States¶
| State | Description | Transitions |
|---|---|---|
connecting | WebSocket upgrade in progress | authenticated, rejected |
authenticated | JWT validated, connection registered | subscribed |
subscribed | Client joined one or more rooms | subscribed (more rooms), disconnecting |
disconnecting | Client or server initiating close | disconnected |
disconnected | Connection terminated, resources cleaned | (terminal) |
Room Types¶
Rooms are logical groupings of WebSocket connections that share a common interest. Events published to a room are delivered to all connections subscribed to that room.
Room Hierarchy¶
| Room Type | Pattern | Scope | Example Events |
|---|---|---|---|
| Workbook | workbook:{workbook_id} | All users viewing a workbook | calculation_progress, calculation_complete, workbook_updated |
| User | user:{user_id} | Single user's personal channel | notification, export_complete, task_status |
| Tenant | tenant:{tenant_id} | All users in a tenant | system_announcement, config_changed, maintenance_notice |
from enum import Enum
class RoomType(str, Enum):
WORKBOOK = "workbook"
USER = "user"
TENANT = "tenant"
class Room:
"""Represents a WebSocket room with type-safe naming."""
def __init__(self, room_type: RoomType, resource_id: str):
self.room_type = room_type
self.resource_id = resource_id
@property
def channel_name(self) -> str:
"""Valkey pub/sub channel name for this room."""
return f"ws:{self.room_type.value}:{self.resource_id}"
@classmethod
def workbook(cls, workbook_id: str) -> "Room":
return cls(RoomType.WORKBOOK, workbook_id)
@classmethod
def user(cls, user_id: str) -> "Room":
return cls(RoomType.USER, user_id)
@classmethod
def tenant(cls, tenant_id: str) -> "Room":
return cls(RoomType.TENANT, tenant_id)
def __str__(self) -> str:
return self.channel_name
Automatic Room Subscriptions
When a connection is established, the client is automatically subscribed to their user room and tenant room. Workbook rooms require explicit subscription via a subscribe message.
Event Types¶
All events share a common envelope structure with type-specific payloads:
from dataclasses import dataclass, field
from enum import Enum
import time
import uuid
class EventType(str, Enum):
# Calculation events
CALCULATION_PROGRESS = "calculation_progress"
CALCULATION_COMPLETE = "calculation_complete"
CALCULATION_FAILED = "calculation_failed"
# Workbook events
WORKBOOK_UPDATED = "workbook_updated"
WORKBOOK_DELETED = "workbook_deleted"
# Export events
EXPORT_PROGRESS = "export_progress"
EXPORT_COMPLETE = "export_complete"
EXPORT_FAILED = "export_failed"
# Scenario events
SCENARIO_COMPLETE = "scenario_complete"
SCENARIO_FAILED = "scenario_failed"
# Reconciliation events
RECON_COMPLETE = "reconciliation_complete"
RECON_DISCREPANCIES = "reconciliation_discrepancies"
# Notification events
NOTIFICATION = "notification"
# System events
SYSTEM_ANNOUNCEMENT = "system_announcement"
MAINTENANCE_NOTICE = "maintenance_notice"
# Connection events
SUBSCRIBED = "subscribed"
UNSUBSCRIBED = "unsubscribed"
ERROR = "error"
@dataclass
class WebSocketEvent:
"""Standard event envelope for all WebSocket messages."""
type: EventType
payload: dict
version: str = "1.0"
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: float = field(default_factory=time.time)
room: str = ""
def to_json(self) -> str:
"""Serialize event to JSON string for transmission."""
import json
return json.dumps({
"type": self.type.value,
"version": self.version,
"event_id": self.event_id,
"timestamp": self.timestamp,
"room": self.room,
"payload": self.payload,
})
Event Payload Examples¶
{
"type": "calculation_progress",
"version": "1.0",
"event_id": "550e8400-e29b-41d4-a716-446655440000",
"timestamp": 1739577600.0,
"room": "ws:workbook:abc-123",
"payload": {
"workbook_id": "abc-123",
"job_id": "job-456",
"progress_pct": 45,
"current_sheet": "Holdings",
"cells_processed": 4500,
"cells_total": 10000,
"eta_seconds": 12
}
}
{
"type": "calculation_complete",
"version": "1.0",
"event_id": "550e8400-e29b-41d4-a716-446655440001",
"timestamp": 1739577612.0,
"room": "ws:workbook:abc-123",
"payload": {
"workbook_id": "abc-123",
"job_id": "job-456",
"duration_seconds": 24.5,
"cells_calculated": 10000,
"errors": 0,
"warnings": 3,
"snapshot_id": "snap-789"
}
}
{
"type": "notification",
"version": "1.0",
"event_id": "550e8400-e29b-41d4-a716-446655440002",
"timestamp": 1739577620.0,
"room": "ws:user:user-001",
"payload": {
"title": "Export Ready",
"message": "Your XLSX export of 'Q4 Holdings' is ready for download.",
"severity": "info",
"action_url": "/exports/export-789/download",
"dismissible": true
}
}
{
"type": "scenario_complete",
"version": "1.0",
"event_id": "550e8400-e29b-41d4-a716-446655440003",
"timestamp": 1739577650.0,
"room": "ws:workbook:abc-123",
"payload": {
"workbook_id": "abc-123",
"scenario_id": "scenario-101",
"scenario_name": "Rate Shock +200bps",
"duration_seconds": 8.3,
"impact_summary": {
"nav_change_pct": -2.4,
"wal_change": 0.3,
"tests_affected": ["OC_Senior", "IC_Mezzanine"]
}
}
}
ConnectionManager¶
The ConnectionManager is the central coordinator for all active WebSocket connections:
import asyncio
from collections import defaultdict
from dataclasses import dataclass, field
from fastapi import WebSocket
@dataclass
class ConnectionState:
"""Tracks state for a single WebSocket connection."""
websocket: WebSocket
user_id: str
tenant_id: str
roles: list[str]
rooms: set[str] = field(default_factory=set)
connected_at: float = 0.0
class ConnectionManager:
"""Manages WebSocket connections, room subscriptions,
and event delivery.
Thread-safe for use with asyncio. Uses in-memory structures
for connection tracking (not persisted -- connections are
ephemeral by nature).
"""
def __init__(self):
self._connections: dict[str, ConnectionState] = {}
self._rooms: dict[str, set[str]] = defaultdict(set)
self._lock = asyncio.Lock()
async def register(
self,
connection_id: str,
websocket: WebSocket,
user_id: str,
tenant_id: str,
roles: list[str],
) -> ConnectionState:
"""Register a new authenticated connection.
Automatically subscribes to user and tenant rooms.
"""
async with self._lock:
state = ConnectionState(
websocket=websocket,
user_id=user_id,
tenant_id=tenant_id,
roles=roles,
connected_at=time.time(),
)
self._connections[connection_id] = state
# Auto-subscribe to user and tenant rooms
user_room = f"ws:user:{user_id}"
tenant_room = f"ws:tenant:{tenant_id}"
state.rooms.add(user_room)
state.rooms.add(tenant_room)
self._rooms[user_room].add(connection_id)
self._rooms[tenant_room].add(connection_id)
return state
async def unregister(self, connection_id: str) -> None:
"""Remove a connection and clean up all room subscriptions."""
async with self._lock:
state = self._connections.pop(connection_id, None)
if state:
for room in state.rooms:
self._rooms[room].discard(connection_id)
if not self._rooms[room]:
del self._rooms[room]
async def join_room(
self, connection_id: str, room: str
) -> bool:
"""Subscribe a connection to a room.
Returns False if the connection does not have permission
to join the requested room (e.g., workbook belongs to
a different tenant).
"""
async with self._lock:
state = self._connections.get(connection_id)
if not state:
return False
# Permission check: workbook rooms require tenant match
if room.startswith("ws:workbook:"):
if not await self._verify_workbook_access(
state.tenant_id, room
):
return False
state.rooms.add(room)
self._rooms[room].add(connection_id)
return True
async def leave_room(
self, connection_id: str, room: str
) -> None:
"""Unsubscribe a connection from a room."""
async with self._lock:
state = self._connections.get(connection_id)
if state:
state.rooms.discard(room)
self._rooms[room].discard(connection_id)
async def broadcast_to_room(
self, room: str, event: WebSocketEvent
) -> int:
"""Send an event to all connections in a room.
Returns the number of connections that received the event.
Failed sends (disconnected clients) are cleaned up.
"""
event.room = room
message = event.to_json()
delivered = 0
failed = []
connection_ids = self._rooms.get(room, set()).copy()
for conn_id in connection_ids:
state = self._connections.get(conn_id)
if state:
try:
await state.websocket.send_text(message)
delivered += 1
except Exception:
failed.append(conn_id)
# Clean up failed connections
for conn_id in failed:
await self.unregister(conn_id)
return delivered
@property
def active_connections(self) -> int:
"""Total number of active connections."""
return len(self._connections)
@property
def active_rooms(self) -> int:
"""Total number of rooms with at least one subscriber."""
return len(self._rooms)
Celery Worker Integration¶
Celery workers publish events to Valkey pub/sub channels. The API process subscribes to these channels and forwards events to connected WebSocket clients.
Publisher (Worker Side)¶
import json
from valkey import Valkey
class EventPublisher:
"""Publishes WebSocket events from Celery workers via Valkey pub/sub.
Workers do not maintain WebSocket connections directly.
Instead, they publish events to Valkey channels, and the
API process's subscriber forwards them to connected clients.
"""
def __init__(self, valkey_client: Valkey):
self._valkey = valkey_client
def publish(self, room: str, event: WebSocketEvent) -> int:
"""Publish an event to a Valkey channel.
Returns the number of subscribers that received the message
(Valkey pub/sub subscriber count, not WebSocket clients).
"""
event.room = room
return self._valkey.publish(room, event.to_json())
def publish_calculation_progress(
self,
workbook_id: str,
job_id: str,
progress_pct: int,
current_sheet: str,
cells_processed: int,
cells_total: int,
) -> None:
"""Convenience method for calculation progress events."""
event = WebSocketEvent(
type=EventType.CALCULATION_PROGRESS,
payload={
"workbook_id": workbook_id,
"job_id": job_id,
"progress_pct": progress_pct,
"current_sheet": current_sheet,
"cells_processed": cells_processed,
"cells_total": cells_total,
"eta_seconds": self._estimate_eta(
progress_pct, cells_processed, cells_total
),
},
)
room = Room.workbook(workbook_id).channel_name
self.publish(room, event)
Subscriber (API Side)¶
import asyncio
import json
class EventSubscriber:
"""Subscribes to Valkey pub/sub channels and forwards events
to the ConnectionManager for WebSocket delivery.
Runs as a background asyncio task within the FastAPI process.
"""
def __init__(
self,
valkey_client,
connection_manager: ConnectionManager,
):
self._valkey = valkey_client
self._manager = connection_manager
self._pubsub = None
self._running = False
async def start(self) -> None:
"""Start the pub/sub listener as a background task."""
self._pubsub = self._valkey.pubsub()
self._running = True
asyncio.create_task(self._listen())
async def stop(self) -> None:
"""Stop the pub/sub listener and clean up."""
self._running = False
if self._pubsub:
await self._pubsub.unsubscribe()
await self._pubsub.close()
async def subscribe_to_room(self, room: str) -> None:
"""Subscribe to a Valkey channel for a room."""
if self._pubsub:
await self._pubsub.subscribe(room)
async def unsubscribe_from_room(self, room: str) -> None:
"""Unsubscribe from a Valkey channel."""
if self._pubsub:
await self._pubsub.unsubscribe(room)
async def _listen(self) -> None:
"""Main listener loop -- forwards Valkey messages
to WebSocket clients via ConnectionManager.
"""
while self._running:
try:
message = await self._pubsub.get_message(
ignore_subscribe_messages=True,
timeout=1.0,
)
if message and message["type"] == "message":
channel = message["channel"]
event_data = json.loads(message["data"])
event = WebSocketEvent(**event_data)
await self._manager.broadcast_to_room(
channel, event
)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Pub/sub listener error: {e}")
await asyncio.sleep(1) # Back off on error
Worker-to-Client Event Flow¶
sequenceDiagram
participant Worker as Celery Worker
participant Publisher as EventPublisher
participant Valkey as Valkey Pub/Sub
participant Subscriber as EventSubscriber
participant CM as ConnectionManager
participant Client as Browser (WebSocket)
Worker->>Worker: Processing calculation...
Worker->>Publisher: publish_calculation_progress(45%)
Publisher->>Valkey: PUBLISH ws:workbook:abc-123
Valkey-->>Subscriber: Message on ws:workbook:abc-123
Subscriber->>CM: broadcast_to_room("ws:workbook:abc-123", event)
CM->>CM: Find all connections in room
CM-->>Client: {"type": "calculation_progress", "payload": {...}}
Worker->>Worker: Calculation complete
Worker->>Publisher: publish_calculation_complete()
Publisher->>Valkey: PUBLISH ws:workbook:abc-123
Valkey-->>Subscriber: Message on ws:workbook:abc-123
Subscriber->>CM: broadcast_to_room("ws:workbook:abc-123", event)
CM-->>Client: {"type": "calculation_complete", "payload": {...}} Authentication¶
WebSocket connections authenticate using the same JWT tokens as REST API requests. The token is validated during the WebSocket upgrade handshake:
from fastapi import WebSocket, status
from src.core.security import verify_jwt_token
async def authenticate_websocket(
websocket: WebSocket,
) -> dict | None:
"""Authenticate a WebSocket connection via JWT.
The token can be provided in two ways:
1. Authorization header: "Bearer <token>"
2. Query parameter: "?token=<token>"
Returns user context dict or None if authentication fails.
"""
# Try Authorization header first
token = None
auth_header = websocket.headers.get("authorization")
if auth_header and auth_header.startswith("Bearer "):
token = auth_header[7:]
# Fall back to query parameter
if not token:
token = websocket.query_params.get("token")
if not token:
await websocket.close(
code=4001, reason="Missing authentication token"
)
return None
try:
payload = verify_jwt_token(token)
return {
"user_id": payload["sub"],
"tenant_id": payload["tenant_id"],
"roles": payload.get("roles", []),
}
except Exception:
await websocket.close(
code=4001, reason="Invalid or expired token"
)
return None
Token Refresh
JWT tokens used for WebSocket connections may expire during long-lived sessions. The client should monitor for 4001 close codes and re-establish the connection with a refreshed token. The Next.js frontend handles this automatically via the useWebSocket hook.
Close Codes¶
| Code | Meaning | Client Action |
|---|---|---|
1000 | Normal closure | No action needed |
1001 | Server going away (deployment) | Reconnect with backoff |
1011 | Server error | Reconnect with backoff |
4001 | Unauthorized (JWT invalid/expired) | Refresh token, reconnect |
4003 | Forbidden (insufficient permissions) | Display error to user |
4009 | Connection limit exceeded | Wait and retry |
4029 | Rate limited | Wait for cooldown, retry |
Client Integration¶
Next.js WebSocket Hook¶
The frontend uses a custom React hook for WebSocket management:
interface UseWebSocketOptions {
rooms?: string[];
onEvent?: (event: WebSocketEvent) => void;
reconnectAttempts?: number;
reconnectInterval?: number;
}
function useWebSocket(options: UseWebSocketOptions) {
const { token, refreshToken } = useAuth();
const [status, setStatus] = useState<ConnectionStatus>("disconnected");
const wsRef = useRef<WebSocket | null>(null);
useEffect(() => {
const wsUrl = `${WS_BASE_URL}/ws?token=${token}`;
const ws = new WebSocket(wsUrl);
ws.onopen = () => {
setStatus("connected");
// Subscribe to requested rooms
options.rooms?.forEach((room) => {
ws.send(JSON.stringify({ action: "subscribe", room }));
});
};
ws.onmessage = (msg) => {
const event = JSON.parse(msg.data);
options.onEvent?.(event);
};
ws.onclose = (e) => {
if (e.code === 4001) {
// Token expired -- refresh and reconnect
refreshToken().then(() => reconnect());
} else {
scheduleReconnect();
}
};
wsRef.current = ws;
return () => ws.close();
}, [token]);
return { status, send: wsRef.current?.send };
}
Scaling Considerations¶
Horizontal Scaling¶
When running multiple API instances behind a load balancer, each instance runs its own EventSubscriber listening to Valkey pub/sub. Since Valkey pub/sub delivers messages to all subscribers, every API instance receives every event and delivers it to its locally connected clients.
| Topology | WebSocket Behavior |
|---|---|
| Single API instance | All connections on one process, direct delivery |
| Multiple API instances | Each instance subscribes to Valkey, delivers to its own clients |
| Sticky sessions (recommended) | Load balancer routes WebSocket upgrades to same instance |
Connection Limits
Each API instance can handle approximately 10,000 concurrent WebSocket connections. For larger deployments, use sticky sessions with a load balancer to distribute connections evenly across instances.
Backpressure Handling¶
When a client cannot consume events fast enough (slow network, overwhelmed browser), the ConnectionManager detects send failures and disconnects the client to prevent memory buildup:
async def _send_with_timeout(
self, websocket: WebSocket, message: str, timeout: float = 5.0
) -> bool:
"""Send a message with timeout to detect slow clients."""
try:
await asyncio.wait_for(
websocket.send_text(message), timeout=timeout
)
return True
except asyncio.TimeoutError:
logger.warning("Client send timeout, disconnecting")
return False
except Exception:
return False
Metrics and Monitoring¶
| Metric | Type | Description |
|---|---|---|
ws_connections_active | Gauge | Current active WebSocket connections |
ws_connections_total | Counter | Total connections established |
ws_rooms_active | Gauge | Current rooms with subscribers |
ws_events_published_total | Counter | Events published to Valkey by workers |
ws_events_delivered_total | Counter | Events delivered to WebSocket clients |
ws_events_failed_total | Counter | Failed event deliveries (disconnected clients) |
ws_auth_failures_total | Counter | Authentication failures on upgrade |
ws_reconnections_total | Counter | Client reconnections |
ws_message_latency_seconds | Histogram | Valkey publish to client delivery latency |
Configuration Reference¶
# WebSocket server
WS_ENABLED: true
WS_PATH: /ws # WebSocket endpoint path
WS_MAX_CONNECTIONS_PER_TENANT: 500 # Per-tenant connection limit
WS_MAX_CONNECTIONS_TOTAL: 10000 # Global connection limit
WS_PING_INTERVAL: 30 # Keepalive ping interval (seconds)
WS_PING_TIMEOUT: 10 # Pong response timeout (seconds)
WS_SEND_TIMEOUT: 5 # Client send timeout (seconds)
# Valkey pub/sub
WS_VALKEY_HOST: localhost
WS_VALKEY_PORT: 6379
WS_VALKEY_DB: 1 # Separate DB from cache
WS_VALKEY_CHANNEL_PREFIX: ws # Channel name prefix
# Rate limiting
WS_RATE_LIMIT_MESSAGES: 100 # Max incoming messages per minute
WS_RATE_LIMIT_SUBSCRIPTIONS: 20 # Max room subscriptions per connection
# Reconnection
WS_CLIENT_RECONNECT_MAX_ATTEMPTS: 10
WS_CLIENT_RECONNECT_INTERVAL_MS: 1000 # Initial backoff interval
WS_CLIENT_RECONNECT_MAX_INTERVAL_MS: 30000 # Max backoff interval
Error Handling¶
| Error | Behavior | Client Response |
|---|---|---|
| JWT expired during session | Server sends close code 4001 | Refresh token, reconnect |
| Valkey pub/sub disconnection | Subscriber auto-reconnects with backoff | Events may be missed during gap |
| Client send timeout | Connection terminated by server | Client auto-reconnects |
| Room subscription denied | Error event sent to client | Display permission error |
| Rate limit exceeded | Close code 4029 sent | Wait, reconnect |
| Server shutdown (deploy) | Close code 1001 sent to all | Auto-reconnect to new instance |
Missed Events During Reconnection
WebSocket connections are ephemeral. Events published while a client is disconnected are not queued. For critical events (e.g., calculation_complete), clients should poll the REST API after reconnection to catch up on any missed state changes.
Related Documentation¶
- System Design -- Overall architecture and component interactions
- Security Architecture -- JWT authentication details
- Data Flow & Processing Pipeline -- Celery task architecture
- Export Pipeline -- Export completion events