Core Concepts
Orchestrix implements Domain-Driven Design (DDD), Event Sourcing, and CQRS patterns. This page covers the building blocks.
Messages
All communication uses Messages — immutable, CloudEvents-compatible dataclasses:
Every message has auto-generated fields: id, type, source, timestamp,
correlation_id, causation_id, trace_id.
Commands
Commands = intentions. Imperative naming. Handled by exactly one handler.
from dataclasses import dataclass
from orchestrix.core.messaging.message import Command
@dataclass(frozen=True, kw_only=True)
class CreateOrder(Command):
order_id: str
customer_name: str
total_amount: float
Events
Events = facts. Past-tense naming. Handled by zero or more handlers.
from orchestrix.core.messaging.message import Event
@dataclass(frozen=True, kw_only=True)
class OrderCreated(Event):
order_id: str
customer_name: str
total_amount: float
Aggregates
Aggregates enforce business rules and emit events. State is rebuilt from events.
from orchestrix.core.eventsourcing.aggregate import AggregateRoot
@dataclass
class Order(AggregateRoot):
customer_name: str = ""
total_amount: float = 0.0
status: str = "pending"
def create(self, cmd):
self._apply_event(OrderCreated(...))
def _when_order_created(self, event: OrderCreated):
self.aggregate_id = event.order_id
self.customer_name = event.customer_name
self.total_amount = event.total_amount
Key methods on AggregateRoot:
| Method | Purpose |
|---|---|
_apply_event(event) |
Record event and apply state change |
_when_<snake_case>(event) |
Convention-based state mutator |
mark_events_committed() |
Clear pending events after save |
AggregateRepository
Loads and saves aggregates via an event store:
from orchestrix.core.eventsourcing.aggregate import AggregateRepository
from orchestrix.infrastructure.memory.store import InMemoryEventStore
store = InMemoryEventStore()
repo = AggregateRepository(event_store=store)
# Save
repo.save(order_aggregate)
# Load (replays events to rebuild state)
order = repo.load(Order, "order-123")
# Async variants
await repo.save_async(order)
order = await repo.load_async(Order, "order-123")
Message Bus
Routes messages to handlers:
from orchestrix.infrastructure.memory.bus import InMemoryMessageBus
bus = InMemoryMessageBus()
bus.subscribe(CreateOrder, handle_create_order)
bus.subscribe(OrderCreated, send_confirmation_email)
bus.publish(CreateOrder(...))
Async variant: InMemoryAsyncMessageBus with await bus.publish(...).
Event Store
Persists event streams. Protocol with save() and load():
from orchestrix.infrastructure.memory.store import InMemoryEventStore
store = InMemoryEventStore()
store.save("order-123", [event1, event2], expected_version=0)
events = store.load("order-123")
Implementations:
| Store | Use Case |
|---|---|
InMemoryEventStore |
Development, testing |
InMemoryAsyncEventStore |
Async development |
PostgreSQLEventStore |
Production (asyncpg) |
GCPCloudSQLEventStore |
Google Cloud SQL |
GCPBigQueryEventStore |
Analytics workloads |
Modules
Encapsulate domain logic and wire handlers:
from orchestrix.core.common.module import Module
class OrderModule:
def register(self, bus, store):
bus.subscribe(CreateOrder, CreateOrderHandler(bus, store))
bus.subscribe(OrderCreated, log_order)
Flow
Application → Command → MessageBus → CommandHandler → Aggregate → Events
↓
EventStore
↓
Event Handlers (projections, side-effects)
Next Steps
- User Guide — Production patterns
- Demos — Working examples