Skip to content

Workflow Effects

Effects capture side effects declaratively, enabling pure executors that are easy to test and reason about.

Overview

Instead of calling mutation methods directly, executors return WorkflowEffect objects that describe what should happen. The engine's EffectProcessor interprets these effects and applies them to storage.

Benefits

  • Pure executors: Node executors return effects instead of mutating state
  • Testability: Effects can be asserted without mocking storage
  • Batching: Multiple effects can be processed together
  • Replay: Effects form an audit trail that can be replayed
dart
// Instead of calling mutation methods...
await storage.userTaskInstances.cancel(taskId);
await storage.events.record(event);

// Return effects declaratively
return TaskSuccess(
  output: {'processed': true},
  effects: [
    const CancelUserTasksEffect(),
    RecordEventEffect(event: event),
  ],
);

Effect Hierarchy

WorkflowEffect (sealed)
├── Output Effects
│   └── SetOutputEffect
├── Status Effects
│   ├── UpdateStatusEffect
│   ├── CompleteWorkflowEffect
│   └── FailWorkflowEffect
├── Token Effects
│   ├── CreateTokensEffect
│   ├── ConsumeTokenEffect
│   ├── ConsumeJoinTokensEffect
│   ├── DropTokenEffect
│   ├── WaitTokenEffect
│   └── ActivateTokenEffect
├── User Task Effects
│   ├── CreateUserTaskEffect
│   └── CancelUserTasksEffect
├── Event Effects
│   ├── RecordEventEffect
│   └── EmitErrorEffect
├── Scheduling Effects
│   ├── ScheduleExecutionEffect
│   ├── ScheduleRetryEffect
│   └── StoreAttemptNumberEffect
└── Cleanup Effects
    └── CleanupResourcesEffect

Output Effects

SetOutputEffect

Set or merge output data into the workflow instance.

dart
class SetOutputEffect extends WorkflowEffect {
  const SetOutputEffect({
    required this.output,
    this.path,
  });

  /// The output data to merge into workflow output
  final Map<String, dynamic> output;

  /// Optional path to namespace the output
  final String? path;
}

Usage:

dart
// Merge at root level
SetOutputEffect(output: {'validated': true})

// Namespace under a path to prevent collisions
SetOutputEffect(
  output: {'count': 10, 'total': 500},
  path: 'processingStats',
)
// Result: { processingStats: { count: 10, total: 500 } }

Status Effects

UpdateStatusEffect

Update the workflow instance status.

dart
class UpdateStatusEffect extends WorkflowEffect {
  const UpdateStatusEffect({required this.status});

  final WorkflowStatus status;
}

CompleteWorkflowEffect

Complete the workflow with final output.

dart
class CompleteWorkflowEffect extends WorkflowEffect {
  const CompleteWorkflowEffect({this.output = const {}});

  /// Final output to merge before completing
  final Map<String, dynamic> output;
}

FailWorkflowEffect

Fail the workflow with an error.

dart
class FailWorkflowEffect extends WorkflowEffect {
  const FailWorkflowEffect({
    required this.error,
    this.nodeId,
  });

  /// The error that caused the failure
  final WorkflowError error;

  /// The node where the failure occurred
  final String? nodeId;
}

Token Effects

Tokens track execution position in the workflow. These effects manage token lifecycle.

CreateTokensEffect

Create new tokens for target nodes. Used when continuing to next nodes or splitting into parallel branches.

dart
class CreateTokensEffect extends WorkflowEffect {
  const CreateTokensEffect({
    required this.targetNodeIds,
    required this.fromNodeId,
    this.fromTokenId,
    this.isParallelSplit = false,
    this.output = const {},
    this.branchId,
    this.parentTokenId,
  });

  /// Target node IDs to create tokens for
  final List<String> targetNodeIds;

  /// The node ID that is the source of these tokens
  final String fromNodeId;

  /// Whether this is a parallel split (creates unique branchIds per token)
  final bool isParallelSplit;

  /// Output to carry on the new tokens
  final Map<String, dynamic> output;
}

ConsumeTokenEffect

Consume a token (mark as terminal). Used when a token reaches an end node or is merged at a join.

dart
class ConsumeTokenEffect extends WorkflowEffect {
  const ConsumeTokenEffect({required this.tokenId});

  final String tokenId;
}

ConsumeJoinTokensEffect

Consume all tokens at a join node when all branches have arrived.

dart
class ConsumeJoinTokensEffect extends WorkflowEffect {
  const ConsumeJoinTokensEffect({
    required this.joinNodeId,
    required this.tokenIds,
  });

  final String joinNodeId;
  final List<String> tokenIds;
}

DropTokenEffect

Drop a token (late arrival at join or explicit drop).

dart
class DropTokenEffect extends WorkflowEffect {
  const DropTokenEffect({
    required this.tokenId,
    this.reason,
  });

  final String tokenId;
  final String? reason;
}

WaitTokenEffect

Mark a token as waiting for an external event.

dart
class WaitTokenEffect extends WorkflowEffect {
  const WaitTokenEffect({required this.tokenId});

  final String tokenId;
}

ActivateTokenEffect

Activate a waiting token (resume from wait state).

dart
class ActivateTokenEffect extends WorkflowEffect {
  const ActivateTokenEffect({required this.tokenId});

  final String tokenId;
}

User Task Effects

CreateUserTaskEffect

Create a user task instance.

dart
class CreateUserTaskEffect extends WorkflowEffect {
  const CreateUserTaskEffect({
    required this.taskId,
    required this.nodeId,
    required this.signalName,
    required this.title,
    required this.schemaType,
    this.description,
    this.assignedToRoleId,
    this.assignedToUserId,
    this.assignedToGroupId,
    this.priority = UserTaskPriority.normal,
    this.dueAt,
    this.input = const {},
    this.storeAs,
  });
}

CancelUserTasksEffect

Cancel pending user tasks for a workflow instance.

dart
class CancelUserTasksEffect extends WorkflowEffect {
  const CancelUserTasksEffect({this.nodeId});

  /// Optional: Cancel only tasks for this node. If null, cancel all.
  final String? nodeId;
}

Usage:

dart
// Cancel all pending user tasks
const CancelUserTasksEffect()

// Cancel only tasks for a specific node
CancelUserTasksEffect(nodeId: 'approvalNode')

Common Use Case: Cancel Before Revision

dart
class RevisionTaskExecutor extends UserTaskExecutor {
  @override
  Future<NodeResult> execute(ExecutionContext context) async {
    return WaitForUserTaskResult(
      signalName: 'revision_submitted',
      config: UserTaskConfiguration(
        title: 'Revise Document',
        schemaType: 'revision',
        assignedToUserId: context.getRequired<String>('submitterId'),
      ),
      // Cancel any pending approval tasks before creating revision task
      effects: [const CancelUserTasksEffect()],
    );
  }
}

Event Effects

RecordEventEffect

Record a workflow event to the event log.

dart
class RecordEventEffect extends WorkflowEffect {
  const RecordEventEffect({required this.event});

  final WorkflowEvent event;
}

Usage:

dart
RecordEventEffect(
  event: WorkflowEvent.custom(
    instanceId: context.workflowInstanceId,
    nodeId: context.nodeId,
    eventType: 'approval_completed',
    data: {'decision': 'approved', 'approvedBy': userId},
  ),
)

EmitErrorEffect

Emit an error to the error stream.

dart
class EmitErrorEffect extends WorkflowEffect {
  const EmitErrorEffect({
    required this.instanceId,
    required this.error,
  });

  final String instanceId;
  final WorkflowError error;
}

Scheduling Effects

ScheduleExecutionEffect

Schedule execution of target nodes. Used internally by the orchestrator.

dart
class ScheduleExecutionEffect extends WorkflowEffect {
  const ScheduleExecutionEffect({
    required this.nodeIds,
    required this.tokenIds,
  });

  final List<String> nodeIds;
  final List<String> tokenIds;
}

ScheduleRetryEffect

Schedule a retry with delay.

dart
class ScheduleRetryEffect extends WorkflowEffect {
  const ScheduleRetryEffect({
    required this.nodeId,
    required this.tokenId,
    required this.delay,
    required this.attemptNumber,
  });

  final String nodeId;
  final String tokenId;
  final Duration delay;
  final int attemptNumber;
}

StoreAttemptNumberEffect

Store the attempt number for retry tracking.

dart
class StoreAttemptNumberEffect extends WorkflowEffect {
  const StoreAttemptNumberEffect({
    required this.nodeId,
    required this.attemptNumber,
  });

  final String nodeId;
  final int attemptNumber;
}

Cleanup Effects

CleanupResourcesEffect

Clean up workflow resources on termination. Deactivates all tokens and cancels pending user tasks.

dart
class CleanupResourcesEffect extends WorkflowEffect {
  const CleanupResourcesEffect();
}

Using Effects in Executors

In TaskExecutor

dart
class MyTaskExecutor extends TaskExecutor {
  @override
  String get schemaType => 'task.myTask';

  @override
  String get name => 'My Task';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    final result = await processData(context.input);

    return TaskSuccess(
      output: {'processed': true},
      effects: [
        // Set namespaced output
        SetOutputEffect(
          output: {'stats': result.stats},
          path: 'taskResult',
        ),
        // Record audit event
        RecordEventEffect(
          event: WorkflowEvent.custom(
            instanceId: context.workflowInstanceId,
            nodeId: context.nodeId,
            eventType: 'task_completed',
            data: {'itemCount': result.count},
          ),
        ),
      ],
    );
  }
}

In NodeExecutor

dart
class MyNodeExecutor extends NodeExecutor {
  @override
  Future<NodeResult> execute(ExecutionContext context) async {
    // Process node...

    return ContinueResult.single(
      'nextNode',
      output: {'processed': true},
      effects: [
        const CancelUserTasksEffect(),
        RecordEventEffect(event: ...),
      ],
    );
  }
}

In UserTaskExecutor

dart
class ApprovalExecutor extends UserTaskExecutor {
  @override
  Future<NodeResult> execute(ExecutionContext context) async {
    return WaitForUserTaskResult(
      signalName: 'approval_decision',
      config: UserTaskConfiguration(
        title: 'Approve Request',
        schemaType: 'approval',
        assignedToRoleId: 'managers',
      ),
      // Effects run before user task is created
      effects: [
        RecordEventEffect(
          event: WorkflowEvent.custom(
            instanceId: context.workflowInstanceId,
            nodeId: context.nodeId,
            eventType: 'approval_requested',
            data: {},
          ),
        ),
      ],
    );
  }
}

Best Practices

  1. Keep executors pure: Return effects instead of calling mutation methods
  2. Use SetOutputEffect with paths: Namespace output to prevent collisions
  3. Cancel tasks before creating new ones: Use CancelUserTasksEffect when transitioning between user tasks
  4. Record events for audit: Use RecordEventEffect for important state changes
  5. Order effects appropriately: Effects are processed in order

See Also