How to execute activities in batch fashion in Temporal

As mentioned in this thread, I wanted to run 10 activities, but 4 activities asynchronously at time T.

As you responded there I’ll be starting with 4 activities and will be creating more if any activity gets completed.
How will I achieve it via java code? Do I need to use Promise.anyOf to know if any activity gets completed. If so, if any one of the activities gets completed my workflow code gets completed.

If I try to add a newly created promise object in the promiseList then it is throwing concurrentModificationException

I tried the below code,

int limit = 4, size = 10;

for(int i=0; i<limit; i++){
    promiseList.add(Async.function(this::executeBindTask, task, taskIOIterator.next()));
}

    Promise.anyOf(promiseList).get();

List<Result> resultList = new ArrayList<>();
    for (Promise<Result> promise : promiseList) {
    if(promise.isCompleted()){
        resultList.add(promise.get());
        if(limit < size) {
            limit++;
            Async.function(this::executeBindTask, task, taskIOIterator.next()); // calling activity if any activity completes
        }
    }
}

Kindly help me to achieve it via java code. Thanks

Make sure that your main workflow method doesn’t return before all the computation is done. The simplest way to block the thread is to use Workflow.await on some variable like count of unprocessed items.

I doubt your code is going to work as I believe you cannot add to the list while iterating through it.

Yes, I did not update my list while iterating because it s throwing concurrentModification exception

if(limit < size) {
            limit++;
            Async.function(this::executeBindTask, task, taskIOIterator.next()); // calling activity if any activity completes
        }

In this code, I am not storing the promise object to some variable, but I have to store it into my promise list so that it will be included in Promise.anyOf.

like below,

 if(limit < size) {
            limit++;
            promiseList.add ( Async.function(this::executeBindTask, task, taskIOIterator.next()) ); // calling activity if any activity completes
        }

If I did not add my promise object in the list then how can put .get on the object?

Actually my use case is simple,

I wanted to execute 20 activities, but 4 tasks at a time. After executing 20 activities I have to collect all its results. That’s it.

I think you can simplify your code by using Promise.thenApply or Promise.handle callbacks.

Okay, let me try out

Thanks @maxim it is working, but not for few cases as my workflow code is not deterministic.

Q1). In the java doc, it says that (Workflow.await(condition)) condition is called on every state transition, what does state transition mean?

As you said I have a variable called count which is reduced for every promise callbacks promise.thenApply

I have the below await call to block my code. count initialized with listSize;
Workflow.await(()-> count == 0);

If the worker has restarted in the middle of the execution then my count is re-initialized with listSize and it never reaches zero though all my activities are completed.

Q2). What change should I do to make my code to be deterministic?

Note: If the worker has not restarted then it is executing as expected

@maxim Workflow.await will block the workflow thread similar to Promise.allof(promiseList).

Am I correct?

Workflow.await blocks the current thread until the unblocking condition you specify is evaluated to true.

Okay, Promise.allOf also does the same thing right? it will block the current thread until all of its promise objects return the results.

You block/wait on the .get() I believe.

Both aren’t the same :anguished:

Promise.allOf returns a promise that you want to block on by calling its method get.

Promise.allOf(list).get();

My argument is Workflow.await(cond) and Promise.allOf(list).get();
Q1). Both block the current workflow thread, right?

if not kindly let me know the differences?

Q2).

Please give you insight regarding this

if not kindly let me know the differences?

await blocks until the condition is satisfied. Promise.allOf(list).get blocks until all the promises in the list are ready.

Please give you insight regarding this

The count should not be reset to 0 on worker restart. Could you post the code that defines and uses the counter?

Assume below value,
batchTask.getIoCollection().size() = 20. (size = 20)
windowSize = 4;

I will be adding my results into resultList, once resultList reaches the size, await will be released.
initially, it is declared with an empty list

Before I had a variable count, on each callback I will be increasing it and using the same in await also. Since I need the resultList, I removed the counter and used my resultList for checking.

private Result executeBatchTask(BatchTask batchTask, TaskIO taskIO) {

        Task task = batchTask.getTask();
        AtomicInteger index = new AtomicInteger(0);
        Integer size = batchTask.getIoCollection().size();
        List<Result> resultList = Collections.synchronizedList(new ArrayList<Result>());

        Iterator<TaskIO> taskIOIterator = batchTask.getIoCollection().iterator();

        for(; index.get()<batchTask.getWindowSize(); index.incrementAndGet()){
            this.runBindTaskTask(task, taskIOIterator, index, size, resultList);
        }

        Workflow.await(()-> resultList.size() == size);

        return batchTask.handleResults(taskIO, resultList);
    }

    private void runBindTaskTask(Task task, Iterator<TaskIO> taskIOIterator, AtomicInteger index, Integer size, List<Result> resultList) {

        Promise<Result> promise = Async.function(this::runBindTaskTask, task, taskIOIterator.next());

        promise.thenApply(result->{
            resultList.add(result);
            if(index.get() != size){
                index.incrementAndGet();
                this.runBindTaskTask(task, taskIOIterator, index, size, resultList);
            }
            return result;
        });
    }

    private Result runBindTaskTask(Task task, TaskIO taskIO){
       
/* activity call */
        taskIO  =   runnableTask.run((FunctionalTask) task, taskIO); 
        return Result.getSuccessResult(taskIO);
    }

@maxim Can you please give your suggestion regarding this

According to documentation using Java synchronization is not allowed in the workflow code. Don’t use synchronizedList inside the workflow code as it breaks this rule.

I think you might have but around index as you check it for boundary condition first and then increment in the thenApply method.

In general the code should work after all the bugs are fixed.

Okay, let me remove synchronizedList . Can I use AtomicInteger ?

Sorry , I did not understand what you are saying

I might be wrong, but it looks like that you first call an activity with index == size and then check for it.