Adding API Endpoints¶
This guide walks through the process of adding new API endpoints to CalcBridge. The API follows FastAPI patterns with dependency injection, Pydantic models, and repository-based data access.
Architecture Overview¶
CalcBridge API follows a layered architecture:
flowchart TD
A["Client Request"] --> B["FastAPI Router<br/>src/api/routes/"]
B --> C["Dependencies<br/>Auth, DB Session"]
C --> D["Request Validation<br/>Pydantic Models"]
D --> E["Business Logic<br/>Services"]
E --> F["Data Access<br/>Repositories"]
F --> G["Database<br/>SQLAlchemy + RLS"]
G --> H["Response"]
H --> A | Layer | Location | Purpose |
|---|---|---|
| Routes | src/api/routes/ | HTTP endpoints, request handling |
| Dependencies | src/api/dependencies.py | Auth, DB session injection |
| Models | In route files or src/models/ | Request/response schemas |
| Repositories | src/db/repositories/ | Data access layer |
| Services | src/services/ | Business logic (optional) |
| ORM Models | src/db/models/ | SQLAlchemy models |
Step-by-Step Guide¶
Step 1: Create the Route File¶
Create a new route file in src/api/routes/:
"""Report management endpoints."""
from typing import List, Optional
from uuid import UUID
from fastapi import APIRouter, HTTPException, Query, status
from pydantic import BaseModel, Field
from src.api.dependencies import CurrentUser, DatabaseSession
router = APIRouter(prefix="/reports", tags=["Reports"])
Naming Conventions
- Use plural nouns for resource names (
/reports,/workbooks) - File names should match the resource (
reports.py) - Tags help organize the OpenAPI documentation
Step 2: Define Request/Response Models¶
Create Pydantic models for request validation and response serialization:
from datetime import datetime
from pydantic import BaseModel, Field
# Request Models
class ReportCreate(BaseModel):
"""Request model for creating a new report."""
name: str = Field(..., min_length=1, max_length=255)
description: Optional[str] = Field(None, max_length=1000)
report_type: str = Field(..., pattern="^(compliance|portfolio|risk)$")
workbook_id: UUID
parameters: Optional[dict] = Field(default_factory=dict)
class ReportUpdate(BaseModel):
"""Request model for updating a report."""
name: Optional[str] = Field(None, min_length=1, max_length=255)
description: Optional[str] = Field(None, max_length=1000)
parameters: Optional[dict] = None
# Response Models
class ReportResponse(BaseModel):
"""Response model for report data."""
id: str
tenant_id: str
name: str
description: Optional[str]
report_type: str
workbook_id: str
status: str
parameters: dict
created_at: datetime
updated_at: datetime
created_by: Optional[str]
class Config:
from_attributes = True
class ReportListResponse(BaseModel):
"""Paginated list of reports."""
items: List[ReportResponse]
total: int
page: int
page_size: int
total_pages: int
Model Best Practices¶
| Practice | Description |
|---|---|
| Field validation | Use Field() for constraints like min_length, max_length, pattern |
| Optional fields | Use Optional[Type] with default None for optional fields |
| Descriptions | Add docstrings to models for OpenAPI documentation |
| Config.from_attributes | Enable ORM mode for automatic serialization from SQLAlchemy models |
Step 3: Create the Repository¶
Add a repository for data access in src/db/repositories/:
"""Report repository for database operations."""
from typing import Any, List, Optional, cast
from sqlalchemy import delete, func, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from src.db.models import Report
from src.db.repositories.base import TenantScopedRepository
class ReportRepository(TenantScopedRepository[Report]):
"""Repository for Report CRUD operations.
All queries are automatically filtered by tenant_id via RLS policies.
"""
model = Report
async def create_report(
self,
*,
tenant_id: str,
name: str,
report_type: str,
workbook_id: str,
created_by: str,
description: Optional[str] = None,
parameters: Optional[dict] = None,
) -> Report:
"""Create a new report."""
return await self.create(
tenant_id=tenant_id,
name=name,
report_type=report_type,
workbook_id=workbook_id,
created_by=created_by,
description=description,
parameters=parameters or {},
status="pending",
)
async def get_by_name(
self,
tenant_id: str,
name: str,
) -> Optional[Report]:
"""Get a report by name within a tenant."""
result = await self.session.execute(
select(Report)
.where(Report.tenant_id == tenant_id)
.where(Report.name == name)
)
return result.scalar_one_or_none()
async def get_reports_paginated(
self,
tenant_id: str,
*,
skip: int = 0,
limit: int = 20,
report_type: Optional[str] = None,
status: Optional[str] = None,
) -> List[Report]:
"""Get paginated list of reports with optional filters."""
query = (
select(Report)
.where(Report.tenant_id == tenant_id)
)
if report_type:
query = query.where(Report.report_type == report_type)
if status:
query = query.where(Report.status == status)
query = (
query.order_by(Report.created_at.desc())
.offset(skip)
.limit(limit)
)
result = await self.session.execute(query)
return list(result.scalars().all())
async def update_status(self, report_id: str, status: str) -> None:
"""Update report status."""
await self.session.execute(
update(Report)
.where(Report.id == report_id)
.values(status=status)
)
await self.session.flush()
Repository Patterns¶
# Base repository provides these methods:
await repo.get_by_id(id) # Get single record
await repo.get_all(skip, limit) # Get paginated list
await repo.create(**kwargs) # Create record
await repo.update(id, **kwargs) # Update record
await repo.delete(id) # Delete record
await repo.exists(id) # Check existence
await repo.count() # Count records
# TenantScopedRepository adds:
await repo.get_by_tenant(tenant_id, skip, limit) # Tenant-filtered list
await repo.count_by_tenant(tenant_id) # Tenant-filtered count
await repo.create_for_tenant(tenant_id, **kwargs) # Create with tenant
Step 4: Implement the Endpoints¶
Now implement the route handlers:
import math
from src.db.repositories.report import ReportRepository
@router.get("", response_model=ReportListResponse)
async def list_reports(
current_user: CurrentUser,
db: DatabaseSession,
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
report_type: Optional[str] = Query(None),
status_filter: Optional[str] = Query(None, alias="status"),
) -> ReportListResponse:
"""List reports for the current tenant.
Supports pagination and filtering by type and status.
"""
repo = ReportRepository(db)
skip = (page - 1) * page_size
reports = await repo.get_reports_paginated(
tenant_id=current_user.tenant_id,
skip=skip,
limit=page_size,
report_type=report_type,
status=status_filter,
)
total = await repo.count_by_tenant(current_user.tenant_id)
total_pages = math.ceil(total / page_size) if total > 0 else 1
return ReportListResponse(
items=[ReportResponse.model_validate(r) for r in reports],
total=total,
page=page,
page_size=page_size,
total_pages=total_pages,
)
@router.post("", response_model=ReportResponse, status_code=status.HTTP_201_CREATED)
async def create_report(
data: ReportCreate,
current_user: CurrentUser,
db: DatabaseSession,
) -> ReportResponse:
"""Create a new report."""
repo = ReportRepository(db)
# Check for duplicate name
existing = await repo.get_by_name(current_user.tenant_id, data.name)
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A report with this name already exists",
)
report = await repo.create_report(
tenant_id=current_user.tenant_id,
name=data.name,
description=data.description,
report_type=data.report_type,
workbook_id=str(data.workbook_id),
parameters=data.parameters or {},
created_by=current_user.id,
)
return ReportResponse.model_validate(report)
@router.get("/{report_id}", response_model=ReportResponse)
async def get_report(
report_id: UUID,
current_user: CurrentUser,
db: DatabaseSession,
) -> ReportResponse:
"""Get a specific report by ID."""
repo = ReportRepository(db)
report = await repo.get_by_id(str(report_id))
if not report:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Report not found",
)
# Double-check tenant ownership (RLS should handle this)
if report.tenant_id != current_user.tenant_id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Report not found",
)
return ReportResponse.model_validate(report)
@router.patch("/{report_id}", response_model=ReportResponse)
async def update_report(
report_id: UUID,
data: ReportUpdate,
current_user: CurrentUser,
db: DatabaseSession,
) -> ReportResponse:
"""Update a report."""
repo = ReportRepository(db)
report = await repo.get_by_id(str(report_id))
if not report or report.tenant_id != current_user.tenant_id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Report not found",
)
# Check name conflict if updating name
if data.name and data.name != report.name:
existing = await repo.get_by_name(current_user.tenant_id, data.name)
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A report with this name already exists",
)
update_data = data.model_dump(exclude_unset=True)
if update_data:
report = await repo.update(str(report_id), **update_data)
return ReportResponse.model_validate(report)
@router.delete("/{report_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_report(
report_id: UUID,
current_user: CurrentUser,
db: DatabaseSession,
) -> None:
"""Delete a report."""
repo = ReportRepository(db)
report = await repo.get_by_id(str(report_id))
if not report or report.tenant_id != current_user.tenant_id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Report not found",
)
await repo.delete(str(report_id))
Step 5: Register the Router¶
Add your router to the API in src/api/routes/__init__.py:
from fastapi import APIRouter
from src.api.routes import (
auth,
calculations,
health,
metrics,
reports, # Add your new router
tenants,
workbooks,
)
api_router = APIRouter()
# Register all routers
api_router.include_router(auth.router)
api_router.include_router(workbooks.router)
api_router.include_router(calculations.router)
api_router.include_router(reports.router) # Add this line
api_router.include_router(tenants.router)
api_router.include_router(health.router)
api_router.include_router(metrics.router)
Step 6: Add Authentication¶
CalcBridge uses dependency injection for authentication:
from src.api.dependencies import CurrentUser, DatabaseSession
# CurrentUser - Requires valid JWT token
# Returns UserContext with: id, email, tenant_id, role
@router.get("/me")
async def get_my_data(
current_user: CurrentUser, # Automatically validates JWT
db: DatabaseSession, # Provides async DB session
) -> dict:
return {"user_id": current_user.id, "tenant_id": current_user.tenant_id}
Authentication Options¶
| Dependency | Type | Description |
|---|---|---|
CurrentUser | Required auth | Validates JWT, returns user context |
OptionalUser | Optional auth | Returns user context or None |
DatabaseSession | DB access | Async SQLAlchemy session with RLS |
AdminUser | Admin only | Requires admin role |
API Key Authentication¶
For service-to-service communication:
from src.core.auth import require_api_key
@router.get("/internal/data")
async def get_internal_data(
api_key: str = Depends(require_api_key),
db: DatabaseSession,
) -> dict:
# API key is validated
return {"status": "ok"}
Step 7: Write Tests¶
Create comprehensive tests for your endpoints:
import pytest
from httpx import AsyncClient
class TestReportsAPI:
"""Tests for reports API endpoints."""
@pytest.mark.asyncio
async def test_create_report(self, authenticated_client: AsyncClient):
"""Test creating a new report."""
# Arrange
payload = {
"name": "Q4 Compliance Report",
"description": "Quarterly compliance analysis",
"report_type": "compliance",
"workbook_id": "550e8400-e29b-41d4-a716-446655440000",
}
# Act
response = await authenticated_client.post(
"/api/v1/reports",
json=payload,
)
# Assert
assert response.status_code == 201
data = response.json()
assert data["name"] == "Q4 Compliance Report"
assert data["status"] == "pending"
assert "id" in data
@pytest.mark.asyncio
async def test_create_report_duplicate_name(
self,
authenticated_client: AsyncClient,
existing_report, # Fixture that creates a report
):
"""Test that duplicate names are rejected."""
response = await authenticated_client.post(
"/api/v1/reports",
json={
"name": existing_report.name, # Same name
"report_type": "compliance",
"workbook_id": str(existing_report.workbook_id),
},
)
assert response.status_code == 400
assert "already exists" in response.json()["detail"]
@pytest.mark.asyncio
async def test_list_reports_pagination(
self,
authenticated_client: AsyncClient,
create_reports, # Fixture that creates multiple reports
):
"""Test paginated report listing."""
response = await authenticated_client.get(
"/api/v1/reports",
params={"page": 1, "page_size": 10},
)
assert response.status_code == 200
data = response.json()
assert "items" in data
assert "total" in data
assert "page" in data
assert data["page"] == 1
@pytest.mark.asyncio
async def test_get_report_not_found(self, authenticated_client: AsyncClient):
"""Test 404 for non-existent report."""
response = await authenticated_client.get(
"/api/v1/reports/00000000-0000-0000-0000-000000000000"
)
assert response.status_code == 404
@pytest.mark.asyncio
async def test_unauthorized_access(self, client: AsyncClient):
"""Test that unauthenticated requests are rejected."""
response = await client.get("/api/v1/reports")
assert response.status_code == 401
Advanced Patterns¶
Background Tasks¶
For long-running operations, queue a Celery task:
from typing import Any, cast
@router.post("/{report_id}/generate")
async def generate_report(
report_id: UUID,
current_user: CurrentUser,
db: DatabaseSession,
) -> dict:
"""Queue report generation as background task."""
repo = ReportRepository(db)
report = await repo.get_by_id(str(report_id))
if not report:
raise HTTPException(status_code=404, detail="Report not found")
# Update status
await repo.update_status(str(report_id), "generating")
# Queue Celery task
from src.workers.tasks.reports import generate_report_task
cast(Any, generate_report_task).delay(
tenant_id=current_user.tenant_id,
report_id=str(report_id),
)
return {"status": "queued", "report_id": str(report_id)}
File Uploads¶
Handle file uploads with validation:
from fastapi import UploadFile
@router.post("/upload")
async def upload_data(
file: UploadFile,
current_user: CurrentUser,
db: DatabaseSession,
) -> dict:
"""Upload data file for processing."""
# Validate file type
if not file.filename:
raise HTTPException(status_code=400, detail="No filename provided")
valid_extensions = [".xlsx", ".csv"]
if not any(file.filename.lower().endswith(ext) for ext in valid_extensions):
raise HTTPException(
status_code=400,
detail=f"Invalid file type. Allowed: {', '.join(valid_extensions)}",
)
# Read content
content = await file.read()
file_size = len(content)
# Process file...
return {"filename": file.filename, "size": file_size}
WebSocket Endpoints¶
For real-time updates:
from fastapi import WebSocket, WebSocketDisconnect
@router.websocket("/ws/{report_id}")
async def report_status_websocket(
websocket: WebSocket,
report_id: str,
):
"""WebSocket for real-time report status updates."""
await websocket.accept()
try:
while True:
# Send status updates
status = await get_report_status(report_id)
await websocket.send_json({"status": status})
if status in ("completed", "failed"):
break
await asyncio.sleep(1)
except WebSocketDisconnect:
pass
finally:
await websocket.close()
Checklist¶
Before submitting your endpoint:
- Route file created in
src/api/routes/ - Request/response Pydantic models defined
- Repository created with needed queries
- Router registered in
__init__.py - Authentication dependency added
- Input validation with Field constraints
- Error handling with appropriate HTTP status codes
- Unit tests covering success and error cases
- Integration tests with database
- OpenAPI documentation (docstrings on endpoints)
- Tenant isolation verified (RLS + explicit checks)
Common HTTP Status Codes¶
| Code | Meaning | Usage |
|---|---|---|
200 | OK | Successful GET, PATCH |
201 | Created | Successful POST |
204 | No Content | Successful DELETE |
400 | Bad Request | Validation error, duplicate name |
401 | Unauthorized | Missing or invalid auth |
403 | Forbidden | Insufficient permissions |
404 | Not Found | Resource doesn't exist |
409 | Conflict | Resource state conflict |
422 | Unprocessable Entity | Invalid request body |
500 | Internal Error | Unexpected server error |