Storage Adapters
Storage adapters abstract persistence, allowing the workflow engine to work with any database.
WorkflowStorage Interface
dart
abstract class WorkflowStorage {
/// Workflow storage (stores RawWorkflow, not typed Workflow)
WorkflowRepository get workflows;
/// Running workflow instances
WorkflowInstanceRepository get instances;
/// Human tasks (inbox items)
UserTaskInstanceRepository get userTaskInstances;
/// Audit trail (event log)
WorkflowEventRepository get events;
/// Execute operations in a transaction
Future<T> transaction<T>(Future<T> Function() operation);
/// Initialize connections
Future<void> initialize();
/// Clean up resources
Future<void> dispose();
/// Check if storage is healthy and accessible
Future<bool> healthCheck();
/// Get storage statistics
Future<StorageStats> getStats();
}Design Philosophy
Storage handles raw data; the engine handles typed deserialization:
- RawWorkflow - Storage model containing JSON template, metadata
- Workflow - Typed model with resolved NodeConfiguration and ConditionExecutors
- WorkflowDeserializationContext - Provided to engine for type resolution
This separation allows storage to remain a pure data layer while the engine handles all type-safe business logic.
Repository Interfaces
WorkflowRepository
Manages workflow definitions using RawWorkflow (raw storage model):
dart
abstract class WorkflowRepository {
/// Get raw workflow by ID
Future<RawWorkflow?> getById(String id);
/// Get raw workflow by code (optionally with specific version)
Future<RawWorkflow?> getByCode(String code, {int? version});
/// Get all versions of a workflow
Future<List<RawWorkflow>> getAllVersions(String code);
/// List workflows with pagination
Future<List<RawWorkflow>> list({int limit, int offset, bool activeOnly});
/// Search by name or code
Future<List<RawWorkflow>> search(String query, {int limit});
/// Create a new workflow
Future<RawWorkflow> create(RawWorkflow workflow);
/// Update an existing workflow
Future<RawWorkflow> update(RawWorkflow workflow);
/// Delete a workflow
Future<bool> delete(String code, {int? version});
/// Activate/deactivate
Future<bool> activate(String code, {int? version});
Future<bool> deactivate(String code, {int? version});
/// Check if code exists
Future<bool> codeExists(String code);
/// Get next version number
Future<int> getNextVersion(String code);
}The engine deserializes RawWorkflow to typed Workflow using its deserialization context.
WorkflowInstanceRepository
dart
abstract class WorkflowInstanceRepository {
Future<WorkflowInstance?> getById(String id);
Future<List<WorkflowInstance>> query(WorkflowInstanceQuery query);
Future<List<WorkflowInstance>> listByDefinition(String workflowId, {int limit, int offset});
Future<List<WorkflowInstance>> listByStatus(WorkflowStatus status, {int limit, int offset});
Future<List<WorkflowInstance>> listActive({int limit, int offset});
Future<List<WorkflowInstance>> getByCorrelationId(String correlationId);
Future<List<WorkflowInstance>> getChildInstances(String parentInstanceId);
Future<WorkflowInstance> create(WorkflowInstance instance);
Future<WorkflowInstance> update(WorkflowInstance instance);
Future<bool> updateStatus(String id, WorkflowStatus status);
Future<bool> updateOutput(String id, Map<String, dynamic> output);
Future<bool> updateTokens(String id, List<WorkflowToken> tokens);
Future<bool> delete(String id);
Future<Map<WorkflowStatus, int>> countByStatus();
Future<List<WorkflowInstance>> getStaleRunningInstances({required Duration staleThreshold});
}UserTaskInstanceRepository
dart
abstract class UserTaskInstanceRepository {
Future<UserTaskInstance?> getById(String id);
Future<List<UserTaskInstance>> getByWorkflowInstanceId(String workflowInstanceId);
Future<List<UserTaskInstance>> getByStatus(TaskStatus status);
Future<List<UserTaskInstance>> getPendingByRoleId(String roleId);
Future<List<UserTaskInstance>> getPendingByRoleIds(List<String> roleIds);
Future<List<UserTaskInstance>> getPendingByUserId(String userId);
Future<List<UserTaskInstance>> getPendingByGroupId(String groupId);
Future<UserTaskInstance> create(UserTaskInstance task);
Future<UserTaskInstance> update(UserTaskInstance task);
Future<void> delete(String id);
Future<UserTaskInstance> claim(String taskId, String userId);
Future<UserTaskInstance> complete(String taskId, String userId, Map<String, dynamic> output);
Future<UserTaskInstance> cancel(String taskId);
Future<int> cancelByWorkflowInstanceId(String workflowInstanceId);
Future<UserTaskInstance?> findActive({required String workflowInstanceId, required String nodeId});
Future<List<UserTaskInstance>> search({String? workflowInstanceId, String? assignedToUserId, ...});
}WorkflowEventRepository
dart
abstract class WorkflowEventRepository {
Future<WorkflowEvent> append(WorkflowEvent event);
Future<List<WorkflowEvent>> appendAll(List<WorkflowEvent> events);
Future<WorkflowEvent?> getById(String id);
Future<List<WorkflowEvent>> getByWorkflowInstanceId(
String workflowInstanceId, {
List<WorkflowEventType>? eventTypes,
int? limit,
int? offset,
});
Future<List<WorkflowEvent>> query(WorkflowEventQuery query);
Future<WorkflowEvent?> getLatestByWorkflowInstanceId(String workflowInstanceId);
Future<List<WorkflowEvent>> getByNodeId(String workflowInstanceId, String nodeId);
Future<List<WorkflowEvent>> getByTaskId(String taskId);
Future<List<WorkflowEvent>> getSummary(String workflowInstanceId);
Future<int> deleteByWorkflowInstanceId(String workflowInstanceId);
Future<int> pruneOlderThan(DateTime cutoff);
}In-Memory Storage
For testing and development, use the built-in InMemoryStorage:
dart
// Create context with your descriptors
final context = RegistryDeserializationContext(
descriptors: [
DefaultWorkflowDescriptor(),
myAppDescriptor,
],
);
// Create in-memory storage with the context
final storage = InMemoryStorage(context: context);
// Create engine with context and storage
final engine = WorkflowEngine(
context: context,
storage: storage,
);
await engine.initialize();WARNING
InMemoryStorage loses all data when the process restarts. Use only for testing and development.
Implementing a Custom Storage
PostgreSQL Example
dart
class PostgresWorkflowStorage implements WorkflowStorage {
final PostgresPool _pool;
late final PostgresWorkflowRepository _workflows;
late final PostgresInstanceRepository _instances;
late final PostgresUserTaskRepository _userTasks;
late final PostgresEventRepository _events;
PostgresWorkflowStorage(String connectionString)
: _pool = PostgresPool(connectionString);
@override
WorkflowRepository get workflows => _workflows;
@override
WorkflowInstanceRepository get instances => _instances;
@override
UserTaskInstanceRepository get userTaskInstances => _userTasks;
@override
WorkflowEventRepository get events => _events;
@override
Future<void> initialize() async {
await _pool.open();
_workflows = PostgresWorkflowRepository(_pool);
_instances = PostgresInstanceRepository(_pool);
_userTasks = PostgresUserTaskRepository(_pool);
_events = PostgresEventRepository(_pool);
}
@override
Future<T> transaction<T>(Future<T> Function() operation) async {
return await _pool.runTx((session) async {
return await operation();
});
}
@override
Future<void> dispose() async {
await _pool.close();
}
@override
Future<bool> healthCheck() async {
try {
await _pool.query('SELECT 1');
return true;
} catch (_) {
return false;
}
}
@override
Future<StorageStats> getStats() async {
final defCount = await _pool.query('SELECT COUNT(*) FROM workflows');
final instCount = await _pool.query('SELECT COUNT(*) FROM workflow_instances');
final activeCount = await _pool.query(
"SELECT COUNT(*) FROM workflow_instances WHERE status NOT IN ('completed', 'failed', 'cancelled')"
);
return StorageStats(
totalDefinitions: defCount.first['count'] as int,
totalInstances: instCount.first['count'] as int,
activeInstances: activeCount.first['count'] as int,
);
}
}Using Your Custom Storage
dart
final context = RegistryDeserializationContext(
descriptors: [DefaultWorkflowDescriptor(), myDescriptor],
);
final storage = PostgresWorkflowStorage(connectionString);
final engine = WorkflowEngine(
context: context,
storage: storage,
);
await engine.initialize();Repository Implementation
dart
class PostgresInstanceRepository extends WorkflowInstanceRepository {
final PostgresPool _pool;
PostgresInstanceRepository(this._pool);
@override
Future<WorkflowInstance?> getById(String id) async {
final result = await _pool.query(
'SELECT * FROM workflow_instances WHERE id = @id',
{'id': id},
);
if (result.isEmpty) return null;
return _mapToInstance(result.first);
}
@override
Future<List<WorkflowInstance>> listByStatus(
WorkflowStatus status, {
int limit = 100,
int offset = 0,
}) async {
final result = await _pool.query(
'SELECT * FROM workflow_instances WHERE status = @status LIMIT @limit OFFSET @offset',
{'status': status.name, 'limit': limit, 'offset': offset},
);
return result.map(_mapToInstance).toList();
}
@override
Future<WorkflowInstance> create(WorkflowInstance instance) async {
await _pool.execute('''
INSERT INTO workflow_instances (id, workflow_id, status, input, output, tokens, created_at)
VALUES (@id, @defId, @status, @input, @output, @tokens, @createdAt)
''', {
'id': instance.id,
'defId': instance.workflowId,
'status': instance.status.name,
'input': jsonEncode(instance.input),
'output': jsonEncode(instance.output),
'tokens': jsonEncode(instance.tokens.map((t) => t.toJson()).toList()),
'createdAt': instance.createdAt,
});
return instance;
}
@override
Future<WorkflowInstance> update(WorkflowInstance instance) async {
await _pool.execute('''
UPDATE workflow_instances
SET status = @status, output = @output, tokens = @tokens, completed_at = @completedAt
WHERE id = @id
''', {
'id': instance.id,
'status': instance.status.name,
'output': jsonEncode(instance.output),
'tokens': jsonEncode(instance.tokens.map((t) => t.toJson()).toList()),
'completedAt': instance.completedAt,
});
return instance;
}
WorkflowInstance _mapToInstance(Map<String, dynamic> row) {
return WorkflowInstance(
id: row['id'],
workflowId: row['workflow_id'],
status: WorkflowStatus.values.byName(row['status']),
input: jsonDecode(row['input']),
output: jsonDecode(row['output']),
tokens: (jsonDecode(row['tokens']) as List)
.map((t) => WorkflowToken.fromJson(t))
.toList(),
createdAt: row['created_at'],
completedAt: row['completed_at'],
);
}
}Database Schema
PostgreSQL Schema Example
sql
CREATE TABLE workflows (
id VARCHAR(255) PRIMARY KEY,
code VARCHAR(100) NOT NULL,
name VARCHAR(255) NOT NULL,
version INT NOT NULL DEFAULT 1,
nodes JSONB NOT NULL,
edges JSONB NOT NULL,
is_active BOOLEAN NOT NULL DEFAULT true,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE TABLE workflow_instances (
id VARCHAR(255) PRIMARY KEY,
workflow_id VARCHAR(255) NOT NULL,
status VARCHAR(50) NOT NULL,
input JSONB NOT NULL,
output JSONB NOT NULL,
tokens JSONB NOT NULL,
created_at TIMESTAMP NOT NULL,
completed_at TIMESTAMP,
FOREIGN KEY (workflow_id) REFERENCES workflows(id)
);
CREATE TABLE user_task_instances (
id VARCHAR(255) PRIMARY KEY,
workflow_instance_id VARCHAR(255) NOT NULL,
node_id VARCHAR(255) NOT NULL,
signal_name VARCHAR(255) NOT NULL,
schema_type VARCHAR(100) NOT NULL,
title VARCHAR(500) NOT NULL,
description TEXT,
assigned_to_role_id VARCHAR(255),
assigned_to_user_id VARCHAR(255),
status VARCHAR(50) NOT NULL,
priority VARCHAR(50) NOT NULL,
input JSONB NOT NULL,
output JSONB,
due_date TIMESTAMP,
created_at TIMESTAMP NOT NULL,
completed_at TIMESTAMP,
FOREIGN KEY (workflow_instance_id) REFERENCES workflow_instances(id)
);
CREATE TABLE workflow_events (
id VARCHAR(255) PRIMARY KEY,
workflow_instance_id VARCHAR(255) NOT NULL,
event_type VARCHAR(100) NOT NULL,
node_id VARCHAR(255),
token_id VARCHAR(255),
data JSONB,
created_at TIMESTAMP NOT NULL,
FOREIGN KEY (workflow_instance_id) REFERENCES workflow_instances(id)
);
-- Indexes
CREATE INDEX idx_instances_status ON workflow_instances(status);
CREATE INDEX idx_instances_definition ON workflow_instances(workflow_id);
CREATE INDEX idx_tasks_status ON user_task_instances(status);
CREATE INDEX idx_tasks_role ON user_task_instances(assigned_to_role_id);
CREATE INDEX idx_tasks_user ON user_task_instances(assigned_to_user_id);
CREATE INDEX idx_events_instance ON workflow_events(workflow_instance_id);Transactions
Use transactions for atomic operations:
dart
await storage.transaction(() async {
final instance = await storage.instances.getById(instanceId);
if (instance != null) {
await storage.instances.updateOutput(instanceId, {'key': 'value'});
await storage.events.append(WorkflowEvent(...));
}
});Best Practices
- Use transactions - For multi-step updates
- Index frequently queried columns - Status, role, user
- Store JSON efficiently - Use JSONB in PostgreSQL
- Clean up old data - Archive completed workflows
- Monitor performance - Query execution times
Next Steps
- Patterns - Workflow patterns
- Best Practices - Design guidance
- Error Handling - Error patterns