Batch-Jobs in Cadence

With respect to Batch job | Cadence, do we have an implementation which showcases batch-job with cadence?

My use-case includes, for a given input, pull data rows thru api call, build new data points and write it to persistent store.

Unfortunately we don’t have a sample that demonstrates a batch-job yet.

The design heavily depends on a use case. The following data is needed to get a better answer:

  • Is API call result paginated or streamed?
  • What is the maximum number of rows in the result?
  • What is the maximum size of the result data?
  • How much processing each row requires?
  • Does a row processing requires external API calls?
  • What is the longest time a single row processing can take?
  • Is it OK to block processing if some row cannot be processed due to downstream dependency failures?
  • How big is the output?
  • Is API call result paginated or streamed?
    paginated
  • What is the maximum number of rows in the result?
    20
  • What is the maximum size of the result data?
    around 2mb
  • How much processing each row requires?
    data manipultaion or basic ruleengine
  • Does a row processing requires external API calls?
    yes
  • What is the longest time a single row processing can take?
    around 5 seconds
  • Is it OK to block processing if some row cannot be processed due to downstream dependency failures?
    Yes
  • How big is the output?
    Out put would be less than the data requested

Having these requirements I would just implement all the processing in an activity. This activity would:

  1. Paginate through results
  2. Heartbeat the progress (page token) back to Temporal
  3. In case of failure restart execution from the last heartbeated page token
  4. Process results locally, possibly using parallel threads
  5. In case of failure keep retrying locally

The workflow would invoke this activity with an appropriate retry policy to ensure that it is retried if its worker dies.

  1. Process results locally, possibly using parallel threads

What’s the reason for processing locally? Is it required for the workflow to function correctly, or is this an optimization of sorts?

What’s the reason for processing locally? Is it required for the workflow to function correctly, or is this an optimization of sorts?

It is locally to an activity worker which makes sense for that particular use case. What are your requirements?

Thank you for the very quick reply!

Use case: The workflow will be started with 1 to 50 messages to be processed. Each message will be used as input for 1 to 3 activities (depends on the exact message content) and for signaling a workflow.

Nice to haves:

  • Parallel processing of the messages. However, I couldn’t find information in the docs about how Temporal handles replaying events/activities if the order they happen is non-deterministic. I was wondering if “Process results locally, possibly using parallel threads” was related to how to handle non-deterministic ordering of activity execution. I’ve been looking into Flux for the parallel processing: Flux (reactor-core 3.4.5).
  • If an activity for message N fails and the entire workflow must retry, messages 1 to N-1 wouldn’t have to be re-processed.

I’ll try to answer the questions you gave the previous poster, too.

  • What is the maximum number of rows in the workflow input?
    50
  • What is the maximum size of the workflow input?
    30kb
  • How much processing each row requires?
    DB read/write, 0 to 2 API calls, and signaling a workflow
  • Does a row processing requires external API calls?
    yes
  • What is the longest time a single row processing can take?
    around 6 seconds, barring downstream dependency failures.
  • Is it OK to block processing if some row cannot be processed due to downstream dependency failures?
    Yes, but if it possible to avoid blocking, I’d like to explore that solution.
  • How big is the output?
    Very small, essentially just a “SUCCESS”

I am considering splitting the N messages into separate child workflows, so if a workflow has to restart due to activity failure, only N/(num child workflows) messages will be re-processed. But, from what I saw in the docs, this does not appear to be the intended use for child workflows.

I am thinking I may be overly concerned with activity failures causing the workflow to retry. My understanding is that this type of activity failure shouldn’t ever happen due to an intermittent downstream issue, meaning it would likely be caused by buggy code. In the buggy code case, the solution would be to upload a new version of the Activity code with the bug fixed, right? Assuming that this is done within the allowed retry period, the workflow wouldn’t have to retry, which means I don’t have to put much effort into planning around this type of failure.

  • Parallel processing of the messages. However, I couldn’t find information in the docs about how Temporal handles replaying events/activities if the order they happen is non-deterministic.

I think there is a confusion about deterministic requirement. The workflow code has to be deterministic, which means it has to end up in the same state given the same execution history. It doesn’t imply that inputs to the workflow or activity implementations should be deterministic. Besides ensuring that workflow code follows some basic rules like using workflow time instead of system time, not relying on random or putting external API calls in activities there is no need to think about determinism while designing workflows.

. I was wondering if “Process results locally, possibly using parallel threads” was related to how to handle non-deterministic ordering of activity execution. I’ve been looking into Flux for the parallel processing: Flux (reactor-core 3.4.5).

Activities can contain any code including non deterministic ones. In your case, the number of rows is so small that it wouldn’t require any batching and running multiple simultaneous tasks from a single activity. So no additional frameworks are needed.

  • If an activity for message N fails and the entire workflow must retry, messages 1 to N-1 wouldn’t have to be re-processed.

There is no need for a workflow to retry on a single message failure. Each activity can be retried individually as long as needed. Retrying the whole workflow is an anti-pattern in Temporal. There are very few use cases when it is needed.

I am considering splitting the N messages into separate child workflows, so if a workflow has to restart due to activity failure, only N/(num child workflows) messages will be re-processed. But, from what I saw in the docs, this does not appear to be the intended use for child workflows.

There is no need to fail workflow on an activity failure. So restarting workflows on activity failures is not needed. Having that the number of items is so low I would implement the whole logic in a single workflow that invokes activities directly without child workflows.

I am thinking I may be overly concerned with activity failures causing the workflow to retry. My understanding is that this type of activity failure shouldn’t ever happen due to an intermittent downstream issue, meaning it would likely be caused by buggy code. In the buggy code case, the solution would be to upload a new version of the Activity code with the bug fixed, right? Assuming that this is done within the allowed retry period, the workflow wouldn’t have to retry, which means I don’t have to put much effort into planning around this type of failure.

Yes, I think you think about this problem in nonoptimal way. Any unknown failure of an activity causes its retry, so you can fix bugs while it is being retried. This assumes that your retry options are not limiting activity retry duration too much.

Okay, let me check my understanding. Say I have 2 messages that are going through a pipeline of activities, we’ll call A and B. The only ordering requirement is that for each message, it must be processed by activity A before Activity B. So if I have messages 1 and 2, these are both valid orders of execution:
1A, 2A, 1B, 2B
1A, 1B, 2B, 2A

it has to end up in the same state given the same execution history.

Is this requirement met in this case? If 1A and 2A are executed, 1B and 2B always remain to be executed. However, it’s not guaranteed that 1A and 2A will always be the first two activities executed.

If this is a valid Temporal use, can you explain how Temporal handles recovering workflow state in this scenario? I looked around in the docs and couldn’t figure it out.

The workflow execution history records what happened in each case. Your scenarios will generate different execution histories. Note that the messages (in form of signals) are also recorded in the history, so there is no any ambiguity about their ordering during replay.

Is this requirement met in this case? If 1A and 2A are executed, 1B and 2B always remain to be executed. However, it’s not guaranteed that 1A and 2A will always be the first two activities executed.

I think the confusion is about what replay means. Replay only replays what already happened and it is based on the persistent event history. In your question, you are asking about what will happen in the future and it is not deterministic and doesn’t need to be.

Watch this video that explains how recovery happens starting from 15:00.

I watched the video, and I think I am able to ask my question more accurately now. My confusion comes from not understanding the activity history deduping logic and how exactly the event history replay works. Given this pseudocode with the input of ["A", "B"], what could happen?

@WorkflowMethod
public void processValues(List<String> values) {
    ExecutorService taskExecutor = Executors.newFixedThreadPool(5);
    for (String val : values) {
        taskExecutor.submit(getProcessCallable(val));
    }
    // ... logic to prevent workflow from closing until
    // all callables are done
}

private Callable<Void> getProcessCallable(String value) {
    return () -> {
        activity1.process(value);
        activity2.process(value);
        return null;
    };
}

Let’s say the first time the workflow is running, the history is
activity1(“A”) started
activity1(“B”) started
activity1(“A”) completed
activity1(“B”) completed
activity2(“A”) started
Worker crashes

Another worker picks up the workflow, and starts the replay. From what I understand, the worker executes the code, and whenever it hits a line that would produce event history, Temporal checks if the event history matches what the current line of code would produce (deduping). If it does, Temporal walks the event history to determine what that line of code will return, and execution proceeds without running the activity. If the event history does not match what this code would produce, Temporal fails with a ‘non-deterministic’ error (Versioning | Temporal documentation).

Due to “A” and “B” being processed at the same time in parallel threads, it seems like there is no guarantee that the code will execute in the same order when the workflow is replaying. For example, if “Thread B” gets to the line activity1.process(value) first, that wouldn’t match the event history, and would cause an error, right? The same thing could happen with the activity2 line, where the worker goes to execute activity2.process("B") before activity2.process("A"). activity2("B") is expected in the history, but it’s actually activity2("A").

I hope I explained myself better this time. I have a few summary questions.

  1. Is the pseudocode above a valid Temporal use case? If so, what prevents the example I gave from causing problems?
  2. If this is a valid use case, are there potential dangers in increasing the amount of elements in the input, or number activities in the Callables?
  3. If the pseudocode is not a valid use case, what is a correct way to accomplish this sort of parallel processing? Would I do something like this example, where activity1 runs for all of the inputs, then activity 2 runs for all of the inputs? Implementing Workflows | Temporal documentation
  4. If you’re willing to explain, how does the deduping of activity calls against the event history work? What is looked at to determine that an activity doesn’t need to run?

You absolutely correctly identified the problem of multithreaded code not being deterministic. Temporal solves this by prohibiting the use of Java threads or any other classes like ExecutorService inside the workflow code. The only supported way to create threads inside a workflow is through Async class.

The threads created through the Async class as well as threads created by the framework to execute callbacks (like signal handlers) use cooperative multi-threading and always execute threads one by one in the exact order. This way their replay is always deterministic.

2 Likes