Task Executors
Task executors execute automated work for task nodes.
TaskExecutor Interface
dart
abstract class TaskExecutor implements SchemaItem {
/// Schema type identifier for registry lookup (e.g., 'task.entity.validate')
String get schemaType;
/// Human-readable name
String get name;
/// Optional description
String get description => '';
/// Execute the task logic
Future<TaskResult> execute(ExecutionContext context);
}ExecutionContext
A read-only context providing access to all execution state. Executors return results with effects instead of calling mutation methods directly.
Data Model
The context provides three levels of data access:
| Property | Description |
|---|---|
input | Output from the previous node (your primary data source) |
workflowInput | Original input when workflow started (immutable) |
accumulated | All output accumulated during workflow execution |
Core Properties
dart
class ExecutionContext {
/// The executable workflow with nodes and edges
final Workflow workflow;
/// The current workflow instance
final WorkflowInstance workflowInstance;
/// The node being executed
final WorkflowNode currentNode;
/// The token that triggered this execution
final WorkflowToken currentToken;
/// Current accumulated output (read-only)
final Map<String, dynamic> output;
/// Executor-specific configuration
final Map<String, dynamic> config;
/// Signal name for user tasks
final String? signalName;
}Data Access Methods
dart
// ═══════════════════════════════════════════════════════════════════
// INPUT - Data from previous node (your primary data source)
// ═══════════════════════════════════════════════════════════════════
T? get<T>(String path); // Get value by key or path
T getRequired<T>(String path); // Throws if missing
// ═══════════════════════════════════════════════════════════════════
// WORKFLOW INPUT - Original input when workflow started (immutable)
// ═══════════════════════════════════════════════════════════════════
Map<String, dynamic> get workflowInput; // Original workflow input
T? getInitial<T>(String path); // Get from workflowInput
T getInitialRequired<T>(String path); // Throws if missing
// ═══════════════════════════════════════════════════════════════════
// ACCUMULATED - All output from workflow execution
// ═══════════════════════════════════════════════════════════════════
Map<String, dynamic> get accumulated; // All accumulated output
T? getAny<T>(String path); // Get from accumulated
// ═══════════════════════════════════════════════════════════════════
// CONFIG - Node configuration
// ═══════════════════════════════════════════════════════════════════
T? getConfig<T>(String key); // Get config value
T getConfigRequired<T>(String key); // Throws if missingNavigation and Retry Helpers
dart
// Navigation
List<WorkflowEdge> get outgoingEdges;
List<WorkflowEdge> get incomingEdges;
WorkflowNode? getNode(String nodeId);
// Retry information
int get attemptNumber; // 1-based (1 = first try, 2 = first retry)
bool get isRetry; // True if attemptNumber > 1TaskResult
Tasks return one of two result types:
TaskSuccess
dart
class TaskSuccess extends TaskResult {
final Map<String, dynamic> output;
final String? outputPortId; // Optional port for routing
final List<WorkflowEffect> effects; // Declarative side effects
TaskSuccess({
this.output = const {},
this.outputPortId,
this.effects = const [],
});
}TaskFailure
dart
class TaskFailure extends TaskResult {
final ErrorType errorType;
final String message;
final bool isRetryable;
final Map<String, dynamic>? details;
TaskFailure({
required this.errorType,
required this.message,
this.isRetryable = true, // Default is retryable
this.details,
});
// Factory constructors
factory TaskFailure.permanent({...});
factory TaskFailure.validation(String message, {...});
factory TaskFailure.internal(String message, {...});
factory TaskFailure.timeout(String message, {...});
}Implementing a Task Executor
Basic Example
dart
class SendEmailTaskExecutor extends TaskExecutor {
static const _schemaType = 'task.notification.sendEmail';
static final typeDescriptor = TypeDescriptor<TaskExecutor>(
schemaType: _schemaType,
fromJson: (json) => SendEmailTaskExecutor(),
title: 'Send Email',
);
@override
String get schemaType => _schemaType;
@override
String get name => 'Send Email';
@override
String get description => 'Sends an email notification';
@override
Future<TaskResult> execute(ExecutionContext context) async {
// Get data from previous node output
final to = context.get<String>('recipientEmail');
final subject = context.get<String>('emailSubject');
final body = context.get<String>('emailBody');
// Get design-time configuration
final template = context.getConfig<String>('template');
// Validate
if (to == null) {
return TaskFailure.validation('Missing required field: recipientEmail');
}
try {
await emailService.send(
to: to,
subject: subject ?? 'Notification',
body: body,
template: template,
);
return TaskSuccess(output: {
'sentAt': DateTime.now().toIso8601String(),
'recipient': to,
});
} catch (e) {
return TaskFailure(
errorType: ErrorType.internal,
message: 'Failed to send email: $e',
isRetryable: true,
);
}
}
}
// Register via descriptor
final descriptor = WorkflowDescriptor(
title: 'Email Tasks',
tasks: [SendEmailTaskExecutor.typeDescriptor],
);
// Create context with descriptors
final context = RegistryDeserializationContext(
descriptors: [DefaultWorkflowDescriptor(), descriptor],
);
// Create engine with context and storage
final engine = WorkflowEngine(
context: context,
storage: InMemoryStorage(context: context),
);
await engine.initialize();Idempotent Task Executor
dart
class CreateOrderTaskExecutor extends TaskExecutor {
static const _schemaType = 'task.order.create';
final OrderRepository orderRepository;
CreateOrderTaskExecutor(this.orderRepository);
@override
String get schemaType => _schemaType;
@override
String get name => 'Create Order';
@override
Future<TaskResult> execute(ExecutionContext context) async {
final orderId = context.getRequired<String>('orderId');
final items = context.getRequired<List>('items');
// IDEMPOTENT: Check if already created
final existing = await orderRepository.findById(orderId);
if (existing != null) {
return TaskSuccess(output: {
'orderId': existing.id,
'status': 'already_exists',
'createdAt': existing.createdAt.toIso8601String(),
});
}
// Create new order
final order = await orderRepository.create(
id: orderId,
items: items,
);
return TaskSuccess(output: {
'orderId': order.id,
'status': 'created',
'createdAt': order.createdAt.toIso8601String(),
});
}
}Task with Effects
dart
class ProcessApprovalExecutor extends TaskExecutor {
@override
String get schemaType => 'task.approval.process';
@override
String get name => 'Process Approval';
@override
Future<TaskResult> execute(ExecutionContext context) async {
final decision = context.getRequired<String>('decision');
final approvedBy = context.getRequired<String>('approvedBy');
// Return output with declarative effects
return TaskSuccess(
output: {
'processed': true,
'decision': decision,
'processedAt': DateTime.now().toIso8601String(),
},
effects: [
// Cancel any pending user tasks
const CancelUserTasksEffect(),
// Record audit event
RecordEventEffect(
event: WorkflowEvent.custom(
instanceId: context.workflowInstanceId,
nodeId: context.nodeId,
eventType: 'approval_processed',
data: {'decision': decision, 'approvedBy': approvedBy},
),
),
],
);
}
}Task with External API
dart
class FetchPricingTaskExecutor extends TaskExecutor {
static const _schemaType = 'task.pricing.fetch';
final PricingApiClient apiClient;
FetchPricingTaskExecutor(this.apiClient);
@override
String get schemaType => _schemaType;
@override
String get name => 'Fetch Pricing';
@override
Future<TaskResult> execute(ExecutionContext context) async {
final productIds = context.getRequired<List<String>>('productIds');
// Check retry info
if (context.isRetry) {
print('Retry attempt ${context.attemptNumber}');
}
try {
final pricing = await apiClient.getPricing(productIds);
return TaskSuccess(output: {
'pricing': pricing.map((p) => p.toJson()).toList(),
'fetchedAt': DateTime.now().toIso8601String(),
});
} on TimeoutException {
return TaskFailure.timeout('Pricing API timeout');
} on ApiException catch (e) {
return TaskFailure(
errorType: ErrorType.activity,
message: 'Pricing API error: ${e.message}',
isRetryable: e.statusCode >= 500,
);
}
}
}Using in Workflows
Inline Execute
dart
builder.task(
'validateData',
name: 'Validate Data',
execute: (ctx) async {
final data = ctx.getRequired<Map>('data');
// Inline logic
return TaskSuccess(output: {'validated': true});
},
);Executor-Based
dart
// Register via descriptor
final descriptor = WorkflowDescriptor(
title: 'Notification Tasks',
tasks: [SendEmailTaskExecutor.typeDescriptor],
);
// Use in workflow
builder.task(
'sendNotification',
name: 'Send Notification',
executor: SendEmailTaskExecutor(),
);Error Handling
Validation Errors
dart
final amount = context.get<num>('amount');
if (amount == null) {
return TaskFailure.validation('Amount is required');
}Retryable Errors
dart
try {
await externalService.call();
} catch (e) {
return TaskFailure(
errorType: ErrorType.activity,
message: 'Service unavailable',
isRetryable: true, // Engine may retry
);
}Using Factory Constructors
dart
// Validation error (not retryable by default)
return TaskFailure.validation('Invalid input');
// Internal error (not retryable by default)
return TaskFailure.internal('Unexpected error');
// Timeout error (retryable by default)
return TaskFailure.timeout('Service timeout');
// Permanent failure (never retry)
return TaskFailure.permanent(
errorType: ErrorType.validation,
message: 'Data corruption detected',
);TypedTaskExecutor
For type-safe input/output handling with compile-time safety:
dart
/// Type-safe base class for task executors
abstract class TypedTaskExecutor<TInput, TOutput> extends TaskExecutor {
/// Deserialize input from workflow variables
TInput fromInput(Map<String, dynamic> input);
/// Serialize output to workflow variables
Map<String, dynamic> toOutput(TOutput output);
/// Type-safe execute method
Future<TOutput> executeTyped(TInput input, ExecutionContext context);
@override
Future<TaskResult> execute(ExecutionContext context) async {
try {
final input = fromInput(context.input);
final output = await executeTyped(input, context);
return TaskSuccess(output: toOutput(output));
} on FormatException catch (e) {
return TaskFailure.validation('Invalid input format: ${e.message}');
} catch (e) {
return TaskFailure.internal('Task execution failed: $e');
}
}
}Example with JsonSerializable
Use @JsonSerializable from json_annotation for input/output models:
dart
import 'package:json_annotation/json_annotation.dart';
part 'validate_input.g.dart';
@JsonSerializable()
class ValidateInput {
final String entityId;
final Map<String, dynamic> entityData;
final List<String> requiredFields;
ValidateInput({
required this.entityId,
required this.entityData,
required this.requiredFields,
});
factory ValidateInput.fromJson(Map<String, dynamic> json) =>
_$ValidateInputFromJson(json);
Map<String, dynamic> toJson() => _$ValidateInputToJson(this);
}
@JsonSerializable()
class ValidateOutput {
final bool isValid;
final List<String> errors;
final String entityId;
ValidateOutput({
required this.isValid,
required this.errors,
required this.entityId,
});
factory ValidateOutput.fromJson(Map<String, dynamic> json) =>
_$ValidateOutputFromJson(json);
Map<String, dynamic> toJson() => _$ValidateOutputToJson(this);
}
// Type-safe executor
class ValidateEntityExecutor
extends TypedTaskExecutor<ValidateInput, ValidateOutput> {
static const _schemaType = 'task.entity.validate';
static final typeDescriptor = TypeDescriptor<TaskExecutor>(
schemaType: _schemaType,
fromJson: (json) => ValidateEntityExecutor(),
title: 'Validate Entity',
);
@override
String get schemaType => _schemaType;
@override
String get name => 'Validate Entity';
@override
ValidateInput fromInput(Map<String, dynamic> input) =>
ValidateInput.fromJson(input);
@override
Map<String, dynamic> toOutput(ValidateOutput output) =>
output.toJson();
@override
Future<ValidateOutput> executeTyped(
ValidateInput input,
ExecutionContext context,
) async {
// Fully typed access to input!
final errors = <String>[];
for (final field in input.requiredFields) {
if (!input.entityData.containsKey(field)) {
errors.add('Missing required field: $field');
}
}
return ValidateOutput(
isValid: errors.isEmpty,
errors: errors,
entityId: input.entityId,
);
}
}Benefits of TypedTaskExecutor
| Aspect | TaskExecutor | TypedTaskExecutor |
|---|---|---|
| Input access | context.get<T>('field') | input.field |
| Type safety | Runtime checks | Compile-time checks |
| Refactoring | Error-prone | IDE-supported |
| Auto-complete | None | Full support |
| Error handling | Manual | Automatic deserialization errors |
When to Use
Use TypedTaskExecutor when:
- Complex input/output structures
- Multiple required fields
- Need strong type guarantees
- Building reusable executors
Use basic TaskExecutor when:
- Simple input (1-2 fields)
- Quick inline tasks
- Prototyping
Best Practices
- Keep tasks focused - Single responsibility
- Make tasks idempotent - Safe to retry
- Validate inputs early - Fail fast with clear errors
- Use meaningful output keys - Document the output schema
- Handle errors gracefully - Return appropriate TaskFailure
- Use dependency injection - For testability
- Use TypedTaskExecutor for complex input - Compile-time type safety
- Use effects for side effects - Keep executors pure by returning effects
- Use
get<T>for previous node output - This is your primary data source - Use
getInitial<T>for original workflow input - For configuration that persists
Data Access Patterns
dart
// PRIMARY: Get data from previous node
final entityId = context.get<String>('entityId');
// REQUIRED: Throws if missing
final userId = context.getRequired<String>('userId');
// ORIGINAL: Get from workflow input (immutable)
final tenantId = context.getInitial<String>('tenantId');
// ANYWHERE: Get from accumulated output
final approval = context.getAny<Map>('level1Approval');
// CONFIG: Get node configuration
final template = context.getConfig<String>('template');
// NESTED: Use dot notation for nested paths
final decision = context.get<String>('approval.decision');Next Steps
- User Task Executors - Human task executors
- Workflow Effects - Effect types reference
- Idempotency - Crash recovery
- Error Handling - Error patterns