Backend Architecture¶
The backend is a FastAPI application with async SQLAlchemy, structured as a Python package: webmacs_backend.
Package Structure¶
backend/src/webmacs_backend/
├── __init__.py
├── main.py # Application factory + lifespan
├── config.py # pydantic-settings Settings
├── database.py # Engine, session, init_db
├── models.py # SQLAlchemy ORM models (15 tables)
├── schemas.py # Pydantic v2 request/response schemas
├── enums.py # StrEnum definitions (EventType, StatusType, etc.)
├── security.py # JWT + bcrypt helpers
├── dependencies.py # FastAPI Depends() factories
├── repository.py # Generic async CRUD helpers (paginate, get_or_404)
├── api/
│ └── v1/
│ ├── auth.py # POST /auth/login, /logout, GET /me
│ ├── sso.py # OIDC SSO: /auth/sso/config, /authorize, /callback, /exchange
│ ├── tokens.py # API token management: CRUD /tokens
│ ├── users.py # CRUD /users
│ ├── experiments.py # CRUD /experiments + CSV export
│ ├── events.py # CRUD /events
│ ├── datapoints.py # CRUD /datapoints + /series + /latest
│ ├── logging.py # CRUD /logs
│ ├── rules.py # CRUD /rules (automation engine)
│ ├── webhooks.py # CRUD /webhooks + deliveries
│ ├── ota.py # CRUD /ota + apply/rollback/check/upload
│ ├── dashboards.py # CRUD /dashboards + widgets
│ ├── plugins.py # Plugin discovery, instances, channels, packages
│ └── health.py # GET /health
├── services/
│ ├── ingestion.py # Shared datapoint ingestion pipeline
│ ├── webhook_dispatcher.py # HMAC-signed webhook delivery
│ ├── rule_evaluator.py # Threshold-based rule engine
│ ├── ota_service.py # Firmware update management
│ ├── updater.py # System update application
│ ├── log_service.py # Logging service
│ └── wheel_validator.py # Plugin wheel upload validation
├── middleware/
│ ├── rate_limit.py # Request rate limiting
│ └── request_id.py # X-Request-ID header injection
└── ws/
├── connection_manager.py # Pub/sub hub for WS groups
└── endpoints.py # /ws/controller/telemetry, /ws/datapoints/stream
Application Lifecycle¶
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None]:
await init_db() # Create tables
await _seed_admin() # Seed admin if no users exist
yield
await engine.dispose() # Clean shutdown
The lifespan context manager runs once at startup and shutdown. It replaces the deprecated on_event hooks.
Configuration¶
All settings are loaded from environment variables via pydantic-settings:
class Settings(BaseSettings):
database_url: str
secret_key: str
env: str = "development"
rate_limit_per_minute: int = 300
ws_heartbeat_interval: int = 30
github_repo: str = "stefanposs/webmacs"
# ...
See Environment Variables for the complete list.
API Routers¶
All eleven routers are mounted under /api/v1/:
| Router | Prefix | Auth | Key Endpoints |
|---|---|---|---|
auth |
/auth |
Public / JWT | POST /login, POST /logout, GET /me |
sso |
/auth/sso |
Public | GET /config, GET /authorize, GET /callback, POST /exchange |
tokens |
/tokens |
JWT | GET /, POST /, DELETE /{id} |
users |
/users |
Admin | CRUD |
experiments |
/experiments |
JWT | CRUD, PUT /{id}/stop, GET /{id}/export/csv |
events |
/events |
JWT | CRUD |
datapoints |
/datapoints |
JWT | CRUD, POST /batch, POST /series, GET /latest |
logging |
/logs |
JWT | CRUD, mark read |
rules |
/rules |
Admin | CRUD (threshold-based automation) |
webhooks |
/webhooks |
Admin | CRUD + GET /{id}/deliveries |
ota |
/ota |
Admin | CRUD, POST /{id}/apply, POST /{id}/rollback, GET /check, POST /upload |
dashboards |
/dashboards |
JWT | CRUD + widget CRUD (nested) |
plugins |
/plugins |
JWT / Admin | Discovery, instance CRUD, channel mappings, package upload |
Generic Repository Layer¶
The backend uses a generic repository module (repository.py) to eliminate CRUD boilerplate:
async def paginate[M, S](db, model, schema, page, page_size, base_query=None) -> PaginatedResponse[S]:
"""Generic paginated query with automatic count."""
async def get_or_404(db, model, public_id, entity_name) -> M:
"""Fetch by public_id or raise 404."""
async def delete_by_public_id(db, model, public_id, entity_name) -> StatusResponse:
"""Delete by public_id or raise 404."""
All routers share these helpers, keeping endpoint code focused on business logic.
Datapoint Ingestion Pipeline¶
All incoming datapoints — whether via REST (POST /datapoints) or WebSocket (/ws/controller/telemetry) — pass through a single shared pipeline in services/ingestion.py:
flowchart LR
REST["REST API<br/>POST /datapoints"] --> Ingest["ingest_datapoints()"]
WS["WebSocket<br/>/ws/controller/telemetry"] --> Ingest
Ingest --> Filter["Filter by<br/>active plugin events"]
Filter --> DB["Persist to DB"]
DB --> Webhooks["Fire webhooks"]
DB --> Rules["Evaluate rules"]
DB --> Broadcast["WS broadcast<br/>to dashboards"]
This eliminates code duplication and guarantees consistent behaviour across both ingestion paths. Key components:
| Function | Purpose |
|---|---|
active_plugin_event_ids(db) |
Returns event IDs that belong to an enabled, connected plugin |
active_experiment_id(db) |
Returns the currently running experiment ID (if any) |
ingest_datapoints(db, items) |
Validates, persists, and triggers side-effects for a batch of datapoints |
The pipeline also guards against phantom data — datapoints submitted for events whose plugin is disabled or disconnected are silently filtered out.
Security¶
- bcrypt for password hashing (via
bcryptpackage directly) - JWT tokens with
python-jose(HS256, 24h expiry) get_current_userdependency validates token on every protected route- Admin-only routes check
user.admin == True - Token blacklisting on logout via
blacklist_tokenstable - Rate limiting middleware (configurable per-minute limit)
Next Steps¶
- WebSocket Design — real-time data streaming
- Database Layer — ORM models and repository helpers
- REST API Reference — endpoint documentation
- User Guide: Rules — automation engine for engineers