Parallel streams in temporal activity

How to use concurrent for loop inside a temporal activity.
Any alternative for
list.parallelStream().forEach()
which can be used to achieve parallelism inside a for loop.

Hello @Aadeesh_Jain

Activity code does not have to be deterministic, it is safe to run list.parallelStream().forEach()

Antonio

Hi @antonio.perez
I am getting this error while using parallelStream inside an activity

java.lang.Error: Called from non workflow or workflow callback thread 
io.temporal.internal.sync.DeterministicRunnerImpl.currentThreadInternal(DeterministicRunnerImpl.java:130)
io.temporal.internal.sync.WorkflowInternal.getRootWorkflowContext(WorkflowInternal.java:404)
io.temporal.internal.sync.WorkflowInternal.getWorkflowOutboundInterceptor(WorkflowInternal.java:400)
io.temporal.internal.sync.WorkflowInternal.getVersion(WorkflowInternal.java:428)
io.temporal.workflow.Workflow.getVersion(Workflow.java:941)

Looks like you are using Workflow.getVersion inside the activity?

Can you share the activity code?

Thanks,
Antonio

yes @antonio.perez , i am using versioning there.
I want to change logic inside activity and do not want my existing running WFs to be affected.
Also, I have another doubt, do i need versioning in case where i am just changing activity implementation and not changing any activity signature in a WF.
Activity code looks like →

void helloActivity(){
int version = Workflow.getVersion("unzipAndConvert", Workflow.DEFAULT_VERSION, 1);
if (version == Workflow.DEFAULT_VERSION) {
  handling1
}else{
 handling2
list.parallelStream().forEach(){
-- do something
}
}
}

Hi @Aadeesh_Jain

Once your activity is completed, the result is written in the event history, and it won’t be executed again during replay.

So no, you don’t have to version the activity code. It is safe to change it. After redeploying your worker with the new changes new workflow executions will execute the new code in your activity, and as I have mentioned before, workflow executions that have already executed the activity won’t execute it again.

Let us know if you have questions.
Antonio

The general rule is: Use the Workflow class only inside the workflow code. Use the Activity class only inside the activity code.

alright!
thanks @antonio.perez @maxim for the clarification