How to execute activities in batch fashion in Temporal

You can. But to make it more clear I would make it an int and make an field of the workflow object.

Sorry for digging this thread after long back,

I just red this line in the doc,

There is no need in explicit synchronization because multi-threaded code inside a Workflow is executed one thread at a time and under a global lock.

So I can avoid using AtomicIntger and synchronizedList in the workflow code.

May I know the reason to make an index field to field of the workflow object?

You should not use explicit synchronization in your Java Workflow code.

You can use non-static fields in your workflow definition without having to worry about isolation issues.

For static fields use io.temporal.workflow.WorkflowLocal or io.temporal.workflow.WorkflowThreadLocal depending on your use case You can use Atomic variables, but not really needed.
Don’t use synchronized lists, as that will break workflow determinism

May I know the reason to make an index field to field of the workflow object?

Can you show example? Not sure I fully understand.

Thanks for the response,

Please find the below code

My use case is to execute my tasks in a batch fashion. Though I have 20 tasks to be executed, if my batch window size is 4 then I have to execute 4 tasks at a time,

Initially, I am executing tasks based on the batch window size, while receiving a response from each task I will be incrementing my index and executing the next task.

public final class SampleWorkflowImpl implements SampleWorkflow{

@Override
public void executeWorkflow(Pilot pilot, PilotIO pilotIO)  {      
    executeTaskList(pilot.getTaskList(), pilotIO);
}


private TaskIO executeTaskList(List<Task> taskList, PilotIO pilotIO) {

    TaskIO taskIO = new TaskIO();

    for (Task task : taskList) {

      if(task instanceof BatchTask){
        executeBatchTask((BatchTask)task, taskIO, pilotIO);
        }
    }
    return taskIO;
}

private boolean executeBatchTask(BatchTask batchTask, TaskIO taskIO, PilotIO pilotIO) {

    Task task                       =   batchTask.getTask();
    List<TaskIO> taskIOList         =   IOHelper.getBatchInputs(batchTask, taskIO);
    List<TaskIO> resultTaskIOList   =   Collections.synchronizedList(new ArrayList<>());
    AtomicInteger index             =   new AtomicInteger(0);
    int windowSize                  =   batchTask.getWindowSize();
    Integer size                    =   taskIOList.size();
    int endIndex                    =   Math.min(windowSize, size);
    Iterator<TaskIO> taskIOIterator =   taskIOList.iterator();

    for(; index.get()<endIndex; index.incrementAndGet()){
        this.runBindTaskTask(task, taskIOIterator, index, size, resultTaskIOList, pilotIO);
    }

    Workflow.await(()-> index.get() == size);

    IOHelper.processBatchResults(taskIO, taskIOList);
    IOHelper.resultProcessing(batchTask, taskIO, pilotIO);

    return IOHelper.checkContinuation(batchTask, taskIO, taskIOList);

}

private void runBindTaskTask(Task task, Iterator<TaskIO> taskIOIterator, AtomicInteger index,
                             Integer size, List<TaskIO> resultList, PilotIO pilotIO) {

    Promise<TaskIO> promise = Async.function(this::runBindTaskTask, task, taskIOIterator.next(), pilotIO);

    promise.thenApply(result -> {

        resultList.add(result);
        if(index.get() != size){
            index.incrementAndGet();
            this.runBindTaskTask(task, taskIOIterator, index, size, resultList, pilotIO);
        }
        return result;
    });
}

private TaskIO runBindTaskTask(Task task, TaskIO taskIO, PilotIO pilotIO){
    IOHelper.inputProcessing(task, taskIO, pilotIO);
    return executeFunctionalTask((FunctionalTask) task, taskIO, pilotIO);
}


private TaskIO executeFunctionalTask(FunctionalTask task, TaskIO taskIO, PilotIO pilotIO){

    RetryOptions retryOptions = RetryOptions.newBuilder().build();

    if (task instanceof ErrorHandleableTask) {
        retryOptions.merge(((ErrorHandleableTask) task).getRetryOptions());
    }


    RunnableTask runnableTask = Workflow.newActivityStub(RunnableTask.class,
            ActivityOptions.newBuilder()
                    .setTaskQueue(getTaskQueueName(task))
                    .setStartToCloseTimeout(Duration.ofMinutes(1)).build());

    taskIO = runnableTask.run(task, taskIO);

    IOHelper.resultProcessing(task, taskIO, pilotIO);

    return taskIO;
}

}

I think that the use of AtomicInteger in your case is not necessary (can use Integer) as all the methods are part of the workflow definition and are not static.
Best thing is to do integration tests tho as we might not see all of your app code here.

1 Like