Skip to content

Data Flow

Understanding how data moves through workflows is essential for building effective business processes.

Data Model

The workflow engine provides three levels of data access through ExecutionContext:

Data SourcePropertyDescription
inputcontext.inputOutput from the previous node (your primary data source)
workflowInputcontext.workflowInputOriginal input when workflow started (immutable)
accumulatedcontext.accumulatedAll output accumulated during workflow execution

Input and Output

Workflow Input

  • Provided when starting a workflow
  • Immutable after start
  • Accessible via context.workflowInput or context.getInitial<T>()

Node Input

  • Output from the previous node in the flow
  • Your primary data source in executors
  • Accessible via context.input or context.get<T>()

Accumulated Output

  • All output accumulated during workflow execution
  • Grows as nodes execute
  • Used for routing decisions
  • Accessible via context.accumulated or context.getAny<T>()

Output at each step:

StepOutput
Input{ entityId: '123' }
After Start{ entityId: '123' }
After Task 1{ entityId: '123', validated: true }
After Task 2{ entityId: '123', validated: true, approvalLevel: 1 }

Output Merging

When a node produces output, it's merged into the workflow output:

dart
// Task executor returns
return TaskSuccess(output: {
  'validated': true,
  'validationErrors': [],
});

// Merged into workflow output:
// { entityId: "123", submittedBy: "...", validated: true, validationErrors: [] }

storeAs (Namespacing Output)

To avoid key collisions, use storeAs to namespace output:

Without Namespacing (Flat Merge)

dart
// Task 1 returns { result: 'value1' }
// Task 2 returns { result: 'value2' }
// Final output: { result: 'value2' }  // Collision! Task 2 overwrote Task 1

With Namespacing

dart
// Define tasks with storeAs
builder.task('task1', storeAs: 'task1Result', ...);
builder.task('task2', storeAs: 'task2Result', ...);

// Task 1 returns { result: 'value1' }
// Task 2 returns { result: 'value2' }
// Final output: {
//   task1Result: { result: 'value1' },
//   task2Result: { result: 'value2' }
// }

Signal/UserTask with storeAs

dart
builder.signalWait('awaitApproval',
  signal: 'approval_decision',
  storeAs: 'level1Approval',  // Namespace the signal payload
);

// Signal payload: { decision: 'approved', comments: 'LGTM' }
// Stored at output.level1Approval: { decision: 'approved', comments: 'LGTM' }

Reading Output in Conditions

Gateway conditions read from output to make routing decisions:

dart
builder.oneOf('routeDecision', [
  Branch.whenFn(
    (output) => output['level1Approval']?['decision'] == 'approved',
    then: 'handleApproved',
  ),
  Branch.whenFn(
    (output) => output['level1Approval']?['decision'] == 'rejected',
    then: 'handleRejected',
  ),
  Branch.otherwise(then: 'handleOther'),
]);

Accessing Data in Executors

Task executors access data through ExecutionContext with typed accessors:

dart
class MyTaskExecutor extends TaskExecutor {
  @override
  String get schemaType => 'task.myTask';

  @override
  String get name => 'My Task';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    // PRIMARY: Get data from previous node output
    final entityId = context.get<String>('entityId');
    final previousResult = context.get<Map>('taskResult');

    // ORIGINAL: Get data from workflow input (immutable)
    final tenantId = context.getInitial<String>('tenantId');

    // ANYWHERE: Get data from accumulated output
    final approval = context.getAny<Map>('level1Approval');

    // REQUIRED: Throws if missing
    final requiredField = context.getRequired<String>('requiredField');

    // CONFIG: Get node configuration
    final template = context.getConfig<String>('template');

    // Process...

    return TaskSuccess(output: {
      'myResult': 'value',
    });
  }
}

Data Access Methods

MethodData SourceUse Case
get<T>(path)Previous node outputPrimary input for current task
getRequired<T>(path)Previous node outputRequired values (throws if missing)
getInitial<T>(path)Original workflow inputConfiguration/context that persists
getInitialRequired<T>(path)Original workflow inputRequired original values
getAny<T>(path)Accumulated outputData from any previous node
getConfig<T>(key)Node configurationDesign-time parameters

Nested Path Access

All accessor methods support dot notation for nested values:

dart
// Simple key
final entityId = context.get<String>('entityId');

// Nested path
final decision = context.get<String>('approval.decision');
final level = context.get<int>('approvals.level1.level');

User Task Data Flow

Best Practices

1. Use storeAs for all signal/user task outputs

dart
builder.signalWait('awaitLevel1', storeAs: 'level1Decision', ...);
builder.userTask('level1Approval', storeAs: 'level1Decision', ...);

2. Design output schema upfront

Plan your output structure before building:

dart
// Expected final output structure
{
  // Original input
  'entityId': '123',
  'entityType': 'Equipment',
  'submittedBy': '[email protected]',

  // From validation task
  'validation': { 'valid': true, 'errors': [] },

  // From approval chain task
  'approvalChain': { 'totalLevels': 2, 'levels': [...] },

  // From level 1 approval
  'level1Decision': { 'decision': 'approved', 'approvedBy': '...' },

  // From level 2 approval
  'level2Decision': { 'decision': 'approved', 'approvedBy': '...' },

  // From final task
  'effectiveAt': '2024-01-15T10:00:00Z',
}

3. Keep task outputs focused

Return only what's needed for downstream tasks:

dart
// GOOD: Focused output
return TaskSuccess(output: {
  'approved': true,
  'approvedAt': DateTime.now().toIso8601String(),
});

// AVOID: Dumping entire internal state
return TaskSuccess(output: {
  'internalState': internalState.toJson(),
  'debugInfo': debugInfo,
  'allRecords': allRecords,  // Too much!
});

Producing Output with Effects

Executors produce output declaratively by returning effects:

dart
class ProcessDataExecutor extends TaskExecutor {
  @override
  String get schemaType => 'task.processData';

  @override
  String get name => 'Process Data';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    final data = context.getRequired<Map>('data');

    // Option 1: Return output directly (most common)
    return TaskSuccess(output: {
      'processed': true,
      'resultCount': data.length,
    });

    // Option 2: Use effects for advanced scenarios
    return TaskSuccess(
      output: {'processed': true},
      effects: [
        // Namespace output under a path
        SetOutputEffect(
          output: {'count': data.length},
          path: 'processingStats',
        ),
        // Record an event
        RecordEventEffect(event: WorkflowEvent.custom(...)),
      ],
    );
  }
}

Effect-Based Output

Effects provide a declarative way to produce side effects:

EffectDescription
SetOutputEffectSet or merge output at a specific path
CancelUserTasksEffectCancel pending user tasks
RecordEventEffectRecord a workflow event
UpdateStatusEffectUpdate workflow status

See Workflow Effects for the complete reference.

Next Steps