How do Signals and workflow stored state interact?

Hello,

I’d like to implement a workflow on top a business entity. Clients can schedule work and can poll to check whether their work completed or not. I’d like all clients to schedule work in parallel, however all work should be serialized to control the load on underlying systems and control concurrency.
I found a recommendation to use semaphore workflow to do that. And use signals to schedule work in such a workflow.

Could you please advise on how such semaphore workflow should be implemented? Especially, how signals and workflow state are preserved in case of worker / client failures.

Lets consider example below:

Map<String, String> pendingWork = new HashMap<>();

@SignalMethod
void scheduleWork(String requestId, String payloadFile) {
    pendingWork.put(requestId, payloadFile);
}

@WorkflowMethod
void performWork(Map<String, String> work) {
    pendingWork.putAll(work);

    Map.Entry<String, String> item = pendingWork.entrySet().iterator().next();
    pendingWork.remove(item.getKey());

    if (!pendingWork.isEmpty()) {
        Workflow.continueAsNew(SampleWorkflow.class, options, pendingWork);
    } else {
        // End workflow
    }
}

and work scheduling is executed using WorkflowClient.signalWithStart().

  1. What happens with pendingWork variable if worker machine dies after scheduleWork() was called twice, but before performWork() did not execute, yet. When workflow is executed again, will signals get to be replayed again?
  2. What happens if there is race condition between Workflow.continueAsNew() and signalWithStart()? Will signal get delivered to new Workflow runId? Or will it get lost?
  3. What happens if there is race condition between Workflow end (no new work) and signalWithStart()? With signal start new workflow if workflow just finished as there was no new work detected or will it get lost (as signal might be attributed to prior runId)?
2 Likes
  1. What happens with pendingWork variable if worker machine dies after scheduleWork() was called twice, but before performWork() did not execute, yet. When workflow is executed again, will signals get to be replayed again?

The correct mental model is to not think about replay (which you called “executed again”) at all when writing workflow code. The only requirement it imposes that the workflow code should be deterministic. Otherwise, you can think about workflow state as always durable. So to answer your question nothing happens from the workflow implementer point of view. The workflow continues on a different worker and the pendingWork queue has the same signals it already stored.

  1. What happens if there is race condition between Workflow.continueAsNew() and signalWithStart()? Will signal get delivered to new Workflow runId? Or will it get lost?

The race condition is handled seamlessly. The workflow state will be rolled back to the state before the signal was received and signal will be delivered.

  1. What happens if there is race condition between Workflow end (no new work) and signalWithStart()? With signal start new workflow if workflow just finished as there was no new work detected or will it get lost (as signal might be attributed to prior runId)?

The same as 2. The state will be rolled back, signal delivered and the workflow is going to execute according to your business logic taking the new signal into the account.

3 Likes

Thank you, #2 and #3 make sense.
For #1 - how is state of WorkflowImpl instance saved? I imagine it’s not serialized and stored in Temporal backend after each new signal is delivered.
If @SignalMethod executes and it modifies WorkflowImpl (say adds new data to an instance map), what’s actually being recorded in the Temporal backend?
How does that instance state is restored on a different machine in an event of a machine crash?
And, finally, how does Temporal know whether Signal should be delivered again? Given that Temporal has no knowledge of how data from Signal is used. Is it just a matter of “replaying” everything from the Workflow execution start (all signals, all activities)?

Unfortunately, backends I deal with are not always idempotent. Therefore, I assume I need to write at least some code that tries to make sure that the same signal is not processed twice when it’s delivered again after a crash. To do that, I’m trying to understand “the magic” behind the scenes.

1 Like

The short answer is that the workflow code is replayed from the beginning. It assumes that it is deterministic. The service records events like activity completions and signals and replays them back to the code on recovery. See this video that goes over recovery logic.

If @SignalMethod executes and it modifies WorkflowImpl (say adds new data to an instance map), what’s actually being recorded in the Temporal backend?

Only the signal itself is recorded.

And, finally, how does Temporal know whether Signal should be delivered again? Given that Temporal has no knowledge of how data from Signal is used. Is it just a matter of “replaying” everything from the Workflow execution start (all signals, all activities)?

Yes, it is a matter of replying everything from the start.

Unfortunately, backends I deal with are not always idempotent. Therefore, I assume I need to write at least some code that tries to make sure that the same signal is not processed twice when it’s delivered again after a crash. To do that, I’m trying to understand “the magic” behind the scenes.

You don’t need to understand how recovery works. Even a single process without fault tolerance has to dedupe external requests as callers can fail and will try to redeliver. So deduping signals is part of the application logic and should be part of the workflow code itself.

1 Like

Got it, thank you.

Hi,

I have a related question to this topic/example. If the workflow signal method is called within the workflow itself, would it result in workflow state change the same way it would if called via the client stub from another workflow or process?

For example, if OP’s example is modified as follows to contain 2 concurrent threads:

boolean exit = false;
Promise<Boolean> concurrentProcessResult;
Map<String, String> pendingWork = new HashMap<>();

@SignalMethod
void scheduleWork(String requestId, String payloadFile) {
    pendingWork.put(requestId, payloadFile);
}

@SignalMethod
void exitWorkflow() {
    exit = true;
}

@WorkflowMethod
void performWork(Map<String, String> work) {

    pendingWork.putAll(work);

    concurrentProcessResult = Async.function(() -> {
        while (!exit) {
            ...
            if (someCondition)
                scheduleWork(...);
            ...
        }
        return true;
    });

    while (!exit) {
        Map.Entry<String, String> item = pendingWork.entrySet().iterator().next();
        ... //do something
        pendingWork.remove(item.getKey());
        if (pendingWork.isEmpty()) {
            Workflow.await(()->!pendingWork.isEmpty());
        }
    }

    concurrentProcessResult.get();

}

Assuming the main thread execution is held at Workflow.await(()->!pendingWork.isEmpty()), after the concurrent thread calls the internal signal method scheduleWork would it trigger the main thread to evaluate the condition in `Workflow.await(()->!pendingWork.isEmpty())``?

A signal method is just a regular method that is used to deliver signals by the Temporal SDK. So, calling it directly is no different than calling any other workflow method.

Thank you Maxim. So the signal method invocation from within (internal to) the workflow will register as an event such that during playback, it will be sequenced correctly along with the same signal invoked from external sources via workflow client stubs? In other words, for example if the signal invocation sequence is as follows:

External at time T1
Internal at time T2
External at time T3
External at time T4

Both concurrent threads when checking the content/size of the pendingWork map at various point in time will be the same during playback as those during the original execution? The reason I am asking is suppose one thread is calculation intensive so during original execution, it accessed the pendingWork map at time T2.3; however, during playback, suppose the system is busier thus took much longer to reach the same point of execution (say after T4); how does Temporal ensure the content of pendingWork map does not include entries from T3 and T4 during playback? Should accessing the content of pendingWork map be wrapped inside side effect call or via @QueryMethod?

Btw, I know I should offload the calculation intensive tasks to activities so the example above does not follow best practice, but I would really appreciate it if you could explain/confirm how Temporal ensures the playback sequence is the same as the original execution.

Temporal Java SDK uses cooperative multi-threading. So, only a single thread executes at the time, and the thread has to explicitly yield (by calling one of the Temporal APIs). So, the sequencing of callbacks and method call executions is not affected by how long each part of the code executes.

2 Likes

Thank you again, much appreciated!