Async API Design
Status: Implemented
Authors: Stefan Poss
Summary
Orchestrix provides full async/await support alongside its synchronous API. The async implementation is non-blocking and supports modern Python async frameworks like FastAPI, Starlette, and Quart.
Goals
- First-class async support for all core abstractions
- Zero breaking changes — sync and async APIs coexist
- Concurrent handler execution via
asyncio.gather() - Full type safety with async/await
- No new external dependencies (uses
asynciostdlib)
Async Message Bus
Protocol
from orchestrix.core.messaging.message_bus import AsyncMessageBus
class AsyncMessageBus(Protocol):
async def publish(self, message: Message) -> None: ...
def subscribe(
self,
message_type: type[T],
handler: Callable[[T], Coroutine[Any, Any, None]],
) -> None: ...
Implementation
Key behavior: when publish() is called, all handlers for that message type run concurrently via asyncio.gather(). If any handler fails, a HandlerError is raised.
bus = InMemoryAsyncMessageBus()
async def handle_order(event: OrderCreated) -> None:
print(f"Processing {event.order_id}")
async def send_email(event: OrderCreated) -> None:
print(f"Emailing for {event.order_id}")
bus.subscribe(OrderCreated, handle_order)
bus.subscribe(OrderCreated, send_email)
# Both handlers execute concurrently
await bus.publish(OrderCreated(order_id="ORD-001"))
Async Event Store
Protocol
from orchestrix.core.eventsourcing.event_store import AsyncEventStore
class AsyncEventStore(Protocol):
async def save(
self,
aggregate_id: str,
events: Sequence[Event],
expected_version: int | None = None,
) -> None: ...
async def load(
self,
aggregate_id: str,
from_version: int = 0,
) -> list[Event]: ...
In-Memory Implementation
from orchestrix.infrastructure.memory.async_store import InMemoryAsyncEventStore
store = InMemoryAsyncEventStore()
await store.save("ACC-001", [AccountOpened(...)], expected_version=0)
events = await store.load("ACC-001")
Supports snapshots via save_snapshot() / load_snapshot().
PostgreSQL Implementation
from orchestrix.infrastructure.postgres.store import PostgreSQLEventStore
store = PostgreSQLEventStore(
connection_string="postgresql://user:pass@localhost:5432/db",
pool_min_size=5,
pool_max_size=20,
)
await store.initialize()
PostgreSQLEventStore is inherently async (asyncpg connection pooling).
Async Aggregate Repository
AggregateRepository provides both sync and async methods:
from orchestrix.core.eventsourcing.aggregate import AggregateRepository
repo = AggregateRepository(event_store=async_store)
# Async operations
account = await repo.load_async(BankAccount, "ACC-001")
account.deposit(500.0)
await repo.save_async(account)
# Sync operations (for sync event stores)
repo_sync = AggregateRepository(event_store=sync_store)
account = repo_sync.load(BankAccount, "ACC-001")
repo_sync.save(account)
Async Command Handlers
Command handlers become async coroutines:
class CreateOrderHandler:
def __init__(self, bus: AsyncMessageBus, store: AsyncEventStore) -> None:
self.bus = bus
self.store = store
async def handle(self, command: CreateOrder) -> None:
order = Order(aggregate_id=command.order_id)
order.create(command.customer_name)
events = order.uncommitted_events
await self.store.save(command.order_id, events)
for event in events:
await self.bus.publish(event)
order.mark_events_committed()
Async Sagas
Saga is async-native. Steps can be sync or async callables:
from orchestrix.core.execution.saga import Saga, SagaStep, InMemorySagaStateStore
async def reserve_inventory(order_id: str) -> None: ...
async def release_inventory(order_id: str) -> None: ...
saga = Saga(
saga_type="order-fulfillment",
steps=[
SagaStep("reserve", reserve_inventory, compensation=release_inventory),
SagaStep("charge", charge_payment, compensation=refund_payment),
],
state_store=InMemorySagaStateStore(),
)
await saga.initialize()
await saga.execute(order_id="ORD-001")
Async Projections
ProjectionEngine is fully async:
from orchestrix.core.eventsourcing.projection import (
ProjectionEngine, InMemoryProjectionStateStore,
)
engine = ProjectionEngine("order-summary", InMemoryProjectionStateStore())
@engine.on(OrderCreated)
async def on_order(event: OrderCreated) -> None:
totals[event.order_id] = event.total
await engine.initialize()
await engine.process_events(events)
Migration Guide
Sync to Async
| Sync | Async |
|---|---|
InMemoryMessageBus |
InMemoryAsyncMessageBus |
InMemoryEventStore |
InMemoryAsyncEventStore |
bus.publish(msg) |
await bus.publish(msg) |
store.save(id, events) |
await store.save(id, events) |
repo.load(Type, id) |
await repo.load_async(Type, id) |
repo.save(agg) |
await repo.save_async(agg) |
Both APIs coexist — no breaking changes. Choose the variant that matches your context.
FastAPI Integration Example
from fastapi import FastAPI
from orchestrix.infrastructure.memory.async_bus import InMemoryAsyncMessageBus
from orchestrix.infrastructure.memory.async_store import InMemoryAsyncEventStore
app = FastAPI()
bus = InMemoryAsyncMessageBus()
store = InMemoryAsyncEventStore()
@app.post("/orders")
async def create_order(order_id: str, customer: str):
await bus.publish(CreateOrder(order_id=order_id, customer_name=customer))
return {"status": "accepted"}
Performance
Async handlers for I/O-bound workloads see significant improvements:
| Approach | 100 messages × 5 handlers | Notes |
|---|---|---|
| Sync (sequential) | ~100ms | One at a time |
| Async (concurrent) | ~1ms | asyncio.gather() |
The advantage scales with handler I/O latency (network, database, file I/O).
Next Steps
- Architecture — System design overview
- Infrastructure API — Async implementation details
- Lakehouse Demo — Full async FastAPI example