I have an “always running” workflow that holds a state like a semaphore.
Before every Activity
execution that depends on this semaphore state, I have a Workflow
code snippet that loops “forever” until the semaphore is “green”:
Pseudo-code:
export async function waitForGreenState(): Promise<SemaphoreState> {
const { getSemaphoreStateActivity } = proxyActivities<typeof keepSessionActivities>({ /* ... */ })
while (true) {
const semaphoreState = await getSemaphoreStateActivity()
if (semaphoreState.state === State.Active) return semaphoreState
await sleep("30s")
}
}
export async function getSemaphoreStateActivity(): Promise<SemaphoreState> {
const client = await getClient()
const handle = client.workflow.getHandle("semaphore-state-workflow-id")
return await handle.query(getCurrentState)
}
So I can use this helper function like this (also pseudo-code):
export async function myWorkflow(inputParams: Parameters): Promise<workflowResult> {
const { jobActivity } = proxyActivities<typeof activities>({
/* ... */
startToCloseTimeout: "10m",
retry: {
initialInterval: "1m",
maximumAttempts: 5,
},
})
const semaphoreState = await waitForGreenState()
return await jobActivity(inputParams, semaphoreState)
}
The thing is, when jobActivity
fails, the current state on SemaphoreState
may have changed already, so I need to wait for a new semaphoreState
before calling the jobActivity
again.
I’ve tried something like this:
// ...
const semaphoreState = await waitForGreenState()
await condition(() => semaphoreState.state === State.Active)
return await jobActivity(inputParams, semaphoreState)
// ...
but once the code goes through the condition for the first time, it only retries from the return
line disregarding the await codition()
so it doesn’t help in this application…
I’m refraining from making the query from within jobActivity
itself because I’m afraid of startToCloseTimeout
issues since the SemaphoreState
can take even days…
What would be a safer or “best” approach for requirements like this?