Skip to content

Storage Adapters

Storage adapters abstract persistence, allowing the workflow engine to work with any database.

WorkflowStorage Interface

dart
abstract class WorkflowStorage {
  /// Workflow storage (stores RawWorkflow, not typed Workflow)
  WorkflowRepository get workflows;

  /// Running workflow instances
  WorkflowInstanceRepository get instances;

  /// Human tasks (inbox items)
  UserTaskInstanceRepository get userTaskInstances;

  /// Audit trail (event log)
  WorkflowEventRepository get events;

  /// Execute operations in a transaction
  Future<T> transaction<T>(Future<T> Function() operation);

  /// Initialize connections
  Future<void> initialize();

  /// Clean up resources
  Future<void> dispose();

  /// Check if storage is healthy and accessible
  Future<bool> healthCheck();

  /// Get storage statistics
  Future<StorageStats> getStats();
}

Design Philosophy

Storage handles raw data; the engine handles typed deserialization:

  • RawWorkflow - Storage model containing JSON template, metadata
  • Workflow - Typed model with resolved NodeConfiguration and ConditionExecutors
  • WorkflowDeserializationContext - Provided to engine for type resolution

This separation allows storage to remain a pure data layer while the engine handles all type-safe business logic.

Repository Interfaces

WorkflowRepository

Manages workflow definitions using RawWorkflow (raw storage model):

dart
abstract class WorkflowRepository {
  /// Get raw workflow by ID
  Future<RawWorkflow?> getById(String id);

  /// Get raw workflow by code (optionally with specific version)
  Future<RawWorkflow?> getByCode(String code, {int? version});

  /// Get all versions of a workflow
  Future<List<RawWorkflow>> getAllVersions(String code);

  /// List workflows with pagination
  Future<List<RawWorkflow>> list({int limit, int offset, bool activeOnly});

  /// Search by name or code
  Future<List<RawWorkflow>> search(String query, {int limit});

  /// Create a new workflow
  Future<RawWorkflow> create(RawWorkflow workflow);

  /// Update an existing workflow
  Future<RawWorkflow> update(RawWorkflow workflow);

  /// Delete a workflow
  Future<bool> delete(String code, {int? version});

  /// Activate/deactivate
  Future<bool> activate(String code, {int? version});
  Future<bool> deactivate(String code, {int? version});

  /// Check if code exists
  Future<bool> codeExists(String code);

  /// Get next version number
  Future<int> getNextVersion(String code);
}

The engine deserializes RawWorkflow to typed Workflow using its deserialization context.

WorkflowInstanceRepository

dart
abstract class WorkflowInstanceRepository {
  Future<WorkflowInstance?> getById(String id);
  Future<List<WorkflowInstance>> query(WorkflowInstanceQuery query);
  Future<List<WorkflowInstance>> listByDefinition(String workflowId, {int limit, int offset});
  Future<List<WorkflowInstance>> listByStatus(WorkflowStatus status, {int limit, int offset});
  Future<List<WorkflowInstance>> listActive({int limit, int offset});
  Future<List<WorkflowInstance>> getByCorrelationId(String correlationId);
  Future<List<WorkflowInstance>> getChildInstances(String parentInstanceId);
  Future<WorkflowInstance> create(WorkflowInstance instance);
  Future<WorkflowInstance> update(WorkflowInstance instance);
  Future<bool> updateStatus(String id, WorkflowStatus status);
  Future<bool> updateOutput(String id, Map<String, dynamic> output);
  Future<bool> updateTokens(String id, List<WorkflowToken> tokens);
  Future<bool> delete(String id);
  Future<Map<WorkflowStatus, int>> countByStatus();
  Future<List<WorkflowInstance>> getStaleRunningInstances({required Duration staleThreshold});
}

UserTaskInstanceRepository

dart
abstract class UserTaskInstanceRepository {
  Future<UserTaskInstance?> getById(String id);
  Future<List<UserTaskInstance>> getByWorkflowInstanceId(String workflowInstanceId);
  Future<List<UserTaskInstance>> getByStatus(TaskStatus status);
  Future<List<UserTaskInstance>> getPendingByRoleId(String roleId);
  Future<List<UserTaskInstance>> getPendingByRoleIds(List<String> roleIds);
  Future<List<UserTaskInstance>> getPendingByUserId(String userId);
  Future<List<UserTaskInstance>> getPendingByGroupId(String groupId);
  Future<UserTaskInstance> create(UserTaskInstance task);
  Future<UserTaskInstance> update(UserTaskInstance task);
  Future<void> delete(String id);
  Future<UserTaskInstance> claim(String taskId, String userId);
  Future<UserTaskInstance> complete(String taskId, String userId, Map<String, dynamic> output);
  Future<UserTaskInstance> cancel(String taskId);
  Future<int> cancelByWorkflowInstanceId(String workflowInstanceId);
  Future<UserTaskInstance?> findActive({required String workflowInstanceId, required String nodeId});
  Future<List<UserTaskInstance>> search({String? workflowInstanceId, String? assignedToUserId, ...});
}

WorkflowEventRepository

dart
abstract class WorkflowEventRepository {
  Future<WorkflowEvent> append(WorkflowEvent event);
  Future<List<WorkflowEvent>> appendAll(List<WorkflowEvent> events);
  Future<WorkflowEvent?> getById(String id);
  Future<List<WorkflowEvent>> getByWorkflowInstanceId(
    String workflowInstanceId, {
    List<WorkflowEventType>? eventTypes,
    int? limit,
    int? offset,
  });
  Future<List<WorkflowEvent>> query(WorkflowEventQuery query);
  Future<WorkflowEvent?> getLatestByWorkflowInstanceId(String workflowInstanceId);
  Future<List<WorkflowEvent>> getByNodeId(String workflowInstanceId, String nodeId);
  Future<List<WorkflowEvent>> getByTaskId(String taskId);
  Future<List<WorkflowEvent>> getSummary(String workflowInstanceId);
  Future<int> deleteByWorkflowInstanceId(String workflowInstanceId);
  Future<int> pruneOlderThan(DateTime cutoff);
}

In-Memory Storage

For testing and development, use the built-in InMemoryStorage:

dart
// Create context with your descriptors
final context = RegistryDeserializationContext(
  descriptors: [
    DefaultWorkflowDescriptor(),
    myAppDescriptor,
  ],
);

// Create in-memory storage with the context
final storage = InMemoryStorage(context: context);

// Create engine with context and storage
final engine = WorkflowEngine(
  context: context,
  storage: storage,
);
await engine.initialize();

WARNING

InMemoryStorage loses all data when the process restarts. Use only for testing and development.

Implementing a Custom Storage

PostgreSQL Example

dart
class PostgresWorkflowStorage implements WorkflowStorage {
  final PostgresPool _pool;

  late final PostgresWorkflowRepository _workflows;
  late final PostgresInstanceRepository _instances;
  late final PostgresUserTaskRepository _userTasks;
  late final PostgresEventRepository _events;

  PostgresWorkflowStorage(String connectionString)
      : _pool = PostgresPool(connectionString);

  @override
  WorkflowRepository get workflows => _workflows;

  @override
  WorkflowInstanceRepository get instances => _instances;

  @override
  UserTaskInstanceRepository get userTaskInstances => _userTasks;

  @override
  WorkflowEventRepository get events => _events;

  @override
  Future<void> initialize() async {
    await _pool.open();
    _workflows = PostgresWorkflowRepository(_pool);
    _instances = PostgresInstanceRepository(_pool);
    _userTasks = PostgresUserTaskRepository(_pool);
    _events = PostgresEventRepository(_pool);
  }

  @override
  Future<T> transaction<T>(Future<T> Function() operation) async {
    return await _pool.runTx((session) async {
      return await operation();
    });
  }

  @override
  Future<void> dispose() async {
    await _pool.close();
  }

  @override
  Future<bool> healthCheck() async {
    try {
      await _pool.query('SELECT 1');
      return true;
    } catch (_) {
      return false;
    }
  }

  @override
  Future<StorageStats> getStats() async {
    final defCount = await _pool.query('SELECT COUNT(*) FROM workflows');
    final instCount = await _pool.query('SELECT COUNT(*) FROM workflow_instances');
    final activeCount = await _pool.query(
      "SELECT COUNT(*) FROM workflow_instances WHERE status NOT IN ('completed', 'failed', 'cancelled')"
    );
    return StorageStats(
      totalDefinitions: defCount.first['count'] as int,
      totalInstances: instCount.first['count'] as int,
      activeInstances: activeCount.first['count'] as int,
    );
  }
}

Using Your Custom Storage

dart
final context = RegistryDeserializationContext(
  descriptors: [DefaultWorkflowDescriptor(), myDescriptor],
);

final storage = PostgresWorkflowStorage(connectionString);

final engine = WorkflowEngine(
  context: context,
  storage: storage,
);
await engine.initialize();

Repository Implementation

dart
class PostgresInstanceRepository extends WorkflowInstanceRepository {
  final PostgresPool _pool;

  PostgresInstanceRepository(this._pool);

  @override
  Future<WorkflowInstance?> getById(String id) async {
    final result = await _pool.query(
      'SELECT * FROM workflow_instances WHERE id = @id',
      {'id': id},
    );

    if (result.isEmpty) return null;
    return _mapToInstance(result.first);
  }

  @override
  Future<List<WorkflowInstance>> listByStatus(
    WorkflowStatus status, {
    int limit = 100,
    int offset = 0,
  }) async {
    final result = await _pool.query(
      'SELECT * FROM workflow_instances WHERE status = @status LIMIT @limit OFFSET @offset',
      {'status': status.name, 'limit': limit, 'offset': offset},
    );

    return result.map(_mapToInstance).toList();
  }

  @override
  Future<WorkflowInstance> create(WorkflowInstance instance) async {
    await _pool.execute('''
      INSERT INTO workflow_instances (id, workflow_id, status, input, output, tokens, created_at)
      VALUES (@id, @defId, @status, @input, @output, @tokens, @createdAt)
    ''', {
      'id': instance.id,
      'defId': instance.workflowId,
      'status': instance.status.name,
      'input': jsonEncode(instance.input),
      'output': jsonEncode(instance.output),
      'tokens': jsonEncode(instance.tokens.map((t) => t.toJson()).toList()),
      'createdAt': instance.createdAt,
    });

    return instance;
  }

  @override
  Future<WorkflowInstance> update(WorkflowInstance instance) async {
    await _pool.execute('''
      UPDATE workflow_instances
      SET status = @status, output = @output, tokens = @tokens, completed_at = @completedAt
      WHERE id = @id
    ''', {
      'id': instance.id,
      'status': instance.status.name,
      'output': jsonEncode(instance.output),
      'tokens': jsonEncode(instance.tokens.map((t) => t.toJson()).toList()),
      'completedAt': instance.completedAt,
    });

    return instance;
  }

  WorkflowInstance _mapToInstance(Map<String, dynamic> row) {
    return WorkflowInstance(
      id: row['id'],
      workflowId: row['workflow_id'],
      status: WorkflowStatus.values.byName(row['status']),
      input: jsonDecode(row['input']),
      output: jsonDecode(row['output']),
      tokens: (jsonDecode(row['tokens']) as List)
          .map((t) => WorkflowToken.fromJson(t))
          .toList(),
      createdAt: row['created_at'],
      completedAt: row['completed_at'],
    );
  }
}

Database Schema

PostgreSQL Schema Example

sql
CREATE TABLE workflows (
  id VARCHAR(255) PRIMARY KEY,
  code VARCHAR(100) NOT NULL,
  name VARCHAR(255) NOT NULL,
  version INT NOT NULL DEFAULT 1,
  nodes JSONB NOT NULL,
  edges JSONB NOT NULL,
  is_active BOOLEAN NOT NULL DEFAULT true,
  created_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE TABLE workflow_instances (
  id VARCHAR(255) PRIMARY KEY,
  workflow_id VARCHAR(255) NOT NULL,
  status VARCHAR(50) NOT NULL,
  input JSONB NOT NULL,
  output JSONB NOT NULL,
  tokens JSONB NOT NULL,
  created_at TIMESTAMP NOT NULL,
  completed_at TIMESTAMP,
  FOREIGN KEY (workflow_id) REFERENCES workflows(id)
);

CREATE TABLE user_task_instances (
  id VARCHAR(255) PRIMARY KEY,
  workflow_instance_id VARCHAR(255) NOT NULL,
  node_id VARCHAR(255) NOT NULL,
  signal_name VARCHAR(255) NOT NULL,
  schema_type VARCHAR(100) NOT NULL,
  title VARCHAR(500) NOT NULL,
  description TEXT,
  assigned_to_role_id VARCHAR(255),
  assigned_to_user_id VARCHAR(255),
  status VARCHAR(50) NOT NULL,
  priority VARCHAR(50) NOT NULL,
  input JSONB NOT NULL,
  output JSONB,
  due_date TIMESTAMP,
  created_at TIMESTAMP NOT NULL,
  completed_at TIMESTAMP,
  FOREIGN KEY (workflow_instance_id) REFERENCES workflow_instances(id)
);

CREATE TABLE workflow_events (
  id VARCHAR(255) PRIMARY KEY,
  workflow_instance_id VARCHAR(255) NOT NULL,
  event_type VARCHAR(100) NOT NULL,
  node_id VARCHAR(255),
  token_id VARCHAR(255),
  data JSONB,
  created_at TIMESTAMP NOT NULL,
  FOREIGN KEY (workflow_instance_id) REFERENCES workflow_instances(id)
);

-- Indexes
CREATE INDEX idx_instances_status ON workflow_instances(status);
CREATE INDEX idx_instances_definition ON workflow_instances(workflow_id);
CREATE INDEX idx_tasks_status ON user_task_instances(status);
CREATE INDEX idx_tasks_role ON user_task_instances(assigned_to_role_id);
CREATE INDEX idx_tasks_user ON user_task_instances(assigned_to_user_id);
CREATE INDEX idx_events_instance ON workflow_events(workflow_instance_id);

Transactions

Use transactions for atomic operations:

dart
await storage.transaction(() async {
  final instance = await storage.instances.getById(instanceId);
  if (instance != null) {
    await storage.instances.updateOutput(instanceId, {'key': 'value'});
    await storage.events.append(WorkflowEvent(...));
  }
});

Best Practices

  1. Use transactions - For multi-step updates
  2. Index frequently queried columns - Status, role, user
  3. Store JSON efficiently - Use JSONB in PostgreSQL
  4. Clean up old data - Archive completed workflows
  5. Monitor performance - Query execution times

Next Steps