Skip to content

Parallel Tasks Workflow

A complete example demonstrating parallel execution patterns.

Overview

This workflow demonstrates:

  • Parallel gateway for concurrent execution
  • Multiple independent branches
  • Join gateway to synchronize completion
  • Aggregating results from parallel branches

Workflow Diagram

                              [Start]

                            [Initialize]

                         [Fork: All Reviews]
                    ↙          ↓          ↘
        [Technical]      [Legal]      [Finance]
        [Review]         [Review]     [Review]
                    ↘          ↓          ↙
                        [Join: Wait All]

                        [Evaluate Results]

                        [Route Decision]
                    ↙                    ↘
            [Approve]                [Reject]
                 ↓                        ↓
           [End: Approved]         [End: Rejected]

Complete Implementation

dart
import 'package:vyuh_workflow_engine/vyuh_workflow_engine.dart';

final parallelReviewWorkflow = WorkflowBuilder(
  'PARALLEL-REVIEW',
  'Parallel Review Workflow',
  description: 'Multiple reviewers evaluate a request in parallel',
)
    .start('begin')

    // Step 1: Initialize request
    .task('initialize',
      execute: (ctx) async {
        // Use getInitial<T> for original workflow input
        final requestId = ctx.getInitialRequired<String>('requestId');
        final submittedBy = ctx.getInitialRequired<String>('submittedBy');
        // Fetch request (simulated)
        // final request = await requestService.getById(requestId);

        return {
          'requestId': requestId,
          'requestType': 'Feature Request', // request.type
          'requestTitle': 'Sample Request', // request.title
          'requestDetails': 'Request details here', // request.details
          'submittedBy': submittedBy,
          'submittedAt': DateTime.now().toIso8601String(),
          'requiredReviews': ['technical', 'legal', 'finance'],
        };
      },
    )

    // Step 2: Fork to parallel reviews
    .allOf('forkReviews', [
      'prepareTechnicalReview',
      'prepareLegalReview',
      'prepareFinanceReview',
    ])

    // Branch A: Technical Review
    .task('prepareTechnicalReview',
      execute: (ctx) async {
        // Use getAny<T> for accumulated output from earlier nodes
        final requestTitle = ctx.getAny<String>('requestTitle') ?? 'Request';
        return {
          'reviewType': 'technical',
          'reviewTitle': 'Technical Review: $requestTitle',
          'reviewCriteria': ['Technical feasibility', 'Architecture alignment', 'Security considerations', 'Performance impact'],
        };
      },
    )
    .userTask('technicalReview',
      signal: 'technical_review_decision',
      schemaType: 'userTask.review',
      title: 'Technical Review: {{requestTitle}}',
      description: 'Please review the technical aspects of this request.',
      assignToRole: 'technical_reviewers',
      storeAs: 'technicalReviewResult',
    )

    // Branch B: Legal Review
    .task('prepareLegalReview',
      execute: (ctx) async {
        // Use getAny<T> for accumulated output from earlier nodes
        final requestTitle = ctx.getAny<String>('requestTitle') ?? 'Request';
        return {
          'reviewType': 'legal',
          'reviewTitle': 'Legal Review: $requestTitle',
          'reviewCriteria': ['Regulatory compliance', 'Contract implications', 'Liability assessment', 'Privacy requirements'],
        };
      },
    )
    .userTask('legalReview',
      signal: 'legal_review_decision',
      schemaType: 'userTask.review',
      title: 'Legal Review: {{requestTitle}}',
      description: 'Please review the legal aspects of this request.',
      assignToRole: 'legal_reviewers',
      storeAs: 'legalReviewResult',
    )

    // Branch C: Finance Review
    .task('prepareFinanceReview',
      execute: (ctx) async {
        // Use getAny<T> for accumulated output from earlier nodes
        final requestTitle = ctx.getAny<String>('requestTitle') ?? 'Request';
        return {
          'reviewType': 'finance',
          'reviewTitle': 'Finance Review: $requestTitle',
          'reviewCriteria': ['Budget availability', 'Cost-benefit analysis', 'ROI projection', 'Financial risk'],
        };
      },
    )
    .userTask('financeReview',
      signal: 'finance_review_decision',
      schemaType: 'userTask.review',
      title: 'Finance Review: {{requestTitle}}',
      description: 'Please review the financial aspects of this request.',
      assignToRole: 'finance_reviewers',
      storeAs: 'financeReviewResult',
    )

    // Step 3: Join - Wait for all reviews (implicit at evaluateResults)
    // Note: The parallel gateway handles joining automatically

    // Step 4: Evaluate combined results
    .task('evaluateResults',
      execute: (ctx) async {
        // Use getAny<T> for stored user task results from parallel branches
        final technicalResult = ctx.getAny<Map<String, dynamic>>('technicalReviewResult');
        final legalResult = ctx.getAny<Map<String, dynamic>>('legalReviewResult');
        final financeResult = ctx.getAny<Map<String, dynamic>>('financeReviewResult');

        // Check approvals
        final technicalApproved = technicalResult?['approved'] == true;
        final legalApproved = legalResult?['approved'] == true;
        final financeApproved = financeResult?['approved'] == true;
        final allApproved = technicalApproved && legalApproved && financeApproved;

        // Collect rejection reasons
        final rejectionReasons = <String>[];
        if (!technicalApproved) rejectionReasons.add('Technical: ${technicalResult?['reason'] ?? 'Not approved'}');
        if (!legalApproved) rejectionReasons.add('Legal: ${legalResult?['reason'] ?? 'Not approved'}');
        if (!financeApproved) rejectionReasons.add('Finance: ${financeResult?['reason'] ?? 'Not approved'}');

        // Calculate overall score
        final scores = [technicalResult?['score'] as int? ?? 0, legalResult?['score'] as int? ?? 0, financeResult?['score'] as int? ?? 0];
        final averageScore = scores.reduce((a, b) => a + b) / scores.length;

        return {
          'allApproved': allApproved,
          'technicalApproved': technicalApproved,
          'legalApproved': legalApproved,
          'financeApproved': financeApproved,
          'rejectionReasons': rejectionReasons,
          'averageScore': averageScore,
          'reviewSummary': {
            'technical': {'approved': technicalApproved, 'reviewer': technicalResult?['completedBy'], 'score': technicalResult?['score']},
            'legal': {'approved': legalApproved, 'reviewer': legalResult?['completedBy'], 'score': legalResult?['score']},
            'finance': {'approved': financeApproved, 'reviewer': financeResult?['completedBy'], 'score': financeResult?['score']},
          },
        };
      },
    )

    // Step 5: Route based on evaluation
    .oneOf('routeDecision', [
      Branch.whenFn(
        (o) => o['allApproved'] == true,
        then: 'processApproval',
      ),
      Branch.otherwise(then: 'processRejection'),
    ])

    // Step 6a: Process approval
    .task('processApproval',
      execute: (ctx) async {
        // Update status (simulated)
        // await requestService.updateStatus(requestId, 'approved');

        return {
          'finalDecision': 'approved',
          'approvedAt': DateTime.now().toIso8601String(),
        };
      },
    )

    // Step 6b: Process rejection
    .task('processRejection',
      execute: (ctx) async {
        // Update status (simulated)
        // await requestService.updateStatus(requestId, 'rejected');

        return {
          'finalDecision': 'rejected',
          'rejectedAt': DateTime.now().toIso8601String(),
        };
      },
    )

    // End nodes
    .end('approved', name: 'Approved')
    .end('rejected', name: 'Rejected')

    // Flow edges - Main flow
    .connect('begin', 'initialize')
    .connect('initialize', 'forkReviews')

    // Note: allOf() already creates edges to all target nodes

    // Technical branch
    .connect('prepareTechnicalReview', 'technicalReview')
    .connect('technicalReview', 'evaluateResults')

    // Legal branch
    .connect('prepareLegalReview', 'legalReview')
    .connect('legalReview', 'evaluateResults')

    // Finance branch
    .connect('prepareFinanceReview', 'financeReview')
    .connect('financeReview', 'evaluateResults')

    // After all parallel branches join at evaluateResults
    .connect('evaluateResults', 'routeDecision')
    .connect('processApproval', 'approved')
    .connect('processRejection', 'rejected')

    .build();

Token Behavior

With parallel gateways, tokens split and merge:

dart
// At fork: 1 token → 3 tokens
// Token 1 → technicalReview branch
// Token 2 → legalReview branch
// Token 3 → financeReview branch

// At join: 3 tokens → 1 token
// Join waits for all tokens to arrive
// Merges output from all branches

Partial Completion Handling

Handle scenarios where some reviews may not complete:

dart
builder
    // Timeout for reviews
    .task('checkTimeout',
      execute: (ctx) async {
        // Use getAny<T> for accumulated output from earlier nodes
        final submittedAt = ctx.getAny<String>('submittedAt');
        final startedAt = DateTime.parse(submittedAt!);
        final deadline = startedAt.add(Duration(days: 5));
        final now = DateTime.now();

        return {'isOverdue': now.isAfter(deadline), 'daysRemaining': deadline.difference(now).inDays};
      },
    )

    // Use anyOf gateway for racing parallel branches (first to complete wins)
    .anyOf('raceReviews', [
      'technicalReview',
      'legalReview',
      'financeReview',
    ])

Alternative: Sequential Reviews with Priority

dart
builder
    // Process reviews in priority order
    .task('determineReviewOrder',
      execute: (ctx) async {
        // Use get<T> for previous node output
        final requestType = ctx.get<String>('requestType');

        // Different order based on request type
        List<String> order;
        if (requestType == 'LEGAL_CONTRACT') {
          order = ['legal', 'finance', 'technical'];
        } else if (requestType == 'BUDGET_REQUEST') {
          order = ['finance', 'technical', 'legal'];
        } else {
          order = ['technical', 'legal', 'finance'];
        }

        return {'reviewOrder': order, 'currentReviewIndex': 0};
      },
    )

    // Loop through reviews sequentially
    .oneOf('checkMoreReviews', [
      Branch.whenFn((o) => o['currentReviewIndex'] < 3, then: 'nextReview'),
      Branch.otherwise(then: 'evaluateResults'),
    ]);

Testing

dart
group('Parallel Review Workflow', () {
  test('approves when all reviews pass', () async {
    final instance = await engine.startWorkflow(
      workflowId: 'parallel-review',
      input: {
        'requestId': 'REQ-001',
        'submittedBy': '[email protected]',
      },
    );

    // Complete all three reviews with approval
    await completeReview(instance.id, 'technical_reviewers', approved: true, score: 85);
    await completeReview(instance.id, 'legal_reviewers', approved: true, score: 90);
    await completeReview(instance.id, 'finance_reviewers', approved: true, score: 80);

    final completed = await engine.getWorkflowInstance(instance.id);
    expect(completed.status, WorkflowStatus.completed);
    expect(completed.output['finalDecision'], 'approved');
    expect(completed.output['averageScore'], 85);
  });

  test('rejects when any review fails', () async {
    final instance = await engine.startWorkflow(
      workflowId: 'parallel-review',
      input: {
        'requestId': 'REQ-002',
        'submittedBy': '[email protected]',
      },
    );

    // Technical and finance approve, legal rejects
    await completeReview(instance.id, 'technical_reviewers', approved: true, score: 85);
    await completeReview(instance.id, 'legal_reviewers', approved: false, score: 40,
      reason: 'Regulatory compliance concerns');
    await completeReview(instance.id, 'finance_reviewers', approved: true, score: 80);

    final completed = await engine.getWorkflowInstance(instance.id);
    expect(completed.status, WorkflowStatus.completed);
    expect(completed.output['finalDecision'], 'rejected');
    expect(completed.output['rejectionReasons'], contains(startsWith('Legal:')));
  });

  test('reviews can complete in any order', () async {
    final instance = await engine.startWorkflow(
      workflowId: 'parallel-review',
      input: {
        'requestId': 'REQ-003',
        'submittedBy': '[email protected]',
      },
    );

    // Complete in different order than branches defined
    await completeReview(instance.id, 'finance_reviewers', approved: true, score: 88);
    await completeReview(instance.id, 'technical_reviewers', approved: true, score: 92);
    await completeReview(instance.id, 'legal_reviewers', approved: true, score: 85);

    final completed = await engine.getWorkflowInstance(instance.id);
    expect(completed.status, WorkflowStatus.completed);
    expect(completed.output['allApproved'], true);
  });
});

Future<void> completeReview(
  String instanceId,
  String role, {
  required bool approved,
  required int score,
  String? reason,
}) async {
  final tasks = await engine.getUserTasks(
    workflowInstanceId: instanceId,
    assignedToRole: role,
    status: UserTaskStatus.pending,
  );

  await engine.completeUserTask(
    taskId: tasks.first.id,
    completedBy: '$role@company.com',
    response: {
      'approved': approved,
      'score': score,
      if (reason != null) 'reason': reason,
    },
  );
}

Performance Considerations

Parallel execution benefits:

  • Faster completion - Reviews happen simultaneously
  • Better resource utilization - Multiple reviewers work concurrently
  • Reduced wait time - No sequential bottlenecks

Considerations:

  • All branches must complete before join
  • Output from all branches is merged
  • Memory usage scales with branch count

Next Steps