With respect to Batch job | Cadence, do we have an implementation which showcases batch-job with cadence?
My use-case includes, for a given input, pull data rows thru api call, build new data points and write it to persistent store.
With respect to Batch job | Cadence, do we have an implementation which showcases batch-job with cadence?
My use-case includes, for a given input, pull data rows thru api call, build new data points and write it to persistent store.
Unfortunately we donât have a sample that demonstrates a batch-job yet.
The design heavily depends on a use case. The following data is needed to get a better answer:
Having these requirements I would just implement all the processing in an activity. This activity would:
The workflow would invoke this activity with an appropriate retry policy to ensure that it is retried if its worker dies.
- Process results locally, possibly using parallel threads
Whatâs the reason for processing locally? Is it required for the workflow to function correctly, or is this an optimization of sorts?
Whatâs the reason for processing locally? Is it required for the workflow to function correctly, or is this an optimization of sorts?
It is locally to an activity worker which makes sense for that particular use case. What are your requirements?
Thank you for the very quick reply!
Use case: The workflow will be started with 1 to 50 messages to be processed. Each message will be used as input for 1 to 3 activities (depends on the exact message content) and for signaling a workflow.
Nice to haves:
Iâll try to answer the questions you gave the previous poster, too.
I am considering splitting the N messages into separate child workflows, so if a workflow has to restart due to activity failure, only N/(num child workflows) messages will be re-processed. But, from what I saw in the docs, this does not appear to be the intended use for child workflows.
I am thinking I may be overly concerned with activity failures causing the workflow to retry. My understanding is that this type of activity failure shouldnât ever happen due to an intermittent downstream issue, meaning it would likely be caused by buggy code. In the buggy code case, the solution would be to upload a new version of the Activity code with the bug fixed, right? Assuming that this is done within the allowed retry period, the workflow wouldnât have to retry, which means I donât have to put much effort into planning around this type of failure.
- Parallel processing of the messages. However, I couldnât find information in the docs about how Temporal handles replaying events/activities if the order they happen is non-deterministic.
I think there is a confusion about deterministic requirement. The workflow code has to be deterministic, which means it has to end up in the same state given the same execution history. It doesnât imply that inputs to the workflow or activity implementations should be deterministic. Besides ensuring that workflow code follows some basic rules like using workflow time instead of system time, not relying on random or putting external API calls in activities there is no need to think about determinism while designing workflows.
. I was wondering if âProcess results locally, possibly using parallel threadsâ was related to how to handle non-deterministic ordering of activity execution. Iâve been looking into Flux for the parallel processing: Flux (reactor-core 3.4.5).
Activities can contain any code including non deterministic ones. In your case, the number of rows is so small that it wouldnât require any batching and running multiple simultaneous tasks from a single activity. So no additional frameworks are needed.
- If an activity for message N fails and the entire workflow must retry, messages 1 to N-1 wouldnât have to be re-processed.
There is no need for a workflow to retry on a single message failure. Each activity can be retried individually as long as needed. Retrying the whole workflow is an anti-pattern in Temporal. There are very few use cases when it is needed.
I am considering splitting the N messages into separate child workflows, so if a workflow has to restart due to activity failure, only N/(num child workflows) messages will be re-processed. But, from what I saw in the docs, this does not appear to be the intended use for child workflows.
There is no need to fail workflow on an activity failure. So restarting workflows on activity failures is not needed. Having that the number of items is so low I would implement the whole logic in a single workflow that invokes activities directly without child workflows.
I am thinking I may be overly concerned with activity failures causing the workflow to retry. My understanding is that this type of activity failure shouldnât ever happen due to an intermittent downstream issue, meaning it would likely be caused by buggy code. In the buggy code case, the solution would be to upload a new version of the Activity code with the bug fixed, right? Assuming that this is done within the allowed retry period, the workflow wouldnât have to retry, which means I donât have to put much effort into planning around this type of failure.
Yes, I think you think about this problem in nonoptimal way. Any unknown failure of an activity causes its retry, so you can fix bugs while it is being retried. This assumes that your retry options are not limiting activity retry duration too much.
Okay, let me check my understanding. Say I have 2 messages that are going through a pipeline of activities, weâll call A and B. The only ordering requirement is that for each message, it must be processed by activity A before Activity B. So if I have messages 1 and 2, these are both valid orders of execution:
1A, 2A, 1B, 2B
1A, 1B, 2B, 2A
it has to end up in the same state given the same execution history.
Is this requirement met in this case? If 1A and 2A are executed, 1B and 2B always remain to be executed. However, itâs not guaranteed that 1A and 2A will always be the first two activities executed.
If this is a valid Temporal use, can you explain how Temporal handles recovering workflow state in this scenario? I looked around in the docs and couldnât figure it out.
The workflow execution history records what happened in each case. Your scenarios will generate different execution histories. Note that the messages (in form of signals) are also recorded in the history, so there is no any ambiguity about their ordering during replay.
Is this requirement met in this case? If 1A and 2A are executed, 1B and 2B always remain to be executed. However, itâs not guaranteed that 1A and 2A will always be the first two activities executed.
I think the confusion is about what replay means. Replay only replays what already happened and it is based on the persistent event history. In your question, you are asking about what will happen in the future and it is not deterministic and doesnât need to be.
Watch this video that explains how recovery happens starting from 15:00.
I watched the video, and I think I am able to ask my question more accurately now. My confusion comes from not understanding the activity history deduping logic and how exactly the event history replay works. Given this pseudocode with the input of ["A", "B"]
, what could happen?
@WorkflowMethod
public void processValues(List<String> values) {
ExecutorService taskExecutor = Executors.newFixedThreadPool(5);
for (String val : values) {
taskExecutor.submit(getProcessCallable(val));
}
// ... logic to prevent workflow from closing until
// all callables are done
}
private Callable<Void> getProcessCallable(String value) {
return () -> {
activity1.process(value);
activity2.process(value);
return null;
};
}
Letâs say the first time the workflow is running, the history is
activity1(âAâ) started
activity1(âBâ) started
activity1(âAâ) completed
activity1(âBâ) completed
activity2(âAâ) started
Worker crashes
Another worker picks up the workflow, and starts the replay. From what I understand, the worker executes the code, and whenever it hits a line that would produce event history, Temporal checks if the event history matches what the current line of code would produce (deduping). If it does, Temporal walks the event history to determine what that line of code will return, and execution proceeds without running the activity. If the event history does not match what this code would produce, Temporal fails with a ânon-deterministicâ error (Versioning | Temporal documentation).
Due to âAâ and âBâ being processed at the same time in parallel threads, it seems like there is no guarantee that the code will execute in the same order when the workflow is replaying. For example, if âThread Bâ gets to the line activity1.process(value)
first, that wouldnât match the event history, and would cause an error, right? The same thing could happen with the activity2 line, where the worker goes to execute activity2.process("B")
before activity2.process("A")
. activity2("B")
is expected in the history, but itâs actually activity2("A")
.
I hope I explained myself better this time. I have a few summary questions.
You absolutely correctly identified the problem of multithreaded code not being deterministic. Temporal solves this by prohibiting the use of Java threads or any other classes like ExecutorService inside the workflow code. The only supported way to create threads inside a workflow is through Async class.
The threads created through the Async class as well as threads created by the framework to execute callbacks (like signal handlers) use cooperative multi-threading and always execute threads one by one in the exact order. This way their replay is always deterministic.