I have an “always running” workflow that holds a state like a semaphore.
Before every Activity execution that depends on this semaphore state, I have a Workflow code snippet that loops “forever” until the semaphore is “green”:
Pseudo-code:
export async function waitForGreenState(): Promise<SemaphoreState> {
const { getSemaphoreStateActivity } = proxyActivities<typeof keepSessionActivities>({ /* ... */ })
while (true) {
const semaphoreState = await getSemaphoreStateActivity()
if (semaphoreState.state === State.Active) return semaphoreState
await sleep("30s")
}
}
export async function getSemaphoreStateActivity(): Promise<SemaphoreState> {
const client = await getClient()
const handle = client.workflow.getHandle("semaphore-state-workflow-id")
return await handle.query(getCurrentState)
}
So I can use this helper function like this (also pseudo-code):
export async function myWorkflow(inputParams: Parameters): Promise<workflowResult> {
const { jobActivity } = proxyActivities<typeof activities>({
/* ... */
startToCloseTimeout: "10m",
retry: {
initialInterval: "1m",
maximumAttempts: 5,
},
})
const semaphoreState = await waitForGreenState()
return await jobActivity(inputParams, semaphoreState)
}
The thing is, when jobActivity fails, the current state on SemaphoreState may have changed already, so I need to wait for a new semaphoreState before calling the jobActivity again.
I’ve tried something like this:
// ...
const semaphoreState = await waitForGreenState()
await condition(() => semaphoreState.state === State.Active)
return await jobActivity(inputParams, semaphoreState)
// ...
but once the code goes through the condition for the first time, it only retries from the return line disregarding the await codition() so it doesn’t help in this application…
I’m refraining from making the query from within jobActivity itself because I’m afraid of startToCloseTimeout issues since the SemaphoreState can take even days…
What would be a safer or “best” approach for requirements like this?