Workflow.await() method when unblocking condition depends upon an external call like MongoDB

Hi,

I have a usecase, where I have to use the workflow.wait(condition) method but the unblock condition depends on the external call like redis or mongo. I want to understand how this method works will it keep on querying mongoDB until the unblock condition is met. If yes, may I know how can I add a delay or pause to this condition check like query mongoDB after 2min or 5mins

Can you give more details about your workflow? Are you calling the database in your workflow?

workflow.wait(condition)

condition is evaluated only when workflow execution receives an update (so a signal is delivered to the workflow execution or let’s say some async activity completion event is delivered to it).
it is not periodically evaluated. so from worker perspective a workflow execution awaiting a condition to become true can wait infinitely long and does not consume worker resources (since worker can evict this workflow thread from memory to allow others to make progress)

Hi,
This is what we observed. Our condition was something like
Workflow.await(Duration.ofHours(1), () → redis.get(“some_key”) != null);
We saw that the redis call was continuously being invoked by the await method to check if the workflow can now be completed.

Basically our usecase is this. We have n links to be sent for processing to an external party. This results in n activites from our workflow.
For each link, we will be receiving a webhook from the external party about the status of that link. Everytime we receive a webhook, we trigger a signal to the original workflow.
The workflow is kept alive using the Workflow.await() method and the blocking condition is until n signals are received.
Our confusion is, do we use a local map variable to store count of how many signals were received or do we use a persistent storage like redis or mongo to store this count. If we use a local variable, and we have 5 workers running, will they all see the same map or different maps? What is the correct way to collect information of the n signals being received to unblock this workflow.

@tihomir @jreynolds23 Can you guys please help us with this?

You should use a workflow context level map. If this is the Java sdk, then this is a class attribute at the workflow implementation class.

We do something similar where for parallel human tasks the activity creating the external task returns a reference to the ticket and this is stored in the map. The signals update this map. And the await condition waits on the aggregate of this map.

@Aditya_Anand Can you please provide an example for the same or any reference will be helpful.

Thanks

Workflows must never call any external service directly. So, the code you posted has undefined behavior. Why are you not using a local variable instead?

We don’t want to use local variable because, we have multiple instances and we can receive a callback from the external party on any of the instance. There will be problem of consistency.

You absolutely can use a local variable for this. The callback is sent to a specific workflow instance as a signal addressed by a WorkflowID. Temporal takes care of the rest.

If we use a local variable, and we have 5 workers running, will they all see the same map or different maps?

A local variable (of field variable of a workflow class) is scoped to a specific workflow instance. Each workflow instance will have its own copy, independent of the number of workers you are running.

We have n links to be sent for processing to an external party. This results in n activites from our workflow.

Whats the max number for n? Trying to understand if batching would be needed here.
Using local vars would be simplest option, another approach could be to do this async, sample here if it helps, sample waits for 3 “packet approvals” to unblock here.

Thanks. n for now is a relatively small number, let says 10.
The example looks exactly like our usecase, where n is the approvals we need for a packet(a batch of n files) to be marked as approved.

Can you tell us how temporal manages to ensure that the visibility of these local variables is across workers in general?
I observed that even for a simple workflow code like the below
for(int i = 0; i < 3; i ++) {
//print i
}
temporal is able to share the value of ‘i’ across workers such that if 1 worker dies after i = 1 is done, the other available worker is able to pick off from i = 2 and not restart from i = 0 all over again. This is fascinating.

Preserving a state of your execution across process crashes is the main value of Temporal. We call it durable execution.

This presentation explains how Temporal works and how it recovers the state. The explanation starts at 25:55.