Skip to content

Engine Lifecycle

Understanding the WorkflowEngine lifecycle is essential for proper integration.

Initialization

Creating the Engine

dart
// 1. Create deserialization context with descriptors
final context = RegistryDeserializationContext(
  descriptors: [DefaultWorkflowDescriptor()],
);

// 2. Create storage (in-memory or custom implementation)
final storage = InMemoryStorage(context: context);
// Or for production: PostgresWorkflowStorage(connectionString)

// 3. Create engine with context and storage
final engine = WorkflowEngine(
  context: context,
  storage: storage,
);

// 4. Initialize engine (connects to storage, recovers stale workflows)
await engine.initialize();

Configuration Options

dart
final engine = WorkflowEngine(
  context: context,
  storage: storage,
  config: WorkflowEngineConfig(
    enableDebugLogging: true,
    maxConcurrentWorkflows: 100,
    staleWorkflowThreshold: Duration(minutes: 5),
    enableAutomaticRecovery: true,
  ),
);

Registration Phase

Register executors via descriptors and workflows before starting instances:

dart
// 1. Create descriptors with executors
final descriptor = WorkflowDescriptor(
  title: 'My Executors',
  tasks: [
    SendEmailTaskExecutor.typeDescriptor,
    ValidateDataTaskExecutor.typeDescriptor,
  ],
  userTasks: [ApprovalTaskExecutor.typeDescriptor],
  conditions: [ApprovedCondition.typeDescriptor],
);

// 2. Create context with all descriptors
final context = RegistryDeserializationContext(
  descriptors: [DefaultWorkflowDescriptor(), descriptor],
);

// 3. Create engine with context
final engine = WorkflowEngine(
  context: context,
  storage: InMemoryStorage(context: context),
);
await engine.initialize();

// 4. Register workflows
engine.registerWorkflow(approvalWorkflow);
engine.registerWorkflow(orderProcessingWorkflow);

Execution Phase

Starting Workflows

dart
final instance = await engine.startWorkflow(
  workflowCode: 'approval-workflow',  // Use workflowCode for semantic lookup
  input: {
    'entityId': 'REQ-001',
    'submittedBy': '[email protected]',
  },
);

Processing Signals

dart
await engine.sendSignal(
  workflowInstanceId: instance.id,
  node: 'awaitApproval',  // The signal wait or user task node ID
  payload: {'decision': 'approved'},
);

Resuming Workflows

After server restart, the engine can automatically recover stale workflows:

dart
// Enable automatic recovery (default is true)
final context = RegistryDeserializationContext(
  descriptors: [DefaultWorkflowDescriptor()],
);

final engine = WorkflowEngine(
  context: context,
  storage: InMemoryStorage(context: context),
  config: WorkflowEngineConfig(
    enableAutomaticRecovery: true,
    staleWorkflowThreshold: Duration(minutes: 5),
  ),
);
await engine.initialize();  // Automatically recovers stale workflows

// Or manually resume a specific workflow
await engine.resumeWorkflow(instanceId);

// Or manually recover all stale workflows
final recoveredCount = await engine.recoverStaleWorkflows();

Execution Flow

Shutdown

Graceful shutdown ensures no data loss:

dart
// Dispose engine and storage connections
await engine.dispose();

Error Handling

Engine-Level Errors

dart
try {
  await engine.startWorkflow(workflowCode: 'my-workflow');
} on WorkflowNotFoundException catch (e) {
  print('Workflow not found: $e');
} on WorkflowNotActiveException catch (e) {
  print('Workflow not active: $e');
} on WorkflowValidationException catch (e) {
  print('Validation errors: ${e.errors}');
} on WorkflowAlreadyTerminatedException catch (e) {
  print('Workflow already terminated: $e');
}

Task-Level Errors

Task failures are handled by returning a FailWorkflowResult:

dart
// In task executor
return FailWorkflowResult.validation('Invalid input');

// Or with more details
return FailWorkflowResult(
  errorType: WorkflowErrorType.taskError,
  message: 'Task failed',
  details: {'field': 'value'},
);

// Engine marks workflow as failed and emits error event

Event Streams

The engine provides real-time event streams for monitoring and integration:

All Events Stream

dart
// Subscribe to all workflow events
engine.onEvent.listen((event) {
  print('${event.eventType}: ${event.workflowInstanceId}');
});

// Filter for specific event types
engine.onEvent
    .where((e) => e.eventType == WorkflowEventType.userTaskCreated)
    .listen((event) {
  // Handle user task creation - e.g., send notification
  print('New task: ${event.data}');
});

// Filter by workflow instance
engine.onEvent
    .where((e) => e.workflowInstanceId == instanceId)
    .listen((event) {
  // Track specific workflow progress
});

Error Stream

dart
// Subscribe to error events specifically
engine.onError.listen((event) {
  print('Workflow ${event.workflowInstanceId} failed: ${event.error.message}');
  // Send alert, log to monitoring system, etc.
});

Event Types

Events are categorized by WorkflowEventType:

CategoryEvents
LifecycleworkflowStarted, workflowCompleted, workflowFailed, workflowCancelled, workflowSuspended, workflowResumed
TaskstaskStarted, taskCompleted, taskFailed, taskRetried
User TasksuserTaskCreated, userTaskClaimed, userTaskCompleted, userTaskCancelled, userTaskExpired
SignalssignalWaitStarted, signalReceived, signalTimeout
GatewaysgatewayEntered, gatewayEvaluated
TokenstokenCreated, tokenMoved, tokenMerged, tokenCompleted

Querying Historical Events

Events are also persisted for audit trails:

dart
// Query events for an instance
final events = await storage.events.getByWorkflowInstanceId(instanceId);

// Filter by event type
final taskEvents = await storage.events.getByWorkflowInstanceId(
  instanceId,
  eventTypes: [WorkflowEventType.taskCompleted],
);

// Get high-level summary
final summary = await storage.events.getSummary(instanceId);

Next Steps