Best Practices
Guidelines for building robust Orchestrix applications.
Aggregate Design
- Keep aggregates small — one consistency boundary per aggregate
- No external I/O in aggregates — aggregates are pure domain logic
- Use
_when_<event>methods — convention-based state mutation - Validate in commands — reject bad data before it reaches the aggregate
- Emit events, don't return values — the event is the result
@dataclass
class Order(AggregateRoot):
def create(self, cmd: CreateOrder):
# Validate
if cmd.total_amount <= 0:
raise ValueError("Amount must be positive")
# Emit event
self._apply_event(OrderCreated(
order_id=cmd.order_id,
total_amount=cmd.total_amount,
))
def _when_order_created(self, event: OrderCreated):
self.aggregate_id = event.order_id
self.total_amount = event.total_amount
Event Design
- Events are immutable facts — never modify stored events
- Use past-tense naming —
OrderCreated, notCreateOrder - Include all relevant data — events should be self-describing
- Version events — add new fields with defaults, never remove fields
- Keep events small — only what changed, not full state
Command Design
- Use imperative naming —
CreateOrder,SuspendAccount - Validate early — use
__post_init__with built-in validators - One handler per command — commands have exactly one receiver
- Keep commands thin — just the data needed to execute
Saga Patterns
- Define compensation for every step — what happens on failure?
- Use
SagaStepwith explicit compensation:
from orchestrix.core.execution.saga import Saga, SagaStep, InMemorySagaStateStore
saga = Saga(
saga_type="OrderSaga",
steps=[
SagaStep(name="payment", action=process_payment, compensation=refund_payment),
SagaStep(name="inventory", action=reserve_stock, compensation=release_stock),
SagaStep(name="confirm", action=confirm_order),
],
state_store=InMemorySagaStateStore(),
)
await saga.execute(order_id="123", amount=99.0)
- Check saga state —
saga.is_completed(),saga.is_successful() - Handle timeouts — sagas should not hang indefinitely
Error Handling
- Use specific exceptions:
ValidationErrorfor invalid inputConcurrencyErrorfor version conflictsHandlerErrorfor handler failures
- Retry transient failures with
ExponentialBackofforFixedDelay - Dead letter queue for permanent failures
Testing
- Test aggregates in isolation — no infrastructure needed
- Use
InMemoryEventStorefor integration tests - Test saga compensation — force failures and verify rollback
- Test event replay — rebuild aggregate and assert state
- Test validation — ensure bad commands are rejected
def test_order_creation():
order = Order()
order.create(CreateOrder(order_id="1", total_amount=99.0))
assert order.aggregate_id == "1"
assert len(order.uncommitted_events) == 1
def test_invalid_amount():
order = Order()
with pytest.raises(ValueError):
order.create(CreateOrder(order_id="1", total_amount=-5.0))
Performance
- Use snapshots for aggregates with > 100 events
- Use async stores and buses for I/O-heavy workloads
- Size connection pools based on expected concurrency
- Index by aggregate_id (handled by
PostgreSQLEventStore)