Quick Start
Build your first event-sourced application in 5 minutes.
1. Define Messages
from dataclasses import dataclass
from orchestrix.core.messaging.message import Command, Event
@dataclass(frozen=True, kw_only=True)
class CreateTask(Command):
task_id: str
title: str
@dataclass(frozen=True, kw_only=True)
class TaskCreated(Event):
task_id: str
title: str
@dataclass(frozen=True, kw_only=True)
class CompleteTask(Command):
task_id: str
@dataclass(frozen=True, kw_only=True)
class TaskCompleted(Event):
task_id: str
2. Create an Aggregate
from dataclasses import dataclass, field
from orchestrix.core.eventsourcing.aggregate import AggregateRoot
@dataclass
class Task(AggregateRoot):
title: str = ""
completed: bool = False
def create(self, cmd: CreateTask):
event = TaskCreated(task_id=cmd.task_id, title=cmd.title)
self._apply_event(event)
def complete(self, cmd: CompleteTask):
if self.completed:
raise ValueError("Already completed")
event = TaskCompleted(task_id=cmd.task_id)
self._apply_event(event)
# _when methods — called automatically by _apply_event
def _when_task_created(self, event: TaskCreated):
self.aggregate_id = event.task_id
self.title = event.title
def _when_task_completed(self, event: TaskCompleted):
self.completed = True
3. Wire Infrastructure & Run
from orchestrix.core.eventsourcing.aggregate import AggregateRepository
from orchestrix.infrastructure.memory.store import InMemoryEventStore
store = InMemoryEventStore()
repo = AggregateRepository(event_store=store)
# Create a task
task = Task()
task.create(CreateTask(task_id="TASK-001", title="Learn Orchestrix"))
repo.save(task)
# Load and complete
task = repo.load(Task, "TASK-001")
task.complete(CompleteTask(task_id="TASK-001"))
repo.save(task)
print(f"Task: {task.title}, completed: {task.completed}")
# → Task: Learn Orchestrix, completed: True
4. Add Event Handlers (Optional)
from orchestrix.infrastructure.memory.bus import InMemoryMessageBus
bus = InMemoryMessageBus()
bus.subscribe(TaskCreated, lambda e: print(f"📝 Created: {e.title}"))
bus.subscribe(TaskCompleted, lambda e: print(f"✅ Completed: {e.task_id}"))
bus.publish(TaskCreated(task_id="TASK-002", title="Read the docs"))
# → 📝 Created: Read the docs
How It Works
Command (CreateTask) → Aggregate (Task) → Event (TaskCreated)
↓
EventStore.save()
↓
MessageBus → Event Handlers
- A Command expresses intent
- The Aggregate validates business rules and emits Events
- Events are saved to the EventStore (immutable audit trail)
- Events can be published to a MessageBus for side-effects
Next Steps
- Core Concepts — Aggregates, Messages, CQRS in depth
- Demos — Banking, E-Commerce, Lakehouse, and more
- User Guide — Production patterns