Skip to content

Node Executors

Node executors execute specific node types. The engine maintains a registry mapping NodeType to its executor.

Executor Registry

NodeTypeExecutor
startStartEventNodeExecutor
endEndEventNodeExecutor
taskTaskNodeExecutor
userTaskUserTaskNodeExecutor
signalWaitSignalEventNodeExecutor
timerWaitTimerEventNodeExecutor
oneOfOneOfGatewayNodeExecutor
anyOfAnyOfGatewayNodeExecutor
allOfAllOfGatewayNodeExecutor
subflowSubflowNodeExecutor

NodeExecutor Interface

dart
abstract class NodeExecutor {
  /// The node type this executor supports
  NodeType get type;

  /// Execute the node logic
  Future<NodeResult> execute(WorkflowContext context);

  /// Handle signal received (for wait nodes)
  Future<NodeResult> onSignalReceived(
    WorkflowContext context,
    String signalName,
    Map<String, dynamic>? payload,
  );

  /// Validate node configuration
  List<String> validateNode(WorkflowNode node);
}

Executor Resolution

Node executors are resolved automatically when loading workflows:

dart
// When you load a workflow, executors are resolved
final workflow = engine.loadWorkflow(jsonData);

// Each node now has its executor attached
for (final node in workflow.nodes) {
  print('${node.name}: ${node.executor?.runtimeType}');
  // Start: StartEventNodeExecutor
  // Validate: TaskNodeExecutor
  // etc.
}

The engine uses the NodeExecutorRegistry to match each node's type field to the appropriate executor.

WorkflowContext

Context provided to executors:

dart
class WorkflowContext {
  final Workflow workflow;
  final WorkflowInstance instance;
  final WorkflowNode currentNode;
  final WorkflowToken currentToken;
  final List<WorkflowEdge> incomingEdges;
  final List<WorkflowEdge> outgoingEdges;

  /// Current workflow output (accumulated state)
  Map<String, dynamic> get output => instance.output;
}

Built-in Executors

StartEventNodeExecutor

dart
class StartEventNodeExecutor extends NodeExecutor {
  @override
  NodeType get type => NodeType.start;

  @override
  Future<NodeResult> execute(WorkflowContext context) async {
    final outgoing = context.outgoingEdges;
    if (outgoing.isEmpty) {
      return FailWorkflowResult.validation('Start node has no outgoing edges');
    }
    return ContinueResult.single(outgoing.first.targetNodeId);
  }
}

EndEventNodeExecutor

dart
class EndEventNodeExecutor extends NodeExecutor {
  @override
  NodeType get type => NodeType.end;

  @override
  Future<NodeResult> execute(WorkflowContext context) async {
    return CompleteWorkflowResult(output: context.output);
  }
}

TaskNodeExecutor

dart
class TaskNodeExecutor extends NodeExecutor {
  @override
  NodeType get type => NodeType.task;

  @override
  Future<NodeResult> execute(WorkflowContext context) async {
    // Get typed configuration
    final config = context.currentNode.config as TaskNodeConfiguration;

    // Get executor from registry by schemaType
    final executor = registry.tasks.createRequired(config.schemaType);

    // Create execution context
    final executionContext = ExecutionContext(
      workflowContext: context,
      input: context.output,
      currentNode: context.currentNode,
    );

    // Execute task
    final result = await executor.execute(executionContext);

    // Handle result
    switch (result) {
      case TaskSuccess(:final output):
        return ContinueResult.single(
          context.outgoingEdges.first.targetNodeId,
          output: output,
        );
      case TaskFailure(:final errorType, :final message):
        return FailWorkflowResult(
          errorType: errorType,
          message: message,
        );
    }
  }
}

SignalEventNodeExecutor

dart
class SignalEventNodeExecutor extends NodeExecutor {
  @override
  NodeType get type => NodeType.signalWait;

  @override
  Future<NodeResult> execute(WorkflowContext context) async {
    // Typed config access
    final config = context.currentNode.config as SignalWaitNodeConfiguration;

    return WaitForSignalResult(
      signalName: config.signalName,
      storeAs: config.storeAs,
    );
  }

  @override
  Future<NodeResult> onSignalReceived(
    WorkflowContext context,
    String signalName,
    Map<String, dynamic>? payload,
  ) async {
    final config = context.currentNode.config as SignalWaitNodeConfiguration;
    final output = config.storeAs != null
        ? {config.storeAs!: payload}
        : payload ?? {};

    return ContinueResult.single(
      context.outgoingEdges.first.targetNodeId,
      output: output,
    );
  }
}

NodeResult Types

Handlers return results that determine the next action:

Result TypeEngine Action
ContinueResultMove token to target node(s)
WaitForSignalResultPause, set status to waitingForSignal
WaitForUserTaskResultCreate user task, then wait
WaitForJoinResultWait for parallel branches
CompleteWorkflowResultMark workflow as completed
FailWorkflowResultMark workflow as failed

ContinueResult

dart
// Single target
return ContinueResult.single('nextNode', output: {'data': 'value'});

// Multiple targets (for parallel fork)
return ContinueResult(
  targetNodeIds: ['branchA', 'branchB', 'branchC'],
  output: {'forkedAt': DateTime.now().toIso8601String()},
);

WaitForSignalResult

dart
return WaitForSignalResult(
  signalName: 'external_callback',
  storeAs: 'callbackData',
);

WaitForJoinResult

dart
// Parallel gateway join waiting for branches
return WaitForJoinResult(
  waitingForBranches: ['branchA', 'branchB'],
);

Custom Node Executors

Create custom executors for specialized node types:

dart
class CustomTimerNodeExecutor extends NodeExecutor {
  @override
  NodeType get type => NodeType.timerWait;

  @override
  Future<NodeResult> execute(WorkflowContext context) async {
    final config = context.currentNode.config as TimerWaitNodeConfiguration;

    // Schedule timer based on configuration
    if (config.timerType == TimerType.duration) {
      await timerService.schedule(
        workflowInstanceId: context.instance.id,
        signalName: 'timer_fired',
        fireAt: DateTime.now().add(config.duration!),
      );
    } else {
      await timerService.schedule(
        workflowInstanceId: context.instance.id,
        signalName: 'timer_fired',
        fireAt: config.dateTime!,
      );
    }

    return WaitForSignalResult(
      signalName: 'timer_fired',
    );
  }
}

// Register custom executor via WorkflowDescriptor
final descriptor = WorkflowDescriptor(
  title: 'Custom Executors',
  nodeExecutors: [CustomTimerNodeExecutor()],
);

final engine = WorkflowEngine(
  descriptors: [DefaultWorkflowDescriptor(), descriptor],
  storage: storage,
);

TypedNodeExecutor

For type-safe access to node configurations, use TypedNodeExecutor<TConfig>:

dart
abstract class TypedNodeExecutor<TConfig extends NodeConfiguration>
    extends NodeExecutor {
  /// Deserialize configuration from raw JSON map
  TConfig fromConfig(Map<String, dynamic> config);

  /// Type-safe execute method with typed configuration
  Future<NodeResult> executeTyped(TConfig config, WorkflowContext context);
}

Example: Custom Gateway Executor

dart
@JsonSerializable()
class PriorityGatewayConfig extends NodeConfiguration {
  static const schemaTypeName = 'config.gateway.priority';

  const PriorityGatewayConfig({
    this.priorityField = 'priority',
    this.defaultRoute,
    this.storeAs,
  });

  factory PriorityGatewayConfig.fromJson(Map<String, dynamic> json) =>
      _$PriorityGatewayConfigFromJson(json);

  final String priorityField;
  final String? defaultRoute;

  @override
  final String? storeAs;

  @override
  String? get schemaType => schemaTypeName;

  @override
  Map<String, dynamic> toJson() => _$PriorityGatewayConfigToJson(this);
}

class PriorityGatewayExecutor extends TypedNodeExecutor<PriorityGatewayConfig> {
  @override
  NodeType get type => NodeType.oneOf;

  @override
  PriorityGatewayConfig fromConfig(Map<String, dynamic> config) =>
      PriorityGatewayConfig.fromJson(config);

  @override
  Future<NodeResult> executeTyped(
    PriorityGatewayConfig config,
    WorkflowContext context,
  ) async {
    // Fully typed access to configuration!
    final priority = context.output[config.priorityField] as String?;

    // Find matching edge
    for (final edge in context.outgoingEdges) {
      if (edge.label?.toLowerCase() == priority?.toLowerCase()) {
        return ContinueResult.single(edge.targetNodeId);
      }
    }

    // Use default route or first edge
    final targetId = config.defaultRoute ?? context.outgoingEdges.first.targetNodeId;
    return ContinueResult.single(targetId);
  }
}

// Register the custom configuration type
final descriptor = WorkflowDescriptor(
  title: 'Custom Gateway',
  nodeExecutors: [PriorityGatewayExecutor()],
  nodeConfigurations: [
    TypeDescriptor<NodeConfiguration>(
      schemaType: PriorityGatewayConfig.schemaTypeName,
      fromJson: PriorityGatewayConfig.fromJson,
    ),
  ],
);

Benefits of TypedNodeExecutor

FeatureNodeExecutorTypedNodeExecutor
Config accessconfig['key'] castconfig.key
Type safetyRuntime castingCompile-time
IDE supportLimitedFull autocomplete
RefactoringError-proneSafe

When to Use

  • Use TypedNodeExecutor for:

    • Complex configurations with many fields
    • Custom node types with specific settings
    • Production code requiring type safety
  • Use NodeExecutor for:

    • Simple nodes with minimal config
    • Prototyping and testing
    • One-off custom executors

Type-Safe Configuration Access

With the unified model, node configurations are already type-safe. Access them via pattern matching:

dart
// In your custom executor
@override
Future<NodeResult> execute(WorkflowContext context) async {
  final node = context.currentNode;

  switch (node.config) {
    case TaskNodeConfiguration config:
      print('Task schemaType: ${config.schemaType}');
      print('Store as: ${config.storeAs}');

    case UserTaskNodeConfiguration config:
      print('User task title: ${config.title}');
      print('Assigned to: ${config.assignToRole}');

    case SignalWaitNodeConfiguration config:
      print('Waiting for signal: ${config.signalName}');

    case GatewayNodeConfiguration config:
      print('Gateway with ${config.outputPorts?.length ?? 0} ports');

    case TimerWaitNodeConfiguration config:
      print('Timer type: ${config.timerType}');

    case SubflowNodeConfiguration config:
      print('Subflow: ${config.workflowCode}');

    case EmptyNodeConfiguration _:
      print('Start/End node');
  }

  // Continue execution...
  return ContinueResult.single(context.outgoingEdges.first.targetNodeId);
}

Next Steps