How to use concurrent for loop inside a temporal activity.
Any alternative for
list.parallelStream().forEach()
which can be used to achieve parallelism inside a for loop.
Hello @Aadeesh_Jain
Activity code does not have to be deterministic, it is safe to run list.parallelStream().forEach()
Antonio
Hi @antonio.perez
I am getting this error while using parallelStream inside an activity
java.lang.Error: Called from non workflow or workflow callback thread
io.temporal.internal.sync.DeterministicRunnerImpl.currentThreadInternal(DeterministicRunnerImpl.java:130)
io.temporal.internal.sync.WorkflowInternal.getRootWorkflowContext(WorkflowInternal.java:404)
io.temporal.internal.sync.WorkflowInternal.getWorkflowOutboundInterceptor(WorkflowInternal.java:400)
io.temporal.internal.sync.WorkflowInternal.getVersion(WorkflowInternal.java:428)
io.temporal.workflow.Workflow.getVersion(Workflow.java:941)
Looks like you are using Workflow.getVersion inside the activity?
Can you share the activity code?
Thanks,
Antonio
yes @antonio.perez , i am using versioning there.
I want to change logic inside activity and do not want my existing running WFs to be affected.
Also, I have another doubt, do i need versioning in case where i am just changing activity implementation and not changing any activity signature in a WF.
Activity code looks like →
void helloActivity(){
int version = Workflow.getVersion("unzipAndConvert", Workflow.DEFAULT_VERSION, 1);
if (version == Workflow.DEFAULT_VERSION) {
handling1
}else{
handling2
list.parallelStream().forEach(){
-- do something
}
}
}
Once your activity is completed, the result is written in the event history, and it won’t be executed again during replay.
So no, you don’t have to version the activity code. It is safe to change it. After redeploying your worker with the new changes new workflow executions will execute the new code in your activity, and as I have mentioned before, workflow executions that have already executed the activity won’t execute it again.
Let us know if you have questions.
Antonio
The general rule is: Use the Workflow class only inside the workflow code. Use the Activity class only inside the activity code.
alright!
thanks @antonio.perez @maxim for the clarification