Signals
Signals are external messages that resume waiting workflows. They're the primary mechanism for async operations and external integrations.
Sending Signals
dart
await engine.sendSignal(
workflowInstanceId: instanceId,
node: 'awaitApproval', // The signal wait node ID
payload: {
'decision': 'approved',
'approvedBy': '[email protected]',
'comments': 'Looks good!',
'approvedAt': DateTime.now().toIso8601String(),
},
);Typed Signal Payloads
For type safety, use @JsonSerializable models for signal payloads:
dart
import 'package:json_annotation/json_annotation.dart';
part 'approval_payload.g.dart';
@JsonSerializable()
class ApprovalPayload {
final String decision;
final String approvedBy;
final String? comments;
final DateTime approvedAt;
ApprovalPayload({
required this.decision,
required this.approvedBy,
this.comments,
required this.approvedAt,
});
factory ApprovalPayload.fromJson(Map<String, dynamic> json) =>
_$ApprovalPayloadFromJson(json);
Map<String, dynamic> toJson() => _$ApprovalPayloadToJson(this);
}
// Usage with typed payload
final payload = ApprovalPayload(
decision: 'approved',
approvedBy: currentUserId,
comments: 'Looks good!',
approvedAt: DateTime.now(),
);
await engine.sendSignal(
workflowInstanceId: instanceId,
node: 'awaitApproval',
payload: payload.toJson(), // Serialize to JSON
);Reading Typed Payloads in Executors
dart
class ProcessApprovalExecutor extends TypedTaskExecutor<ApprovalPayload, ProcessResult> {
@override
String get schemaType => 'task.approval.process';
@override
String get name => 'Process Approval';
@override
ApprovalPayload fromInput(Map<String, dynamic> input) =>
ApprovalPayload.fromJson(input['approvalDecision']);
@override
Map<String, dynamic> toOutput(ProcessResult output) => output.toJson();
@override
Future<ProcessResult> executeTyped(
ApprovalPayload input,
ExecutionContext context,
) async {
// Fully typed access to payload!
// Use getAny<T> to access data from earlier nodes
final entityId = context.getAny<String>('entityId')!;
if (input.decision == 'approved') {
await entityService.approve(entityId);
}
return ProcessResult(
processed: true,
processedBy: input.approvedBy,
processedAt: DateTime.now(),
);
}
}Signal Processing Flow
Step Details:
engine.sendSignal()called with workflowInstanceId, node, and payload- Load workflow instance from storage
- Validate: status must be
waitingForSignal, signal name must match waiting node - Find waiting token at signal wait node
- Get node executor for signal wait node
- Call
executor.onSignalReceived(context, signalName, payload) - Handler returns
NodeResult, payload stored at storeAs path - Engine processes result, moves token
- Continue workflow execution
Signal Sources
Webhook Handlers
dart
// POST /webhooks/payment
Future<Response> handlePaymentWebhook(Request request) async {
final event = PaymentEvent.fromJson(await request.json());
final instanceId = event.metadata['workflowInstanceId'];
final nodeId = event.metadata['nodeId']; // Store this when starting workflow
await engine.sendSignal(
workflowInstanceId: instanceId,
node: nodeId, // The signal wait node ID (e.g., 'awaitPayment')
payload: {
'transactionId': event.transactionId,
'amount': event.amount,
'currency': event.currency,
'paidAt': event.timestamp,
},
);
return Response.ok();
}User Actions
dart
// User completes a task in UI
Future<void> submitApproval(
String instanceId,
String nodeId, // The signal wait or user task node ID
String decision,
String? comments,
) async {
await engine.sendSignal(
workflowInstanceId: instanceId,
node: nodeId, // e.g., 'awaitApproval'
payload: {
'decision': decision,
'comments': comments,
'submittedBy': currentUserId,
'submittedAt': DateTime.now().toIso8601String(),
},
);
}Scheduled Jobs
dart
// Timer service sends signal when time expires
class TimerService {
Future<void> processExpiredTimers() async {
final expiredTimers = await timerRepository.findExpired();
for (final timer in expiredTimers) {
await engine.sendSignal(
workflowInstanceId: timer.workflowInstanceId,
node: timer.nodeId, // The node waiting for this signal
payload: {
'expired': true,
'expiredAt': DateTime.now().toIso8601String(),
},
);
await timerRepository.markProcessed(timer.id);
}
}
}External System Callbacks
dart
// POST /callbacks/verification
Future<Response> handleVerificationCallback(Request request) async {
final result = VerificationResult.fromJson(await request.json());
await engine.sendSignal(
workflowInstanceId: result.referenceId,
node: result.nodeId, // The signal wait node ID from the callback data
payload: {
'verified': result.success,
'verificationId': result.id,
'failureReason': result.failureReason,
},
);
return Response.ok();
}Signal Payload Storage
Payload is stored at the storeAs path:
dart
// Workflow definition
builder.signalWait('awaitApproval',
signal: 'approval_decision',
storeAs: 'level1Decision', // Payload stored here
);
// Signal sent using the node ID
await engine.sendSignal(
workflowInstanceId: instanceId,
node: 'awaitApproval', // The node ID, NOT the signal name
payload: {'decision': 'approved', 'comments': 'OK'},
);
// After signal, workflow output contains:
// output['level1Decision'] = {'decision': 'approved', 'comments': 'OK'}Signal Validation
The engine validates signals:
dart
try {
await engine.sendSignal(
workflowInstanceId: instanceId,
node: 'awaitApproval',
payload: {'decision': 'approved'},
);
} on WorkflowNotFoundException {
// Instance doesn't exist
}
// Note: If no matching token is found at the node, the signal
// is silently ignored (no exception thrown)Signal Idempotency
Implement idempotency in signal sources:
dart
// POST /webhooks/payment
Future<Response> handlePaymentWebhook(Request request) async {
final event = PaymentEvent.fromJson(await request.json());
// Check if already processed
final processed = await processedSignals.exists(event.id);
if (processed) {
return Response.ok(); // Already handled
}
await engine.sendSignal(...);
// Mark as processed
await processedSignals.add(event.id);
return Response.ok();
}Multiple Signals
A workflow can wait for signals at multiple points:
dart
builder
.signalWait('awaitSubmission', signal: 'draft_submitted', storeAs: 'submission')
.signalWait('awaitLevel1', signal: 'level1_decision', storeAs: 'level1')
.signalWait('awaitLevel2', signal: 'level2_decision', storeAs: 'level2');Each signal wait is processed sequentially as the workflow progresses. Send signals to each node using its node ID:
dart
await engine.sendSignal(workflowInstanceId: id, node: 'awaitSubmission', payload: {...});
// Later, after workflow progresses...
await engine.sendSignal(workflowInstanceId: id, node: 'awaitLevel1', payload: {...});Signal vs Events
| Aspect | Signal | Event |
|---|---|---|
| Direction | External → Engine | Engine → External |
| Purpose | Resume workflow | Audit/notification |
| Effect | Changes workflow state | Read-only record |
Best Practices
- Use descriptive signal names -
order_payment_completednotsignal1 - Include timestamps - For audit and debugging
- Validate payloads - In downstream task executors
- Implement idempotency - Handle duplicate signals
- Log all signals - For debugging and compliance
Next Steps
- Storage Adapters - Persistence
- Signal Wait - Signal wait nodes
- Error Handling - Error patterns