Skip to content

WorkflowStorage

Abstract interface for workflow persistence.

Overview

WorkflowStorage defines the contract for persisting workflow definitions, instances, tokens, and user tasks. Implementations provide the actual storage mechanism (PostgreSQL, MySQL, MongoDB, etc.).

Design Philosophy

Storage handles both raw storage and deserialization:

  • Stores raw workflow data ([RawWorkflow]) for create/update operations
  • The engine is responsible for deserializing to typed [Workflow] for execution
  • Deserialization context is provided to the engine, not storage

This separation allows:

  • Storage to remain a pure data layer
  • Different deserialization strategies for different environments
  • Clear boundaries between persistence and business logic

Interface

dart
abstract class WorkflowStorage {
  // Repositories
  WorkflowRepository get workflows;
  WorkflowInstanceRepository get instances;
  UserTaskInstanceRepository get userTaskInstances;
  WorkflowEventRepository get events;

  // Transaction support
  Future<T> transaction<T>(Future<T> Function() operation);

  // Lifecycle
  Future<void> initialize();
  Future<void> dispose();

  // Health and stats
  Future<bool> healthCheck();
  Future<StorageStats> getStats();
}

Repository Interfaces

The WorkflowStorage exposes four main repositories for workflow persistence.

WorkflowRepository

Manages workflows definitions using [RawWorkflow] - the raw storage model.

dart
abstract class WorkflowRepository {
  /// Create a new workflow
  Future<RawWorkflow> create(RawWorkflow workflow);

  /// Update an existing workflow
  Future<RawWorkflow> update(RawWorkflow workflow);

  /// Get a workflow by its unique ID (UUID)
  Future<RawWorkflow?> getById(String id);

  /// Get a workflow by code, optionally specifying a version
  /// If version is not provided, returns the latest version
  Future<RawWorkflow?> getByCode(String code, {int? version});

  /// Get all versions of a workflow by code
  Future<List<RawWorkflow>> getAllVersions(String code);

  /// List all workflows
  Future<List<RawWorkflow>> list({
    int limit = 100,
    int offset = 0,
    bool activeOnly = false,
  });

  /// Search workflows by name or code
  Future<List<RawWorkflow>> search(String query, {int limit = 100});

  /// Delete a workflow by code, optionally specifying a version
  Future<bool> delete(String code, {int? version});

  /// Activate/deactivate a workflow
  Future<bool> activate(String code, {int? version});
  Future<bool> deactivate(String code, {int? version});

  /// Check if a workflow code exists
  Future<bool> codeExists(String code);

  /// Get the next version number for a workflow code
  Future<int> getNextVersion(String code);
}

WorkflowInstanceRepository

Manages running workflow instances.

dart
/// Query parameters for listing workflow instances
class WorkflowInstanceQuery {
  final String? workflowId;
  final List<WorkflowStatus>? statuses;
  final String? correlationId;
  final String? parentInstanceId;
  final DateTime? createdAfter;
  final DateTime? createdBefore;
  final int limit;
  final int offset;
}

abstract class WorkflowInstanceRepository {
  Future<WorkflowInstance> create(WorkflowInstance instance);
  Future<WorkflowInstance> update(WorkflowInstance instance);
  Future<WorkflowInstance?> getById(String id);

  /// Query with flexible filtering
  Future<List<WorkflowInstance>> query(WorkflowInstanceQuery query);

  /// Convenience methods
  Future<List<WorkflowInstance>> listByDefinition(String workflowId, {int limit, int offset});
  Future<List<WorkflowInstance>> listByStatus(WorkflowStatus status, {int limit, int offset});
  Future<List<WorkflowInstance>> listActive({int limit, int offset});
  Future<List<WorkflowInstance>> getByCorrelationId(String correlationId);
  Future<List<WorkflowInstance>> getChildInstances(String parentInstanceId);

  /// Partial updates
  Future<bool> updateStatus(String id, WorkflowStatus status);
  Future<bool> mergeOutput(String id, Map<String, dynamic> output);
  Future<bool> updateTokens(String id, List<WorkflowToken> tokens);

  /// Recovery support
  Future<List<WorkflowInstance>> getStaleRunningInstances({
    required Duration staleThreshold,
    int limit = 100,
  });

  Future<bool> delete(String id);
  Future<Map<WorkflowStatus, int>> countByStatus();
}

UserTaskInstanceRepository

Manages user task instances (inbox items).

dart
abstract class UserTaskInstanceRepository {
  Future<UserTaskInstance> create(UserTaskInstance task);
  Future<UserTaskInstance?> getById(String id);

  /// Find an active (non-completed) user task for idempotency checks
  Future<UserTaskInstance?> findActive({
    required String workflowInstanceId,
    required String nodeId,
  });

  Future<List<UserTaskInstance>> query({
    String? workflowInstanceId,
    String? assignedToUserId,
    String? assignedToRoleId,
    TaskStatus? status,
    String? schemaType,
    int? limit,
    int? offset,
  });

  Future<void> update(UserTaskInstance task);
  Future<void> delete(String id);

  /// Cancel all pending user tasks for a workflow instance
  /// Called when workflow is cancelled or fails
  Future<void> cancelByWorkflowInstanceId(String workflowInstanceId);

  /// Complete a user task with output
  Future<void> complete(String taskId, Map<String, dynamic> output);
}

WorkflowEventRepository

Manages workflow events (audit trail).

dart
abstract class WorkflowEventRepository {
  Future<void> append(WorkflowEvent event);
  Future<List<WorkflowEvent>> getByInstanceId(String instanceId, {int? limit});
  Future<List<WorkflowEvent>> query({
    String? workflowInstanceId,
    String? eventType,
    DateTime? after,
    DateTime? before,
    int? limit,
    int? offset,
  });
}

Data Models

WorkflowInstance

dart
class WorkflowInstance {
  final String id;
  final String workflowId;           // ID of the Workflow
  final String workflowCode;         // Code of the Workflow
  final int workflowVersion;         // Version of the Workflow
  final WorkflowStatus status;
  final Map<String, dynamic> input;
  final Map<String, dynamic> output;
  final String? correlationId;
  final Map<String, dynamic> metadata;
  final List<WorkflowToken> tokens;  // Current execution tokens
  final DateTime createdAt;
  final DateTime updatedAt;
  final DateTime? completedAt;
  final String? errorMessage;
}

WorkflowToken

Tokens track the execution position within the workflow graph.

dart
class WorkflowToken {
  final String id;
  final String currentNodeId;
  final String? previousNodeId;
  final TokenState state;          // active, waiting, consumed, dropped
  final String? parentTokenId;     // For parallel splits
  final String? branchId;          // For join tracking
  final DateTime createdAt;

  // State transition methods
  WorkflowToken toWaiting();
  WorkflowToken toConsumed();
  WorkflowToken toDropped();

  // State checks
  bool get isActive => state == TokenState.active;
  bool get isTerminal => state == TokenState.consumed || state == TokenState.dropped;
}

enum TokenState {
  active,    // Moving through nodes
  waiting,   // Paused at wait node
  consumed,  // Terminal: reached end or merged
  dropped,   // Terminal: discarded late arrival
}

UserTaskInstance

dart
class UserTaskInstance extends TaskInstance {
  final String title;
  final String? description;
  final String? assignedToUserId;
  final String? assignedToRoleId;
  final String? assignedToGroupId;
  final String? signalName;
  final UserTaskPriority priority;
  final DateTime? dueAt;
  final String? claimedBy;
  final DateTime? claimedAt;
  final String? completedBy;
  final DateTime? completedAt;

  // Inherited from TaskInstance:
  // id, workflowInstanceId, nodeId, schemaType, status, input, output,
  // createdAt, updatedAt, startedAt, finishedAt, metadata
}

enum TaskStatus {
  pending,     // Created but not started
  running,     // Currently executing
  completed,   // Completed successfully
  failed,      // Failed (may be retried)
  cancelled,   // Cancelled
  timedOut,    // Timed out
}

enum UserTaskPriority {
  low,
  normal,
  high,
  urgent,
}

PostgreSQL Implementation

Reference implementation using PostgreSQL:

Schema

sql
-- Workflow definitions
CREATE TABLE workflows (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  definition_id VARCHAR(255) NOT NULL,
  version INTEGER NOT NULL DEFAULT 1,
  code VARCHAR(50) NOT NULL,
  name VARCHAR(255) NOT NULL,
  description TEXT,
  definition JSONB NOT NULL,
  is_active BOOLEAN NOT NULL DEFAULT true,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  UNIQUE(definition_id, version)
);

-- Workflow instances
CREATE TABLE workflow_instances (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  definition_id VARCHAR(255) NOT NULL,
  definition_version INTEGER NOT NULL,
  status VARCHAR(50) NOT NULL DEFAULT 'pending',
  input JSONB NOT NULL DEFAULT '{}',
  output JSONB NOT NULL DEFAULT '{}',
  correlation_id VARCHAR(255),
  metadata JSONB,
  error_message TEXT,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  completed_at TIMESTAMPTZ
);

CREATE INDEX idx_instances_status ON workflow_instances(status);
CREATE INDEX idx_instances_correlation ON workflow_instances(correlation_id);
CREATE INDEX idx_instances_definition ON workflow_instances(definition_id);

-- Execution tokens
CREATE TABLE workflow_tokens (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  instance_id UUID NOT NULL REFERENCES workflow_instances(id) ON DELETE CASCADE,
  current_node_id VARCHAR(255) NOT NULL,
  status VARCHAR(50) NOT NULL DEFAULT 'active',
  parent_token_id UUID REFERENCES workflow_tokens(id),
  local_data JSONB,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_tokens_instance ON workflow_tokens(instance_id);
CREATE INDEX idx_tokens_status ON workflow_tokens(status);

-- User tasks
CREATE TABLE user_task_instances (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  workflow_instance_id UUID NOT NULL REFERENCES workflow_instances(id) ON DELETE CASCADE,
  node_id VARCHAR(255) NOT NULL,
  schema_type VARCHAR(100) NOT NULL,
  title VARCHAR(500) NOT NULL,
  description TEXT,
  assigned_to_user VARCHAR(255),
  assigned_to_role VARCHAR(255),
  priority VARCHAR(50) NOT NULL DEFAULT 'normal',
  status VARCHAR(50) NOT NULL DEFAULT 'pending',
  input JSONB NOT NULL DEFAULT '{}',
  response JSONB,
  completed_by VARCHAR(255),
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  due_at TIMESTAMPTZ,
  completed_at TIMESTAMPTZ
);

CREATE INDEX idx_user_tasks_instance ON user_task_instances(workflow_instance_id);
CREATE INDEX idx_user_tasks_status ON user_task_instances(status);
CREATE INDEX idx_user_tasks_user ON user_task_instances(assigned_to_user);
CREATE INDEX idx_user_tasks_role ON user_task_instances(assigned_to_role);

-- Signal subscriptions
CREATE TABLE signal_subscriptions (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  instance_id UUID NOT NULL REFERENCES workflow_instances(id) ON DELETE CASCADE,
  node_id VARCHAR(255) NOT NULL,
  signal_name VARCHAR(255) NOT NULL,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  UNIQUE(instance_id, signal_name)
);

CREATE INDEX idx_signals_name ON signal_subscriptions(signal_name);

Implementation

dart
class PostgresWorkflowStorage implements WorkflowStorage {
  final PostgreSQLConnection _connection;

  PostgresWorkflowStorage({required String connectionString})
      : _connection = PostgreSQLConnection.fromUri(Uri.parse(connectionString));

  @override
  Future<void> initialize() async {
    await _connection.open();
  }

  @override
  Future<void> dispose() async {
    await _connection.close();
  }

  @override
  late final WorkflowRepository workflows = PostgresWorkflowRepository(_connection);

  @override
  late final WorkflowInstanceRepository instances = PostgresInstanceRepository(_connection);

  @override
  late final UserTaskInstanceRepository userTaskInstances = PostgresUserTaskRepository(_connection);

  @override
  late final WorkflowEventRepository events = PostgresEventRepository(_connection);

  @override
  Future<T> transaction<T>(Future<T> Function() operation) async {
    return await _connection.transaction((ctx) => operation());
  }

  @override
  Future<bool> healthCheck() async {
    // Check database connectivity
    return true;
  }

  @override
  Future<StorageStats> getStats() async {
    // Query counts from database
    return StorageStats(...);
  }
}

In-Memory Implementation

For testing and development, use the built-in InMemoryStorage:

dart
import 'package:vyuh_workflow_engine/storage/storage.dart';

// Create in-memory storage with a deserialization context
final context = RegistryDeserializationContext(
  descriptors: [DefaultWorkflowDescriptor(), myDescriptor],
);
final storage = InMemoryStorage(context: context);

// Use with the engine
final engine = WorkflowEngine(
  context: context,
  storage: storage,
);

The InMemoryStorage class is included in the package and provides:

  • Full repository implementations
  • Transaction support (simulated)
  • All query operations
  • Automatic cleanup

Custom Implementation

Create your own storage implementation:

dart
class MongoWorkflowStorage implements WorkflowStorage {
  final MongoClient _client;
  final String _databaseName;

  MongoWorkflowStorage({
    required String connectionUri,
    required String databaseName,
  }) : _client = MongoClient(connectionUri),
       _databaseName = databaseName;

  @override
  Future<void> initialize() async {
    await _client.connect();
  }

  @override
  Future<void> dispose() async {
    await _client.close();
  }

  @override
  late final WorkflowRepository workflows =
      MongoWorkflowRepository(_client.db(_databaseName));

  @override
  late final WorkflowInstanceRepository instances =
      MongoInstanceRepository(_client.db(_databaseName));

  @override
  late final UserTaskInstanceRepository userTaskInstances =
      MongoUserTaskRepository(_client.db(_databaseName));

  @override
  late final WorkflowEventRepository events =
      MongoEventRepository(_client.db(_databaseName));

  // ... implement transaction, healthCheck, getStats
}

Best Practices

1. Use Transactions

dart
Future<void> completeWorkflow(String instanceId, Map<String, dynamic> output) async {
  await db.transaction((txn) async {
    await txn.execute(
      'UPDATE workflow_instances SET status = @status, output = @output WHERE id = @id',
      {'id': instanceId, 'status': 'completed', 'output': jsonEncode(output)},
    );
    await txn.execute(
      'UPDATE workflow_tokens SET status = @status WHERE instance_id = @id',
      {'id': instanceId, 'status': 'completed'},
    );
  });
}

2. Handle Concurrency

dart
Future<bool> claimTask(String taskId, String userId) async {
  final result = await db.execute('''
    UPDATE user_task_instances
    SET assigned_to_user = @userId, status = 'assigned'
    WHERE id = @taskId AND status = 'pending'
  ''', {'taskId': taskId, 'userId': userId});

  return result.affectedRows > 0;
}

3. Index for Query Patterns

sql
-- Frequent queries
CREATE INDEX idx_instances_status_created
  ON workflow_instances(status, created_at DESC);

CREATE INDEX idx_tasks_role_status
  ON user_task_instances(assigned_to_role, status)
  WHERE status = 'pending';

See Also