Wait for condition between retries

I have an “always running” workflow that holds a state like a semaphore.

Before every Activity execution that depends on this semaphore state, I have a Workflow code snippet that loops “forever” until the semaphore is “green”:

Pseudo-code:

export async function waitForGreenState(): Promise<SemaphoreState> {
  const { getSemaphoreStateActivity } = proxyActivities<typeof keepSessionActivities>({ /* ... */  })

  while (true) {
    const semaphoreState = await getSemaphoreStateActivity()
    if (semaphoreState.state === State.Active) return semaphoreState
    await sleep("30s")
  }
}

export async function getSemaphoreStateActivity(): Promise<SemaphoreState> {
  const client = await getClient()
  const handle = client.workflow.getHandle("semaphore-state-workflow-id")
  return await handle.query(getCurrentState)
}

So I can use this helper function like this (also pseudo-code):

export async function myWorkflow(inputParams: Parameters): Promise<workflowResult> {
  const { jobActivity } = proxyActivities<typeof activities>({
    /* ... */
    startToCloseTimeout: "10m",
    retry: {
      initialInterval: "1m",
      maximumAttempts: 5,
    },
  })

  const semaphoreState = await waitForGreenState()

  return await jobActivity(inputParams, semaphoreState)
}

The thing is, when jobActivity fails, the current state on SemaphoreState may have changed already, so I need to wait for a new semaphoreState before calling the jobActivity again.

I’ve tried something like this:

  // ...
  const semaphoreState = await waitForGreenState()
  await condition(() => semaphoreState.state === State.Active)
  return await jobActivity(inputParams, semaphoreState)
  // ...

but once the code goes through the condition for the first time, it only retries from the return line disregarding the await codition() so it doesn’t help in this application…

I’m refraining from making the query from within jobActivity itself because I’m afraid of startToCloseTimeout issues since the SemaphoreState can take even days…

What would be a safer or “best” approach for requirements like this?

I wouldn’t poll from the workflow: What is the best practice for a polling activity? - #2 by maxim

A common approach for this kind of thing is to have myWorkflow send a “requestLock” signal to the semaphore Workflow, with its own Workflow ID as argument, then wait for an incoming “lockGranted” signal.

The semaphore workflow maintain a list of pending requests. When the semaphore become green, the semaphore workflow sends the “lockGranted” signal to the first workflow in queue, then mark the semaphore as red.

When myWorkflow no longer needs to hold the semaphore, it send a “lockReleased” signal to the semaphore workflow, which mark the semaphore as green.

Look here for an example on implementing this in Type Script.

For context, this polling is done every 30s now.

I wanna avoid a “subscriber” pattern because the workflow using semaphore-state-workflow-id can be stopped any time and re-started with the same id, so we would lose control of what other workflows are waiting for the signal.

I also want to avoid making the query from jobActivity activity itself (with a heartbeatTimeout setting) because if we don’t have workers available (which in our current context is a possibility) the heartbeatTimeout might be “false triggered”.

In our use case, the timing is less important, we only care that we have workers running and that this “semaphore” is “green”, if either is not true we can safely wait and avoid failing prematurely.

For every 30 seconds, you can fail the activity with 30 seconds retry interval. That would avoid growing workflow history indefinitely if polling takes long time.

But then I would need to not set a maximumAttempts (or set it to 0), right?

Just don’t set them. I personally would never use maximuAttempts. ScheduleToClose timeout makes much more sense to me.

I get your point and agree with it, but in our context the requirement is the exact opposite :sweat_smile:

In our use case, how long the workflow takes to complete is secondary, we rather know that our activity is failing earlier and that we need to take action, so the activities are coded with the mindset of “try the best you can, but if it’s not possible, fail fast”, if we try just a few times and the thing is not working, we need to manually intervene because the activity is “highly dependant on the semaphore”, if you will, it’s a quirky use case.

Thanks a lot for the inputs, by the way!

I think the timeout is more predictable than the retry count. And it can be short if your use case requires this.