Node Executors
Node executors execute specific node types. The engine maintains a registry mapping NodeType to its executor.
Executor Registry
| NodeType | Executor |
|---|---|
start | StartEventNodeExecutor |
end | EndEventNodeExecutor |
task | TaskNodeExecutor |
userTask | UserTaskNodeExecutor |
signalWait | SignalEventNodeExecutor |
timerWait | TimerEventNodeExecutor |
oneOf | OneOfGatewayNodeExecutor |
anyOf | AnyOfGatewayNodeExecutor |
allOf | AllOfGatewayNodeExecutor |
subflow | SubflowNodeExecutor |
NodeExecutor Interface
dart
abstract class NodeExecutor {
/// The node type this executor supports
NodeType get type;
/// Execute the node logic
Future<NodeResult> execute(WorkflowContext context);
/// Handle signal received (for wait nodes)
Future<NodeResult> onSignalReceived(
WorkflowContext context,
String signalName,
Map<String, dynamic>? payload,
);
/// Validate node configuration
List<String> validateNode(WorkflowNode node);
}Executor Resolution
Node executors are resolved automatically when loading workflows:
dart
// When you load a workflow, executors are resolved
final workflow = engine.loadWorkflow(jsonData);
// Each node now has its executor attached
for (final node in workflow.nodes) {
print('${node.name}: ${node.executor?.runtimeType}');
// Start: StartEventNodeExecutor
// Validate: TaskNodeExecutor
// etc.
}The engine uses the NodeExecutorRegistry to match each node's type field to the appropriate executor.
WorkflowContext
Context provided to executors:
dart
class WorkflowContext {
final Workflow workflow;
final WorkflowInstance instance;
final WorkflowNode currentNode;
final WorkflowToken currentToken;
final List<WorkflowEdge> incomingEdges;
final List<WorkflowEdge> outgoingEdges;
/// Current workflow output (accumulated state)
Map<String, dynamic> get output => instance.output;
}Built-in Executors
StartEventNodeExecutor
dart
class StartEventNodeExecutor extends NodeExecutor {
@override
NodeType get type => NodeType.start;
@override
Future<NodeResult> execute(WorkflowContext context) async {
final outgoing = context.outgoingEdges;
if (outgoing.isEmpty) {
return FailWorkflowResult.validation('Start node has no outgoing edges');
}
return ContinueResult.single(outgoing.first.targetNodeId);
}
}EndEventNodeExecutor
dart
class EndEventNodeExecutor extends NodeExecutor {
@override
NodeType get type => NodeType.end;
@override
Future<NodeResult> execute(WorkflowContext context) async {
return CompleteWorkflowResult(output: context.output);
}
}TaskNodeExecutor
dart
class TaskNodeExecutor extends NodeExecutor {
@override
NodeType get type => NodeType.task;
@override
Future<NodeResult> execute(WorkflowContext context) async {
// Get typed configuration
final config = context.currentNode.config as TaskNodeConfiguration;
// Get executor from registry by schemaType
final executor = registry.tasks.createRequired(config.schemaType);
// Create execution context
final executionContext = ExecutionContext(
workflowContext: context,
input: context.output,
currentNode: context.currentNode,
);
// Execute task
final result = await executor.execute(executionContext);
// Handle result
switch (result) {
case TaskSuccess(:final output):
return ContinueResult.single(
context.outgoingEdges.first.targetNodeId,
output: output,
);
case TaskFailure(:final errorType, :final message):
return FailWorkflowResult(
errorType: errorType,
message: message,
);
}
}
}SignalEventNodeExecutor
dart
class SignalEventNodeExecutor extends NodeExecutor {
@override
NodeType get type => NodeType.signalWait;
@override
Future<NodeResult> execute(WorkflowContext context) async {
// Typed config access
final config = context.currentNode.config as SignalWaitNodeConfiguration;
return WaitForSignalResult(
signalName: config.signalName,
storeAs: config.storeAs,
);
}
@override
Future<NodeResult> onSignalReceived(
WorkflowContext context,
String signalName,
Map<String, dynamic>? payload,
) async {
final config = context.currentNode.config as SignalWaitNodeConfiguration;
final output = config.storeAs != null
? {config.storeAs!: payload}
: payload ?? {};
return ContinueResult.single(
context.outgoingEdges.first.targetNodeId,
output: output,
);
}
}NodeResult Types
Handlers return results that determine the next action:
| Result Type | Engine Action |
|---|---|
ContinueResult | Move token to target node(s) |
WaitForSignalResult | Pause, set status to waitingForSignal |
WaitForUserTaskResult | Create user task, then wait |
WaitForJoinResult | Wait for parallel branches |
CompleteWorkflowResult | Mark workflow as completed |
FailWorkflowResult | Mark workflow as failed |
ContinueResult
dart
// Single target
return ContinueResult.single('nextNode', output: {'data': 'value'});
// Multiple targets (for parallel fork)
return ContinueResult(
targetNodeIds: ['branchA', 'branchB', 'branchC'],
output: {'forkedAt': DateTime.now().toIso8601String()},
);WaitForSignalResult
dart
return WaitForSignalResult(
signalName: 'external_callback',
storeAs: 'callbackData',
);WaitForJoinResult
dart
// Parallel gateway join waiting for branches
return WaitForJoinResult(
waitingForBranches: ['branchA', 'branchB'],
);Custom Node Executors
Create custom executors for specialized node types:
dart
class CustomTimerNodeExecutor extends NodeExecutor {
@override
NodeType get type => NodeType.timerWait;
@override
Future<NodeResult> execute(WorkflowContext context) async {
final config = context.currentNode.config as TimerWaitNodeConfiguration;
// Schedule timer based on configuration
if (config.timerType == TimerType.duration) {
await timerService.schedule(
workflowInstanceId: context.instance.id,
signalName: 'timer_fired',
fireAt: DateTime.now().add(config.duration!),
);
} else {
await timerService.schedule(
workflowInstanceId: context.instance.id,
signalName: 'timer_fired',
fireAt: config.dateTime!,
);
}
return WaitForSignalResult(
signalName: 'timer_fired',
);
}
}
// Register custom executor via WorkflowDescriptor
final descriptor = WorkflowDescriptor(
title: 'Custom Executors',
nodeExecutors: [CustomTimerNodeExecutor()],
);
final engine = WorkflowEngine(
descriptors: [DefaultWorkflowDescriptor(), descriptor],
storage: storage,
);TypedNodeExecutor
For type-safe access to node configurations, use TypedNodeExecutor<TConfig>:
dart
abstract class TypedNodeExecutor<TConfig extends NodeConfiguration>
extends NodeExecutor {
/// Deserialize configuration from raw JSON map
TConfig fromConfig(Map<String, dynamic> config);
/// Type-safe execute method with typed configuration
Future<NodeResult> executeTyped(TConfig config, WorkflowContext context);
}Example: Custom Gateway Executor
dart
@JsonSerializable()
class PriorityGatewayConfig extends NodeConfiguration {
static const schemaTypeName = 'config.gateway.priority';
const PriorityGatewayConfig({
this.priorityField = 'priority',
this.defaultRoute,
this.storeAs,
});
factory PriorityGatewayConfig.fromJson(Map<String, dynamic> json) =>
_$PriorityGatewayConfigFromJson(json);
final String priorityField;
final String? defaultRoute;
@override
final String? storeAs;
@override
String? get schemaType => schemaTypeName;
@override
Map<String, dynamic> toJson() => _$PriorityGatewayConfigToJson(this);
}
class PriorityGatewayExecutor extends TypedNodeExecutor<PriorityGatewayConfig> {
@override
NodeType get type => NodeType.oneOf;
@override
PriorityGatewayConfig fromConfig(Map<String, dynamic> config) =>
PriorityGatewayConfig.fromJson(config);
@override
Future<NodeResult> executeTyped(
PriorityGatewayConfig config,
WorkflowContext context,
) async {
// Fully typed access to configuration!
final priority = context.output[config.priorityField] as String?;
// Find matching edge
for (final edge in context.outgoingEdges) {
if (edge.label?.toLowerCase() == priority?.toLowerCase()) {
return ContinueResult.single(edge.targetNodeId);
}
}
// Use default route or first edge
final targetId = config.defaultRoute ?? context.outgoingEdges.first.targetNodeId;
return ContinueResult.single(targetId);
}
}
// Register the custom configuration type
final descriptor = WorkflowDescriptor(
title: 'Custom Gateway',
nodeExecutors: [PriorityGatewayExecutor()],
nodeConfigurations: [
TypeDescriptor<NodeConfiguration>(
schemaType: PriorityGatewayConfig.schemaTypeName,
fromJson: PriorityGatewayConfig.fromJson,
),
],
);Benefits of TypedNodeExecutor
| Feature | NodeExecutor | TypedNodeExecutor |
|---|---|---|
| Config access | config['key'] cast | config.key |
| Type safety | Runtime casting | Compile-time |
| IDE support | Limited | Full autocomplete |
| Refactoring | Error-prone | Safe |
When to Use
Use TypedNodeExecutor for:
- Complex configurations with many fields
- Custom node types with specific settings
- Production code requiring type safety
Use NodeExecutor for:
- Simple nodes with minimal config
- Prototyping and testing
- One-off custom executors
Type-Safe Configuration Access
With the unified model, node configurations are already type-safe. Access them via pattern matching:
dart
// In your custom executor
@override
Future<NodeResult> execute(WorkflowContext context) async {
final node = context.currentNode;
switch (node.config) {
case TaskNodeConfiguration config:
print('Task schemaType: ${config.schemaType}');
print('Store as: ${config.storeAs}');
case UserTaskNodeConfiguration config:
print('User task title: ${config.title}');
print('Assigned to: ${config.assignToRole}');
case SignalWaitNodeConfiguration config:
print('Waiting for signal: ${config.signalName}');
case GatewayNodeConfiguration config:
print('Gateway with ${config.outputPorts?.length ?? 0} ports');
case TimerWaitNodeConfiguration config:
print('Timer type: ${config.timerType}');
case SubflowNodeConfiguration config:
print('Subflow: ${config.workflowCode}');
case EmptyNodeConfiguration _:
print('Start/End node');
}
// Continue execution...
return ContinueResult.single(context.outgoingEdges.first.targetNodeId);
}Next Steps
- Task Executors - Task implementation
- User Task Executors - Human tasks
- Condition Executors - Gateway conditions
- Type Registries - Typed executor patterns