Temporal Workflow Orchestration (Deep Dive)
This document elaborates on Intent's use of Temporal for orchestrating domain workflows, building on the Saga concept introduced in the core concepts documentation.
Why Temporal in Intent?
Temporal is a workflow orchestration platform that Intent uses to manage complex, long-running business processes. It was chosen for several key reasons:
- Durability: Workflows continue executing even if the process or machine fails
- Reliability: Automatic retries for failed activities
- Visibility: Provides visibility into workflow execution status and history
- Scalability: Workers can be scaled independently to handle load
- Long-running processes: Supports processes that can run for days, weeks, or even longer
These characteristics make Temporal an ideal fit for implementing event-sourced systems where reliability and consistency are paramount.
Key Temporal Concepts in Intent
Workflows
Workflows in Temporal are durable, fault-tolerant functions that orchestrate activities. They are defined as code but executed as persistent, resumable programs that can span long periods of time.
In Intent, workflows are implemented in src/infra/temporal/workflows/
and are responsible for orchestrating the command-event-projection cycle.
Activities
Activities are the building blocks of workflows. They are individual tasks that workflows orchestrate. Activities can be retried independently if they fail, without having to restart the entire workflow.
In Intent, activities are defined in src/infra/temporal/activities/
and serve as the bridge between Temporal workflows and the core domain logic.
Workers
Workers are processes that execute workflow and activity code. They poll task queues for work and execute the corresponding code.
Intent includes a worker implementation in src/infra/worker.ts
that registers workflows and activities with Temporal. The worker can be started with:
npm run dev:worker aggregates # starts the aggregates worker npm run dev:worker sagas # starts the sagas worker
The processCommand Workflow
The most important workflow in Intent is the processCommand
workflow, which encapsulates the full command→event→projection cycle:
export async function processCommand(command: Command): Promise<CommandResult> { // 1. Load the aggregate const aggregate = await activities.loadAggregate( command.tenant_id, command.aggregateType, command.aggregateId ); // 2. Generate events for the command const result = await activities.routeCommand(command); if (result.status !== 'success') { return result; } // 3. Apply the events to the aggregate await activities.applyEvents( command.tenant_id, command.aggregateType, command.aggregateId, result.events ); // 4. Project the events to read models await activities.projectEvents(result.events); // 5. Route the events to interested handlers (e.g., sagas) for (const event of result.events) { await activities.routeEvent(event); } return result; }
This workflow ensures that all steps in command processing are completed reliably, with automatic retries for any failures. It maintains atomicity across the entire process, ensuring that either all steps complete successfully or none do.
Core Activities
The activities used by workflows are the bridge between Temporal and the core domain logic. The key activities include:
1. loadAggregate
Loads an aggregate from the event store, optionally using a snapshot for optimization:
export async function loadAggregate( tenantId: UUID, aggregateType: string, aggregateId: UUID ): Promise<BaseAggregate<any>> { // 1. Try to load the latest snapshot const snapshot = await eventStore.loadLatestSnapshot(tenantId, aggregateType, aggregateId); // 2. Determine the starting version const startingVersion = snapshot ? snapshot.version : 0; // 3. Load events after the snapshot version const events = await eventStore.loadEvents( tenantId, aggregateType, aggregateId, startingVersion ); // 4. Create or rehydrate the aggregate let aggregate; if (snapshot) { // Create from snapshot const AggregateClass = AggregateRegistry[aggregateType]; aggregate = AggregateClass.fromSnapshot(snapshot); } else if (events.length > 0) { // Rehydrate from events const AggregateClass = AggregateRegistry[aggregateType]; aggregate = AggregateClass.rehydrate(events); } else { // Aggregate doesn't exist yet throw new Error(`Aggregate ${aggregateType}:${aggregateId} not found`); } // 5. Apply any events after the snapshot if (snapshot) { for (const event of events) { aggregate.apply(event); } } return aggregate; }
2. routeCommand
Routes commands to appropriate handlers in the domain:
export async function routeCommand(command: Command): Promise<CommandResult> { if (!router) { router = await WorkflowRouter.create(); } return router.handle(command); }
3. applyEvents
Applies events to an aggregate and saves them to the event store:
export async function applyEvents( tenantId: UUID, aggregateType: string, aggregateId: UUID, events: Event[], ): Promise<void> { // 1. Validate events // 2. Append events to the event store await eventStore.appendEvents(tenantId, events); // 3. Optionally create a snapshot if (shouldCreateSnapshot(events)) { await snapshotAggregate(tenantId, aggregateType, aggregateId); } }
4. projectEvents
Projects events to read models:
export async function projectEvents(events: Event[]): Promise<void> { await projectEventsInfra(events, projectionPool); }
5. routeEvent
Routes events to interested handlers (e.g., Saga/PMs):
export async function routeEvent(event: Event): Promise<void> { if (!router) { router = await WorkflowRouter.create(); } return router.on(event); }
Workflow Router
The WorkflowRouter
class is a key component that ensures each aggregate's commands go to the correct workflow. It uses the aggregate ID as part of the Temporal workflow ID, ensuring single-threaded command processing per aggregate:
export class WorkflowRouter { private commandHandlers: CommandHandler[] = []; private eventHandlers: EventHandler[] = []; static async create(): Promise<WorkflowRouter> { // Load all command and event handlers const router = new WorkflowRouter(); await router.initialize(); return router; } async handle(command: Command): Promise<CommandResult> { // Find the appropriate command handler const handler = this.findCommandHandler(command); if (!handler) { return { status: 'fail', error: `No handler for command type: ${command.type}` }; } // Load the aggregate const aggregate = await loadAggregate( command.tenant_id, command.aggregateType, command.aggregateId ); // Handle the command try { const events = await handler.handleWithAggregate(command, aggregate); return { status: 'success', events }; } catch (error: any) { return { status: 'fail', error: error.message }; } } async on(event: Event): Promise<void> { // Find all event handlers that support this event const handlers = this.findEventHandlers(event); // Process the event with each handler for (const handler of handlers) { await handler.on(event); } } // Helper methods... }
This router ensures that:
- Commands are routed to the appropriate handler
- Events are routed to all interested handlers
- Aggregate consistency is maintained (commands for the same aggregate are processed sequentially)
Workers
Workers are the processes that execute workflow and activity code. Intent's worker implementation is in src/infra/worker.ts
:
async function startWorker() { const connection = await NativeConnection.connect({ address: process.env.TEMPORAL_ADDRESS || 'localhost:7233', }); const worker = await Worker.create({ connection, namespace: 'default', taskQueue: 'intent-tasks', workflowsPath: require.resolve('./infra/temporal/workflows'), activities: { loadAggregate, routeCommand, applyEvents, projectEvents, routeEvent, // Other activities... }, }); await worker.run(); } startWorker().catch((err) => { console.error(err); process.exit(1); });
This worker registers all workflows and activities with the Temporal server, making them available for execution.
Benefits and Trade-offs
Benefits
- Reliability: Temporal will retry failed activities, and preserve workflow state across failures
- Observability: Temporal Web UI provides visibility into workflow execution, making debugging easier
- Saga Pattern: Temporal is a natural fit for implementing the Saga pattern for complex business processes
- Scalability: Workers can be scaled independently to handle increased load
- Versioning: Temporal supports versioning of workflow and activity code, making it easier to evolve the system
Trade-offs
- Determinism: Workflow code must be deterministic to ensure consistent replay, which can be challenging
- Learning Curve: Temporal has a unique programming model that requires understanding
- Complexity: Adding Temporal introduces another system to manage and monitor
- State Size: Large workflow states can impact performance
Integration with Other Patterns
Temporal workflow orchestration in Intent integrates seamlessly with other architectural patterns:
- Event Sourcing: Workflows orchestrate the loading and applying of events
- CQRS: Workflows ensure that commands are processed and events are projected to read models
- Domain-Driven Design: Workflows respect domain boundaries and aggregate consistency rules
- Multi-tenancy: Workflow IDs include tenant information to maintain isolation
Observability
Intent includes comprehensive observability features for Temporal workflows:
- Tracing: Activities use
traceSpan
for distributed tracing - Logging: Extensive logging throughout activities and workflows
- Error Handling: Structured error handling and reporting
- Temporal Web UI: Provides visibility into workflow execution status and history
Adding New Workflows and Activities
To extend Intent with new workflows or activities:
- Define a new activity: Create a new function in
src/infra/temporal/activities/
and register it in the worker - Define a new workflow: Create a new function in
src/infra/temporal/workflows/
that orchestrates activities - Register with the worker: Update
src/infra/worker.ts
to include the new workflow or activity - Invoke the workflow: Use the Temporal client to start the workflow
For example, to add a new workflow for a custom long-running saga:
// Define the workflow export async function customSagaWorkflow(input: CustomSagaInput): Promise<void> { // Orchestrate activities to implement the saga const result1 = await activities.firstStep(input); const result2 = await activities.secondStep(result1); await activities.finalStep(result2); } // Register with the worker const worker = await Worker.create({ // ... workflowsPath: require.resolve('./infra/temporal/workflows'), // ... }); // Invoke the workflow const client = new Client({ connection, namespace: 'default', }); const handle = await client.workflow.start(customSagaWorkflow, { taskQueue: 'intent-tasks', workflowId: `custom-saga-${input.id}`, args: [input], });
This extensibility allows Intent to support a wide range of complex business processes while maintaining reliability and observability.