Implementing modifiable Workflow definition

Hello, I am doing a POC on Temporal to see if it is something we can use. Our main requirement is that we need to define a base workflow, and then as when we require add Activities, change the sequence of steps and define timed activities.

Since Temporal follows a deterministic workflow, I was trying to use Dynamic Workflows but the documentation around Java SDK for Dynamic Workflows were lacking. Since most of the things are not supported out of the box, so I was trying to see if each workflow can mantain a list of Activities and our main function basically goes through the list and executes each task.

This is the Workflow Implementation where execute is the WorkflowMethod and addActivity is the signal method. Whenever addActivity is called we add another to the Activity to this list.

@WorkflowImpl(taskQueues = AppConstants.TaskQueues.TEST)
public class TestWorkflowImpl implements TestWorkflow {


    private List<BaseActivity> activityList = new ArrayList<BaseActivity>();
    private final Logger logger = Workflow.getLogger(TestWorkflowImpl.class);
    private String name;

    @Override
    public Object execute() {
        activityList.add(Workflow.newActivityStub(BaseTestActivityOne.class,
                ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build()));

        activityList.add(Workflow.newActivityStub(BaseTestActivityThree.class,
                ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build()));

        while(true){

            Workflow.await(() -> !activityList.isEmpty());

            // Define activity options and get ActivityStub
            for (BaseActivity activity : activityList) {
                activity.runMethod("Running Tasks");
               
            }
            
        }

    }

    @Override
    public void addActivity() {
        BaseActivity activityStub = Workflow.newActivityStub(BaseTestActivityTwo.class,
                ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build());
        activityList.add(activityStub);
    }

But I get a failure whenever I try to call addActivity. This is the error

{
  "message": "Failure handling event 29 of type 'EVENT_TYPE_WORKFLOW_TASK_STARTED' during execution. {WorkflowTaskStartedEventId=29, CurrentStartedEventId=29}",
  "source": "JavaSDK",
  "stackTrace": "io.temporal.internal.statemachines.WorkflowStateMachines.createEventProcessingException(WorkflowStateMachines.java:314)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:275)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:221)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.applyServerHistory(ReplayWorkflowRunTaskHandler.java:234)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:218)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:158)\nio.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithQuery(ReplayWorkflowTaskHandler.java:132)\nio.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:97)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handleTask(WorkflowWorker.java:415)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:319)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:261)\nio.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:105)\njava.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\njava.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\njava.base/java.lang.Thread.run(Thread.java:833)\n",
  "cause": {
    "message": "WorkflowTask: failure executing SCHEDULED->WORKFLOW_TASK_STARTED, transition history is [CREATED->WORKFLOW_TASK_SCHEDULED]",
    "source": "JavaSDK",
    "stackTrace": "io.temporal.internal.statemachines.StateMachine.executeTransition(StateMachine.java:163)\nio.temporal.internal.statemachines.StateMachine.handleHistoryEvent(StateMachine.java:103)\nio.temporal.internal.statemachines.EntityStateMachineBase.handleEvent(EntityStateMachineBase.java:84)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleSingleEvent(WorkflowStateMachines.java:360)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:273)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:221)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.applyServerHistory(ReplayWorkflowRunTaskHandler.java:234)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:218)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:158)\nio.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithQuery(ReplayWorkflowTaskHandler.java:132)\nio.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:97)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handleTask(WorkflowWorker.java:415)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:319)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:261)\nio.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:105)\njava.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\njava.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\njava.base/java.lang.Thread.run(Thread.java:833)\n",
    "cause": {
      "source": "JavaSDK",
      "stackTrace": "java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1013)\njava.base/java.util.ArrayList$Itr.next(ArrayList.java:967)\ntest.kd.st6.workflows.DynamicTestWorkflowImpl.execute(DynamicTestWorkflowImpl.java:45)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\njava.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.base/java.lang.reflect.Method.invoke(Method.java:568)\nio.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation$RootWorkflowInboundCallsInterceptor.execute(POJOWorkflowImplementationFactory.java:339)\nio.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation.execute(POJOWorkflowImplementationFactory.java:314)\n",
      "applicationFailureInfo": {
        "type": "java.util.ConcurrentModificationException"
      }
    },
    "applicationFailureInfo": {
      "type": "java.lang.RuntimeException"
    }
  },
  "applicationFailureInfo": {
    "type": "io.temporal.internal.statemachines.InternalWorkflowTaskException"
  }
}

Any help would be much appreciated. Thanks!

You don’t need a dynamic workflow if you just need to implement your workflow dynamically. DynamicWorkflow supports adding workflow types on the fly.

Your particular error “java.util.ConcurrentModificationException” comes from mutating the activityList while it is being iterated.

Thanks for the reply @maxim. Is there any way in Temporal Workflows where we can manipulate the sequence of activities in the workflow definition while the workflow is being run?

Yes. Use a queue, not the list.

Getting the same error with Queue. Tried using a Concurrent Collection in Java which gave me the results as expected. Thanks!

Don’t iterate the queue; consume from it to avoid this error.