WorkflowBuilder
Fluent API for constructing workflow definitions.
Overview
WorkflowBuilder provides a chainable API for defining workflows declaratively. It validates structure and produces a Workflow object ready for execution.
Constructor
WorkflowBuilder(
String code,
String name, {
String? description,
});Parameters
| Parameter | Type | Description |
|---|---|---|
code | String | Short code for display/reference (positional) |
name | String | Human-readable name (positional) |
description | String? | Optional description |
Use .version(int) method to set the version number (default: 1).
Example
final builder = WorkflowBuilder(
'ONBOARD',
'Employee Onboarding',
description: 'New hire onboarding workflow',
)
.version(2)
.withMetadata({'author': 'HR Team'});Control Flow Nodes
start
WorkflowBuilder start(String id, {String? name});Adds a start node. Every workflow must have exactly one start node.
Parameters:
| Parameter | Type | Description |
|---|---|---|
id | String | Unique node ID |
name | String? | Display name |
Example:
builder.start('begin', name: 'Start Process')end
WorkflowBuilder end(String id, {String? name});Adds an end node. Workflows can have multiple end nodes for different outcomes.
Parameters:
| Parameter | Type | Description |
|---|---|---|
id | String | Unique node ID |
name | String? | Display name (e.g., 'Approved', 'Rejected') |
Example:
builder
.end('success', name: 'Completed Successfully')
.end('failure', name: 'Failed')
.end('cancelled', name: 'Cancelled by User')Task Nodes
task
WorkflowBuilder task(
String id, {
String? name,
String? description,
TaskExecutor? executor,
TaskExecuteFunction? execute,
});Adds a task node for automated work. You must provide either an executor instance or an execute function.
Parameters:
| Parameter | Type | Description |
|---|---|---|
id | String | Unique node ID |
name | String? | Display name (auto-generated from id if not provided) |
description | String? | Task description |
executor | TaskExecutor? | Reusable executor class instance |
execute | TaskExecuteFunction? | Inline function: Future<Map<String, dynamic>> Function(ExecutionContext) |
Inline Function Example:
builder.task('calculateTotal',
execute: (ctx) async {
// Use get<T> for previous node output
final items = ctx.get<List>('items')!;
final total = items.fold<double>(0, (sum, item) => sum + item['price']);
return {'total': total};
},
)Executor Class Example:
// Define a reusable executor
class SendEmailTaskExecutor extends TaskExecutor {
SendEmailTaskExecutor({
required this.template,
required this.to,
this.subject,
}) : super(schemaType: 'task.send-email');
final String template;
final String to;
final String? subject;
@override
Future<TaskResult> execute(ExecutionContext context) async {
// Interpolate values from workflow output
final recipient = interpolate(to, context.output);
final emailSubject = interpolate(subject ?? '', context.output);
await emailService.send(template: template, to: recipient, subject: emailSubject);
return TaskSuccess(output: {'sent': true});
}
}
// Use in workflow
builder.task('sendNotification',
executor: SendEmailTaskExecutor(
template: 'welcome',
to: '{{submittedBy}}',
subject: 'Your request {{requestId}} was approved',
),
)schemaType in Executors
Every TaskExecutor has a schemaType property that uniquely identifies the executor type. This is used for:
- Serialization/deserialization of workflow definitions
- Registry lookups when building workflows from stored definitions
- Matching executors to nodes at runtime
userTask
WorkflowBuilder userTask(
String id, {
String? name,
String? description,
String? signal,
UserTaskExecutor? executor,
String? title,
String? schemaType,
String? assignToRole,
String? assignToUser,
String? assignToGroup,
String? priority,
String? storeAs,
});Adds a user task node that waits for human interaction. User tasks create inbox items that appear in a user's task list.
Parameters:
| Parameter | Type | Description |
|---|---|---|
id | String | Unique node ID |
name | String? | Display name in workflow |
description | String? | Task description |
signal | String? | Signal name to wait for (auto-generated if not provided) |
executor | UserTaskExecutor? | Custom executor for complex user tasks |
title | String? | Task title shown to user (supports ) |
schemaType | String? | Executor type identifier for deserialization |
assignToRole | String? | Role-based assignment (supports ) |
assignToUser | String? | Specific user assignment |
assignToGroup | String? | Group-based assignment |
priority | String? | Task priority level |
storeAs | String? | Output key for task response |
Configuration-based Example:
// Uses DefaultUserTaskExecutor with configuration
builder.userTask('approval',
signal: 'level_decision',
title: 'Approve {{entityType}}',
description: 'Please review and approve',
schemaType: 'task.approval', // For deserialization
assignToRole: '{{approverRoleId}}',
storeAs: 'approvalDecision',
)Executor-based Example:
// Custom executor for complex user task logic
class ApprovalTaskExecutor extends UserTaskExecutor {
ApprovalTaskExecutor() : super(schemaType: 'user-task.approval');
@override
Future<UserTaskConfiguration> createTask(ExecutionContext context) async {
final entityType = context.get<String>('entityType') ?? 'Item';
final approverRoleId = context.get<String>('approverRoleId');
return UserTaskConfiguration(
schemaType: schemaType,
title: 'Approve $entityType',
assignedToRoleId: approverRoleId,
input: {
'entityId': context.get<String>('entityId'),
'entityData': context.get<Map>('entityData'),
},
);
}
}
builder.userTask('approval',
signal: 'approval_decision',
executor: ApprovalTaskExecutor(),
storeAs: 'approvalResult',
)schemaType for User Tasks
The schemaType is essential for deserializing user task configurations from stored workflow definitions. It tells the engine which UserTaskExecutor to use when rebuilding the workflow.
Signal Nodes
signalWait
WorkflowBuilder signalWait(
String id, {
required String signal,
required String storeAs,
String? name,
Duration? timeout,
});Adds a node that waits for an external signal.
Parameters:
| Parameter | Type | Description |
|---|---|---|
id | String | Unique node ID |
signal | String | Signal name to wait for |
storeAs | String | Key for signal payload |
name | String? | Display name |
timeout | Duration? | Optional timeout |
Example:
builder.signalWait('waitForPayment',
signal: 'payment_completed',
storeAs: 'paymentResult',
timeout: Duration(hours: 1),
)Gateway Nodes
oneOf (Exclusive Gateway)
WorkflowBuilder oneOf(
String id,
List<Branch> branches, {
String? name,
});Adds an exclusive (XOR) gateway. Only one branch executes.
Example:
builder.oneOf('routeByAmount', [
Branch.whenFn(
(o) => (o['amount'] as num) > 10000,
then: 'highValuePath',
),
Branch.whenFn(
(o) => (o['amount'] as num) > 1000,
then: 'mediumValuePath',
),
Branch.otherwise(then: 'lowValuePath'),
])anyOf (Race Gateway)
WorkflowBuilder anyOf(
String id,
List<String> targets, {
String? name,
});Adds a race gateway. Multiple branches execute, first to complete wins.
Parameters:
| Parameter | Type | Description |
|---|---|---|
targets | List<String> | Target node IDs (positional) |
name | String? | Display name |
Example:
builder.anyOf('race', ['approval-signal', 'timeout-timer', 'cancel-signal'])allOf (Parallel Gateway)
WorkflowBuilder allOf(
String id,
List<String> targets, {
String? name,
});Adds a parallel (AND) gateway. All branches execute concurrently, waits for all to complete.
Parameters:
| Parameter | Type | Description |
|---|---|---|
targets | List<String> | Target node IDs (positional) |
name | String? | Display name |
Example:
builder.allOf('parallelReviews', ['technicalReview', 'legalReview', 'financeReview'])Branch Class
Constructors
// Unconditional branch
Branch({required String then});
// Conditional branch with function
Branch.whenFn(
bool Function(Map<String, dynamic> output) condition,
{required String then, String? label, int? priority},
);
// Expression-based condition
Branch.when(
String expression,
{required String then, String? label},
);
// Equality check
Branch.whenEquals(
String variable,
Object value,
{required String then},
);
// Boolean check
Branch.whenTrue(String variable, {required String then});
// Null check
Branch.whenNotNull(String variable, {required String then});
// Default/fallback branch
Branch.otherwise({required String then});Examples:
// Simple conditional
Branch.whenFn((o) => o['status'] == 'approved', then: 'processApproval')
// Complex condition
Branch.whenFn((o) {
final amount = o['amount'] as num;
final priority = o['priority'] as String;
return amount > 5000 && priority == 'high';
}, then: 'fastTrack')
// Expression-based
Branch.when("amount > 10000", then: 'highValue')
// Equality check
Branch.whenEquals('status', 'approved', then: 'approved')
// Default fallback
Branch.otherwise(then: 'handleUnexpected')Edge Connections
connect
WorkflowBuilder connect(
String from,
String to, {
int? branch,
String? condition,
});Connects two nodes.
Parameters:
| Parameter | Type | Description |
|---|---|---|
from | String | Source node ID |
to | String | Target node ID |
branch | int? | Branch index for gateway edges |
condition | String? | Edge label/condition |
Example:
builder
.connect('begin', 'validateInput')
.connect('validateInput', 'processData')
.connect('processData', 'routeResult')
// Gateway branches handled automatically by branch definitions
.connect('processApproval', 'success')
.connect('processRejection', 'failure')Build
build
Workflow build();Validates and builds the final workflow definition.
Validation:
- Exactly one start node
- At least one end node
- All edges reference valid nodes
- No orphaned nodes (unreachable from start)
- Gateway branches properly configured
Throws: WorkflowValidationException if validation fails.
Example:
final workflow = builder.build();
print('Built: ${workflow.name} v${workflow.version}');
print('Nodes: ${workflow.nodes.length}');
print('Edges: ${workflow.edges.length}');Complete Example
final approvalWorkflow = WorkflowBuilder(
'DOC-APPROVAL',
'Document Approval',
description: 'Review and approve documents',
)
// Start
.start('begin')
// Validate input
.task('validateDocument',
execute: (ctx) async {
// Use getInitial<T> for original workflow input
final docId = ctx.getInitial<String>('documentId');
if (docId == null) {
throw Exception('documentId is required');
}
final doc = await documentService.get(docId);
return {
'documentId': docId,
'documentTitle': doc.title,
'documentType': doc.type,
};
},
)
// Wait for approval
.userTask('approval',
signal: 'approval_decision',
schemaType: 'user-task.review',
title: 'Review: {{documentTitle}}',
assignToRole: 'reviewers',
storeAs: 'decision',
)
// Route decision
.oneOf('routeDecision', [
Branch.whenFn((o) => o['decision']?['approved'] == true, then: 'finalize'),
Branch.otherwise(then: 'reject'),
])
// Finalize approval
.task('finalize',
execute: (ctx) async {
// Use getAny<T> for accumulated output from earlier nodes
final documentId = ctx.getAny<String>('documentId')!;
await documentService.approve(documentId);
return {'finalizedAt': DateTime.now().toIso8601String()};
},
)
// Handle rejection
.task('reject',
execute: (ctx) async {
// Use getAny<T> for accumulated output from earlier nodes
final documentId = ctx.getAny<String>('documentId')!;
await documentService.reject(documentId);
return {'rejectedAt': DateTime.now().toIso8601String()};
},
)
// End nodes
.end('approved', name: 'Approved')
.end('rejected', name: 'Rejected')
// Define flow
.connect('begin', 'validateDocument')
.connect('validateDocument', 'approval')
.connect('approval', 'routeDecision')
.connect('finalize', 'approved')
.connect('reject', 'rejected')
.build();See Also
- Workflow - Concept overview
- Node Types - Node reference
- WorkflowEngine - Execution API