NodeExecutor API
Base class for node executors. Node executors handle specific node types in the workflow graph.
NodeExecutor
dart
abstract class NodeExecutor {
/// The node type this executor supports
NodeType get type;
/// Execute the node and return the result
Future<NodeResult> execute(WorkflowContext context);
/// Handle signal received (for wait nodes)
Future<NodeResult> onSignalReceived(
WorkflowContext context,
String signalName,
Map<String, dynamic>? payload,
);
/// Validate node configuration
List<String> validateNode(WorkflowNode node) => [];
}TypedNodeExecutor
Type-safe base class with compile-time safety for node configuration:
dart
abstract class TypedNodeExecutor<TConfig extends NodeConfiguration>
extends NodeExecutor {
/// Deserialize configuration from raw JSON map
TConfig fromConfig(Map<String, dynamic> config);
/// Type-safe execute method with typed configuration
Future<NodeResult> executeTyped(TConfig config, WorkflowContext context);
}WorkflowContext
Context provided to node executors:
dart
class WorkflowContext {
/// The complete workflow definition
final Workflow workflow;
/// Current workflow instance state
final WorkflowInstance instance;
/// The node being executed
final WorkflowNode currentNode;
/// Current execution token
final WorkflowToken currentToken;
/// Edges leading into this node
final List<Edge> incomingEdges;
/// Edges leading out of this node
final List<Edge> outgoingEdges;
/// Current workflow output (accumulated state)
Map<String, dynamic> get output => instance.output;
}NodeResult Types
| Result Type | Engine Action |
|---|---|
ContinueResult | Move token to target node(s) |
WaitForSignalResult | Pause, set status to waitingForSignal |
WaitForUserTaskResult | Create user task, then wait |
WaitForJoinResult | Wait for parallel branches |
CompleteWorkflowResult | Mark workflow as completed |
FailWorkflowResult | Mark workflow as failed |
ContinueResult
dart
class ContinueResult extends NodeResult {
const ContinueResult({
required this.targetNodeIds,
this.output = const {},
});
factory ContinueResult.single(String targetNodeId, {Map<String, dynamic>? output});
final List<String> targetNodeIds;
final Map<String, dynamic> output;
}WaitForSignalResult
dart
class WaitForSignalResult extends NodeResult {
const WaitForSignalResult({
required this.signalName,
this.storeAs,
this.timeout,
});
final String signalName;
final String? storeAs;
final Duration? timeout;
}FailWorkflowResult
dart
class FailWorkflowResult extends NodeResult {
const FailWorkflowResult({
required this.errorType,
required this.message,
this.isRetryable = true,
this.details,
});
// Factory constructors
factory FailWorkflowResult.validation(String message, {...});
factory FailWorkflowResult.internal(String message, {...});
factory FailWorkflowResult.timeout(String message, {...});
final ErrorType errorType;
final String message;
final bool isRetryable;
final Map<String, dynamic>? details;
}Built-in Node Executors
| Executor | Node Type | Description |
|---|---|---|
StartEventNodeExecutor | start | Entry point, continues to first node |
EndEventNodeExecutor | end | Terminal point, completes workflow |
TaskNodeExecutor | task | Executes task via TaskExecutor |
UserTaskNodeExecutor | userTask | Creates user task, waits for signal |
SignalEventNodeExecutor | signalWait | Waits for external signal |
OneOfGatewayNodeExecutor | oneOf | Exclusive routing (XOR) |
AnyOfGatewayNodeExecutor | anyOf | Race routing (first wins) |
AllOfGatewayNodeExecutor | allOf | Parallel routing (AND) |
SubflowNodeExecutor | subflow | Invokes child workflow |
TimerEventNodeExecutor | timerWait | Waits for timer |
StartEventNodeExecutor
dart
class StartEventNodeExecutor extends NodeExecutor {
@override
NodeType get type => NodeType.start;
@override
Future<NodeResult> execute(WorkflowContext context) async {
final outgoing = context.outgoingEdges;
if (outgoing.isEmpty) {
return FailWorkflowResult.validation('Start node has no outgoing edges');
}
return ContinueResult.single(outgoing.first.targetNodeId);
}
}EndEventNodeExecutor
dart
class EndEventNodeExecutor extends NodeExecutor {
@override
NodeType get type => NodeType.end;
@override
Future<NodeResult> execute(WorkflowContext context) async {
return CompleteWorkflowResult(output: context.output);
}
}Examples
Custom NodeExecutor
dart
class TimerNodeExecutor extends NodeExecutor {
@override
NodeType get type => NodeType.timerWait;
@override
Future<NodeResult> execute(WorkflowContext context) async {
// Get typed configuration
final config = context.currentNode.config as TimerWaitNodeConfiguration;
// Calculate fire time based on config
DateTime fireAt;
if (config.timerType == TimerType.duration) {
fireAt = DateTime.now().add(config.duration!);
} else {
fireAt = config.dateTime!;
}
await timerService.schedule(
workflowInstanceId: context.instance.id,
signalName: 'timer_fired',
fireAt: fireAt,
);
return WaitForSignalResult(signalName: 'timer_fired');
}
@override
Future<NodeResult> onSignalReceived(
WorkflowContext context,
String signalName,
Map<String, dynamic>? payload,
) async {
return ContinueResult.single(
context.outgoingEdges.first.targetNodeId,
output: {'timerFired': true, 'firedAt': DateTime.now().toIso8601String()},
);
}
}TypedNodeExecutor
dart
@JsonSerializable()
class RetryTaskConfig extends NodeConfiguration {
static const schemaTypeName = 'config.task.retry';
const RetryTaskConfig({
required this.schemaType,
this.maxRetries = 3,
this.retryDelayMs = 1000,
this.storeAs,
});
factory RetryTaskConfig.fromJson(Map<String, dynamic> json) =>
_$RetryTaskConfigFromJson(json);
@override
final String schemaType;
final int maxRetries;
final int retryDelayMs;
@override
final String? storeAs;
@override
Map<String, dynamic> toJson() => _$RetryTaskConfigToJson(this);
}
class RetryTaskNodeExecutor extends TypedNodeExecutor<RetryTaskConfig> {
@override
NodeType get type => NodeType.task;
@override
RetryTaskConfig fromConfig(Map<String, dynamic> config) =>
RetryTaskConfig.fromJson(config);
@override
Future<NodeResult> executeTyped(
RetryTaskConfig config,
WorkflowContext context,
) async {
// Fully typed access to configuration!
for (var attempt = 1; attempt <= config.maxRetries; attempt++) {
try {
final result = await executeTask(context);
return result;
} catch (e) {
if (attempt == config.maxRetries) {
return FailWorkflowResult.internal(
'Task failed after ${config.maxRetries} retries: $e',
);
}
await Future.delayed(Duration(milliseconds: config.retryDelayMs));
}
}
throw StateError('Unreachable');
}
Future<NodeResult> executeTask(WorkflowContext context) async {
// Actual task implementation
return ContinueResult.single(context.outgoingEdges.first.targetNodeId);
}
}Batch Processing Node
dart
@JsonSerializable()
class BatchConfig extends NodeConfiguration {
const BatchConfig({
required this.schemaType,
this.batchSize = 100,
this.parallelism = 4,
this.storeAs,
});
factory BatchConfig.fromJson(Map<String, dynamic> json) =>
_$BatchConfigFromJson(json);
@override
final String schemaType;
final int batchSize;
final int parallelism;
@override
final String? storeAs;
@override
Map<String, dynamic> toJson() => _$BatchConfigToJson(this);
}
class BatchProcessingNodeExecutor extends TypedNodeExecutor<BatchConfig> {
@override
NodeType get type => NodeType.task;
@override
BatchConfig fromConfig(Map<String, dynamic> config) =>
BatchConfig.fromJson(config);
@override
Future<NodeResult> executeTyped(
BatchConfig config,
WorkflowContext context,
) async {
final items = context.output['items'] as List? ?? [];
// Process in batches with typed config
for (var i = 0; i < items.length; i += config.batchSize) {
final batch = items.skip(i).take(config.batchSize).toList();
await processInParallel(batch, config.parallelism);
}
return ContinueResult.single(
context.outgoingEdges.first.targetNodeId,
output: {'processed': items.length},
);
}
}Registration
dart
final descriptor = WorkflowDescriptor(
title: 'Custom Node Executors',
nodeExecutors: [
RetryTaskNodeExecutor(),
BatchProcessingNodeExecutor(),
],
);
// Create deserialization context with all descriptors
final context = RegistryDeserializationContext(
descriptors: [DefaultWorkflowDescriptor(), descriptor],
);
// Create engine with context and storage
final engine = WorkflowEngine(
context: context,
storage: InMemoryStorage(context: context),
);
await engine.initialize();Comparison
| Feature | NodeExecutor | TypedNodeExecutor |
|---|---|---|
| Config access | context.currentNode.config['key'] | config.key |
| Type safety | Runtime | Compile-time |
| IDE support | Limited | Full autocomplete |
| Error handling | Manual | Auto-wrapped |
When to Use
Use NodeExecutor
- Simple nodes with minimal configuration
- Built-in node type overrides
- Prototyping and testing
- One-off custom node types
Use TypedNodeExecutor
- Complex configurations with many fields
- Custom node types with specific settings
- Production code requiring type safety
- Reusable node executor libraries
Best Practices
- Override buildNode for custom node building logic
- Implement onSignalReceived for wait nodes
- Use TypedNodeExecutor for complex configurations
- Return detailed FailWorkflowResult on errors
- Register via WorkflowDescriptor for engine integration
See Also
- Node Executors - Detailed guide
- Type Registries - Registration patterns
- NodeResult - Result types
- WorkflowDescriptor - Registration API