Skip to content

Idempotency

Patterns for crash recovery and safe retries.

Why Idempotency Matters

Workflows can be interrupted at any point:

  • Server crashes
  • Network failures
  • Database errors
  • Process restarts

The engine resumes workflows from their last saved position, which means executors may be re-executed. Idempotent executors produce the same result whether executed once or multiple times.

Task Executor Idempotency

Check-Before-Create Pattern

dart
class CreateOrderTaskExecutor extends TaskExecutor {
  @override
  String get schemaType => 'task.createOrder';

  @override
  String get name => 'Create Order';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    final orderId = context.getRequired<String>('orderId');

    // IDEMPOTENT: Check if already exists
    final existing = await orderRepository.findById(orderId);
    if (existing != null) {
      // Return existing result - same as if we created it
      return TaskSuccess(output: {
        'orderId': existing.id,
        'status': 'exists',
        'createdAt': existing.createdAt.toIso8601String(),
      });
    }

    // Create new order - use input from previous node
    final order = await orderRepository.create(orderId, context.input);
    return TaskSuccess(output: {
      'orderId': order.id,
      'status': 'created',
      'createdAt': order.createdAt.toIso8601String(),
    });
  }
}

Idempotency Key Pattern

dart
class PaymentTaskExecutor extends TaskExecutor {
  @override
  String get schemaType => 'task.payment';

  @override
  String get name => 'Process Payment';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    // Use workflow instance ID + node ID as idempotency key
    final idempotencyKey = '${context.workflowInstance.id}_${context.currentNode.id}';

    // Check if payment already processed
    final existing = await paymentService.findByIdempotencyKey(idempotencyKey);
    if (existing != null) {
      return TaskSuccess(output: {
        'paymentId': existing.id,
        'status': 'already_processed',
      });
    }

    // Process payment with idempotency key
    final payment = await paymentService.charge(
      amount: context.getRequired<num>('amount'),
      idempotencyKey: idempotencyKey,
    );

    return TaskSuccess(output: {
      'paymentId': payment.id,
      'status': 'processed',
    });
  }
}

Record Progress Pattern

dart
class BulkProcessTaskExecutor extends TaskExecutor {
  @override
  String get schemaType => 'task.bulkProcess';

  @override
  String get name => 'Bulk Process Items';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    final items = context.getRequired<List>('items');
    final processedIds = Set<String>.from(context.get<List>('processedIds') ?? []);

    for (final item in items) {
      final itemId = item['id'] as String;

      // Skip already processed items
      if (processedIds.contains(itemId)) {
        continue;
      }

      // Process item
      await processItem(item);
      processedIds.add(itemId);

      // Save progress (for long-running tasks)
      // This allows resume from last processed item
    }

    return TaskSuccess(output: {
      'processedIds': processedIds.toList(),
      'totalProcessed': processedIds.length,
    });
  }
}

User Task Idempotency

The engine handles user task idempotency automatically:

dart
// In the engine's user task node handling
Future<NodeResult> executeUserTaskNode(WorkflowContext context) async {
  final nodeId = context.currentNode.id;
  final instanceId = context.instance.id;

  // IDEMPOTENT: Check for existing active task
  final existing = await storage.userTaskInstances.findActive(instanceId, nodeId);
  if (existing != null) {
    // Task already exists, just wait for it
    return WaitForSignalResult(signalName: existing.signalName);
  }

  // Create new task
  final task = await createUserTask(context);
  return WaitForSignalResult(signalName: task.signalName);
}

Signal Idempotency

Prevent duplicate signal processing:

dart
// In your webhook handler
@PostMapping('/webhooks/payment')
Future<Response> handlePaymentWebhook(PaymentEvent event) async {
  // Use event ID as idempotency key
  final eventId = event.id;

  // Check if already processed
  if (await processedEvents.contains(eventId)) {
    return Response.ok();  // Already handled, ignore
  }

  try {
    await engine.sendSignal(
      workflowInstanceId: event.metadata['workflowInstanceId'],
      signalName: 'payment_completed',
      payload: event.data,
    );

    // Mark as processed
    await processedEvents.add(eventId);
  } catch (e) {
    // Log but don't re-add to processed
    logger.error('Failed to process event: $eventId', e);
  }

  return Response.ok();
}

Database Operations

Upsert Pattern

dart
class UpdateStatusTaskExecutor extends TaskExecutor {
  @override
  String get schemaType => 'task.updateStatus';

  @override
  String get name => 'Update Status';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    final entityId = context.getRequired<String>('entityId');
    final newStatus = context.getRequired<String>('status');

    // IDEMPOTENT: Upsert instead of insert
    await db.execute('''
      INSERT INTO entity_status (entity_id, status, updated_at)
      VALUES (@entityId, @status, @now)
      ON CONFLICT (entity_id) DO UPDATE
      SET status = @status, updated_at = @now
    ''', {
      'entityId': entityId,
      'status': newStatus,
      'now': DateTime.now(),
    });

    return TaskSuccess(output: {'statusUpdated': true});
  }
}

Conditional Update Pattern

dart
class IncrementCounterTaskExecutor extends TaskExecutor {
  @override
  String get schemaType => 'task.incrementCounter';

  @override
  String get name => 'Increment Counter';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    final entityId = context.getRequired<String>('entityId');
    final expectedVersion = context.getRequired<int>('version');

    // IDEMPOTENT: Only update if version matches
    final updated = await db.execute('''
      UPDATE entities
      SET counter = counter + 1, version = version + 1
      WHERE id = @id AND version = @version
    ''', {
      'id': entityId,
      'version': expectedVersion,
    });

    if (updated == 0) {
      // Already incremented (version changed)
      final current = await db.query('SELECT * FROM entities WHERE id = @id', {'id': entityId});
      return TaskSuccess(output: {
        'counter': current.first['counter'],
        'version': current.first['version'],
        'alreadyIncremented': true,
      });
    }

    return TaskSuccess(output: {'incremented': true});
  }
}

External Service Calls

Pass-Through Idempotency Key

dart
class ExternalApiTaskExecutor extends TaskExecutor {
  @override
  String get schemaType => 'task.externalApi';

  @override
  String get name => 'External API Call';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    final idempotencyKey = '${context.workflowInstance.id}_${context.currentNode.id}';

    // External service uses idempotency key
    final response = await httpClient.post(
      '/api/process',
      headers: {'Idempotency-Key': idempotencyKey},
      body: context.input,  // Pass previous node output as request body
    );

    return TaskSuccess(output: response.data);
  }
}

Testing Idempotency

dart
test('task executor is idempotent', () async {
  final executor = CreateOrderTaskExecutor();
  final context = createTestContext(input: {'orderId': 'ORD-1'});

  // First execution
  final result1 = await executor.execute(context);
  expect(result1, isA<TaskSuccess>());

  // Second execution (simulating retry)
  final result2 = await executor.execute(context);
  expect(result2, isA<TaskSuccess>());

  // Results should be equivalent
  expect((result2 as TaskSuccess).output['orderId'], equals('ORD-1'));

  // Only one order should exist
  final orders = await orderRepository.findAll();
  expect(orders.length, equals(1));
});

Best Practices

  1. Always check before create - Avoid duplicates
  2. Use idempotency keys - Unique identifiers for operations
  3. Prefer upserts - Over separate check-then-insert
  4. Track processed items - For bulk operations
  5. Test retry scenarios - Verify idempotency works

Next Steps