Say I have a workflow for a product’s inventory. The workflow receives signal to add/reduce inventory. After it is changed, it need to call an activity to update an external system. We can only set the total in the external system. The activity may fail, timeout or just take random amount of time. Now if there are 2 signals come into the workflow very closely one after another, how do I make sure I do not run the activity in parallel but one by one?
Just process signals one by one. In Go it is very natural as signals are delivered through a channel. In Java, a blocking queue (created through Workflow.newQueue) can be used to buffer signals for processing.
Is there any Java example of this? I did not find any documentation.
Also curious, why isn’t this part of the framework? Is it possible to have something like @ParallelSignalMethod and @OneByOneSignalMethod?
Why the framework takes care of workflow/activity task queues, but requires user to queue up signals themselves?
I’m also having a hard time understanding how exactly a channel or a blocking queue works in the Temporal context. I probably do not even understand them in a local multi thread context. So I would like to avoid using them completely if possible.
Yes, it is conceptually possible to have something like @OneByOneSignalMethod. But the whole thing can be implemented in a few lines of code it is not really needed.
Here is an example that doesn’t even need a blocking queue:
public static class GreetingWorkflowImpl implements GreetingWorkflow {
private final Queue<String> messageQueue = new ArrayDeque<>();
private boolean exit = false;
@Override
public void getGreetings() {
List<String> receivedMessages = new ArrayList<>(10);
while (true) {
Workflow.await(() -> !messageQueue.isEmpty() || exit);
if (messageQueue.isEmpty() && exit) {
break;
}
String message = messageQueue.poll();
// process message
...
}
}
@Override
public void waitForName(String name) {
messageQueue.add("Hello " + name + "!");
}
@Override
public void exit() {
exit = true;
}
}
The whole sample is here.
Thank you very much!
So it seems to me that @SignalMethod should only change data/state of the workflow execution and the @WorkflowMethod detect the data/state change and then perform the logic or call activities. This way the signals are processed one by one in a strict order.
What if there are multiple @SignalMethod with different input type and all of them need to be processed one by one? Need to have a queue that can take different input types and maybe also somehow identify the signal, then in the @WorkflowMethod have a big switch statement in the while loop to process different signal types?
If there is @OneByOneSignalMethod then we can just write the logic for each signal separately in that method instead of all in the @WorkflowMethod. And the framework ensures that when the signal is processed, the workflow is not doing anything else.
Is there anything I’m missing or mistaken in my thoughts above?
You don’t need an annotation. What you are asking is a synchronized block that works inside the workflow. You can easily mimic this with Workflow.await and a variable:
boolean signalLock = true;
@Override
public void waitForName(String name) {
Workflow.await(() -> signalLock);
signalLock = false;
try {
// process signal
...
} finally {
signalLock = true;
}
}
Obviously the above can be easily encapsulated in a nice class.
Thank you Maxim!
Your sample works very well. I can force the workflow process one signal at a time but still have the code separated in each signal method.
I still feel this is a common use case and should be in the framework annotation but I’m just one data point and a happy data point with this solution.
Does this mean that if my code has multiple Workflow.await() methods waiting - those will be invoked one-by-one in a case of variable change?
Like in this example.
- I have Signal1 which grabs a signalLock
- 2 more other Signals waiting on Workflow.await()
- Signal1 releases signalLock.
How does Workflow.await() knows that it should continue execution and what will happen if in both those Signal codeblocks Workflow.await() will start execution in parallel at the same exact time?
Should this signalLock be kind of AtomicBoolean with getAndSet() or something like that when we are checking condition in those await() blocks?
Temporal uses cooperative multi-threading to execute workflow code deterministically. In practice, only a single thread is running at a time. Therefore, no explicit synchronization is needed when accessing shared variables. To be precise, explicit synchronization is prohibited as it can interfere with the deterministic dispatcher.