Skip to content

Task Executors

Task executors execute automated work for task nodes.

Executor Model

TaskExecutor Interface

dart
abstract class TaskExecutor implements SchemaItem {
  /// Schema type identifier for registry lookup (e.g., 'task.entity.validate')
  String get schemaType;

  /// Human-readable name
  String get name;

  /// Optional description
  String get description => '';

  /// Execute the task logic
  Future<TaskResult> execute(ExecutionContext context);
}

ExecutionContext

A read-only context providing access to all execution state. Executors return results with effects instead of calling mutation methods directly.

Data Model

The context provides three levels of data access:

PropertyDescription
inputOutput from the previous node (your primary data source)
workflowInputOriginal input when workflow started (immutable)
accumulatedAll output accumulated during workflow execution

Core Properties

dart
class ExecutionContext {
  /// The executable workflow with nodes and edges
  final Workflow workflow;

  /// The current workflow instance
  final WorkflowInstance workflowInstance;

  /// The node being executed
  final WorkflowNode currentNode;

  /// The token that triggered this execution
  final WorkflowToken currentToken;

  /// Current accumulated output (read-only)
  final Map<String, dynamic> output;

  /// Executor-specific configuration
  final Map<String, dynamic> config;

  /// Signal name for user tasks
  final String? signalName;
}

Data Access Methods

dart
// ═══════════════════════════════════════════════════════════════════
// INPUT - Data from previous node (your primary data source)
// ═══════════════════════════════════════════════════════════════════

T? get<T>(String path);              // Get value by key or path
T getRequired<T>(String path);       // Throws if missing

// ═══════════════════════════════════════════════════════════════════
// WORKFLOW INPUT - Original input when workflow started (immutable)
// ═══════════════════════════════════════════════════════════════════

Map<String, dynamic> get workflowInput;  // Original workflow input
T? getInitial<T>(String path);           // Get from workflowInput
T getInitialRequired<T>(String path);    // Throws if missing

// ═══════════════════════════════════════════════════════════════════
// ACCUMULATED - All output from workflow execution
// ═══════════════════════════════════════════════════════════════════

Map<String, dynamic> get accumulated;    // All accumulated output
T? getAny<T>(String path);               // Get from accumulated

// ═══════════════════════════════════════════════════════════════════
// CONFIG - Node configuration
// ═══════════════════════════════════════════════════════════════════

T? getConfig<T>(String key);             // Get config value
T getConfigRequired<T>(String key);      // Throws if missing
dart
// Navigation
List<WorkflowEdge> get outgoingEdges;
List<WorkflowEdge> get incomingEdges;
WorkflowNode? getNode(String nodeId);

// Retry information
int get attemptNumber;   // 1-based (1 = first try, 2 = first retry)
bool get isRetry;        // True if attemptNumber > 1

TaskResult

Tasks return one of two result types:

TaskSuccess

dart
class TaskSuccess extends TaskResult {
  final Map<String, dynamic> output;
  final String? outputPortId;            // Optional port for routing
  final List<WorkflowEffect> effects;    // Declarative side effects

  TaskSuccess({
    this.output = const {},
    this.outputPortId,
    this.effects = const [],
  });
}

TaskFailure

dart
class TaskFailure extends TaskResult {
  final ErrorType errorType;
  final String message;
  final bool isRetryable;
  final Map<String, dynamic>? details;

  TaskFailure({
    required this.errorType,
    required this.message,
    this.isRetryable = true,  // Default is retryable
    this.details,
  });

  // Factory constructors
  factory TaskFailure.permanent({...});
  factory TaskFailure.validation(String message, {...});
  factory TaskFailure.internal(String message, {...});
  factory TaskFailure.timeout(String message, {...});
}

Implementing a Task Executor

Basic Example

dart
class SendEmailTaskExecutor extends TaskExecutor {
  static const _schemaType = 'task.notification.sendEmail';

  static final typeDescriptor = TypeDescriptor<TaskExecutor>(
    schemaType: _schemaType,
    fromJson: (json) => SendEmailTaskExecutor(),
    title: 'Send Email',
  );

  @override
  String get schemaType => _schemaType;

  @override
  String get name => 'Send Email';

  @override
  String get description => 'Sends an email notification';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    // Get data from previous node output
    final to = context.get<String>('recipientEmail');
    final subject = context.get<String>('emailSubject');
    final body = context.get<String>('emailBody');

    // Get design-time configuration
    final template = context.getConfig<String>('template');

    // Validate
    if (to == null) {
      return TaskFailure.validation('Missing required field: recipientEmail');
    }

    try {
      await emailService.send(
        to: to,
        subject: subject ?? 'Notification',
        body: body,
        template: template,
      );

      return TaskSuccess(output: {
        'sentAt': DateTime.now().toIso8601String(),
        'recipient': to,
      });
    } catch (e) {
      return TaskFailure(
        errorType: ErrorType.internal,
        message: 'Failed to send email: $e',
        isRetryable: true,
      );
    }
  }
}

// Register via descriptor
final descriptor = WorkflowDescriptor(
  title: 'Email Tasks',
  tasks: [SendEmailTaskExecutor.typeDescriptor],
);

// Create context with descriptors
final context = RegistryDeserializationContext(
  descriptors: [DefaultWorkflowDescriptor(), descriptor],
);

// Create engine with context and storage
final engine = WorkflowEngine(
  context: context,
  storage: InMemoryStorage(context: context),
);
await engine.initialize();

Idempotent Task Executor

dart
class CreateOrderTaskExecutor extends TaskExecutor {
  static const _schemaType = 'task.order.create';
  final OrderRepository orderRepository;

  CreateOrderTaskExecutor(this.orderRepository);

  @override
  String get schemaType => _schemaType;

  @override
  String get name => 'Create Order';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    final orderId = context.getRequired<String>('orderId');
    final items = context.getRequired<List>('items');

    // IDEMPOTENT: Check if already created
    final existing = await orderRepository.findById(orderId);
    if (existing != null) {
      return TaskSuccess(output: {
        'orderId': existing.id,
        'status': 'already_exists',
        'createdAt': existing.createdAt.toIso8601String(),
      });
    }

    // Create new order
    final order = await orderRepository.create(
      id: orderId,
      items: items,
    );

    return TaskSuccess(output: {
      'orderId': order.id,
      'status': 'created',
      'createdAt': order.createdAt.toIso8601String(),
    });
  }
}

Task with Effects

dart
class ProcessApprovalExecutor extends TaskExecutor {
  @override
  String get schemaType => 'task.approval.process';

  @override
  String get name => 'Process Approval';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    final decision = context.getRequired<String>('decision');
    final approvedBy = context.getRequired<String>('approvedBy');

    // Return output with declarative effects
    return TaskSuccess(
      output: {
        'processed': true,
        'decision': decision,
        'processedAt': DateTime.now().toIso8601String(),
      },
      effects: [
        // Cancel any pending user tasks
        const CancelUserTasksEffect(),
        // Record audit event
        RecordEventEffect(
          event: WorkflowEvent.custom(
            instanceId: context.workflowInstanceId,
            nodeId: context.nodeId,
            eventType: 'approval_processed',
            data: {'decision': decision, 'approvedBy': approvedBy},
          ),
        ),
      ],
    );
  }
}

Task with External API

dart
class FetchPricingTaskExecutor extends TaskExecutor {
  static const _schemaType = 'task.pricing.fetch';
  final PricingApiClient apiClient;

  FetchPricingTaskExecutor(this.apiClient);

  @override
  String get schemaType => _schemaType;

  @override
  String get name => 'Fetch Pricing';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    final productIds = context.getRequired<List<String>>('productIds');

    // Check retry info
    if (context.isRetry) {
      print('Retry attempt ${context.attemptNumber}');
    }

    try {
      final pricing = await apiClient.getPricing(productIds);

      return TaskSuccess(output: {
        'pricing': pricing.map((p) => p.toJson()).toList(),
        'fetchedAt': DateTime.now().toIso8601String(),
      });
    } on TimeoutException {
      return TaskFailure.timeout('Pricing API timeout');
    } on ApiException catch (e) {
      return TaskFailure(
        errorType: ErrorType.activity,
        message: 'Pricing API error: ${e.message}',
        isRetryable: e.statusCode >= 500,
      );
    }
  }
}

Using in Workflows

Inline Execute

dart
builder.task(
  'validateData',
  name: 'Validate Data',
  execute: (ctx) async {
    final data = ctx.getRequired<Map>('data');
    // Inline logic
    return TaskSuccess(output: {'validated': true});
  },
);

Executor-Based

dart
// Register via descriptor
final descriptor = WorkflowDescriptor(
  title: 'Notification Tasks',
  tasks: [SendEmailTaskExecutor.typeDescriptor],
);

// Use in workflow
builder.task(
  'sendNotification',
  name: 'Send Notification',
  executor: SendEmailTaskExecutor(),
);

Error Handling

Validation Errors

dart
final amount = context.get<num>('amount');
if (amount == null) {
  return TaskFailure.validation('Amount is required');
}

Retryable Errors

dart
try {
  await externalService.call();
} catch (e) {
  return TaskFailure(
    errorType: ErrorType.activity,
    message: 'Service unavailable',
    isRetryable: true,  // Engine may retry
  );
}

Using Factory Constructors

dart
// Validation error (not retryable by default)
return TaskFailure.validation('Invalid input');

// Internal error (not retryable by default)
return TaskFailure.internal('Unexpected error');

// Timeout error (retryable by default)
return TaskFailure.timeout('Service timeout');

// Permanent failure (never retry)
return TaskFailure.permanent(
  errorType: ErrorType.validation,
  message: 'Data corruption detected',
);

TypedTaskExecutor

For type-safe input/output handling with compile-time safety:

dart
/// Type-safe base class for task executors
abstract class TypedTaskExecutor<TInput, TOutput> extends TaskExecutor {
  /// Deserialize input from workflow variables
  TInput fromInput(Map<String, dynamic> input);

  /// Serialize output to workflow variables
  Map<String, dynamic> toOutput(TOutput output);

  /// Type-safe execute method
  Future<TOutput> executeTyped(TInput input, ExecutionContext context);

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    try {
      final input = fromInput(context.input);
      final output = await executeTyped(input, context);
      return TaskSuccess(output: toOutput(output));
    } on FormatException catch (e) {
      return TaskFailure.validation('Invalid input format: ${e.message}');
    } catch (e) {
      return TaskFailure.internal('Task execution failed: $e');
    }
  }
}

Example with JsonSerializable

Use @JsonSerializable from json_annotation for input/output models:

dart
import 'package:json_annotation/json_annotation.dart';

part 'validate_input.g.dart';

@JsonSerializable()
class ValidateInput {
  final String entityId;
  final Map<String, dynamic> entityData;
  final List<String> requiredFields;

  ValidateInput({
    required this.entityId,
    required this.entityData,
    required this.requiredFields,
  });

  factory ValidateInput.fromJson(Map<String, dynamic> json) =>
      _$ValidateInputFromJson(json);

  Map<String, dynamic> toJson() => _$ValidateInputToJson(this);
}

@JsonSerializable()
class ValidateOutput {
  final bool isValid;
  final List<String> errors;
  final String entityId;

  ValidateOutput({
    required this.isValid,
    required this.errors,
    required this.entityId,
  });

  factory ValidateOutput.fromJson(Map<String, dynamic> json) =>
      _$ValidateOutputFromJson(json);

  Map<String, dynamic> toJson() => _$ValidateOutputToJson(this);
}

// Type-safe executor
class ValidateEntityExecutor
    extends TypedTaskExecutor<ValidateInput, ValidateOutput> {
  static const _schemaType = 'task.entity.validate';

  static final typeDescriptor = TypeDescriptor<TaskExecutor>(
    schemaType: _schemaType,
    fromJson: (json) => ValidateEntityExecutor(),
    title: 'Validate Entity',
  );

  @override
  String get schemaType => _schemaType;

  @override
  String get name => 'Validate Entity';

  @override
  ValidateInput fromInput(Map<String, dynamic> input) =>
      ValidateInput.fromJson(input);

  @override
  Map<String, dynamic> toOutput(ValidateOutput output) =>
      output.toJson();

  @override
  Future<ValidateOutput> executeTyped(
    ValidateInput input,
    ExecutionContext context,
  ) async {
    // Fully typed access to input!
    final errors = <String>[];

    for (final field in input.requiredFields) {
      if (!input.entityData.containsKey(field)) {
        errors.add('Missing required field: $field');
      }
    }

    return ValidateOutput(
      isValid: errors.isEmpty,
      errors: errors,
      entityId: input.entityId,
    );
  }
}

Benefits of TypedTaskExecutor

AspectTaskExecutorTypedTaskExecutor
Input accesscontext.get<T>('field')input.field
Type safetyRuntime checksCompile-time checks
RefactoringError-proneIDE-supported
Auto-completeNoneFull support
Error handlingManualAutomatic deserialization errors

When to Use

  • Use TypedTaskExecutor when:

    • Complex input/output structures
    • Multiple required fields
    • Need strong type guarantees
    • Building reusable executors
  • Use basic TaskExecutor when:

    • Simple input (1-2 fields)
    • Quick inline tasks
    • Prototyping

Best Practices

  1. Keep tasks focused - Single responsibility
  2. Make tasks idempotent - Safe to retry
  3. Validate inputs early - Fail fast with clear errors
  4. Use meaningful output keys - Document the output schema
  5. Handle errors gracefully - Return appropriate TaskFailure
  6. Use dependency injection - For testability
  7. Use TypedTaskExecutor for complex input - Compile-time type safety
  8. Use effects for side effects - Keep executors pure by returning effects
  9. Use get<T> for previous node output - This is your primary data source
  10. Use getInitial<T> for original workflow input - For configuration that persists

Data Access Patterns

dart
// PRIMARY: Get data from previous node
final entityId = context.get<String>('entityId');

// REQUIRED: Throws if missing
final userId = context.getRequired<String>('userId');

// ORIGINAL: Get from workflow input (immutable)
final tenantId = context.getInitial<String>('tenantId');

// ANYWHERE: Get from accumulated output
final approval = context.getAny<Map>('level1Approval');

// CONFIG: Get node configuration
final template = context.getConfig<String>('template');

// NESTED: Use dot notation for nested paths
final decision = context.get<String>('approval.decision');

Next Steps