Testing
Testing strategies for Orchestrix — from unit to integration tests.
Test Setup
Install dependencies
Run tests
Test Structure
test/
├── conftest.py # Shared fixtures (bus, store)
├── components/orchestrix/
│ ├── core/
│ │ ├── test_message.py # Message creation and metadata
│ │ ├── test_aggregate_repository.py # AggregateRoot + Repository
│ │ ├── test_saga.py # Saga execution and compensation
│ │ ├── test_projection_engine.py # Projection processing
│ │ ├── test_versioning.py # Event upcasting
│ │ ├── test_validation.py # Built-in validators
│ │ ├── test_retry.py # Retry policies
│ │ ├── test_observability.py # ObservabilityHooks
│ │ ├── test_logging.py # StructuredLogger
│ │ ├── test_snapshot.py # Snapshot save/load
│ │ ├── test_dead_letter_queue.py # DLQ handling
│ │ ├── test_error_handling.py # Exception paths
│ │ ├── test_edge_cases.py # Boundary conditions
│ │ └── test_message_metadata.py # CloudEvents fields
│ └── infrastructure/
│ ├── test_inmemory_bus.py # InMemoryMessageBus
│ ├── test_inmemory_store.py # InMemoryEventStore
│ ├── test_async_bus.py # InMemoryAsyncMessageBus
│ ├── test_async_store.py # InMemoryAsyncEventStore
│ ├── test_postgres_store.py # PostgreSQLEventStore (testcontainers)
│ ├── test_prometheus_metrics.py # PrometheusMetrics
│ ├── test_tracing.py # JaegerTracer
│ └── …
├── benchmarks/ # Performance benchmarks
└── projects/ # Project-level integration tests
Shared Fixtures
The root conftest.py provides pre-built fixtures:
# test/conftest.py
from orchestrix.infrastructure.memory import InMemoryEventStore, InMemoryMessageBus
@pytest.fixture
def bus():
"""Fresh InMemoryMessageBus for each test."""
return InMemoryMessageBus()
@pytest.fixture
def store():
"""Fresh InMemoryEventStore for each test."""
return InMemoryEventStore()
Use them in any test:
def test_handler_stores_events(bus, store):
handler = CreateOrderHandler(bus, store)
bus.subscribe(CreateOrder, handler.handle)
bus.publish(CreateOrder(order_id="ORD-001", customer_id="CUST-123"))
events = store.load("ORD-001")
assert len(events) == 1
assert isinstance(events[0], OrderCreated)
Unit Tests
Testing Messages
from orchestrix.core.messaging.message import Command
def test_command_creation():
"""Test command with auto-generated metadata."""
command = CreateOrder(order_id="ORD-001", customer_id="CUST-123")
assert command.order_id == "ORD-001"
assert command.type == "CreateOrder" # Auto-derived from class name
assert command.id # UUID auto-generated
assert command.specversion == "1.0"
Testing Aggregates
from orchestrix.core.eventsourcing.aggregate import AggregateRoot
def test_aggregate_applies_events():
"""Test that _apply_event tracks uncommitted events."""
account = BankAccount(aggregate_id="ACC-001")
account.open("Alice", 1000.0)
assert account.owner == "Alice"
assert account.balance == 1000.0
assert len(account.uncommitted_events) == 1
assert isinstance(account.uncommitted_events[0], AccountOpened)
def test_aggregate_mark_committed():
"""Test that mark_events_committed clears the list."""
account = BankAccount(aggregate_id="ACC-001")
account.open("Alice", 1000.0)
account.mark_events_committed()
assert len(account.uncommitted_events) == 0
Testing Validation
from orchestrix.core.common.validation import validate_not_empty, ValidationError
def test_validation_rejects_empty():
with pytest.raises(ValidationError):
validate_not_empty("", "name")
def test_validation_accepts_value():
validate_not_empty("Alice", "name") # No error
Integration Tests
Testing with AggregateRepository
from orchestrix.core.eventsourcing.aggregate import AggregateRepository
from orchestrix.infrastructure.memory.store import InMemoryEventStore
def test_repository_round_trip():
"""Test save and load through repository."""
store = InMemoryEventStore()
repo = AggregateRepository(event_store=store)
# Create and save
account = BankAccount(aggregate_id="ACC-001")
account.open("Alice", 500.0)
repo.save(account)
# Load and verify
loaded = repo.load(BankAccount, "ACC-001")
assert loaded.owner == "Alice"
assert loaded.balance == 500.0
assert loaded.version == 1
Testing with MessageBus
def test_event_handler_called(bus):
"""Test that event handlers receive published events."""
received = []
bus.subscribe(OrderCreated, lambda e: received.append(e))
bus.publish(OrderCreated(order_id="ORD-001"))
assert len(received) == 1
assert received[0].order_id == "ORD-001"
Testing Async Code
import pytest
from orchestrix.infrastructure.memory.async_bus import InMemoryAsyncMessageBus
@pytest.mark.asyncio
async def test_async_bus_publish():
"""Test async message bus publishes to handlers."""
bus = InMemoryAsyncMessageBus()
received = []
async def handler(event: OrderCreated) -> None:
received.append(event)
bus.subscribe(OrderCreated, handler)
await bus.publish(OrderCreated(order_id="ORD-001"))
assert len(received) == 1
Test Patterns
Message Spy
Collect all published messages for assertions:
class MessageSpy:
def __init__(self):
self.messages: list[Message] = []
def record(self, message: Message) -> None:
self.messages.append(message)
def get_by_type(self, message_type: type) -> list[Message]:
return [m for m in self.messages if isinstance(m, message_type)]
@pytest.fixture
def spy(bus):
s = MessageSpy()
bus.subscribe(OrderCreated, s.record)
bus.subscribe(OrderCancelled, s.record)
return s
def test_with_spy(bus, spy):
bus.publish(OrderCreated(order_id="ORD-001"))
assert len(spy.get_by_type(OrderCreated)) == 1
AAA Pattern
def test_order_cancellation():
# Arrange
store = InMemoryEventStore()
repo = AggregateRepository(event_store=store)
# Act
order = Order(aggregate_id="ORD-001")
order.create("CUST-123")
order.cancel()
repo.save(order)
# Assert
loaded = repo.load(Order, "ORD-001")
assert loaded.status == "cancelled"
Parameterized Tests
@pytest.mark.parametrize("value,field,should_raise", [
("", "name", True),
("Alice", "name", False),
(" ", "name", True),
])
def test_validate_not_empty(value, field, should_raise):
if should_raise:
with pytest.raises(ValidationError):
validate_not_empty(value, field)
else:
validate_not_empty(value, field)
PostgreSQL Integration Tests
PostgreSQL tests use testcontainers for isolated containers:
@pytest.mark.asyncio
async def test_postgres_save_and_load():
"""Requires Docker running."""
store = PostgreSQLEventStore(connection_string="postgresql://...")
await store.initialize()
try:
await store.save("ACC-001", [AccountOpened(...)])
events = await store.load("ACC-001")
assert len(events) == 1
finally:
await store.close()
Container versions are managed in .container-versions.json at the repo root.
Coverage
Coverage output goes to output/coverage.xml and output/htmlcov/.
Continuous Testing
Watch Mode
Re-runs tests on every file change.
CI Pipeline
GitHub Actions runs the full test suite on every push against Python 3.12 and 3.13 on Linux, macOS, and Windows.
Next Steps
- Architecture — System design
- Contributing — Development workflow
- Best Practices — Production patterns