Java SDK workflow concurrency model

Hi Temporal team!

We have recently deployed a new workflow (using Java SDK 1.19.1) and some of the instances seem to fail with InternalWorkflowTaskException saying Failure handling event 1434 of type 'EVENT_TYPE_WORKFLOW_TASK_STARTED' during execution.. The root cause seems to the a ConcurrentModificationException that is thrown by a LinkedHashMap workflow field.

java.lang.RuntimeException: Failure processing workflow task. WorkflowId=<workflowId>, RunId=<runId>, Attempt=6
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:398)
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:259)
	at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:110)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: io.temporal.internal.statemachines.InternalWorkflowTaskException: Failure handling event 1434 of type 'EVENT_TYPE_WORKFLOW_TASK_STARTED' during execution. {WorkflowTaskStartedEventId=1434, CurrentStartedEventId=1434}
	at io.temporal.internal.statemachines.WorkflowStateMachines.createEventProcessingException(WorkflowStateMachines.java:257)
	at io.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:236)
	at io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:208)
	at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.applyServerHistory(ReplayWorkflowRunTaskHandler.java:224)
	at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:208)
	at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:156)
	at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithQuery(ReplayWorkflowTaskHandler.java:131)
	at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:96)
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handleTask(WorkflowWorker.java:407)
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:317)
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:259)
	at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:105)
	... 3 common frames omitted
Caused by: java.lang.RuntimeException: WorkflowTask: failure executing SCHEDULED->WORKFLOW_TASK_STARTED, transition history is [CREATED->WORKFLOW_TASK_SCHEDULED]
	at io.temporal.internal.statemachines.StateMachine.executeTransition(StateMachine.java:152)
	at io.temporal.internal.statemachines.StateMachine.handleHistoryEvent(StateMachine.java:102)
	at io.temporal.internal.statemachines.EntityStateMachineBase.handleEvent(EntityStateMachineBase.java:68)
	at io.temporal.internal.statemachines.WorkflowStateMachines.handleSingleEvent(WorkflowStateMachines.java:277)
	at io.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:234)
	... 13 common frames omitted
Caused by: java.util.ConcurrentModificationException: null
	at java.base/java.util.LinkedHashMap$LinkedHashIterator.nextNode(Unknown Source)
	at java.base/java.util.LinkedHashMap$LinkedValueIterator.next(Unknown Source)
	at com.acme.workflow.CartWorkflowImpl.signalProcessed(CartWorkflowImpl.kt:382)
	at com.acme.workflow.CartWorkflowImpl.itemRemoved(CartWorkflowImpl.kt:347)
	at jdk.internal.reflect.GeneratedMethodAccessor55.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at io.temporal.internal.sync.WorkflowInternal.lambda$registerListener$155fbe99$1(WorkflowInternal.java:189)
	at io.temporal.internal.sync.SignalDispatcher.handleInterceptedSignal(SignalDispatcher.java:75)
	at io.temporal.internal.sync.SyncWorkflowContext.handleInterceptedSignal(SyncWorkflowContext.java:306)
	at io.temporal.internal.sync.BaseRootWorkflowInboundCallsInterceptor.handleSignal(BaseRootWorkflowInboundCallsInterceptor.java:52)
	at io.temporal.internal.sync.SignalDispatcher.handleSignal(SignalDispatcher.java:102)
	at io.temporal.internal.sync.SyncWorkflowContext.handleSignal(SyncWorkflowContext.java:310)
	at io.temporal.internal.sync.WorkflowExecutionHandler.handleSignal(WorkflowExecutionHandler.java:92)
	at io.temporal.internal.sync.SyncWorkflow.lambda$handleSignal$2(SyncWorkflow.java:139)
	at io.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:102)
	at io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper.run(WorkflowThreadImpl.java:106)
	at io.temporal.worker.ActiveThreadReportingExecutor.lambda$submit$0(ActiveThreadReportingExecutor.java:53)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
	... 3 common frames omitted

Based on my observation all failures seem to happen when the workflow gets several signals at the same time. The first failure is usually a WorkflowTaskFailed: UnhandledCommand, and afterwards its the exception I’ve described above.

Am I doing something wrong, should workflow state in Java SDK be protected from concurrent modifications?

P.S. We’re running self-hosted Temporal 1.20.1.

The workflow state is protected from simultaneous state modification by multiple threads. But the ConcurrentModificationException is not related to multi-threading. All it says is that a linked list currently being iterated is modified. So if one workflow thread iterates over a list and blocks on some Temporal API (like synchronous activity invocation), then the thread is released, and another thread can handle a signal that might modify the list, which causes the exception.

Here is a ChatGPT generated example that causes ConcurrentModificationException with a single thread:

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class ConcurrentModificationExample {
    public static void main(String[] args) {
        List<String> list = new ArrayList<>();
        list.add("one");
        list.add("two");
        list.add("three");

        Iterator<String> iterator = list.iterator();
        while (iterator.hasNext()) {
            String element = iterator.next();
            if (element.equals("two")) {
                list.remove(element);
            }
        }
    }
}

Thank you, @maxim! Yeah, that seems to be the case. The code that is throwing ConcurrentModificationException does

pendingNotifications.values.forEach { it.cancel() }

where cancel is defined as

fun cancel() {
    scope.cancel()
    promises.forEach { runCatching { it.get() } }
}

there shouldn’t be any “real” waiting there (we cancel some CancellationScopes and then consume Promises from those scopes, just to avoid the “Promise completed with exception and was never accessed” warning), but I guess, as it’s considered a blocking call by the SDK, a concurrent signal could kick in in the middle, right?

it.get() is a blocking call.

Thank you! I’ll fix my code to iterate over a local copy of notifications collections

P.S. I wish there was a way to suppress the “Promise completed with exception and was never accessed” warning only for CanceledFailures so I can completely get rid of Promise.get() calls.

You can call Promise.exceptionally or Promise.handle to suppress the exception.

1 Like

Thank you!