Skip to content

Signals

Signals are external messages that resume waiting workflows. They're the primary mechanism for async operations and external integrations.

Sending Signals

dart
await engine.sendSignal(
  workflowInstanceId: instanceId,
  node: 'awaitApproval',  // The signal wait node ID
  payload: {
    'decision': 'approved',
    'approvedBy': '[email protected]',
    'comments': 'Looks good!',
    'approvedAt': DateTime.now().toIso8601String(),
  },
);

Typed Signal Payloads

For type safety, use @JsonSerializable models for signal payloads:

dart
import 'package:json_annotation/json_annotation.dart';

part 'approval_payload.g.dart';

@JsonSerializable()
class ApprovalPayload {
  final String decision;
  final String approvedBy;
  final String? comments;
  final DateTime approvedAt;

  ApprovalPayload({
    required this.decision,
    required this.approvedBy,
    this.comments,
    required this.approvedAt,
  });

  factory ApprovalPayload.fromJson(Map<String, dynamic> json) =>
      _$ApprovalPayloadFromJson(json);

  Map<String, dynamic> toJson() => _$ApprovalPayloadToJson(this);
}

// Usage with typed payload
final payload = ApprovalPayload(
  decision: 'approved',
  approvedBy: currentUserId,
  comments: 'Looks good!',
  approvedAt: DateTime.now(),
);

await engine.sendSignal(
  workflowInstanceId: instanceId,
  node: 'awaitApproval',
  payload: payload.toJson(),  // Serialize to JSON
);

Reading Typed Payloads in Executors

dart
class ProcessApprovalExecutor extends TypedTaskExecutor<ApprovalPayload, ProcessResult> {
  @override
  String get schemaType => 'task.approval.process';

  @override
  String get name => 'Process Approval';

  @override
  ApprovalPayload fromInput(Map<String, dynamic> input) =>
      ApprovalPayload.fromJson(input['approvalDecision']);

  @override
  Map<String, dynamic> toOutput(ProcessResult output) => output.toJson();

  @override
  Future<ProcessResult> executeTyped(
    ApprovalPayload input,
    ExecutionContext context,
  ) async {
    // Fully typed access to payload!
    // Use getAny<T> to access data from earlier nodes
    final entityId = context.getAny<String>('entityId')!;

    if (input.decision == 'approved') {
      await entityService.approve(entityId);
    }

    return ProcessResult(
      processed: true,
      processedBy: input.approvedBy,
      processedAt: DateTime.now(),
    );
  }
}

Signal Processing Flow

Step Details:

  1. engine.sendSignal() called with workflowInstanceId, node, and payload
  2. Load workflow instance from storage
  3. Validate: status must be waitingForSignal, signal name must match waiting node
  4. Find waiting token at signal wait node
  5. Get node executor for signal wait node
  6. Call executor.onSignalReceived(context, signalName, payload)
  7. Handler returns NodeResult, payload stored at storeAs path
  8. Engine processes result, moves token
  9. Continue workflow execution

Signal Sources

Webhook Handlers

dart
// POST /webhooks/payment
Future<Response> handlePaymentWebhook(Request request) async {
  final event = PaymentEvent.fromJson(await request.json());
  final instanceId = event.metadata['workflowInstanceId'];
  final nodeId = event.metadata['nodeId'];  // Store this when starting workflow

  await engine.sendSignal(
    workflowInstanceId: instanceId,
    node: nodeId,  // The signal wait node ID (e.g., 'awaitPayment')
    payload: {
      'transactionId': event.transactionId,
      'amount': event.amount,
      'currency': event.currency,
      'paidAt': event.timestamp,
    },
  );

  return Response.ok();
}

User Actions

dart
// User completes a task in UI
Future<void> submitApproval(
  String instanceId,
  String nodeId,  // The signal wait or user task node ID
  String decision,
  String? comments,
) async {
  await engine.sendSignal(
    workflowInstanceId: instanceId,
    node: nodeId,  // e.g., 'awaitApproval'
    payload: {
      'decision': decision,
      'comments': comments,
      'submittedBy': currentUserId,
      'submittedAt': DateTime.now().toIso8601String(),
    },
  );
}

Scheduled Jobs

dart
// Timer service sends signal when time expires
class TimerService {
  Future<void> processExpiredTimers() async {
    final expiredTimers = await timerRepository.findExpired();

    for (final timer in expiredTimers) {
      await engine.sendSignal(
        workflowInstanceId: timer.workflowInstanceId,
        node: timer.nodeId,  // The node waiting for this signal
        payload: {
          'expired': true,
          'expiredAt': DateTime.now().toIso8601String(),
        },
      );

      await timerRepository.markProcessed(timer.id);
    }
  }
}

External System Callbacks

dart
// POST /callbacks/verification
Future<Response> handleVerificationCallback(Request request) async {
  final result = VerificationResult.fromJson(await request.json());

  await engine.sendSignal(
    workflowInstanceId: result.referenceId,
    node: result.nodeId,  // The signal wait node ID from the callback data
    payload: {
      'verified': result.success,
      'verificationId': result.id,
      'failureReason': result.failureReason,
    },
  );

  return Response.ok();
}

Signal Payload Storage

Payload is stored at the storeAs path:

dart
// Workflow definition
builder.signalWait('awaitApproval',
  signal: 'approval_decision',
  storeAs: 'level1Decision',  // Payload stored here
);

// Signal sent using the node ID
await engine.sendSignal(
  workflowInstanceId: instanceId,
  node: 'awaitApproval',  // The node ID, NOT the signal name
  payload: {'decision': 'approved', 'comments': 'OK'},
);

// After signal, workflow output contains:
// output['level1Decision'] = {'decision': 'approved', 'comments': 'OK'}

Signal Validation

The engine validates signals:

dart
try {
  await engine.sendSignal(
    workflowInstanceId: instanceId,
    node: 'awaitApproval',
    payload: {'decision': 'approved'},
  );
} on WorkflowNotFoundException {
  // Instance doesn't exist
}
// Note: If no matching token is found at the node, the signal
// is silently ignored (no exception thrown)

Signal Idempotency

Implement idempotency in signal sources:

dart
// POST /webhooks/payment
Future<Response> handlePaymentWebhook(Request request) async {
  final event = PaymentEvent.fromJson(await request.json());

  // Check if already processed
  final processed = await processedSignals.exists(event.id);
  if (processed) {
    return Response.ok();  // Already handled
  }

  await engine.sendSignal(...);

  // Mark as processed
  await processedSignals.add(event.id);

  return Response.ok();
}

Multiple Signals

A workflow can wait for signals at multiple points:

dart
builder
    .signalWait('awaitSubmission', signal: 'draft_submitted', storeAs: 'submission')
    .signalWait('awaitLevel1', signal: 'level1_decision', storeAs: 'level1')
    .signalWait('awaitLevel2', signal: 'level2_decision', storeAs: 'level2');

Each signal wait is processed sequentially as the workflow progresses. Send signals to each node using its node ID:

dart
await engine.sendSignal(workflowInstanceId: id, node: 'awaitSubmission', payload: {...});
// Later, after workflow progresses...
await engine.sendSignal(workflowInstanceId: id, node: 'awaitLevel1', payload: {...});

Signal vs Events

AspectSignalEvent
DirectionExternal → EngineEngine → External
PurposeResume workflowAudit/notification
EffectChanges workflow stateRead-only record

Best Practices

  1. Use descriptive signal names - order_payment_completed not signal1
  2. Include timestamps - For audit and debugging
  3. Validate payloads - In downstream task executors
  4. Implement idempotency - Handle duplicate signals
  5. Log all signals - For debugging and compliance

Next Steps