Parallel activity execution java-sdk

Hi, currently I am developing a workflow where my use case is like below.
I have say 5M records in the database to be updated and I want to parallelise this update by say running 5 updates concurrently. However I want to create 10 parallel activities via Promise but concurrently I want to run only say 5 activities. As soon as 1 finishes 6th one from the remaining 10 parallel activity is picked up and so on.
MaxConcurrentActivityExecutionSize is at the worker level and not workflow level right?
Also below await way of handling it is also incorrect right?
Workflow.await(() → activities.stream().filter(p → !p.isCompleted()).count() < MAX_RUNNING) - this is I believe where workflow execution will pause if #activities running crosses MAX_RUNNING

I want to know the best way to handle this in java

1 Like

In my opinion you can start 5 activities with heartbeat and pass a range of records into each activity which they must to update. Then just wait all these activities finish to work and you are done.

No what I need is to control the concurrency and number of activities
So I want to say create 10 activities from the workflow but I want to limit concurrent execution to say 3.
That way in future I may increase concurrent execution to 5 or reduce to say 2 as required.
i.e. my workflow needs to execute 10 activities, but only want 5 activities executing concurrently. I want to start 5 activities and start more when one of the executing activities is completed.

As I see activity is not well choice to create something that will waits next task.

What if to create a few child workflows each of those will wait next task or cancelation? In child workflow every time when previously signaled task ends it will signal parent that it ready to get new task to process.

Parent workflow will control the number of such child workflows and distribute tasks between them. Also the parent workflow will listen signals to size the pool of child workflows.

ok so you are saying we can’t use parallel activities in this usecase. My usecase is simple I want to parallelize DB operation which as per documentation is Promise.allOf(list of activities to be run in parallel) The extra thing I am looking into is to limit those concurrent running activities in parallel.

I don’t want to Set WorkerOptions.MaxConcurrentActivityExecutionSize since that will limit number of activities executing at the same time per worker. So are you saying if you have a workflow that needs to execute 100 activities, but only want 10 activities executing concurrently. Is that not possible via activity but has to be child workflow?

Hi @Monikongkona

You can have different taskqueues according to your needs and set MaxConcurrentActivityExecutionSize only to one of them.

You could run the activity you want to limit the number of concurrent Activity Executions in one task queue (taskqueueA) and have another task queue for the rest of the activities. Then set up a worker with MaxConcurrentActivityExecutionSize = x listening on taskqueueA, this will limit the number of concurrent Activity Executions only for the activities running on that taskqueue.

Hi @krocos /@antonio.perez

Going through promise samples in github for parallel activity execution, just have a basic confusion.
Say as in the example, I add via async function call all the composeGreeting() methods in the activity as below
if (names != null) {
for (String name : names) {
promiseList.add(Async.function(activities::composeGreeting, “Hello”, name));
}

Now does both the below actions mean the same or are they different??
Workflow.await(() → promiseList.stream().filter(p → !p.isCompleted()).count() < 2);
Promise.allOf(promiseList).get();

For this particular example technically they are the same
(think maybe should be Workflow.await(() → promiseList.stream().filter(p → p.isCompleted()).count() == 1); instead but not the point).

Reason being is that Workflow.await unblock condition is going to be re-evaluated each time one of your async activities complete or fail after retries or time out.
From best practice perspective imo its better to use use Promise.allOf cause it’s easier to define and more intuitive, as well as if you had multiple activities promises, failure of single one would be delivered to workflow code immediately (after retries), for example:

try {
  Promise.allOf(promiseList).get();
} catch (ActivityFailure e) {
  // ...
}

Where as with Workflow.await you would not be able to catch and handle any failures

ok thanks @tihomir
Actually the reason behind asking that question what I want to achieve is parallelism but I also want to control number of parallel activities running at any point of time.
So I want to create 20 parallel activities however at 1 time only 5 should run concurrently . Only after any 1 of the 5 completes 6th one should start

A single worker can support a large number of parallel activity executions (with Java SDK its 200 by default per WorkerOptions config).
You can use WorkerOptionssetMaxConcurrentActivityExecutionSize to limit number of parallel activities.

So in your case for example:

Worker worker = factory.newWorker(TASK_QUEUE,
            WorkerOptions.newBuilder()
                    .setMaxConcurrentActivityExecutionSize(5)
                    // ...
                    .build());

Note that this is a per-worker configuration.

If you want to limit the number of activities that run in parallel from the the same workflow then you have to implement the logic yourself by calling activity 5 times and starting a new activity as soon as any of them complete. You can use Promise.anyOf to wait for a single activity completion.

ok thanks @maxim for the reply.

Will something like this work? Basically from my WorkflowImpl class I would call the Activity in a loop.
Here windowSize is the maxConcThreads and size is maxParallelThreads

    int windowSize = 5;
    int size = 20;
    int endIndex = Math.min(windowSize, size);
    AtomicInteger index = new AtomicInteger(0);

    for(; index.get()<endIndex; index.incrementAndGet()) {
        this.runTask(index, size, results);
    }

    Workflow.await(()-> index.get() == size);

My activity is AccountingActivity and it returns AccountingResult. runTask() will be called 1 by 1 from above.

private List<AccountingResult> runTask(AtomicInteger index,
            Integer size, List<AccountingResult> results) {
        Promise<AccountingResult> promise = Async.function(accountingActivity::processTransferAccounting);

        promise.thenApply(result -> {
            results.add(result);
            if(index.get() != size){
                index.incrementAndGet();
                this.runTask(index, size, results);
            }
            return result;
        });

        return results;
    }

Yes, the idea looks fine.