Loading & Registration Guide
This guide explains the complete sequence from workflow definition to execution, covering where registration happens and how the different APIs connect.
Overview
The workflow engine uses a two-phase registration model:
- Executor Registration - Registers task, user task, and condition executors via
WorkflowDescriptor - Workflow Registration - Registers workflow definitions for execution
This separation allows executors to be reusable across many workflows, while workflows themselves can come from different sources.
The Complete Sequence
Phase 1: Engine Setup
Creating the Context and Engine
The engine is created with a deserialization context and storage:
// 1. Create deserialization context with all descriptors
final context = RegistryDeserializationContext(
descriptors: [
DefaultWorkflowDescriptor(), // Built-in node executors
myAppDescriptor, // Your custom executors
],
);
// 2. Create storage with the context
final storage = InMemoryStorage(context: context);
// 3. Create engine with context and storage
final engine = WorkflowEngine(
context: context,
storage: storage,
);The RegistryDeserializationContext creates its internal registry lazily on first use. The context is responsible for type resolution during deserialization.
Initializing the Engine
await engine.initialize();During initialization, the engine:
- Initializes the storage layer
- Optionally recovers any stale workflows from a previous crash
Note: The registry is initialized lazily by the context when first needed for type resolution.
Descriptor Order Matters
Later registrations of the same schemaType override earlier ones. Place DefaultWorkflowDescriptor() first if you want your custom executors to override defaults.
Phase 2: Workflow Loading
Workflows can come from three sources:
Source A: JSON (External)
Load workflows from JSON files, APIs, or databases:
final json = await fetchWorkflowJson();
final workflow = engine.loadWorkflow(json);When you call loadWorkflow(), the engine:
- Uses its deserialization context (
context.run()) to establish type resolution scope - Deserializes the JSON into a typed
Workflowobject - Resolves
NodeConfigurationtypes based on nodeType (task, userTask, signalWait, etc.) - Resolves
ConditionExecutorinstances for gateway edges using schemaType - Validates the workflow structure
- Resolves node executors from the NodeExecutorRegistry
Source B: Code (WorkflowBuilder)
Build workflows programmatically with executors attached directly:
final workflow = WorkflowBuilder('APPROVAL', 'Approval Workflow')
.start('begin')
.task('validate',
name: 'Validate Request',
executor: ValidateRequestExecutor(),
)
.userTask('approval',
name: 'Manager Approval',
executor: ApprovalTaskExecutor(),
signal: 'approvalDecision',
)
.end('complete')
.connect('begin', 'validate')
.connect('validate', 'approval')
.connect('approval', 'complete')
.build();Builder-created workflows already have executors attached, so no registry lookup is needed.
Source C: Storage (Lazy Loading)
Workflows can be loaded automatically from storage when starting an instance:
final instance = await engine.startWorkflow(
workflowId: 'approval-workflow',
input: {'entityId': 'DOC-123'},
);If the workflow isn't already in memory, the engine will:
- Check the in-memory cache first
- If not found, load from storage by code (latest version) or by UUID
- Resolve executors from the registry
- Cache the workflow for future use
- Then proceed with execution
Phase 3: Workflow Registration
Explicit Registration
After loading a workflow from JSON, you must register it for execution:
final workflow = engine.loadWorkflow(jsonData);
engine.registerWorkflow(workflow);Registration validates the workflow structure, resolves any remaining executors, and caches the workflow in memory by both its ID and code for flexible lookup.
Combined Load + Register
For convenience, combine both operations:
final workflow = engine.loadAndRegisterWorkflow(jsonData);
await engine.startWorkflow(workflowCode: workflow.code);Registration vs. Persistence
| Method | Purpose | Storage |
|---|---|---|
registerWorkflow() | In-memory cache for execution | Memory only |
saveWorkflow() | Persist to storage adapter | Database/file |
loadAndRegisterWorkflow() | Load JSON + cache in memory | Memory only |
You might need BOTH for production scenarios:
// Load and register for immediate execution
final workflow = engine.loadAndRegisterWorkflow(jsonData);
// Also persist for future server restarts
await engine.saveWorkflow(workflow);Phase 4: Execution
Starting a Workflow
final instance = await engine.startWorkflow(
workflowCode: 'approval-workflow', // Can be code or UUID
version: 2, // Optional: specific version
input: {'entityId': 'DOC-123'},
);The engine looks up workflows in this order:
- In-memory cache - Fastest path if already registered
- Storage by code + version - If version is specified
- Storage by code (latest) - Gets the most recent active version
- Storage by UUID - Direct lookup by workflow ID
Caching Behavior
Once loaded, workflows are cached by multiple keys for flexible lookup:
- By UUID (e.g.,
550e8400-e29b-41d4-a716-446655440000) - By code (e.g.,
approval-workflow) - By code + version (e.g.,
approval-workflow:v2)
This means all of these will find the same workflow:
engine.getWorkflow('550e8400-e29b-41d4-a716-446655440000');
engine.getWorkflow('approval-workflow');
engine.startWorkflow(workflowCode: 'approval-workflow', version: 2);Complete Example
void main() async {
// ═══════════════════════════════════════════════════════════════════
// PHASE 1: Engine Setup
// ═══════════════════════════════════════════════════════════════════
final myDescriptor = WorkflowDescriptor(
title: 'My Application Executors',
tasks: [
ValidateRequestExecutor.typeDescriptor,
SendNotificationExecutor.typeDescriptor,
],
userTasks: [
ApprovalTaskExecutor.typeDescriptor,
],
conditions: [
RequiresNextLevelCondition.typeDescriptor,
],
);
// Create deserialization context with all descriptors
final context = RegistryDeserializationContext(
descriptors: [
DefaultWorkflowDescriptor(),
myDescriptor,
],
);
// Create engine with context and storage
final engine = WorkflowEngine(
context: context,
storage: PostgresWorkflowStorage(connectionString),
);
await engine.initialize();
// ═══════════════════════════════════════════════════════════════════
// PHASE 2 & 3: Load and Register Workflows
// ═══════════════════════════════════════════════════════════════════
// Option A: From JSON file
final jsonWorkflow = await File('workflows/approval.json').readAsString();
final approvalWorkflow = engine.loadAndRegisterWorkflow(
jsonDecode(jsonWorkflow) as Map<String, dynamic>,
);
// Option B: From builder
final simpleWorkflow = WorkflowBuilder('SIMPLE', 'Simple Workflow')
.start('begin')
.task('process', name: 'Process', executor: ProcessExecutor())
.end('done')
.connect('begin', 'process')
.connect('process', 'done')
.build();
engine.registerWorkflow(simpleWorkflow);
// Persist workflows for future restarts
await engine.saveWorkflow(approvalWorkflow);
await engine.saveWorkflow(simpleWorkflow);
// ═══════════════════════════════════════════════════════════════════
// PHASE 4: Execute Workflows
// ═══════════════════════════════════════════════════════════════════
final instance = await engine.startWorkflow(
workflowCode: 'APPROVAL', // Use workflowCode for semantic lookup
input: {'documentId': 'DOC-001'},
);
final status = await engine.getWorkflowInstance(instance.id);
print('Workflow status: ${status?.status}');
await engine.dispose();
}Troubleshooting
"Workflow not found" Error
Causes:
- Workflow not registered - call
registerWorkflow()first - Wrong identifier - check if using code vs UUID
- Not in storage - if relying on lazy loading, ensure
saveWorkflow()was called
Fix:
final workflow = engine.loadWorkflow(jsonData);
engine.registerWorkflow(workflow); // Don't forget this!
await engine.startWorkflow(workflowCode: workflow.code);"Executor not found" Error
Causes:
- Descriptor not included in context construction
schemaTypemismatch between JSON and executor- Forgot to call
engine.initialize()
Fix:
final context = RegistryDeserializationContext(
descriptors: [
DefaultWorkflowDescriptor(),
myDescriptor, // Must contain the missing executor
],
);
final engine = WorkflowEngine(
context: context,
storage: InMemoryStorage(context: context),
);
await engine.initialize(); // Don't forget this!Type Resolution Issues
If NodeConfiguration types aren't resolving correctly, verify that:
- The
nodeTypein your JSON matches a supported node type (task, userTask, signalWait, etc.) - For tasks/userTasks, the
schemaTypein the config matches your executor's schemaType - You're using
engine.loadWorkflow()(which sets up the deserialization context) rather than callingWorkflow.fromJson()directly
Summary
| Step | Method | What Happens |
|---|---|---|
| 1. Context | RegistryDeserializationContext(descriptors: [...]) | Creates context with descriptors |
| 2. Construct | WorkflowEngine(context: ctx, storage: s) | Creates engine |
| 3. Initialize | engine.initialize() | Initializes storage |
| 4. Load | engine.loadWorkflow(json) | Deserializes and resolves types |
| 5. Register | engine.registerWorkflow(wf) | Caches in memory for execution |
| 6. Persist | engine.saveWorkflow(wf) | Saves to storage (optional) |
| 7. Execute | engine.startWorkflow(...) | Creates instance and runs |
The key insight: The context holds all type descriptors; the engine uses the context for all type resolution.
Next Steps
- Type Registries - Deep dive into type resolution
- Engine Lifecycle - Full lifecycle details
- Storage Adapters - Persistence options