Workflow specific parallelism

I have two workflows:
Workflow 1: Download Video from AWS S3 and it’s metadata
Workflow 2: Read video, decode, apply image processsing, draw on video, encode, write to video file.

Workflow 1 can (and should) work well with parallelism as it’s bound by IO.
Workflow 2 should not run with parallelism. It should run one by one.

Workflow 2 should backpressure workflow 1, meaning workflow 1 should stop downloading when reached to N videos and wait until they are processed (<N).

Is this a suitable topic for temporal? How to architect this use case?

Thanks.

What is the maximum rate (workflows/second) of execution of these workflows per single host?

If I were to code this, I would create a parent workflow to implement a semapahore (for easier cases) or token bucket to control the number of child workflows (or activities). An example of how this can be done using temporal can be found at samples-go/mutex at main · temporalio/samples-go · GitHub.

On running parallel activities, it can be found at, samples-go/splitmerge-future at main · temporalio/samples-go · GitHub

Hi! Sorry for not being able to reply to here. I forgot about this question until a notification popped up on my email.

1st workload is bounded by the speed of second workload. And second workflow will be processing 5 workflow/min most likely.

I was also going to consider mutex based approach, due to it’s straightforward nature but interested in how community handled these scenarios. Thanks for the demo links, they are very helpful!

In our case, I needed to control number of parallel executions based on the billing plan. We used the typescript sdk. For semaphore, we leveraged RxJs

import { BehaviorSubject, filter, map, Observable, take, tap } from 'rxjs';

/**
 * Given a number, creates a semaphore that can be used to limit the number of concurrent executions.
 *
 * @export
 * @class Semaphore
 */
export class Semaphore {
  /**
   * Reflects the number of available semaphore slots.
   *
   * @private
   * @type {number}
   * @memberof Semaphore
   */
  private _counter: number;

  /**
   * Available number of slots in the semaphore as an observable
   *
   * @private
   * @type {BehaviorSubject<number>}
   * @memberof Semaphore
   */
  private _counter$: BehaviorSubject<number>;

  /**
   * Restricts the number of slots to be greater than 0.
   *
   * @private
   * @type {Observable<number>}
   * @memberof Semaphore
   */
  private _isAvailable: Observable<number>;

  public get counter() {
    return this._counter;
  }

  /**
   * Creates an instance of Semaphore.
   * @param {number} max Maximum number of concurrent executions.
   * @memberof Semaphore
   */
  constructor(public max: number) {
    this._counter = max;
    this._counter$ = new BehaviorSubject(this._counter);
    this._isAvailable = this._counter$.pipe(filter(() => this._counter > 0));
  }

  /**
   * Acquires a slot on the semaphore.
   *
   * @return {*}  {Observable<Semaphore>}
   * @memberof Semaphore
   */
  public acquire(): Observable<Semaphore> {
    return this._isAvailable.pipe(
      take(1),
      tap(() => {
        this._counter--;
        this._counter$.next(this._counter);
      }),
      map(() => this),
    );
  }

  /**
   * Releases a slot on the semaphore.
   *
   * @memberof Semaphore
   */
  public release() {
    this._counter++;
    this._counter$.next(this._counter);
  }

  /** Resize the semaphore
   *
   *
   * @param {number} max
   * @memberof Semaphore
   */
  public resize(max: number) {
    this._counter += max - this.max;
    this.max = max;
    this._counter$.next(this._counter);
  }
}

and then the controller worked as bellow

import { TaskEnvironment, TaskResult, TestExecutionResultStatus, TaskPlan } from '@ctrlplane/common/models';
import { Semaphore } from '@ctrlplane/common/utils/semaphore';
import { logger } from '@ctrlplane/common/workflows';
import { executeChild, proxyActivities, setHandler, startChild } from '@temporalio/workflow';
import {
  bufferToggle,
  distinctUntilChanged,
  filter,
  from,
  map,
  merge,
  mergeMap,
  ReplaySubject,
  share,
  Subject,
  take,
  takeUntil,
  tap,
  windowToggle,
} from 'rxjs';
import type * as activities from './activities';
import { UpdateEnvCtrlWorkflow } from './signals';

const { runTest: runTask, terminateEnvironmentTests } = proxyActivities<typeof activities>({
  startToCloseTimeout: '60 minutes',
});

/**
 * Environment Controller Workflow is the parent workflow responsible for managing the number of parallel tasks
 * executions for a given customer.
 *
 * NOTE: The workflow is meant only to be started with `signalWithStart`.
 *
 * @param {TaskEnvironment} environment - The environment to create
 * @return {Promise<TaskResult[]>}
 */
export const EnvCtrlWorkflow = async (environment: TaskEnvironment): Promise<TaskResult[]> => {
  return new Promise(resolve => {
    /**
     * Semaphore to control the number of parallel tests
     */
    const semaphore = new Semaphore(environment.maxParallism);

    /**
     * Task result accumulator
     */
    const results: TaskResult[] = [];

    /**
     * The total number of tasks to run
     */
    let total = 0;
    let paused = 0;
    let terminationCounter = 0;

    /**
     * Main Exection Queue
     */
    const queue$ = new ReplaySubject<TaskPlan>();

    /**
     * Waiting Queue. By default, all tests are pushed to this queue.
     */
    const waiting$ = new ReplaySubject<TaskPlan>();

    /**
     * Stream of results
     */
    const result$ = new ReplaySubject<TaskResult>();

    /**
     * Signal to end the workflow
     */
    const end$ = new ReplaySubject<void>();

    /**
     * Signals for pause and resume.
     */
    const pause$ = new Subject<boolean>();

    const _pause$ = pause$.pipe(distinctUntilChanged(), share());
    const _on$ = _pause$.pipe(
      filter(v => !v),
      tap(() => (total += paused)),
      tap(() => (paused = 0)),
      tap(() => logger.info(`Resuming ...`)),
    );
    const _off$ = _pause$.pipe(
      filter(v => !!v),
      tap(() => logger.info(`Pausing ...`)),
    );

    /**
     * Utility Functions
     */

    const _skipTest = (plan: TaskPlan) =>
      from(executeChild(SkipTaskWorkflow, { workflowId: `${plan.id}`, args: [plan] }));

    const _runTest = (plan: TaskPlan) =>
      from(executeChild(RunTaskWorkflow, { workflowId: `${plan.id}`, args: [plan] }));

    /**
     * Workflow Execution Logic
     */

    // Initiating the pause and resume logic
    merge(
      waiting$.pipe(
        bufferToggle(_off$, () => _on$),
        mergeMap(x => x),
      ),
      waiting$.pipe(
        windowToggle(_on$, () => _off$),
        mergeMap(x => x),
      ),
    )
      .pipe(tap(plan => queue$.next(plan)))
      .subscribe();

    pause$.next(true);
    pause$.next(false);

    queue$
      .pipe(
        mergeMap(plan => semaphore.acquire().pipe(map(() => plan))),
        tap(plan => logger.info(`[${plan.id}] Slot Acquired`)),
        mergeMap(plan => (paused ? _skipTest(plan) : _runTest(plan))),
        tap(result => result$.next(result)),
        tap(() => semaphore.release()),
        tap(result => logger.info(`[${result.id}] [${result.status}] Slot Released ...`)),
        takeUntil(end$),
      )
      .subscribe();

    result$
      .pipe(
        tap(result => results.push(result)),
        tap(() => !paused && total && total === results.length && end$.next()),
        tap(() => paused && total && total === results.length && pause$.next(false)),
        takeUntil(end$),
      )
      .subscribe();

    end$
      .pipe(
        take(1),
        tap(() => logger.info(`Finished`)),
      )
      .subscribe(() => resolve(results));

    /**
     * NOTE: This workflow is only meant to be started with `signalWithStart`.
     */
    setHandler(UpdateEnvCtrlWorkflow, async signal => {
      logger.info(`Received Signal ..`);
      logger.info(`Resizing Sempahore to ${semaphore.max} -> ${signal.maxParallism}`);
      semaphore.resize(signal.maxParallism);

      if (paused === 0 && total === 0 && results.length === 0) {
        total += signal.tests.length;
      } else {
        if (paused === 0 && signal.continue) {
          total += signal.tests.length;
        } else {
          logger.info(`Skipping tests ...`);
          paused += signal.tests.length;
          pause$.next(true); // This will stop the `queue`.
          startChild(TerminateEnvironmentTasksWorkflow, {
            workflowId: `${environment.id}-${terminationCounter}`, // TODO: We need to comeup with a better way to generate ID
            args: [environment],
          });
          terminationCounter++;
        }
      }

      for (const plan of signal.tests) {
        logger.info(`[${plan.id}] Adding to Queue`);
        waiting$.next(plan);
      }
    });
  });
};

/**
 * Run The Test Workflow
 *
 * @param {TaskPlan} plan - The test plan to run
 * @returns {Promise<Result>} The test execution result
 */
export const RunTaskWorkflow = async (plan: TaskPlan): Promise<Result> => {
  logger.info(`Running Test ...`);
  const result = await runTask(plan);
  return result;
};

/**
 * Skip Running the tests. We just update the status of the test to `skipped`.
 *
 * @param {TaskPlan} plan - The test plan to run
 * @returns {Promise<Result>} The test execution result
 */
export const SkipTaskWorkflow = async (plan: TaskPlan): Promise<Result> => {
  logger.info(`Skipping Test ...`);
  return new Promise(resolve => resolve({ id: plan.id, status: TestExecutionResultStatus.SKIPPED }));
};

/**
 * Terminates all tests running in environment
 *
 * @param {TaskEnvironment} environment - The environment which we want to terminate
 * @returns
 */
export const TerminateEnvironmentTasksWorkflow = async (environment: TaskEnvironment): Promise<void> => {
  logger.info(`Terminating All Tests ...`);
  return terminateEnvironmentTests(environment);
};

We have three streams, queue$ which queues the tasks to be scheduled, results$ where we pushed the results and a listener to listen to end$ or kill signal.

The rxjs subscriber controls the logic.

I don’t know about the rest of the community. If you want, we can schedule a call. drop me a line at ysf @ breu . io