Skip to content

Adding API Endpoints

This guide walks through the process of adding new API endpoints to CalcBridge. The API follows FastAPI patterns with dependency injection, Pydantic models, and repository-based data access.


Architecture Overview

CalcBridge API follows a layered architecture:

flowchart TD
    A["Client Request"] --> B["FastAPI Router<br/>src/api/routes/"]
    B --> C["Dependencies<br/>Auth, DB Session"]
    C --> D["Request Validation<br/>Pydantic Models"]
    D --> E["Business Logic<br/>Services"]
    E --> F["Data Access<br/>Repositories"]
    F --> G["Database<br/>SQLAlchemy + RLS"]
    G --> H["Response"]
    H --> A
Layer Location Purpose
Routes src/api/routes/ HTTP endpoints, request handling
Dependencies src/api/dependencies.py Auth, DB session injection
Models In route files or src/models/ Request/response schemas
Repositories src/db/repositories/ Data access layer
Services src/services/ Business logic (optional)
ORM Models src/db/models/ SQLAlchemy models

Step-by-Step Guide

Step 1: Create the Route File

Create a new route file in src/api/routes/:

src/api/routes/reports.py
"""Report management endpoints."""

from typing import List, Optional
from uuid import UUID

from fastapi import APIRouter, HTTPException, Query, status
from pydantic import BaseModel, Field

from src.api.dependencies import CurrentUser, DatabaseSession

router = APIRouter(prefix="/reports", tags=["Reports"])

Naming Conventions

  • Use plural nouns for resource names (/reports, /workbooks)
  • File names should match the resource (reports.py)
  • Tags help organize the OpenAPI documentation

Step 2: Define Request/Response Models

Create Pydantic models for request validation and response serialization:

src/api/routes/reports.py (continued)
from datetime import datetime
from pydantic import BaseModel, Field


# Request Models
class ReportCreate(BaseModel):
    """Request model for creating a new report."""

    name: str = Field(..., min_length=1, max_length=255)
    description: Optional[str] = Field(None, max_length=1000)
    report_type: str = Field(..., pattern="^(compliance|portfolio|risk)$")
    workbook_id: UUID
    parameters: Optional[dict] = Field(default_factory=dict)


class ReportUpdate(BaseModel):
    """Request model for updating a report."""

    name: Optional[str] = Field(None, min_length=1, max_length=255)
    description: Optional[str] = Field(None, max_length=1000)
    parameters: Optional[dict] = None


# Response Models
class ReportResponse(BaseModel):
    """Response model for report data."""

    id: str
    tenant_id: str
    name: str
    description: Optional[str]
    report_type: str
    workbook_id: str
    status: str
    parameters: dict
    created_at: datetime
    updated_at: datetime
    created_by: Optional[str]

    class Config:
        from_attributes = True


class ReportListResponse(BaseModel):
    """Paginated list of reports."""

    items: List[ReportResponse]
    total: int
    page: int
    page_size: int
    total_pages: int

Model Best Practices

Practice Description
Field validation Use Field() for constraints like min_length, max_length, pattern
Optional fields Use Optional[Type] with default None for optional fields
Descriptions Add docstrings to models for OpenAPI documentation
Config.from_attributes Enable ORM mode for automatic serialization from SQLAlchemy models

Step 3: Create the Repository

Add a repository for data access in src/db/repositories/:

src/db/repositories/report.py
"""Report repository for database operations."""

from typing import Any, List, Optional, cast

from sqlalchemy import delete, func, select, update
from sqlalchemy.ext.asyncio import AsyncSession

from src.db.models import Report
from src.db.repositories.base import TenantScopedRepository


class ReportRepository(TenantScopedRepository[Report]):
    """Repository for Report CRUD operations.

    All queries are automatically filtered by tenant_id via RLS policies.
    """

    model = Report

    async def create_report(
        self,
        *,
        tenant_id: str,
        name: str,
        report_type: str,
        workbook_id: str,
        created_by: str,
        description: Optional[str] = None,
        parameters: Optional[dict] = None,
    ) -> Report:
        """Create a new report."""
        return await self.create(
            tenant_id=tenant_id,
            name=name,
            report_type=report_type,
            workbook_id=workbook_id,
            created_by=created_by,
            description=description,
            parameters=parameters or {},
            status="pending",
        )

    async def get_by_name(
        self,
        tenant_id: str,
        name: str,
    ) -> Optional[Report]:
        """Get a report by name within a tenant."""
        result = await self.session.execute(
            select(Report)
            .where(Report.tenant_id == tenant_id)
            .where(Report.name == name)
        )
        return result.scalar_one_or_none()

    async def get_reports_paginated(
        self,
        tenant_id: str,
        *,
        skip: int = 0,
        limit: int = 20,
        report_type: Optional[str] = None,
        status: Optional[str] = None,
    ) -> List[Report]:
        """Get paginated list of reports with optional filters."""
        query = (
            select(Report)
            .where(Report.tenant_id == tenant_id)
        )

        if report_type:
            query = query.where(Report.report_type == report_type)
        if status:
            query = query.where(Report.status == status)

        query = (
            query.order_by(Report.created_at.desc())
            .offset(skip)
            .limit(limit)
        )

        result = await self.session.execute(query)
        return list(result.scalars().all())

    async def update_status(self, report_id: str, status: str) -> None:
        """Update report status."""
        await self.session.execute(
            update(Report)
            .where(Report.id == report_id)
            .values(status=status)
        )
        await self.session.flush()

Repository Patterns

# Base repository provides these methods:
await repo.get_by_id(id)           # Get single record
await repo.get_all(skip, limit)    # Get paginated list
await repo.create(**kwargs)        # Create record
await repo.update(id, **kwargs)    # Update record
await repo.delete(id)              # Delete record
await repo.exists(id)              # Check existence
await repo.count()                 # Count records

# TenantScopedRepository adds:
await repo.get_by_tenant(tenant_id, skip, limit)  # Tenant-filtered list
await repo.count_by_tenant(tenant_id)             # Tenant-filtered count
await repo.create_for_tenant(tenant_id, **kwargs) # Create with tenant

Step 4: Implement the Endpoints

Now implement the route handlers:

src/api/routes/reports.py (continued)
import math

from src.db.repositories.report import ReportRepository


@router.get("", response_model=ReportListResponse)
async def list_reports(
    current_user: CurrentUser,
    db: DatabaseSession,
    page: int = Query(1, ge=1),
    page_size: int = Query(20, ge=1, le=100),
    report_type: Optional[str] = Query(None),
    status_filter: Optional[str] = Query(None, alias="status"),
) -> ReportListResponse:
    """List reports for the current tenant.

    Supports pagination and filtering by type and status.
    """
    repo = ReportRepository(db)

    skip = (page - 1) * page_size
    reports = await repo.get_reports_paginated(
        tenant_id=current_user.tenant_id,
        skip=skip,
        limit=page_size,
        report_type=report_type,
        status=status_filter,
    )

    total = await repo.count_by_tenant(current_user.tenant_id)
    total_pages = math.ceil(total / page_size) if total > 0 else 1

    return ReportListResponse(
        items=[ReportResponse.model_validate(r) for r in reports],
        total=total,
        page=page,
        page_size=page_size,
        total_pages=total_pages,
    )


@router.post("", response_model=ReportResponse, status_code=status.HTTP_201_CREATED)
async def create_report(
    data: ReportCreate,
    current_user: CurrentUser,
    db: DatabaseSession,
) -> ReportResponse:
    """Create a new report."""
    repo = ReportRepository(db)

    # Check for duplicate name
    existing = await repo.get_by_name(current_user.tenant_id, data.name)
    if existing:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="A report with this name already exists",
        )

    report = await repo.create_report(
        tenant_id=current_user.tenant_id,
        name=data.name,
        description=data.description,
        report_type=data.report_type,
        workbook_id=str(data.workbook_id),
        parameters=data.parameters or {},
        created_by=current_user.id,
    )

    return ReportResponse.model_validate(report)


@router.get("/{report_id}", response_model=ReportResponse)
async def get_report(
    report_id: UUID,
    current_user: CurrentUser,
    db: DatabaseSession,
) -> ReportResponse:
    """Get a specific report by ID."""
    repo = ReportRepository(db)

    report = await repo.get_by_id(str(report_id))

    if not report:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Report not found",
        )

    # Double-check tenant ownership (RLS should handle this)
    if report.tenant_id != current_user.tenant_id:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Report not found",
        )

    return ReportResponse.model_validate(report)


@router.patch("/{report_id}", response_model=ReportResponse)
async def update_report(
    report_id: UUID,
    data: ReportUpdate,
    current_user: CurrentUser,
    db: DatabaseSession,
) -> ReportResponse:
    """Update a report."""
    repo = ReportRepository(db)

    report = await repo.get_by_id(str(report_id))

    if not report or report.tenant_id != current_user.tenant_id:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Report not found",
        )

    # Check name conflict if updating name
    if data.name and data.name != report.name:
        existing = await repo.get_by_name(current_user.tenant_id, data.name)
        if existing:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail="A report with this name already exists",
            )

    update_data = data.model_dump(exclude_unset=True)
    if update_data:
        report = await repo.update(str(report_id), **update_data)

    return ReportResponse.model_validate(report)


@router.delete("/{report_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_report(
    report_id: UUID,
    current_user: CurrentUser,
    db: DatabaseSession,
) -> None:
    """Delete a report."""
    repo = ReportRepository(db)

    report = await repo.get_by_id(str(report_id))

    if not report or report.tenant_id != current_user.tenant_id:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Report not found",
        )

    await repo.delete(str(report_id))

Step 5: Register the Router

Add your router to the API in src/api/routes/__init__.py:

src/api/routes/__init__.py
from fastapi import APIRouter

from src.api.routes import (
    auth,
    calculations,
    health,
    metrics,
    reports,  # Add your new router
    tenants,
    workbooks,
)

api_router = APIRouter()

# Register all routers
api_router.include_router(auth.router)
api_router.include_router(workbooks.router)
api_router.include_router(calculations.router)
api_router.include_router(reports.router)  # Add this line
api_router.include_router(tenants.router)
api_router.include_router(health.router)
api_router.include_router(metrics.router)

Step 6: Add Authentication

CalcBridge uses dependency injection for authentication:

Authentication Dependencies
from src.api.dependencies import CurrentUser, DatabaseSession

# CurrentUser - Requires valid JWT token
# Returns UserContext with: id, email, tenant_id, role

@router.get("/me")
async def get_my_data(
    current_user: CurrentUser,  # Automatically validates JWT
    db: DatabaseSession,         # Provides async DB session
) -> dict:
    return {"user_id": current_user.id, "tenant_id": current_user.tenant_id}

Authentication Options

Dependency Type Description
CurrentUser Required auth Validates JWT, returns user context
OptionalUser Optional auth Returns user context or None
DatabaseSession DB access Async SQLAlchemy session with RLS
AdminUser Admin only Requires admin role

API Key Authentication

For service-to-service communication:

from src.core.auth import require_api_key

@router.get("/internal/data")
async def get_internal_data(
    api_key: str = Depends(require_api_key),
    db: DatabaseSession,
) -> dict:
    # API key is validated
    return {"status": "ok"}

Step 7: Write Tests

Create comprehensive tests for your endpoints:

tests/unit/test_reports_routes.py
import pytest
from httpx import AsyncClient


class TestReportsAPI:
    """Tests for reports API endpoints."""

    @pytest.mark.asyncio
    async def test_create_report(self, authenticated_client: AsyncClient):
        """Test creating a new report."""
        # Arrange
        payload = {
            "name": "Q4 Compliance Report",
            "description": "Quarterly compliance analysis",
            "report_type": "compliance",
            "workbook_id": "550e8400-e29b-41d4-a716-446655440000",
        }

        # Act
        response = await authenticated_client.post(
            "/api/v1/reports",
            json=payload,
        )

        # Assert
        assert response.status_code == 201
        data = response.json()
        assert data["name"] == "Q4 Compliance Report"
        assert data["status"] == "pending"
        assert "id" in data

    @pytest.mark.asyncio
    async def test_create_report_duplicate_name(
        self,
        authenticated_client: AsyncClient,
        existing_report,  # Fixture that creates a report
    ):
        """Test that duplicate names are rejected."""
        response = await authenticated_client.post(
            "/api/v1/reports",
            json={
                "name": existing_report.name,  # Same name
                "report_type": "compliance",
                "workbook_id": str(existing_report.workbook_id),
            },
        )

        assert response.status_code == 400
        assert "already exists" in response.json()["detail"]

    @pytest.mark.asyncio
    async def test_list_reports_pagination(
        self,
        authenticated_client: AsyncClient,
        create_reports,  # Fixture that creates multiple reports
    ):
        """Test paginated report listing."""
        response = await authenticated_client.get(
            "/api/v1/reports",
            params={"page": 1, "page_size": 10},
        )

        assert response.status_code == 200
        data = response.json()
        assert "items" in data
        assert "total" in data
        assert "page" in data
        assert data["page"] == 1

    @pytest.mark.asyncio
    async def test_get_report_not_found(self, authenticated_client: AsyncClient):
        """Test 404 for non-existent report."""
        response = await authenticated_client.get(
            "/api/v1/reports/00000000-0000-0000-0000-000000000000"
        )

        assert response.status_code == 404

    @pytest.mark.asyncio
    async def test_unauthorized_access(self, client: AsyncClient):
        """Test that unauthenticated requests are rejected."""
        response = await client.get("/api/v1/reports")

        assert response.status_code == 401

Advanced Patterns

Background Tasks

For long-running operations, queue a Celery task:

from typing import Any, cast

@router.post("/{report_id}/generate")
async def generate_report(
    report_id: UUID,
    current_user: CurrentUser,
    db: DatabaseSession,
) -> dict:
    """Queue report generation as background task."""
    repo = ReportRepository(db)
    report = await repo.get_by_id(str(report_id))

    if not report:
        raise HTTPException(status_code=404, detail="Report not found")

    # Update status
    await repo.update_status(str(report_id), "generating")

    # Queue Celery task
    from src.workers.tasks.reports import generate_report_task

    cast(Any, generate_report_task).delay(
        tenant_id=current_user.tenant_id,
        report_id=str(report_id),
    )

    return {"status": "queued", "report_id": str(report_id)}

File Uploads

Handle file uploads with validation:

from fastapi import UploadFile

@router.post("/upload")
async def upload_data(
    file: UploadFile,
    current_user: CurrentUser,
    db: DatabaseSession,
) -> dict:
    """Upload data file for processing."""
    # Validate file type
    if not file.filename:
        raise HTTPException(status_code=400, detail="No filename provided")

    valid_extensions = [".xlsx", ".csv"]
    if not any(file.filename.lower().endswith(ext) for ext in valid_extensions):
        raise HTTPException(
            status_code=400,
            detail=f"Invalid file type. Allowed: {', '.join(valid_extensions)}",
        )

    # Read content
    content = await file.read()
    file_size = len(content)

    # Process file...
    return {"filename": file.filename, "size": file_size}

WebSocket Endpoints

For real-time updates:

from fastapi import WebSocket, WebSocketDisconnect

@router.websocket("/ws/{report_id}")
async def report_status_websocket(
    websocket: WebSocket,
    report_id: str,
):
    """WebSocket for real-time report status updates."""
    await websocket.accept()

    try:
        while True:
            # Send status updates
            status = await get_report_status(report_id)
            await websocket.send_json({"status": status})

            if status in ("completed", "failed"):
                break

            await asyncio.sleep(1)
    except WebSocketDisconnect:
        pass
    finally:
        await websocket.close()

Checklist

Before submitting your endpoint:

  • Route file created in src/api/routes/
  • Request/response Pydantic models defined
  • Repository created with needed queries
  • Router registered in __init__.py
  • Authentication dependency added
  • Input validation with Field constraints
  • Error handling with appropriate HTTP status codes
  • Unit tests covering success and error cases
  • Integration tests with database
  • OpenAPI documentation (docstrings on endpoints)
  • Tenant isolation verified (RLS + explicit checks)

Common HTTP Status Codes

Code Meaning Usage
200 OK Successful GET, PATCH
201 Created Successful POST
204 No Content Successful DELETE
400 Bad Request Validation error, duplicate name
401 Unauthorized Missing or invalid auth
403 Forbidden Insufficient permissions
404 Not Found Resource doesn't exist
409 Conflict Resource state conflict
422 Unprocessable Entity Invalid request body
500 Internal Error Unexpected server error