WorkflowStorage
Abstract interface for workflow persistence.
Overview
WorkflowStorage defines the contract for persisting workflow definitions, instances, tokens, and user tasks. Implementations provide the actual storage mechanism (PostgreSQL, MySQL, MongoDB, etc.).
Design Philosophy
Storage handles both raw storage and deserialization:
- Stores raw workflow data ([RawWorkflow]) for create/update operations
- The engine is responsible for deserializing to typed [Workflow] for execution
- Deserialization context is provided to the engine, not storage
This separation allows:
- Storage to remain a pure data layer
- Different deserialization strategies for different environments
- Clear boundaries between persistence and business logic
Interface
abstract class WorkflowStorage {
// Repositories
WorkflowRepository get workflows;
WorkflowInstanceRepository get instances;
UserTaskInstanceRepository get userTaskInstances;
WorkflowEventRepository get events;
// Transaction support
Future<T> transaction<T>(Future<T> Function() operation);
// Lifecycle
Future<void> initialize();
Future<void> dispose();
// Health and stats
Future<bool> healthCheck();
Future<StorageStats> getStats();
}Repository Interfaces
The WorkflowStorage exposes four main repositories for workflow persistence.
WorkflowRepository
Manages workflows definitions using [RawWorkflow] - the raw storage model.
abstract class WorkflowRepository {
/// Create a new workflow
Future<RawWorkflow> create(RawWorkflow workflow);
/// Update an existing workflow
Future<RawWorkflow> update(RawWorkflow workflow);
/// Get a workflow by its unique ID (UUID)
Future<RawWorkflow?> getById(String id);
/// Get a workflow by code, optionally specifying a version
/// If version is not provided, returns the latest version
Future<RawWorkflow?> getByCode(String code, {int? version});
/// Get all versions of a workflow by code
Future<List<RawWorkflow>> getAllVersions(String code);
/// List all workflows
Future<List<RawWorkflow>> list({
int limit = 100,
int offset = 0,
bool activeOnly = false,
});
/// Search workflows by name or code
Future<List<RawWorkflow>> search(String query, {int limit = 100});
/// Delete a workflow by code, optionally specifying a version
Future<bool> delete(String code, {int? version});
/// Activate/deactivate a workflow
Future<bool> activate(String code, {int? version});
Future<bool> deactivate(String code, {int? version});
/// Check if a workflow code exists
Future<bool> codeExists(String code);
/// Get the next version number for a workflow code
Future<int> getNextVersion(String code);
}WorkflowInstanceRepository
Manages running workflow instances.
/// Query parameters for listing workflow instances
class WorkflowInstanceQuery {
final String? workflowId;
final List<WorkflowStatus>? statuses;
final String? correlationId;
final String? parentInstanceId;
final DateTime? createdAfter;
final DateTime? createdBefore;
final int limit;
final int offset;
}
abstract class WorkflowInstanceRepository {
Future<WorkflowInstance> create(WorkflowInstance instance);
Future<WorkflowInstance> update(WorkflowInstance instance);
Future<WorkflowInstance?> getById(String id);
/// Query with flexible filtering
Future<List<WorkflowInstance>> query(WorkflowInstanceQuery query);
/// Convenience methods
Future<List<WorkflowInstance>> listByDefinition(String workflowId, {int limit, int offset});
Future<List<WorkflowInstance>> listByStatus(WorkflowStatus status, {int limit, int offset});
Future<List<WorkflowInstance>> listActive({int limit, int offset});
Future<List<WorkflowInstance>> getByCorrelationId(String correlationId);
Future<List<WorkflowInstance>> getChildInstances(String parentInstanceId);
/// Partial updates
Future<bool> updateStatus(String id, WorkflowStatus status);
Future<bool> mergeOutput(String id, Map<String, dynamic> output);
Future<bool> updateTokens(String id, List<WorkflowToken> tokens);
/// Recovery support
Future<List<WorkflowInstance>> getStaleRunningInstances({
required Duration staleThreshold,
int limit = 100,
});
Future<bool> delete(String id);
Future<Map<WorkflowStatus, int>> countByStatus();
}UserTaskInstanceRepository
Manages user task instances (inbox items).
abstract class UserTaskInstanceRepository {
Future<UserTaskInstance> create(UserTaskInstance task);
Future<UserTaskInstance?> getById(String id);
/// Find an active (non-completed) user task for idempotency checks
Future<UserTaskInstance?> findActive({
required String workflowInstanceId,
required String nodeId,
});
Future<List<UserTaskInstance>> query({
String? workflowInstanceId,
String? assignedToUserId,
String? assignedToRoleId,
TaskStatus? status,
String? schemaType,
int? limit,
int? offset,
});
Future<void> update(UserTaskInstance task);
Future<void> delete(String id);
/// Cancel all pending user tasks for a workflow instance
/// Called when workflow is cancelled or fails
Future<void> cancelByWorkflowInstanceId(String workflowInstanceId);
/// Complete a user task with output
Future<void> complete(String taskId, Map<String, dynamic> output);
}WorkflowEventRepository
Manages workflow events (audit trail).
abstract class WorkflowEventRepository {
Future<void> append(WorkflowEvent event);
Future<List<WorkflowEvent>> getByInstanceId(String instanceId, {int? limit});
Future<List<WorkflowEvent>> query({
String? workflowInstanceId,
String? eventType,
DateTime? after,
DateTime? before,
int? limit,
int? offset,
});
}Data Models
WorkflowInstance
class WorkflowInstance {
final String id;
final String workflowId; // ID of the Workflow
final String workflowCode; // Code of the Workflow
final int workflowVersion; // Version of the Workflow
final WorkflowStatus status;
final Map<String, dynamic> input;
final Map<String, dynamic> output;
final String? correlationId;
final Map<String, dynamic> metadata;
final List<WorkflowToken> tokens; // Current execution tokens
final DateTime createdAt;
final DateTime updatedAt;
final DateTime? completedAt;
final String? errorMessage;
}WorkflowToken
Tokens track the execution position within the workflow graph.
class WorkflowToken {
final String id;
final String currentNodeId;
final String? previousNodeId;
final TokenState state; // active, waiting, consumed, dropped
final String? parentTokenId; // For parallel splits
final String? branchId; // For join tracking
final DateTime createdAt;
// State transition methods
WorkflowToken toWaiting();
WorkflowToken toConsumed();
WorkflowToken toDropped();
// State checks
bool get isActive => state == TokenState.active;
bool get isTerminal => state == TokenState.consumed || state == TokenState.dropped;
}
enum TokenState {
active, // Moving through nodes
waiting, // Paused at wait node
consumed, // Terminal: reached end or merged
dropped, // Terminal: discarded late arrival
}UserTaskInstance
class UserTaskInstance extends TaskInstance {
final String title;
final String? description;
final String? assignedToUserId;
final String? assignedToRoleId;
final String? assignedToGroupId;
final String? signalName;
final UserTaskPriority priority;
final DateTime? dueAt;
final String? claimedBy;
final DateTime? claimedAt;
final String? completedBy;
final DateTime? completedAt;
// Inherited from TaskInstance:
// id, workflowInstanceId, nodeId, schemaType, status, input, output,
// createdAt, updatedAt, startedAt, finishedAt, metadata
}
enum TaskStatus {
pending, // Created but not started
running, // Currently executing
completed, // Completed successfully
failed, // Failed (may be retried)
cancelled, // Cancelled
timedOut, // Timed out
}
enum UserTaskPriority {
low,
normal,
high,
urgent,
}PostgreSQL Implementation
Reference implementation using PostgreSQL:
Schema
-- Workflow definitions
CREATE TABLE workflows (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
definition_id VARCHAR(255) NOT NULL,
version INTEGER NOT NULL DEFAULT 1,
code VARCHAR(50) NOT NULL,
name VARCHAR(255) NOT NULL,
description TEXT,
definition JSONB NOT NULL,
is_active BOOLEAN NOT NULL DEFAULT true,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(definition_id, version)
);
-- Workflow instances
CREATE TABLE workflow_instances (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
definition_id VARCHAR(255) NOT NULL,
definition_version INTEGER NOT NULL,
status VARCHAR(50) NOT NULL DEFAULT 'pending',
input JSONB NOT NULL DEFAULT '{}',
output JSONB NOT NULL DEFAULT '{}',
correlation_id VARCHAR(255),
metadata JSONB,
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ
);
CREATE INDEX idx_instances_status ON workflow_instances(status);
CREATE INDEX idx_instances_correlation ON workflow_instances(correlation_id);
CREATE INDEX idx_instances_definition ON workflow_instances(definition_id);
-- Execution tokens
CREATE TABLE workflow_tokens (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
instance_id UUID NOT NULL REFERENCES workflow_instances(id) ON DELETE CASCADE,
current_node_id VARCHAR(255) NOT NULL,
status VARCHAR(50) NOT NULL DEFAULT 'active',
parent_token_id UUID REFERENCES workflow_tokens(id),
local_data JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_tokens_instance ON workflow_tokens(instance_id);
CREATE INDEX idx_tokens_status ON workflow_tokens(status);
-- User tasks
CREATE TABLE user_task_instances (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_instance_id UUID NOT NULL REFERENCES workflow_instances(id) ON DELETE CASCADE,
node_id VARCHAR(255) NOT NULL,
schema_type VARCHAR(100) NOT NULL,
title VARCHAR(500) NOT NULL,
description TEXT,
assigned_to_user VARCHAR(255),
assigned_to_role VARCHAR(255),
priority VARCHAR(50) NOT NULL DEFAULT 'normal',
status VARCHAR(50) NOT NULL DEFAULT 'pending',
input JSONB NOT NULL DEFAULT '{}',
response JSONB,
completed_by VARCHAR(255),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
due_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ
);
CREATE INDEX idx_user_tasks_instance ON user_task_instances(workflow_instance_id);
CREATE INDEX idx_user_tasks_status ON user_task_instances(status);
CREATE INDEX idx_user_tasks_user ON user_task_instances(assigned_to_user);
CREATE INDEX idx_user_tasks_role ON user_task_instances(assigned_to_role);
-- Signal subscriptions
CREATE TABLE signal_subscriptions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
instance_id UUID NOT NULL REFERENCES workflow_instances(id) ON DELETE CASCADE,
node_id VARCHAR(255) NOT NULL,
signal_name VARCHAR(255) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(instance_id, signal_name)
);
CREATE INDEX idx_signals_name ON signal_subscriptions(signal_name);Implementation
class PostgresWorkflowStorage implements WorkflowStorage {
final PostgreSQLConnection _connection;
PostgresWorkflowStorage({required String connectionString})
: _connection = PostgreSQLConnection.fromUri(Uri.parse(connectionString));
@override
Future<void> initialize() async {
await _connection.open();
}
@override
Future<void> dispose() async {
await _connection.close();
}
@override
late final WorkflowRepository workflows = PostgresWorkflowRepository(_connection);
@override
late final WorkflowInstanceRepository instances = PostgresInstanceRepository(_connection);
@override
late final UserTaskInstanceRepository userTaskInstances = PostgresUserTaskRepository(_connection);
@override
late final WorkflowEventRepository events = PostgresEventRepository(_connection);
@override
Future<T> transaction<T>(Future<T> Function() operation) async {
return await _connection.transaction((ctx) => operation());
}
@override
Future<bool> healthCheck() async {
// Check database connectivity
return true;
}
@override
Future<StorageStats> getStats() async {
// Query counts from database
return StorageStats(...);
}
}In-Memory Implementation
For testing and development, use the built-in InMemoryStorage:
import 'package:vyuh_workflow_engine/storage/storage.dart';
// Create in-memory storage with a deserialization context
final context = RegistryDeserializationContext(
descriptors: [DefaultWorkflowDescriptor(), myDescriptor],
);
final storage = InMemoryStorage(context: context);
// Use with the engine
final engine = WorkflowEngine(
context: context,
storage: storage,
);The InMemoryStorage class is included in the package and provides:
- Full repository implementations
- Transaction support (simulated)
- All query operations
- Automatic cleanup
Custom Implementation
Create your own storage implementation:
class MongoWorkflowStorage implements WorkflowStorage {
final MongoClient _client;
final String _databaseName;
MongoWorkflowStorage({
required String connectionUri,
required String databaseName,
}) : _client = MongoClient(connectionUri),
_databaseName = databaseName;
@override
Future<void> initialize() async {
await _client.connect();
}
@override
Future<void> dispose() async {
await _client.close();
}
@override
late final WorkflowRepository workflows =
MongoWorkflowRepository(_client.db(_databaseName));
@override
late final WorkflowInstanceRepository instances =
MongoInstanceRepository(_client.db(_databaseName));
@override
late final UserTaskInstanceRepository userTaskInstances =
MongoUserTaskRepository(_client.db(_databaseName));
@override
late final WorkflowEventRepository events =
MongoEventRepository(_client.db(_databaseName));
// ... implement transaction, healthCheck, getStats
}Best Practices
1. Use Transactions
Future<void> completeWorkflow(String instanceId, Map<String, dynamic> output) async {
await db.transaction((txn) async {
await txn.execute(
'UPDATE workflow_instances SET status = @status, output = @output WHERE id = @id',
{'id': instanceId, 'status': 'completed', 'output': jsonEncode(output)},
);
await txn.execute(
'UPDATE workflow_tokens SET status = @status WHERE instance_id = @id',
{'id': instanceId, 'status': 'completed'},
);
});
}2. Handle Concurrency
Future<bool> claimTask(String taskId, String userId) async {
final result = await db.execute('''
UPDATE user_task_instances
SET assigned_to_user = @userId, status = 'assigned'
WHERE id = @taskId AND status = 'pending'
''', {'taskId': taskId, 'userId': userId});
return result.affectedRows > 0;
}3. Index for Query Patterns
-- Frequent queries
CREATE INDEX idx_instances_status_created
ON workflow_instances(status, created_at DESC);
CREATE INDEX idx_tasks_role_status
ON user_task_instances(assigned_to_role, status)
WHERE status = 'pending';See Also
- Storage Adapters - Implementation guide
- WorkflowEngine - Engine API
- Architecture - System overview