Hello, I am doing a POC on Temporal to see if it is something we can use. Our main requirement is that we need to define a base workflow, and then as when we require add Activities, change the sequence of steps and define timed activities.
Since Temporal follows a deterministic workflow, I was trying to use Dynamic Workflows but the documentation around Java SDK for Dynamic Workflows were lacking. Since most of the things are not supported out of the box, so I was trying to see if each workflow can mantain a list of Activities and our main function basically goes through the list and executes each task.
This is the Workflow Implementation where execute is the WorkflowMethod and addActivity is the signal method. Whenever addActivity is called we add another to the Activity to this list.
@WorkflowImpl(taskQueues = AppConstants.TaskQueues.TEST)
public class TestWorkflowImpl implements TestWorkflow {
private List<BaseActivity> activityList = new ArrayList<BaseActivity>();
private final Logger logger = Workflow.getLogger(TestWorkflowImpl.class);
private String name;
@Override
public Object execute() {
activityList.add(Workflow.newActivityStub(BaseTestActivityOne.class,
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build()));
activityList.add(Workflow.newActivityStub(BaseTestActivityThree.class,
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build()));
while(true){
Workflow.await(() -> !activityList.isEmpty());
// Define activity options and get ActivityStub
for (BaseActivity activity : activityList) {
activity.runMethod("Running Tasks");
}
}
}
@Override
public void addActivity() {
BaseActivity activityStub = Workflow.newActivityStub(BaseTestActivityTwo.class,
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build());
activityList.add(activityStub);
}
But I get a failure whenever I try to call addActivity. This is the error
{
"message": "Failure handling event 29 of type 'EVENT_TYPE_WORKFLOW_TASK_STARTED' during execution. {WorkflowTaskStartedEventId=29, CurrentStartedEventId=29}",
"source": "JavaSDK",
"stackTrace": "io.temporal.internal.statemachines.WorkflowStateMachines.createEventProcessingException(WorkflowStateMachines.java:314)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:275)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:221)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.applyServerHistory(ReplayWorkflowRunTaskHandler.java:234)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:218)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:158)\nio.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithQuery(ReplayWorkflowTaskHandler.java:132)\nio.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:97)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handleTask(WorkflowWorker.java:415)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:319)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:261)\nio.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:105)\njava.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\njava.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\njava.base/java.lang.Thread.run(Thread.java:833)\n",
"cause": {
"message": "WorkflowTask: failure executing SCHEDULED->WORKFLOW_TASK_STARTED, transition history is [CREATED->WORKFLOW_TASK_SCHEDULED]",
"source": "JavaSDK",
"stackTrace": "io.temporal.internal.statemachines.StateMachine.executeTransition(StateMachine.java:163)\nio.temporal.internal.statemachines.StateMachine.handleHistoryEvent(StateMachine.java:103)\nio.temporal.internal.statemachines.EntityStateMachineBase.handleEvent(EntityStateMachineBase.java:84)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleSingleEvent(WorkflowStateMachines.java:360)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:273)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:221)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.applyServerHistory(ReplayWorkflowRunTaskHandler.java:234)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:218)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:158)\nio.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithQuery(ReplayWorkflowTaskHandler.java:132)\nio.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:97)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handleTask(WorkflowWorker.java:415)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:319)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:261)\nio.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:105)\njava.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\njava.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\njava.base/java.lang.Thread.run(Thread.java:833)\n",
"cause": {
"source": "JavaSDK",
"stackTrace": "java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1013)\njava.base/java.util.ArrayList$Itr.next(ArrayList.java:967)\ntest.kd.st6.workflows.DynamicTestWorkflowImpl.execute(DynamicTestWorkflowImpl.java:45)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\njava.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.base/java.lang.reflect.Method.invoke(Method.java:568)\nio.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation$RootWorkflowInboundCallsInterceptor.execute(POJOWorkflowImplementationFactory.java:339)\nio.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation.execute(POJOWorkflowImplementationFactory.java:314)\n",
"applicationFailureInfo": {
"type": "java.util.ConcurrentModificationException"
}
},
"applicationFailureInfo": {
"type": "java.lang.RuntimeException"
}
},
"applicationFailureInfo": {
"type": "io.temporal.internal.statemachines.InternalWorkflowTaskException"
}
}
Any help would be much appreciated. Thanks!