CQRS and Projections in Intent
Overview
Command Query Responsibility Segregation (CQRS) is a key architectural pattern in Intent. CQRS separates the write model (commands) from the read model (queries), allowing each to be optimized for its specific purpose. This pattern works hand-in-hand with Event Sourcing, where events generated from commands are projected into read-optimized models.
Command Side (Write Model)
Command Structure
Commands in the system are defined by the Command
interface in src/core/contracts.ts
:
export interface Command<T = any> { id: UUID; tenant_id: UUID; type: string; payload: T; status?: 'pending' | 'consumed' | 'processed' | 'failed'; metadata?: Metadata; }
Key components:
- id: Unique identifier for the command
- tenant_id: Supports multi-tenancy
- type: Describes the intent
- payload: Contains the command data
- status: Tracks the command's lifecycle
- metadata: Additional information like timestamps, correlation IDs, etc.
Command Handling
The command handling process involves several components:
- Command Bus: Routes commands to appropriate handlers (
src/core/command-bus.ts
) - Command Handlers: Domain services that implement the
CommandHandler
interface - Aggregates: Domain entities that encapsulate business rules and state changes
- Event Generation: Commands result in events that represent state changes
The dispatchCommand
function in src/infra/temporal/activities/coreActivities.ts
shows the command handling flow:
export async function dispatchCommand(cmd: Command): Promise<void> { try { // Store the command await pgCommandStore.upsert(cmd); // Route the command to a handler const result: CommandResult = await routeCommand(cmd); // Update the command status const infraStatus = result.status === 'success' ? 'consumed' : 'failed'; await pgCommandStore.markStatus(cmd.id, infraStatus, result); } catch (error: any) { await pgCommandStore.markStatus(cmd.id, 'failed', {status: 'fail', error: error.message}); throw error; } }
Query Side (Read Model)
Projections
Projections transform events into read-optimized models. They are implemented as event handlers that update database tables or other storage mechanisms.
The EventHandler
interface in src/core/contracts.ts
defines the contract for projections:
export interface EventHandler<E extends Event = Event> { supportsEvent(event: Event): event is E; on(event: E): Promise<void>; }
Projection Implementation
Projections are typically implemented in domain-specific files under src/core/slices/*/read-models/
and src/core/example-slices/*/read-models/
. For example, the system status projection in src/core/example-slices/system/read-models/system-status.projection.ts
:
export function createSystemStatusProjection( updater: ReadModelUpdaterPort<any> ): EventHandler { return { supportsEvent(event): event is Event<TestExecutedPayload> { return projectionMeta.eventTypes.includes(event.type); }, async on(event) { // Extract data from the event const { tenant_id, aggregateId, payload, metadata } = event; // Prepare data for the read model const upsertData = { id: aggregateId, tenant_id, // ... other fields mapped from the event }; // Update the read model await updater.upsert(tenant_id, aggregateId, upsertData); }, }; }
Projection Registration
Projections are registered and loaded dynamically in src/infra/projections/loadProjections.ts
:
export async function loadAllProjections(pool: DatabasePool): Promise<EventHandler[]> { const slices = await Promise.all([ import('../../core/example-slices/system/read-models/register').then(r => r.registerSystemProjections(pool)), // Add more slices here as they are implemented ]); return slices.flat(); }
Projection Processing
Events are processed by projections in the projectEvents
function in src/infra/projections/projectEvents.ts
:
export async function projectEvents( events: Event[], pool: DatabasePool, ): Promise<void> { const handlers = await loadAllProjections(pool); for (const event of events) { for (const h of handlers) { if (!h.supportsEvent(event)) continue; try { await traceSpan(`projection.handle.${event.type}`, { event }, () => h.on(event), ); } catch (err) { console.warn('Projection failed', { eventType: event.type, error: err }); } } } }
Schema Management
The system includes tools for managing projection schemas:
- Projection Metadata: Each projection defines its table schema in a
projectionMeta
object - Schema Drift Detection: The
check-projection-drift.ts
tool compares the expected schema with the actual database schema - Schema Repair: The
repair-projection-drift.ts
tool can fix schema discrepancies
Benefits and Challenges
Benefits
- Optimized Models: Each side (read and write) can be optimized for its specific purpose
- Scalability: Read and write sides can be scaled independently
- Performance: Read models can be denormalized for query performance
- Flexibility: New read models can be added without changing the write model
Challenges
- Eventual Consistency: Read models may lag behind the write model
- Complexity: Managing multiple models adds complexity
- Schema Evolution: Handling changes to event schemas and their impact on projections
- Synchronization: Ensuring read models are properly updated when events occur
Integration with Other Patterns
CQRS and projections in Intent integrate with several other patterns:
- Event Sourcing: Events from the event store are the input to projections
- Domain-Driven Design: Commands and events represent domain concepts
- Temporal Workflows: Complex processes may involve multiple commands and events
- Multi-tenancy: Projections maintain tenant isolation in read models