Skip to content

Task Nodes

Task nodes execute automated work in a workflow. They're the workhorses of business process automation.

Definition

Inline Execute Function

For simple tasks, define the logic inline:

dart
builder.task(
  'validateRequest',
  name: 'Validate Request',
  execute: (ExecutionContext ctx) async {
    // Use getInitial<T> for original workflow input
    final entityId = ctx.getInitial<String>('entityId');
    if (entityId == null) {
      throw Exception('entityId is required');
    }
    return {'validated': true};  // Auto-wrapped in TaskSuccess
  },
);

Executor-Based

For reusable logic, use an executor class:

dart
// Define the executor
class SendEmailTaskExecutor extends TaskExecutor {
  static const _schemaType = 'task.email.send';

  @override
  String get schemaType => _schemaType;

  @override
  String get name => 'Send Email';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    // Get data from previous node output
    final to = context.get<String>('recipientEmail');
    // Get node configuration
    final template = context.getConfig<String>('template');

    await emailService.send(to: to, template: template);
    return TaskSuccess(output: {'sentAt': DateTime.now().toIso8601String()});
  }

  static final typeDescriptor = TypeDescriptor<TaskExecutor>(
    schemaType: _schemaType,
    fromJson: (_) => SendEmailTaskExecutor(),
    title: 'Send Email',
  );
}

// Register via descriptor
final descriptor = WorkflowDescriptor(
  title: 'Email Tasks',
  tasks: [SendEmailTaskExecutor.typeDescriptor],
);

// Use in workflow
builder.task(
  'notifyApprover',
  name: 'Notify Approver',
  executor: SendEmailTaskExecutor(),
);

ExecutionContext

The context provided to task executors:

dart
class ExecutionContext {
  final Workflow workflow;                // The workflow definition
  final WorkflowInstance workflowInstance; // Current instance
  final WorkflowNode currentNode;         // The node being executed
  final Map<String, dynamic> input;       // Previous node output
  final Map<String, dynamic> config;      // Node configuration

  // Typed accessors for previous node output
  T? get<T>(String path);           // Get from previous node output
  T getRequired<T>(String path);    // Throws if missing

  // Original workflow input (immutable)
  T? getInitial<T>(String path);

  // Accumulated output from all nodes
  T? getAny<T>(String path);

  // Node configuration
  T? getConfig<T>(String key);
  T getConfigRequired<T>(String key);
}

TaskResult

Tasks return one of two result types:

TaskSuccess

dart
return TaskSuccess(output: {
  'processedAt': DateTime.now().toIso8601String(),
  'recordId': newRecord.id,
});

TaskFailure

dart
return TaskFailure(
  errorType: ErrorType.validation,
  message: 'Invalid input: missing required field',
  isRetryable: false,
);

return TaskFailure(
  errorType: ErrorType.internal,
  message: 'Database connection failed',
  isRetryable: true,  // Engine may retry
);

Output Namespacing

Use storeAs to namespace task output:

dart
builder.task(
  'fetchUser',
  name: 'Fetch User',
  storeAs: 'userData',  // Output stored under 'userData' key
  execute: (ctx) async {
    // Use get<T> for previous node output
    final userId = ctx.get<String>('userId')!;
    final user = await userService.get(userId);
    return TaskSuccess(output: user.toJson());
  },
);

// Later, access with dot-notation: ctx.get<String>('userData.name')

Idempotent Tasks

Tasks should be idempotent for crash recovery:

dart
class CreateOrderTaskExecutor extends TaskExecutor {
  static const _schemaType = 'task.order.create';

  @override
  String get schemaType => _schemaType;

  @override
  String get name => 'Create Order';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    final orderId = context.getRequired<String>('orderId');

    // IDEMPOTENT: Check if already created
    final existing = await orderRepository.findById(orderId);
    if (existing != null) {
      return TaskSuccess(output: {
        'orderId': orderId,
        'status': 'already_exists',
      });
    }

    // Create new order
    final order = await orderRepository.create(orderId, context.input);
    return TaskSuccess(output: {
      'orderId': order.id,
      'status': 'created',
    });
  }
}

Error Handling

Retryable Errors

dart
try {
  await externalService.call();
  return TaskSuccess(output: {...});
} catch (e) {
  if (e is TimeoutException) {
    return TaskFailure(
      errorType: ErrorType.timeout,
      message: 'Service timeout',
      isRetryable: true,
    );
  }
  rethrow;
}

Non-Retryable Errors

dart
if (!isValid(input)) {
  return TaskFailure(
    errorType: ErrorType.validation,
    message: 'Invalid input',
    isRetryable: false,  // Don't retry validation failures
  );
}

Common Task Patterns

Data Transformation

dart
builder.task('transformData', execute: (ctx) async {
  // Use get<T> for previous node output
  final input = ctx.get<Map<String, dynamic>>('rawData')!;
  final transformed = transformService.process(input);
  return {'transformedData': transformed};  // Auto-wrapped in TaskSuccess
});

External API Call

dart
builder.task('fetchExternalData', execute: (ctx) async {
  // Use get<T> for previous node output
  final id = ctx.get<String>('id')!;
  final response = await apiClient.get('/data/$id');
  return response.data;  // Auto-wrapped in TaskSuccess
});

Database Operation

dart
builder.task('updateStatus', execute: (ctx) async {
  // Use get<T> for previous node output
  final entityId = ctx.get<String>('entityId')!;
  final newStatus = ctx.get<String>('newStatus')!;
  await repository.updateStatus(entityId, newStatus);
  return {'updatedAt': DateTime.now().toIso8601String()};  // Auto-wrapped in TaskSuccess
});

Notification

dart
builder.task('sendNotification', execute: (ctx) async {
  // Use get<T> for previous node output
  final userId = ctx.get<String>('userId')!;
  await notificationService.send(
    userId: userId,
    message: 'Your request has been processed',
  );
  return {'notified': true};  // Auto-wrapped in TaskSuccess
});

Best Practices

  1. Keep tasks focused - One task, one responsibility
  2. Make tasks idempotent - Safe to retry
  3. Use storeAs - Avoid output key collisions
  4. Handle errors gracefully - Return appropriate TaskFailure
  5. Log important actions - For debugging and audit

Next Steps