Getting started with workflow concepts (parallel activities tracking)

Hi there, I’m currently evaluating temporal to be used in our org. I’m super excited that I found this, seems to be a great fit for us.
I’m still starting reading into the docs, and I must confess I have not yet deployed a sample app.

But looking into the concepts I’m trying to map it to our needs, and one of the main use cases we have is the capability to chunk requests into thousands (so far our max is 7k) parallel functions that once completed they need to be assembled (something like a map/reduce execution model).

So I have a few questions:

  1. From the workflow/child docs, it seems that there’s a max number of activities a workflow can track. What is that number? 1000?

  2. Each function is executed remotely via message passing into a kafka cluster, the nodes on the other end may take from 2-60 minutes to send a reply, looking at the examples is Async.function the way to go here? I’m assuming I could have a loop 1…N chunks that creates the Promise for each function and then invoke allOf to get the results of each execution. In this scenario, how do I control individual errors/retries? What if one of the chunks fail and the workflow gets stuck on a never ready state?

Besides those basic questions, one thing I’m still not clear on, is the design of the workflow. I’m used to workflows such as activity for instance, to be composed of several smaller steps. It looks to me (and I’m assuming that I can just not getting it yet) that a @WorkflowMethod is a single point of entry/end of a workflow. But if I have dozens of activities within this workflow, it was not clear to me how to orchestrate them. For instance how the uploadActivity once done can continue into the split activity that once done will continue to the mergeActitivy and finally the notificationActivity? Could someone care to give an pseudo example?

Cheers

1 Like

From the workflow/child docs, it seems that there’s a max number of activities a workflow can track. What is that number? 1000?

A single workflow does have limited size (50K events in its event history). Due to this limitation there is
some max number of activities that a single workflow can execute. Note that child workflows can be used to partition this problem into smaller executions (each child workflow has its own wf history). So for example if you have a workflow that executes a 1000 child workflows, and each child workflow executes a 1000 activities would give you 1 million activities executed, without reaching the history limit.

Each function is executed remotely via message passing into a kafka cluster, the nodes on the other end may take from 2-60 minutes to send a reply, looking at the examples is Async.function the way to go here? I’m assuming I could have a loop 1…N chunks that creates the Promise for each function and then invoke allOf to get the results of each execution. In this scenario, how do I control individual errors/retries? What if one of the chunks fail and the workflow gets stuck on a never ready state?

You can catch ActivityFailure directly in your Async.procedure/function block, for example:

Promise<String> myPromise =
          Async.function(
              () -> {
                try {
                  return activities.doSomething();
                } catch (ActivityFailure e) {
                 // handle exception...

                 return "someDefault";
                }
              });

Another thing you can do is check if the promise has a failure before you call .get() on it, for example:

for (Promise<ActResult> promise : actionsPromises) {
   if (promise.getFailure() != null) {
      result1.getFailure();
      ...
    }
    ...
    promise.get();
}

where getFailure, in case of activity failure, should be of type ActivityFailure that includes the original failure as its cause.

For retries, you can register different activity options for each activity, for example:

You can use WorkflowImplementationOptions->setActivityOptions, for example:

Map<String, ActivityOptions> perTypeActivityOptions = new HashMap<>();
perTypeActivityOptions.put("typeA", ActivityOptions.newBuilder().build()); // set custom options with custom RetryOptions
perTypeActivityOptions.put("typeB", ActivityOptions.newBuilder().build()); // set custom options with custom RetryOptions
perTypeActivityOptions.put("typeC", ActivityOptions.newBuilder().build()); // set custom options with custom RetryOptions

WorkflowImplementationOptions workflowImplementationOptions =
            WorkflowImplementationOptions.newBuilder()
                    .setActivityOptions(perTypeActivityOptions)
                    .build();

...
worker.registerWorkflowImplementationTypes(workflowImplementationOptions, MyWorkflowImpl.class);

where “typeA, typeB, etc” are the names of the activity types (your activity method names in your activity interface)

For instance how the uploadActivity once done can continue into the split activity that once done will continue to the mergeActitivy and finally the notificationActivity? Could someone care to give an pseudo example?

I assume you are asking about async function invocation here via Async.function for example. Async.function returns a Promise on which you can call thenApply and thenApplyAsync. Otherwise you can wait on the promise to complete (by calling .get) and then invoke the next activity.

Thank you @tihomir that was a very insightful response. I’m starting to grasp some concepts, and will take the next few weeks for a real POC and see how things goes. I’ll come back with more questions should they arise.

Cheers