WorkflowRejectedExecutionError whats the cause and how to solve?

seeing:
io.temporal.internal.sync.WorkflowRejectedExecutionError: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@2053d445[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@71f6e85c[Wrapped task = io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper@1568b1e]] rejected from java.util.concurrent.ThreadPoolExecutor@10d53f40[Running, pool size = 600, active threads = 600, queued tasks = 0, completed tasks = 4199]
io.temporal.internal.sync.WorkflowThreadImpl.start(WorkflowThreadImpl.java:263)
io.temporal.internal.sync.SyncWorkflow.lambda$start$1(SyncWorkflow.java:120)
io.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:104)
io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper.run(WorkflowThreadImpl.java:107)
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:834)
Caused By: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@2053d445[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@71f6e85c[Wrapped task = io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper@1568b1e]] rejected from java.util.concurrent.ThreadPoolExecutor@10d53f40[Running, pool size = 600, active threads = 600, queued tasks = 0, completed tasks = 4199]
java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
io.temporal.internal.sync.WorkflowThreadImpl.start(WorkflowThreadImpl.java:246)
io.temporal.internal.sync.SyncWorkflow.lambda$start$1(SyncWorkflow.java:120)
io.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:104)
io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper.run(WorkflowThreadImpl.java:107)
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:834)

Whats the cause and whats the recommended way to fix?

While a workflow task is executed the workflow uses real Java threads from an executor. The total size of the workflow threads available for all workers created by a WorkerFactory is defined through WorkerFactoryOptions.maxWorkflowThreadCount property. The default value of maxWorkflowThreadCount is 600.

The total number of workflow tasks that can be executed in parallel by a single worker is configured through WorkerOptions.maxConcurrentWorkflowTaskExecutionSize. The default value of maxConcurrentWorkflowTaskExecutionSize is 200. I want to emphasize that it is a number of workflow tasks, not the number of workflows in the system. It is possible to have millions of open workflows and process them with small task parallelism.

Note that this configuration is per worker. So if a factory is used to create multiple workers then each worker will execute up to maxConcurrentWorkflowTaskExecutionSize number of tasks.

The WorkflowRejectedExecutionError exception is thrown when workflows that are currently executing workflow tasks need more threads that are available in the thread pool.

If you see WorkflowRejectedExecutionError intermittently it can be considered benign as a workflow task is retried and workflow continues execution without a problem. You should adjust the configuration if these errors are frequent.

Some reasons for not having enough threads:

  • The sum of maxConcurrentWorkflowTaskExecutionSize across all workers is higher than maxWorkflowThreadCount.
  • Each workflow using many threads with the
    <threads per workflow> * maxConcurrentWorkflowTaskExecutionSize > maxWorkflowThreadCount.

One of the common mistakes that leads to excessive use of threads by workflows is not using Async API correctly when invoking activities or child workflows. For example, the following code that calls an activity consumes a thread:

      GreetingActivities activities = Workflow.newActivityStub(GreetingActivities.class, options);
      Promise<String> hello = Async.function(() -> activities.composeGreeting("Hello", name));

as a lambda function passed to Async.function. The correct way is to pass a reference to an activity stub method. The following code is fully asynchronously and doesn’t require a thread for the activity execution:

      GreetingActivities activities = Workflow.newActivityStub(GreetingActivities.class, options);
      Promise<String> hello = Async.function(activities::composeGreeting, "Hello", name);
1 Like