Data Flow
Understanding how data moves through workflows is essential for building effective business processes.
Data Model
The workflow engine provides three levels of data access through ExecutionContext:
| Data Source | Property | Description |
|---|---|---|
| input | context.input | Output from the previous node (your primary data source) |
| workflowInput | context.workflowInput | Original input when workflow started (immutable) |
| accumulated | context.accumulated | All output accumulated during workflow execution |
Input and Output
Workflow Input
- Provided when starting a workflow
- Immutable after start
- Accessible via
context.workflowInputorcontext.getInitial<T>()
Node Input
- Output from the previous node in the flow
- Your primary data source in executors
- Accessible via
context.inputorcontext.get<T>()
Accumulated Output
- All output accumulated during workflow execution
- Grows as nodes execute
- Used for routing decisions
- Accessible via
context.accumulatedorcontext.getAny<T>()
Output at each step:
| Step | Output |
|---|---|
| Input | { entityId: '123' } |
| After Start | { entityId: '123' } |
| After Task 1 | { entityId: '123', validated: true } |
| After Task 2 | { entityId: '123', validated: true, approvalLevel: 1 } |
Output Merging
When a node produces output, it's merged into the workflow output:
// Task executor returns
return TaskSuccess(output: {
'validated': true,
'validationErrors': [],
});
// Merged into workflow output:
// { entityId: "123", submittedBy: "...", validated: true, validationErrors: [] }storeAs (Namespacing Output)
To avoid key collisions, use storeAs to namespace output:
Without Namespacing (Flat Merge)
// Task 1 returns { result: 'value1' }
// Task 2 returns { result: 'value2' }
// Final output: { result: 'value2' } // Collision! Task 2 overwrote Task 1With Namespacing
// Define tasks with storeAs
builder.task('task1', storeAs: 'task1Result', ...);
builder.task('task2', storeAs: 'task2Result', ...);
// Task 1 returns { result: 'value1' }
// Task 2 returns { result: 'value2' }
// Final output: {
// task1Result: { result: 'value1' },
// task2Result: { result: 'value2' }
// }Signal/UserTask with storeAs
builder.signalWait('awaitApproval',
signal: 'approval_decision',
storeAs: 'level1Approval', // Namespace the signal payload
);
// Signal payload: { decision: 'approved', comments: 'LGTM' }
// Stored at output.level1Approval: { decision: 'approved', comments: 'LGTM' }Reading Output in Conditions
Gateway conditions read from output to make routing decisions:
builder.oneOf('routeDecision', [
Branch.whenFn(
(output) => output['level1Approval']?['decision'] == 'approved',
then: 'handleApproved',
),
Branch.whenFn(
(output) => output['level1Approval']?['decision'] == 'rejected',
then: 'handleRejected',
),
Branch.otherwise(then: 'handleOther'),
]);Accessing Data in Executors
Task executors access data through ExecutionContext with typed accessors:
class MyTaskExecutor extends TaskExecutor {
@override
String get schemaType => 'task.myTask';
@override
String get name => 'My Task';
@override
Future<TaskResult> execute(ExecutionContext context) async {
// PRIMARY: Get data from previous node output
final entityId = context.get<String>('entityId');
final previousResult = context.get<Map>('taskResult');
// ORIGINAL: Get data from workflow input (immutable)
final tenantId = context.getInitial<String>('tenantId');
// ANYWHERE: Get data from accumulated output
final approval = context.getAny<Map>('level1Approval');
// REQUIRED: Throws if missing
final requiredField = context.getRequired<String>('requiredField');
// CONFIG: Get node configuration
final template = context.getConfig<String>('template');
// Process...
return TaskSuccess(output: {
'myResult': 'value',
});
}
}Data Access Methods
| Method | Data Source | Use Case |
|---|---|---|
get<T>(path) | Previous node output | Primary input for current task |
getRequired<T>(path) | Previous node output | Required values (throws if missing) |
getInitial<T>(path) | Original workflow input | Configuration/context that persists |
getInitialRequired<T>(path) | Original workflow input | Required original values |
getAny<T>(path) | Accumulated output | Data from any previous node |
getConfig<T>(key) | Node configuration | Design-time parameters |
Nested Path Access
All accessor methods support dot notation for nested values:
// Simple key
final entityId = context.get<String>('entityId');
// Nested path
final decision = context.get<String>('approval.decision');
final level = context.get<int>('approvals.level1.level');User Task Data Flow
Best Practices
1. Use storeAs for all signal/user task outputs
builder.signalWait('awaitLevel1', storeAs: 'level1Decision', ...);
builder.userTask('level1Approval', storeAs: 'level1Decision', ...);2. Design output schema upfront
Plan your output structure before building:
// Expected final output structure
{
// Original input
'entityId': '123',
'entityType': 'Equipment',
'submittedBy': '[email protected]',
// From validation task
'validation': { 'valid': true, 'errors': [] },
// From approval chain task
'approvalChain': { 'totalLevels': 2, 'levels': [...] },
// From level 1 approval
'level1Decision': { 'decision': 'approved', 'approvedBy': '...' },
// From level 2 approval
'level2Decision': { 'decision': 'approved', 'approvedBy': '...' },
// From final task
'effectiveAt': '2024-01-15T10:00:00Z',
}3. Keep task outputs focused
Return only what's needed for downstream tasks:
// GOOD: Focused output
return TaskSuccess(output: {
'approved': true,
'approvedAt': DateTime.now().toIso8601String(),
});
// AVOID: Dumping entire internal state
return TaskSuccess(output: {
'internalState': internalState.toJson(),
'debugInfo': debugInfo,
'allRecords': allRecords, // Too much!
});Producing Output with Effects
Executors produce output declaratively by returning effects:
class ProcessDataExecutor extends TaskExecutor {
@override
String get schemaType => 'task.processData';
@override
String get name => 'Process Data';
@override
Future<TaskResult> execute(ExecutionContext context) async {
final data = context.getRequired<Map>('data');
// Option 1: Return output directly (most common)
return TaskSuccess(output: {
'processed': true,
'resultCount': data.length,
});
// Option 2: Use effects for advanced scenarios
return TaskSuccess(
output: {'processed': true},
effects: [
// Namespace output under a path
SetOutputEffect(
output: {'count': data.length},
path: 'processingStats',
),
// Record an event
RecordEventEffect(event: WorkflowEvent.custom(...)),
],
);
}
}Effect-Based Output
Effects provide a declarative way to produce side effects:
| Effect | Description |
|---|---|
SetOutputEffect | Set or merge output at a specific path |
CancelUserTasksEffect | Cancel pending user tasks |
RecordEventEffect | Record a workflow event |
UpdateStatusEffect | Update workflow status |
See Workflow Effects for the complete reference.
Next Steps
- Workflow Effects - Effect types reference
- Type Registries - Handler registration
- Task Executors - Implementing task logic
- Patterns - Workflow design patterns