How does signalWithStart prevent race conditions with a DynamicWorkflow?

Hi All

I tried to search the forum to see if this has already been asked, but I didn’t see any exact matches…apologies if this is a repeat.

What I am wondering is how to prevent any race conditions with the use of .signalWithStart at least when DynamicWorkflows are employed with the java-sdk. I am less clear if this is a problem in other SDKs or not. The scenario I am thinking about is similar to the pattern used for the golang-sdk “Mutex” sample (samples-go/mutex at main · temporalio/samples-go · GitHub) where we have a workflow designed to be started with signalwithstart, and intended to run in a loop until the signal pool is drained, at which time the workflow exits.

Consider the following pattern:

  • Client “C1” starts workflow “W” with signalWithStart(“foo”)
  • W execution begins on a worker, checks to see if there are any pending messages, and receives “foo” from C1.
  • W processes “foo”, and loops back to check if there are more signals. It finds none, so the conditional breaks the loop and starts its journey to exit the execution
  • Meanwhile, Client “C2” calls signalWithStart(“bar”), and Temporal sees W is already running so it queues the signal “bar” for delivery to W.
  • W completes its exit, and the workflow concludes while leaving “bar” not processed.

I imagine that at least in certain SDKs, like golang, the ReceiveAsync call serves as a type of “delivery confirmation” where Temporal would in fact understand that “bar” was never delivered and do something like start a new W execution, delivering “bar” as the first signal.

However, when using at least the Java SDK DynamicWorkflow and RegisterListener calls, its unclear how one may coordinate to prevent the scenario outlined above from happening. Are there “delivery confirmation” APIs I should be calling to prevent this?

Thank you,
-Greg

There are two separate issues here.

  1. It is not possible to call RegisterListener before DynamicWorkflow.execute is called.
  2. The race condition between workflow completion and receiving a signal.

The (1) is indeed a problem and will be solved by issue #865.

The (2) is not a problem. Temporal prioritizes signal handler threads higher than main workflow threads. So there is a guarantee that every buffered signal is delivered before the main workflow thread is given a chance to run. So there is no need for ReceiveAsync in all other SDKs.

1 Like

Thanks @maxim. One thing I wanted to clarify from your statement. You mentioned it works because the signals are delivered before the main thread “runs” but is that just the first entry to the function, or is it on any await-able pause?

Follow up question: if the signal arrives after the last await but before the workflow exits, can the RegisterListener callback fire or would that be like an implicit ContinueAsNew+Reposting of the signal?

You mentioned it works because the signals are delivered before the main thread “runs” but is that just the first entry to the function, or is it on any await-able pause?

It is on every workflow task, which is the same as on each new batch of signals.

Follow up question: if the signal arrives after the last await but before the workflow exits, can the RegisterListener callback fire or would that be like an implicit ContinueAsNew+Reposting of the signal?

If the signal arrives after the last await, then the workflow code state will be rolled back (aka transactional memory) back to that await, and the signal handler will be called before the await is unblocked.

Great! It sounds like I am indeed all set.

Tangentially, I actually don’t understand the bit in Issue #865. I regularly use signalWithStart with DyanmicWorkflow/RegisterListener and it seems to work fine: i.e. the workflow sees the signal delivered and works as I expect.

Now that you explained the rollback-after-await, I suppose what I could be seeing is that temporal is always starting my workflow twice…first without signals delivered and a second time with. Is that possible?

In any case, thank you to the team/community for Temporal. It is an awesome framework. I think you guys are going to be huge.

-Greg

Tangentially, I actually don’t understand the bit in Issue #865. I regularly use signalWithStart with DyanmicWorkflow/RegisterListener and it seems to work fine: i.e. the workflow sees the signal delivered and works as I expect.

Imagine a workflow that counts the number of signals received before the first workflow task executes. The workflow would be implemented like:

  @WorkflowInterface
  public interface Counter {
    @WorkflowMethod
    int getCount();

    @SignalMethod
    void increment();

  }

  public class CounterImpl implements Counter {

    private int counter;
    @Override
    public int getCount() {
      return counter;
    }

    @Override
    public void increment() {
      counter++;
    }
  }

As increment() for signals is always called before the main thread, and due to the rollback mechanism I described above, this workflow is going to return the correct number of signals in all cases.

Note that there is no way to implement such workflow as DynamicWorkflow. The problem is that registerListener can be called only from the main workflow thread, and by the time the callback thread should be called the main workflow thread returns 0.

  public static class DynamicCounterImpl implements DynamicWorkflow {

    private int counter;
    @Override
    public Object execute(EncodedValues args) {
      Workflow.registerListener(new SignalCounter() {
        @Override
        public void increment() {
          counter++;
        }
      });
      return counter; // always returns 0
    }
  }

So the solution is to add init() method to the DynamicWorkfow. The working solution is going to look like:

  public static class DynamicCounterImpl implements DynamicWorkflow {

    private int counter;

    public void init() {
      Workflow.registerListener(
          new SignalCounter() {
            @Override
            public void increment() {
              counter++;
            }
          });
    }

    @Override
    public Object execute(EncodedValues args) {
      return counter;
    }
  }

The init is called before any other threads. Then increment is called for each signal as this is a signal thread that has the higher priority, and then execute returns the correct counter value.

Now that you explained the rollback-after-await, I suppose what I could be seeing is that temporal is always starting my workflow twice…first without signals delivered and a second time with. Is that possible?

The replay is forced only if there is a conflict between closing of a workflow and a new signal.

Ah, thank you for the explanation. Now that I understand it better, I think the way I built the clojure SDK inadvertently works around the current limitation. It’s because its built more like a message queue and less like a callback, and receiving a message requires an await. I suspect the workflow’s having the await for a more message-queue like flow simply means the workflow ends up properly coordinating to the signal regardless of the lack of an init() mechanism.

In any case, I will keep an eye on the development of that fix/feature and incorporate it as appropriate.

Thanks again.

-Greg

1 Like