Hi everyone,
I’m designing a workflow that contains multiple internal flows running in parallel (for example, flowA and flowB). Each flow has multiple activities, and each one needs to wait for a specific signal before continuing to the next activity.
What I want to confirm is:
1. Can multiple internal flows inside the same workflow share the same state?
The behavior I want is that flowA and flowB both read and update a shared state object inside the workflow.
Example:
-
each flow updates
globalCounter -
each maintains its own step state (e.g.,
STEP1_DONE,WAITING_FOR_SIGNAL) -
signal handlers update the same state object that both flows observe
2. Is this pattern recommended/deterministic?
My workflow does something like:
-
flowAruns activities A1 → wait for signal → A2 → A3 -
flowBruns B1 → wait for signal → B2 → B3 -
both run in parallel using
wf.execute() -
both wait using
wf.condition() -
both update the same in-memory state object
3. Is there a preferred Temporal pattern for this scenario?
Is it okay that both flows mutate the same state object?
Should I instead isolate state per flow and use some orchestrating state machine on top?
Or is the shared in-workflow state approach considered normal and safe?
Any best practices or recommended patterns would be really helpful.
Thanks!
import * as wf from '@temporalio/workflow';
import * as activities from '../activities';
import { signalForA, signalForB } from '../signals';
const act = wf.proxyActivities<typeof activities>({
startToCloseTimeout: '10 minutes'
});
export async function ParallelFlowsWorkflow() {
// -------------------------
// SHARED STATE
// -------------------------
const state: SharedState = {
flowA: { step: 'INIT' },
flowB: { step: 'INIT' },
globalCounter: 0
};
// -------------------------
// SIGNAL HANDLERS
// -------------------------
wf.setHandler(signalForA, (payload: string) => {
state.flowA.signalPayload = payload;
state.flowA.step = 'WAITING_FOR_SIGNAL';
});
wf.setHandler(signalForB, (payload: number) => {
state.flowB.signalPayload = payload;
state.flowB.step = 'WAITING_FOR_SIGNAL';
});
// -------------------------
// FLOW A
// -------------------------
const flowA = wf.execute(async () => {
// Step A1 (activity)
await act.doStepA1();
state.flowA.step = 'STEP1_DONE';
state.globalCounter++; // shared state update
// Wait for signalForA
await wf.condition(() => state.flowA.step === 'WAITING_FOR_SIGNAL');
// Step A2 (depends on signal)
await act.doStepA2();
state.globalCounter += 10;
state.flowA.step = 'STEP2_DONE';
// Step A3
await act.doStepA3();
state.flowA.step = 'DONE';
state.globalCounter += 100;
});
// -------------------------
// FLOW B
// -------------------------
const flowB = wf.execute(async () => {
// Step B1
await act.doStepB1();
state.flowB.step = 'STEP1_DONE';
state.globalCounter += 2;
// Wait for signalForB
await wf.condition(() => state.flowB.step === 'WAITING_FOR_SIGNAL');
// Step B2 (depends on signal)
await act.doStepB2();
state.globalCounter += 20;
state.flowB.step = 'STEP2_DONE';
// Step B3
await act.doStepB3();
state.flowB.step = 'DONE';
state.globalCounter += 200;
});
// -------------------------
// WAIT FOR BOTH FLOWS
// -------------------------
await Promise.all([flowA, flowB]);
return state;
}