CQRS Read Model Design & Projection Management
This document provides an architectural deep-dive into how Intent implements Command Query Responsibility Segregation (CQRS) and manages projections for read models.
CQRS Pattern Overview
Command Query Responsibility Segregation (CQRS) is a key architectural pattern in Intent that separates the write model (commands) from the read model (queries). This separation allows each side to be optimized for its specific purpose:
- Command Side (Write Model): Optimized for data consistency, business rule validation, and capturing state changes
- Query Side (Read Model): Optimized for query performance, often using denormalized data structures
In Intent, CQRS works hand-in-hand with Event Sourcing: events generated from commands are projected into read-optimized models that serve queries efficiently.
Command Side (Write Model)
While the command side is covered in more detail in other sections (see Domain Modeling and Event Sourcing), it's important to understand how it relates to the read model.
Command Structure
Commands in Intent are defined by the Command
interface in src/core/contracts.ts
:
export interface Command<T = any> { id: UUID; tenant_id: UUID; type: string; payload: T; status?: 'pending' | 'consumed' | 'processed' | 'failed'; metadata?: Metadata; }
Command Flow
The command handling process follows these steps:
- A command is dispatched through the Command Bus
- The appropriate Command Handler processes the command
- The Command Handler loads or creates the target Aggregate
- The Aggregate's
handle
method processes the command and produces events - The events are persisted to the Event Store
- The events are then projected to update read models
This flow is implemented in the dispatchCommand
function:
export async function dispatchCommand(cmd: Command): Promise<void> { try { // Store the command await pgCommandStore.upsert(cmd); // Route the command to a handler const result: CommandResult = await routeCommand(cmd); // Update the command status const infraStatus = result.status === 'success' ? 'consumed' : 'failed'; await pgCommandStore.markStatus(cmd.id, infraStatus, result); } catch (error: any) { await pgCommandStore.markStatus(cmd.id, 'failed', {status: 'fail', error: error.message}); throw error; } }
Query Side (Read Model)
The query side is where Intent's projection system comes into play. Projections transform events into read-optimized models that can be queried efficiently.
Projection Definition
Projections in Intent are implemented as event handlers that update database tables or other storage mechanisms. They implement the EventHandler
interface:
export interface EventHandler<E extends Event = Event> { supportsEvent(event: Event): event is E; on(event: E): Promise<void>; }
Each projection typically includes:
- A list of event types it handles
- A table schema definition
- Logic for transforming events into read model updates
- Access control metadata (which roles can read the data)
Projection Implementation Example
Here's an example of a projection implementation:
// Projection metadata defines the table schema and access control export const projectionMeta = { name: 'system_status', eventTypes: [SystemEventType.TEST_EXECUTED], tables: { system_status: { columns: { id: 'uuid PRIMARY KEY', tenant_id: 'uuid NOT NULL', last_test_time: 'timestamp', test_count: 'integer', // ... other columns }, }, }, // Access control metadata readAccess: { roles: ['admin', 'user'], }, }; // The projection implementation export function createSystemStatusProjection( updater: ReadModelUpdaterPort<any> ): EventHandler { return { supportsEvent(event): event is Event<TestExecutedPayload> { return projectionMeta.eventTypes.includes(event.type); }, async on(event) { // Extract data from the event const { tenant_id, aggregateId, payload, metadata } = event; // Prepare data for the read model const upsertData = { id: aggregateId, tenant_id, last_test_time: new Date(), test_count: 1, // This would be incremented in a real implementation // ... other fields mapped from the event }; // Update the read model await updater.upsert(tenant_id, aggregateId, upsertData); }, }; }
Projection Registration
Projections are registered and loaded dynamically, allowing the system to discover all projections at runtime:
export async function loadAllProjections(pool: DatabasePool): Promise<EventHandler[]> { const slices = await Promise.all([ import('../../core/system/read-models/register').then(r => r.registerSystemProjections(pool)), // Add more slices here as they are implemented ]); return slices.flat(); }
Each domain module typically has a register.ts
file that exports all projections for that domain:
export async function registerSystemProjections(pool: DatabasePool): Promise<EventHandler[]> { const updater = createPgReadModelUpdater(pool); return [ createSystemStatusProjection(updater), // Other system projections... ]; }
Projection Processing
Events are processed by projections in the projectEvents
function:
export async function projectEvents( events: Event[], pool: DatabasePool, ): Promise<void> { const handlers = await loadAllProjections(pool); for (const event of events) { for (const h of handlers) { if (!h.supportsEvent(event)) continue; try { await traceSpan(`projection.handle.${event.type}`, { event }, () => h.on(event), ); } catch (err) { console.warn('Projection failed', { eventType: event.type, error: err }); } } } }
This function:
- Loads all registered projections
- For each event, finds projections that support the event type
- Calls the projection's
on
method to update the read model - Wraps each projection call in a trace span for observability
- Catches and logs errors to prevent one projection failure from affecting others
Schema Management
One of the key challenges in maintaining projections is managing database schemas as projections evolve. Intent addresses this with a sophisticated schema management system.
Projection Schema Definition
Each projection defines its expected database schema in its metadata:
export const projectionMeta = { name: 'system_status', tables: { system_status: { columns: { id: 'uuid PRIMARY KEY', tenant_id: 'uuid NOT NULL', // ... other columns }, }, }, // ... };
Schema Drift Detection
Intent includes a tool to detect "schema drift" - discrepancies between the expected schema (defined in code) and the actual database schema:
// Simplified example from src/tools/projection-drift/check-projection-drift.ts export async function checkProjectionDrift(pool: DatabasePool): Promise<DriftResult[]> { const projections = await loadAllProjections(pool); const results: DriftResult[] = []; for (const projection of projections) { const meta = getProjectionMeta(projection); const expectedSchema = meta.tables; for (const [tableName, tableSchema] of Object.entries(expectedSchema)) { const actualSchema = await getTableSchema(pool, tableName); const drifts = compareSchemas(tableSchema, actualSchema); if (drifts.length > 0) { results.push({ projection: meta.name, table: tableName, drifts, }); } } } return results; }
This tool is run in CI to catch schema drift early and can also be run locally during development.
Schema Repair
When schema drift is detected, Intent provides a tool to automatically generate SQL migrations to fix the discrepancies:
// Simplified example from src/tools/projection-drift/repair-projection-drift.ts export async function repairProjectionDrift(pool: DatabasePool): Promise<string[]> { const drifts = await checkProjectionDrift(pool); const migrations: string[] = []; for (const drift of drifts) { const sql = generateMigrationSql(drift); migrations.push(sql); if (autoApply) { await pool.query(sql); } } return migrations; }
This approach allows for:
- Automatic detection of schema changes
- Generation of migration scripts
- Optional automatic application of migrations
- CI integration to prevent schema drift in production
Access Policies
A key feature of Intent's projection system is the integration with access control. Each projection declares which roles can read its data:
export const projectionMeta = { // ... readAccess: { roles: ['admin', 'user'], }, };
These access policies are translated into PostgreSQL Row-Level Security (RLS) policies, ensuring that data access is controlled at the database level:
-- Example generated RLS policy CREATE POLICY "system_status_admin_user_read_policy" ON "system_status" FOR SELECT USING ( current_setting('request.jwt.claims')->>'role' IN ('admin', 'user') AND current_setting('request.jwt.claims')->>'tenant_id' = tenant_id::text );
This ensures that:
- Only users with the appropriate roles can read the data
- Users can only see data for their own tenant
- These rules are enforced at the database level, not just in application code
Benefits of CQRS in Intent
The CQRS approach in Intent provides several key benefits:
- Optimized Models: Each side (read and write) is optimized for its specific purpose
- Scalability: Read and write sides can be scaled independently
- Performance: Read models can be denormalized for query performance
- Flexibility: New read models can be added without changing the write model
- Security: Access control is built into the read model at the database level
Challenges and Mitigations
CQRS also presents some challenges, which Intent addresses:
-
Eventual Consistency: Read models may lag behind the write model
- Mitigation: Workflow engines ensure reliable processing of events to projections
-
Complexity: Managing multiple models adds complexity
- Mitigation: Clear patterns and tooling simplify projection management
-
Schema Evolution: Handling changes to event schemas and their impact on projections
- Mitigation: Schema drift detection and repair tools
-
Synchronization: Ensuring read models are properly updated when events occur
- Mitigation: The
projectEvents
function is called as part of the command processing workflow
- Mitigation: The
Extending the System
To create a new projection in Intent:
- Define a new projection in a domain module's
read-models
directory - Define the table schema and access control in the projection metadata
- Implement the
EventHandler
interface to process relevant events - Register the projection in the domain's
register.ts
file - Run the schema drift check and repair tool to create the necessary database tables
This process ensures that new projections are consistent with the system's patterns and that the database schema stays in sync with the code.