Skip to content

WebSocket Architecture

CalcBridge uses WebSocket connections to deliver real-time events to clients -- calculation progress updates, completion notifications, workbook change alerts, and tenant-wide broadcasts. This document covers the connection lifecycle, room-based subscription model, event types, and the integration between Celery workers and WebSocket delivery via Valkey pub/sub.


Overview

The WebSocket layer bridges the gap between asynchronous backend processing (Celery tasks) and the client's need for immediate feedback. Without WebSocket, clients would need to poll the API for status updates, which is both wasteful and introduces latency.

Key Design Decisions

Decision Rationale
Room-based subscriptions Clients subscribe only to relevant events, reducing bandwidth and processing
Valkey pub/sub for cross-process delivery Celery workers and API processes are separate; Valkey bridges them
JWT authentication on upgrade Reuses existing auth infrastructure, prevents unauthorized connections
Automatic reconnection Clients handle transient disconnects transparently
Event schema versioning Forward-compatible event payloads with explicit version field

Architecture Components

src/websocket/
├── manager.py                   # ConnectionManager: tracks connections and rooms
├── auth.py                      # JWT validation on WebSocket upgrade
├── events.py                    # Event type definitions and serialization
├── rooms.py                     # Room type definitions and subscription logic
├── handlers.py                  # Incoming message handlers
├── publisher.py                 # Valkey pub/sub publisher (used by workers)
└── subscriber.py                # Valkey pub/sub subscriber (used by API)

src/workers/
└── tasks/
    └── websocket_events.py      # Worker-side event publishing

System Diagram

flowchart TB
    subgraph Clients["Client Layer"]
        BROWSER["Browser<br/>(Next.js)"]
        MOBILE["Mobile App"]
        API_CLIENT["API Client"]
    end

    subgraph APILayer["API Layer"]
        WS_ENDPOINT["WebSocket<br/>Endpoint"]
        MANAGER["Connection<br/>Manager"]
        AUTH["JWT<br/>Validator"]
        SUBSCRIBER["Valkey<br/>Subscriber"]
    end

    subgraph MessageBus["Message Bus"]
        VALKEY[("Valkey<br/>Pub/Sub")]
    end

    subgraph Workers["Worker Layer"]
        CALC_WORKER["Calculation<br/>Worker"]
        EXPORT_WORKER["Export<br/>Worker"]
        RECON_WORKER["Reconciliation<br/>Worker"]
        PUBLISHER["Event<br/>Publisher"]
    end

    BROWSER -->|"WSS"| WS_ENDPOINT
    MOBILE -->|"WSS"| WS_ENDPOINT
    API_CLIENT -->|"WSS"| WS_ENDPOINT

    WS_ENDPOINT --> AUTH
    AUTH --> MANAGER
    MANAGER --> SUBSCRIBER
    SUBSCRIBER -->|"subscribe"| VALKEY

    CALC_WORKER --> PUBLISHER
    EXPORT_WORKER --> PUBLISHER
    RECON_WORKER --> PUBLISHER
    PUBLISHER -->|"publish"| VALKEY

    VALKEY -->|"events"| SUBSCRIBER
    SUBSCRIBER --> MANAGER
    MANAGER -->|"broadcast"| WS_ENDPOINT

    style VALKEY fill:#FEF3C7,stroke:#F59E0B
    style MANAGER fill:#DBEAFE,stroke:#3B82F6
    style AUTH fill:#DCFCE7,stroke:#22C55E

Connection Lifecycle

Sequence Diagram

sequenceDiagram
    participant Client
    participant WS as WebSocket Endpoint
    participant Auth as JWT Validator
    participant CM as ConnectionManager
    participant Valkey as Valkey Pub/Sub

    Client->>WS: WSS upgrade request<br/>(Authorization: Bearer <JWT>)
    WS->>Auth: Validate JWT token
    Auth->>Auth: Verify signature, expiry, tenant

    alt Token valid
        Auth-->>WS: User context (user_id, tenant_id, roles)
        WS->>CM: register_connection(conn, user_context)
        CM->>CM: Add to user room
        CM->>CM: Add to tenant room
        CM->>Valkey: Subscribe to user + tenant channels
        WS-->>Client: 101 Switching Protocols

        loop Active Connection
            Client->>WS: Subscribe to workbook room
            WS->>CM: join_room(conn, "workbook:{id}")
            CM->>Valkey: Subscribe to workbook channel
            WS-->>Client: {"type": "subscribed", "room": "workbook:abc123"}

            Note over Valkey,CM: Worker publishes event
            Valkey-->>CM: Event on workbook channel
            CM->>CM: Find connections in room
            CM-->>Client: {"type": "calculation_progress", ...}
        end

        Client->>WS: Close connection
        WS->>CM: unregister_connection(conn)
        CM->>Valkey: Unsubscribe from channels
        CM->>CM: Remove from all rooms

    else Token invalid
        Auth-->>WS: Authentication failed
        WS-->>Client: 4001 Unauthorized (close)
    end

Connection States

State Description Transitions
connecting WebSocket upgrade in progress authenticated, rejected
authenticated JWT validated, connection registered subscribed
subscribed Client joined one or more rooms subscribed (more rooms), disconnecting
disconnecting Client or server initiating close disconnected
disconnected Connection terminated, resources cleaned (terminal)

Room Types

Rooms are logical groupings of WebSocket connections that share a common interest. Events published to a room are delivered to all connections subscribed to that room.

Room Hierarchy

Room Type Pattern Scope Example Events
Workbook workbook:{workbook_id} All users viewing a workbook calculation_progress, calculation_complete, workbook_updated
User user:{user_id} Single user's personal channel notification, export_complete, task_status
Tenant tenant:{tenant_id} All users in a tenant system_announcement, config_changed, maintenance_notice
src/websocket/rooms.py
from enum import Enum


class RoomType(str, Enum):
    WORKBOOK = "workbook"
    USER = "user"
    TENANT = "tenant"


class Room:
    """Represents a WebSocket room with type-safe naming."""

    def __init__(self, room_type: RoomType, resource_id: str):
        self.room_type = room_type
        self.resource_id = resource_id

    @property
    def channel_name(self) -> str:
        """Valkey pub/sub channel name for this room."""
        return f"ws:{self.room_type.value}:{self.resource_id}"

    @classmethod
    def workbook(cls, workbook_id: str) -> "Room":
        return cls(RoomType.WORKBOOK, workbook_id)

    @classmethod
    def user(cls, user_id: str) -> "Room":
        return cls(RoomType.USER, user_id)

    @classmethod
    def tenant(cls, tenant_id: str) -> "Room":
        return cls(RoomType.TENANT, tenant_id)

    def __str__(self) -> str:
        return self.channel_name

Automatic Room Subscriptions

When a connection is established, the client is automatically subscribed to their user room and tenant room. Workbook rooms require explicit subscription via a subscribe message.


Event Types

All events share a common envelope structure with type-specific payloads:

src/websocket/events.py
from dataclasses import dataclass, field
from enum import Enum
import time
import uuid


class EventType(str, Enum):
    # Calculation events
    CALCULATION_PROGRESS = "calculation_progress"
    CALCULATION_COMPLETE = "calculation_complete"
    CALCULATION_FAILED = "calculation_failed"

    # Workbook events
    WORKBOOK_UPDATED = "workbook_updated"
    WORKBOOK_DELETED = "workbook_deleted"

    # Export events
    EXPORT_PROGRESS = "export_progress"
    EXPORT_COMPLETE = "export_complete"
    EXPORT_FAILED = "export_failed"

    # Scenario events
    SCENARIO_COMPLETE = "scenario_complete"
    SCENARIO_FAILED = "scenario_failed"

    # Reconciliation events
    RECON_COMPLETE = "reconciliation_complete"
    RECON_DISCREPANCIES = "reconciliation_discrepancies"

    # Notification events
    NOTIFICATION = "notification"

    # System events
    SYSTEM_ANNOUNCEMENT = "system_announcement"
    MAINTENANCE_NOTICE = "maintenance_notice"

    # Connection events
    SUBSCRIBED = "subscribed"
    UNSUBSCRIBED = "unsubscribed"
    ERROR = "error"


@dataclass
class WebSocketEvent:
    """Standard event envelope for all WebSocket messages."""

    type: EventType
    payload: dict
    version: str = "1.0"
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: float = field(default_factory=time.time)
    room: str = ""

    def to_json(self) -> str:
        """Serialize event to JSON string for transmission."""
        import json
        return json.dumps({
            "type": self.type.value,
            "version": self.version,
            "event_id": self.event_id,
            "timestamp": self.timestamp,
            "room": self.room,
            "payload": self.payload,
        })

Event Payload Examples

{
    "type": "calculation_progress",
    "version": "1.0",
    "event_id": "550e8400-e29b-41d4-a716-446655440000",
    "timestamp": 1739577600.0,
    "room": "ws:workbook:abc-123",
    "payload": {
        "workbook_id": "abc-123",
        "job_id": "job-456",
        "progress_pct": 45,
        "current_sheet": "Holdings",
        "cells_processed": 4500,
        "cells_total": 10000,
        "eta_seconds": 12
    }
}
{
    "type": "calculation_complete",
    "version": "1.0",
    "event_id": "550e8400-e29b-41d4-a716-446655440001",
    "timestamp": 1739577612.0,
    "room": "ws:workbook:abc-123",
    "payload": {
        "workbook_id": "abc-123",
        "job_id": "job-456",
        "duration_seconds": 24.5,
        "cells_calculated": 10000,
        "errors": 0,
        "warnings": 3,
        "snapshot_id": "snap-789"
    }
}
{
    "type": "notification",
    "version": "1.0",
    "event_id": "550e8400-e29b-41d4-a716-446655440002",
    "timestamp": 1739577620.0,
    "room": "ws:user:user-001",
    "payload": {
        "title": "Export Ready",
        "message": "Your XLSX export of 'Q4 Holdings' is ready for download.",
        "severity": "info",
        "action_url": "/exports/export-789/download",
        "dismissible": true
    }
}
{
    "type": "scenario_complete",
    "version": "1.0",
    "event_id": "550e8400-e29b-41d4-a716-446655440003",
    "timestamp": 1739577650.0,
    "room": "ws:workbook:abc-123",
    "payload": {
        "workbook_id": "abc-123",
        "scenario_id": "scenario-101",
        "scenario_name": "Rate Shock +200bps",
        "duration_seconds": 8.3,
        "impact_summary": {
            "nav_change_pct": -2.4,
            "wal_change": 0.3,
            "tests_affected": ["OC_Senior", "IC_Mezzanine"]
        }
    }
}

ConnectionManager

The ConnectionManager is the central coordinator for all active WebSocket connections:

src/websocket/manager.py
import asyncio
from collections import defaultdict
from dataclasses import dataclass, field
from fastapi import WebSocket


@dataclass
class ConnectionState:
    """Tracks state for a single WebSocket connection."""

    websocket: WebSocket
    user_id: str
    tenant_id: str
    roles: list[str]
    rooms: set[str] = field(default_factory=set)
    connected_at: float = 0.0


class ConnectionManager:
    """Manages WebSocket connections, room subscriptions,
    and event delivery.

    Thread-safe for use with asyncio. Uses in-memory structures
    for connection tracking (not persisted -- connections are
    ephemeral by nature).
    """

    def __init__(self):
        self._connections: dict[str, ConnectionState] = {}
        self._rooms: dict[str, set[str]] = defaultdict(set)
        self._lock = asyncio.Lock()

    async def register(
        self,
        connection_id: str,
        websocket: WebSocket,
        user_id: str,
        tenant_id: str,
        roles: list[str],
    ) -> ConnectionState:
        """Register a new authenticated connection.

        Automatically subscribes to user and tenant rooms.
        """
        async with self._lock:
            state = ConnectionState(
                websocket=websocket,
                user_id=user_id,
                tenant_id=tenant_id,
                roles=roles,
                connected_at=time.time(),
            )
            self._connections[connection_id] = state

            # Auto-subscribe to user and tenant rooms
            user_room = f"ws:user:{user_id}"
            tenant_room = f"ws:tenant:{tenant_id}"

            state.rooms.add(user_room)
            state.rooms.add(tenant_room)
            self._rooms[user_room].add(connection_id)
            self._rooms[tenant_room].add(connection_id)

            return state

    async def unregister(self, connection_id: str) -> None:
        """Remove a connection and clean up all room subscriptions."""
        async with self._lock:
            state = self._connections.pop(connection_id, None)
            if state:
                for room in state.rooms:
                    self._rooms[room].discard(connection_id)
                    if not self._rooms[room]:
                        del self._rooms[room]

    async def join_room(
        self, connection_id: str, room: str
    ) -> bool:
        """Subscribe a connection to a room.

        Returns False if the connection does not have permission
        to join the requested room (e.g., workbook belongs to
        a different tenant).
        """
        async with self._lock:
            state = self._connections.get(connection_id)
            if not state:
                return False

            # Permission check: workbook rooms require tenant match
            if room.startswith("ws:workbook:"):
                if not await self._verify_workbook_access(
                    state.tenant_id, room
                ):
                    return False

            state.rooms.add(room)
            self._rooms[room].add(connection_id)
            return True

    async def leave_room(
        self, connection_id: str, room: str
    ) -> None:
        """Unsubscribe a connection from a room."""
        async with self._lock:
            state = self._connections.get(connection_id)
            if state:
                state.rooms.discard(room)
                self._rooms[room].discard(connection_id)

    async def broadcast_to_room(
        self, room: str, event: WebSocketEvent
    ) -> int:
        """Send an event to all connections in a room.

        Returns the number of connections that received the event.
        Failed sends (disconnected clients) are cleaned up.
        """
        event.room = room
        message = event.to_json()
        delivered = 0
        failed = []

        connection_ids = self._rooms.get(room, set()).copy()

        for conn_id in connection_ids:
            state = self._connections.get(conn_id)
            if state:
                try:
                    await state.websocket.send_text(message)
                    delivered += 1
                except Exception:
                    failed.append(conn_id)

        # Clean up failed connections
        for conn_id in failed:
            await self.unregister(conn_id)

        return delivered

    @property
    def active_connections(self) -> int:
        """Total number of active connections."""
        return len(self._connections)

    @property
    def active_rooms(self) -> int:
        """Total number of rooms with at least one subscriber."""
        return len(self._rooms)

Celery Worker Integration

Celery workers publish events to Valkey pub/sub channels. The API process subscribes to these channels and forwards events to connected WebSocket clients.

Publisher (Worker Side)

src/websocket/publisher.py
import json
from valkey import Valkey


class EventPublisher:
    """Publishes WebSocket events from Celery workers via Valkey pub/sub.

    Workers do not maintain WebSocket connections directly.
    Instead, they publish events to Valkey channels, and the
    API process's subscriber forwards them to connected clients.
    """

    def __init__(self, valkey_client: Valkey):
        self._valkey = valkey_client

    def publish(self, room: str, event: WebSocketEvent) -> int:
        """Publish an event to a Valkey channel.

        Returns the number of subscribers that received the message
        (Valkey pub/sub subscriber count, not WebSocket clients).
        """
        event.room = room
        return self._valkey.publish(room, event.to_json())

    def publish_calculation_progress(
        self,
        workbook_id: str,
        job_id: str,
        progress_pct: int,
        current_sheet: str,
        cells_processed: int,
        cells_total: int,
    ) -> None:
        """Convenience method for calculation progress events."""
        event = WebSocketEvent(
            type=EventType.CALCULATION_PROGRESS,
            payload={
                "workbook_id": workbook_id,
                "job_id": job_id,
                "progress_pct": progress_pct,
                "current_sheet": current_sheet,
                "cells_processed": cells_processed,
                "cells_total": cells_total,
                "eta_seconds": self._estimate_eta(
                    progress_pct, cells_processed, cells_total
                ),
            },
        )
        room = Room.workbook(workbook_id).channel_name
        self.publish(room, event)

Subscriber (API Side)

src/websocket/subscriber.py
import asyncio
import json


class EventSubscriber:
    """Subscribes to Valkey pub/sub channels and forwards events
    to the ConnectionManager for WebSocket delivery.

    Runs as a background asyncio task within the FastAPI process.
    """

    def __init__(
        self,
        valkey_client,
        connection_manager: ConnectionManager,
    ):
        self._valkey = valkey_client
        self._manager = connection_manager
        self._pubsub = None
        self._running = False

    async def start(self) -> None:
        """Start the pub/sub listener as a background task."""
        self._pubsub = self._valkey.pubsub()
        self._running = True
        asyncio.create_task(self._listen())

    async def stop(self) -> None:
        """Stop the pub/sub listener and clean up."""
        self._running = False
        if self._pubsub:
            await self._pubsub.unsubscribe()
            await self._pubsub.close()

    async def subscribe_to_room(self, room: str) -> None:
        """Subscribe to a Valkey channel for a room."""
        if self._pubsub:
            await self._pubsub.subscribe(room)

    async def unsubscribe_from_room(self, room: str) -> None:
        """Unsubscribe from a Valkey channel."""
        if self._pubsub:
            await self._pubsub.unsubscribe(room)

    async def _listen(self) -> None:
        """Main listener loop -- forwards Valkey messages
        to WebSocket clients via ConnectionManager.
        """
        while self._running:
            try:
                message = await self._pubsub.get_message(
                    ignore_subscribe_messages=True,
                    timeout=1.0,
                )
                if message and message["type"] == "message":
                    channel = message["channel"]
                    event_data = json.loads(message["data"])
                    event = WebSocketEvent(**event_data)

                    await self._manager.broadcast_to_room(
                        channel, event
                    )
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"Pub/sub listener error: {e}")
                await asyncio.sleep(1)  # Back off on error

Worker-to-Client Event Flow

sequenceDiagram
    participant Worker as Celery Worker
    participant Publisher as EventPublisher
    participant Valkey as Valkey Pub/Sub
    participant Subscriber as EventSubscriber
    participant CM as ConnectionManager
    participant Client as Browser (WebSocket)

    Worker->>Worker: Processing calculation...
    Worker->>Publisher: publish_calculation_progress(45%)
    Publisher->>Valkey: PUBLISH ws:workbook:abc-123

    Valkey-->>Subscriber: Message on ws:workbook:abc-123
    Subscriber->>CM: broadcast_to_room("ws:workbook:abc-123", event)
    CM->>CM: Find all connections in room
    CM-->>Client: {"type": "calculation_progress", "payload": {...}}

    Worker->>Worker: Calculation complete
    Worker->>Publisher: publish_calculation_complete()
    Publisher->>Valkey: PUBLISH ws:workbook:abc-123

    Valkey-->>Subscriber: Message on ws:workbook:abc-123
    Subscriber->>CM: broadcast_to_room("ws:workbook:abc-123", event)
    CM-->>Client: {"type": "calculation_complete", "payload": {...}}

Authentication

WebSocket connections authenticate using the same JWT tokens as REST API requests. The token is validated during the WebSocket upgrade handshake:

src/websocket/auth.py
from fastapi import WebSocket, status
from src.core.security import verify_jwt_token


async def authenticate_websocket(
    websocket: WebSocket,
) -> dict | None:
    """Authenticate a WebSocket connection via JWT.

    The token can be provided in two ways:
    1. Authorization header: "Bearer <token>"
    2. Query parameter: "?token=<token>"

    Returns user context dict or None if authentication fails.
    """
    # Try Authorization header first
    token = None
    auth_header = websocket.headers.get("authorization")
    if auth_header and auth_header.startswith("Bearer "):
        token = auth_header[7:]

    # Fall back to query parameter
    if not token:
        token = websocket.query_params.get("token")

    if not token:
        await websocket.close(
            code=4001, reason="Missing authentication token"
        )
        return None

    try:
        payload = verify_jwt_token(token)
        return {
            "user_id": payload["sub"],
            "tenant_id": payload["tenant_id"],
            "roles": payload.get("roles", []),
        }
    except Exception:
        await websocket.close(
            code=4001, reason="Invalid or expired token"
        )
        return None

Token Refresh

JWT tokens used for WebSocket connections may expire during long-lived sessions. The client should monitor for 4001 close codes and re-establish the connection with a refreshed token. The Next.js frontend handles this automatically via the useWebSocket hook.

Close Codes

Code Meaning Client Action
1000 Normal closure No action needed
1001 Server going away (deployment) Reconnect with backoff
1011 Server error Reconnect with backoff
4001 Unauthorized (JWT invalid/expired) Refresh token, reconnect
4003 Forbidden (insufficient permissions) Display error to user
4009 Connection limit exceeded Wait and retry
4029 Rate limited Wait for cooldown, retry

Client Integration

Next.js WebSocket Hook

The frontend uses a custom React hook for WebSocket management:

frontend/src/hooks/useWebSocket.ts
interface UseWebSocketOptions {
  rooms?: string[];
  onEvent?: (event: WebSocketEvent) => void;
  reconnectAttempts?: number;
  reconnectInterval?: number;
}

function useWebSocket(options: UseWebSocketOptions) {
  const { token, refreshToken } = useAuth();
  const [status, setStatus] = useState<ConnectionStatus>("disconnected");
  const wsRef = useRef<WebSocket | null>(null);

  useEffect(() => {
    const wsUrl = `${WS_BASE_URL}/ws?token=${token}`;
    const ws = new WebSocket(wsUrl);

    ws.onopen = () => {
      setStatus("connected");
      // Subscribe to requested rooms
      options.rooms?.forEach((room) => {
        ws.send(JSON.stringify({ action: "subscribe", room }));
      });
    };

    ws.onmessage = (msg) => {
      const event = JSON.parse(msg.data);
      options.onEvent?.(event);
    };

    ws.onclose = (e) => {
      if (e.code === 4001) {
        // Token expired -- refresh and reconnect
        refreshToken().then(() => reconnect());
      } else {
        scheduleReconnect();
      }
    };

    wsRef.current = ws;
    return () => ws.close();
  }, [token]);

  return { status, send: wsRef.current?.send };
}

Scaling Considerations

Horizontal Scaling

When running multiple API instances behind a load balancer, each instance runs its own EventSubscriber listening to Valkey pub/sub. Since Valkey pub/sub delivers messages to all subscribers, every API instance receives every event and delivers it to its locally connected clients.

Topology WebSocket Behavior
Single API instance All connections on one process, direct delivery
Multiple API instances Each instance subscribes to Valkey, delivers to its own clients
Sticky sessions (recommended) Load balancer routes WebSocket upgrades to same instance

Connection Limits

Each API instance can handle approximately 10,000 concurrent WebSocket connections. For larger deployments, use sticky sessions with a load balancer to distribute connections evenly across instances.

Backpressure Handling

When a client cannot consume events fast enough (slow network, overwhelmed browser), the ConnectionManager detects send failures and disconnects the client to prevent memory buildup:

Backpressure detection in ConnectionManager
async def _send_with_timeout(
    self, websocket: WebSocket, message: str, timeout: float = 5.0
) -> bool:
    """Send a message with timeout to detect slow clients."""
    try:
        await asyncio.wait_for(
            websocket.send_text(message), timeout=timeout
        )
        return True
    except asyncio.TimeoutError:
        logger.warning("Client send timeout, disconnecting")
        return False
    except Exception:
        return False

Metrics and Monitoring

Metric Type Description
ws_connections_active Gauge Current active WebSocket connections
ws_connections_total Counter Total connections established
ws_rooms_active Gauge Current rooms with subscribers
ws_events_published_total Counter Events published to Valkey by workers
ws_events_delivered_total Counter Events delivered to WebSocket clients
ws_events_failed_total Counter Failed event deliveries (disconnected clients)
ws_auth_failures_total Counter Authentication failures on upgrade
ws_reconnections_total Counter Client reconnections
ws_message_latency_seconds Histogram Valkey publish to client delivery latency

Configuration Reference

WebSocket configuration (environment variables)
# WebSocket server
WS_ENABLED: true
WS_PATH: /ws                           # WebSocket endpoint path
WS_MAX_CONNECTIONS_PER_TENANT: 500     # Per-tenant connection limit
WS_MAX_CONNECTIONS_TOTAL: 10000        # Global connection limit
WS_PING_INTERVAL: 30                   # Keepalive ping interval (seconds)
WS_PING_TIMEOUT: 10                    # Pong response timeout (seconds)
WS_SEND_TIMEOUT: 5                     # Client send timeout (seconds)

# Valkey pub/sub
WS_VALKEY_HOST: localhost
WS_VALKEY_PORT: 6379
WS_VALKEY_DB: 1                        # Separate DB from cache
WS_VALKEY_CHANNEL_PREFIX: ws           # Channel name prefix

# Rate limiting
WS_RATE_LIMIT_MESSAGES: 100            # Max incoming messages per minute
WS_RATE_LIMIT_SUBSCRIPTIONS: 20       # Max room subscriptions per connection

# Reconnection
WS_CLIENT_RECONNECT_MAX_ATTEMPTS: 10
WS_CLIENT_RECONNECT_INTERVAL_MS: 1000  # Initial backoff interval
WS_CLIENT_RECONNECT_MAX_INTERVAL_MS: 30000  # Max backoff interval

Error Handling

Error Behavior Client Response
JWT expired during session Server sends close code 4001 Refresh token, reconnect
Valkey pub/sub disconnection Subscriber auto-reconnects with backoff Events may be missed during gap
Client send timeout Connection terminated by server Client auto-reconnects
Room subscription denied Error event sent to client Display permission error
Rate limit exceeded Close code 4029 sent Wait, reconnect
Server shutdown (deploy) Close code 1001 sent to all Auto-reconnect to new instance

Missed Events During Reconnection

WebSocket connections are ephemeral. Events published while a client is disconnected are not queued. For critical events (e.g., calculation_complete), clients should poll the REST API after reconnection to catch up on any missed state changes.