In our case, I needed to control number of parallel executions based on the billing plan. We used the typescript sdk. For semaphore, we leveraged RxJs
import { BehaviorSubject, filter, map, Observable, take, tap } from 'rxjs';
/**
* Given a number, creates a semaphore that can be used to limit the number of concurrent executions.
*
* @export
* @class Semaphore
*/
export class Semaphore {
/**
* Reflects the number of available semaphore slots.
*
* @private
* @type {number}
* @memberof Semaphore
*/
private _counter: number;
/**
* Available number of slots in the semaphore as an observable
*
* @private
* @type {BehaviorSubject<number>}
* @memberof Semaphore
*/
private _counter$: BehaviorSubject<number>;
/**
* Restricts the number of slots to be greater than 0.
*
* @private
* @type {Observable<number>}
* @memberof Semaphore
*/
private _isAvailable: Observable<number>;
public get counter() {
return this._counter;
}
/**
* Creates an instance of Semaphore.
* @param {number} max Maximum number of concurrent executions.
* @memberof Semaphore
*/
constructor(public max: number) {
this._counter = max;
this._counter$ = new BehaviorSubject(this._counter);
this._isAvailable = this._counter$.pipe(filter(() => this._counter > 0));
}
/**
* Acquires a slot on the semaphore.
*
* @return {*} {Observable<Semaphore>}
* @memberof Semaphore
*/
public acquire(): Observable<Semaphore> {
return this._isAvailable.pipe(
take(1),
tap(() => {
this._counter--;
this._counter$.next(this._counter);
}),
map(() => this),
);
}
/**
* Releases a slot on the semaphore.
*
* @memberof Semaphore
*/
public release() {
this._counter++;
this._counter$.next(this._counter);
}
/** Resize the semaphore
*
*
* @param {number} max
* @memberof Semaphore
*/
public resize(max: number) {
this._counter += max - this.max;
this.max = max;
this._counter$.next(this._counter);
}
}
and then the controller worked as bellow
import { TaskEnvironment, TaskResult, TestExecutionResultStatus, TaskPlan } from '@ctrlplane/common/models';
import { Semaphore } from '@ctrlplane/common/utils/semaphore';
import { logger } from '@ctrlplane/common/workflows';
import { executeChild, proxyActivities, setHandler, startChild } from '@temporalio/workflow';
import {
bufferToggle,
distinctUntilChanged,
filter,
from,
map,
merge,
mergeMap,
ReplaySubject,
share,
Subject,
take,
takeUntil,
tap,
windowToggle,
} from 'rxjs';
import type * as activities from './activities';
import { UpdateEnvCtrlWorkflow } from './signals';
const { runTest: runTask, terminateEnvironmentTests } = proxyActivities<typeof activities>({
startToCloseTimeout: '60 minutes',
});
/**
* Environment Controller Workflow is the parent workflow responsible for managing the number of parallel tasks
* executions for a given customer.
*
* NOTE: The workflow is meant only to be started with `signalWithStart`.
*
* @param {TaskEnvironment} environment - The environment to create
* @return {Promise<TaskResult[]>}
*/
export const EnvCtrlWorkflow = async (environment: TaskEnvironment): Promise<TaskResult[]> => {
return new Promise(resolve => {
/**
* Semaphore to control the number of parallel tests
*/
const semaphore = new Semaphore(environment.maxParallism);
/**
* Task result accumulator
*/
const results: TaskResult[] = [];
/**
* The total number of tasks to run
*/
let total = 0;
let paused = 0;
let terminationCounter = 0;
/**
* Main Exection Queue
*/
const queue$ = new ReplaySubject<TaskPlan>();
/**
* Waiting Queue. By default, all tests are pushed to this queue.
*/
const waiting$ = new ReplaySubject<TaskPlan>();
/**
* Stream of results
*/
const result$ = new ReplaySubject<TaskResult>();
/**
* Signal to end the workflow
*/
const end$ = new ReplaySubject<void>();
/**
* Signals for pause and resume.
*/
const pause$ = new Subject<boolean>();
const _pause$ = pause$.pipe(distinctUntilChanged(), share());
const _on$ = _pause$.pipe(
filter(v => !v),
tap(() => (total += paused)),
tap(() => (paused = 0)),
tap(() => logger.info(`Resuming ...`)),
);
const _off$ = _pause$.pipe(
filter(v => !!v),
tap(() => logger.info(`Pausing ...`)),
);
/**
* Utility Functions
*/
const _skipTest = (plan: TaskPlan) =>
from(executeChild(SkipTaskWorkflow, { workflowId: `${plan.id}`, args: [plan] }));
const _runTest = (plan: TaskPlan) =>
from(executeChild(RunTaskWorkflow, { workflowId: `${plan.id}`, args: [plan] }));
/**
* Workflow Execution Logic
*/
// Initiating the pause and resume logic
merge(
waiting$.pipe(
bufferToggle(_off$, () => _on$),
mergeMap(x => x),
),
waiting$.pipe(
windowToggle(_on$, () => _off$),
mergeMap(x => x),
),
)
.pipe(tap(plan => queue$.next(plan)))
.subscribe();
pause$.next(true);
pause$.next(false);
queue$
.pipe(
mergeMap(plan => semaphore.acquire().pipe(map(() => plan))),
tap(plan => logger.info(`[${plan.id}] Slot Acquired`)),
mergeMap(plan => (paused ? _skipTest(plan) : _runTest(plan))),
tap(result => result$.next(result)),
tap(() => semaphore.release()),
tap(result => logger.info(`[${result.id}] [${result.status}] Slot Released ...`)),
takeUntil(end$),
)
.subscribe();
result$
.pipe(
tap(result => results.push(result)),
tap(() => !paused && total && total === results.length && end$.next()),
tap(() => paused && total && total === results.length && pause$.next(false)),
takeUntil(end$),
)
.subscribe();
end$
.pipe(
take(1),
tap(() => logger.info(`Finished`)),
)
.subscribe(() => resolve(results));
/**
* NOTE: This workflow is only meant to be started with `signalWithStart`.
*/
setHandler(UpdateEnvCtrlWorkflow, async signal => {
logger.info(`Received Signal ..`);
logger.info(`Resizing Sempahore to ${semaphore.max} -> ${signal.maxParallism}`);
semaphore.resize(signal.maxParallism);
if (paused === 0 && total === 0 && results.length === 0) {
total += signal.tests.length;
} else {
if (paused === 0 && signal.continue) {
total += signal.tests.length;
} else {
logger.info(`Skipping tests ...`);
paused += signal.tests.length;
pause$.next(true); // This will stop the `queue`.
startChild(TerminateEnvironmentTasksWorkflow, {
workflowId: `${environment.id}-${terminationCounter}`, // TODO: We need to comeup with a better way to generate ID
args: [environment],
});
terminationCounter++;
}
}
for (const plan of signal.tests) {
logger.info(`[${plan.id}] Adding to Queue`);
waiting$.next(plan);
}
});
});
};
/**
* Run The Test Workflow
*
* @param {TaskPlan} plan - The test plan to run
* @returns {Promise<Result>} The test execution result
*/
export const RunTaskWorkflow = async (plan: TaskPlan): Promise<Result> => {
logger.info(`Running Test ...`);
const result = await runTask(plan);
return result;
};
/**
* Skip Running the tests. We just update the status of the test to `skipped`.
*
* @param {TaskPlan} plan - The test plan to run
* @returns {Promise<Result>} The test execution result
*/
export const SkipTaskWorkflow = async (plan: TaskPlan): Promise<Result> => {
logger.info(`Skipping Test ...`);
return new Promise(resolve => resolve({ id: plan.id, status: TestExecutionResultStatus.SKIPPED }));
};
/**
* Terminates all tests running in environment
*
* @param {TaskEnvironment} environment - The environment which we want to terminate
* @returns
*/
export const TerminateEnvironmentTasksWorkflow = async (environment: TaskEnvironment): Promise<void> => {
logger.info(`Terminating All Tests ...`);
return terminateEnvironmentTests(environment);
};
We have three streams, queue$
which queues the tasks to be scheduled, results$
where we pushed the results and a listener to listen to end$
or kill signal.
The rxjs subscriber controls the logic.
I don’t know about the rest of the community. If you want, we can schedule a call. drop me a line at ysf @ breu . io