WorkflowRejectedExecutionError whats the cause and how to solve?

seeing:
io.temporal.internal.sync.WorkflowRejectedExecutionError: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@2053d445[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@71f6e85c[Wrapped task = io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper@1568b1e]] rejected from java.util.concurrent.ThreadPoolExecutor@10d53f40[Running, pool size = 600, active threads = 600, queued tasks = 0, completed tasks = 4199]
io.temporal.internal.sync.WorkflowThreadImpl.start(WorkflowThreadImpl.java:263)
io.temporal.internal.sync.SyncWorkflow.lambda$start$1(SyncWorkflow.java:120)
io.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:104)
io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper.run(WorkflowThreadImpl.java:107)
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:834)
Caused By: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@2053d445[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@71f6e85c[Wrapped task = io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper@1568b1e]] rejected from java.util.concurrent.ThreadPoolExecutor@10d53f40[Running, pool size = 600, active threads = 600, queued tasks = 0, completed tasks = 4199]
java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
io.temporal.internal.sync.WorkflowThreadImpl.start(WorkflowThreadImpl.java:246)
io.temporal.internal.sync.SyncWorkflow.lambda$start$1(SyncWorkflow.java:120)
io.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:104)
io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper.run(WorkflowThreadImpl.java:107)
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:834)

Whats the cause and whats the recommended way to fix?

While a workflow task is executed the workflow uses real Java threads from an executor. The total size of the workflow threads available for all workers created by a WorkerFactory is defined through WorkerFactoryOptions.maxWorkflowThreadCount property. The default value of maxWorkflowThreadCount is 600.

The total number of workflow tasks that can be executed in parallel by a single worker is configured through WorkerOptions.maxConcurrentWorkflowTaskExecutionSize. The default value of maxConcurrentWorkflowTaskExecutionSize is 200. I want to emphasize that it is a number of workflow tasks, not the number of workflows in the system. It is possible to have millions of open workflows and process them with small task parallelism.

Note that this configuration is per worker. So if a factory is used to create multiple workers then each worker will execute up to maxConcurrentWorkflowTaskExecutionSize number of tasks.

The WorkflowRejectedExecutionError exception is thrown when workflows that are currently executing workflow tasks need more threads that are available in the thread pool.

If you see WorkflowRejectedExecutionError intermittently it can be considered benign as a workflow task is retried and workflow continues execution without a problem. You should adjust the configuration if these errors are frequent.

Some reasons for not having enough threads:

  • The sum of maxConcurrentWorkflowTaskExecutionSize across all workers is higher than maxWorkflowThreadCount.
  • Each workflow using many threads with the
    <threads per workflow> * maxConcurrentWorkflowTaskExecutionSize > maxWorkflowThreadCount.

One of the common mistakes that leads to excessive use of threads by workflows is not using Async API correctly when invoking activities or child workflows. For example, the following code that calls an activity consumes a thread:

      GreetingActivities activities = Workflow.newActivityStub(GreetingActivities.class, options);
      Promise<String> hello = Async.function(() -> activities.composeGreeting("Hello", name));

as a lambda function passed to Async.function. The correct way is to pass a reference to an activity stub method. The following code is fully asynchronously and doesn’t require a thread for the activity execution:

      GreetingActivities activities = Workflow.newActivityStub(GreetingActivities.class, options);
      Promise<String> hello = Async.function(activities::composeGreeting, "Hello", name);
2 Likes

Hi @maxim,

I’m starting to notice that i’m hitting the max limit and its not clear why.

    CancellationScope activityCancelableScope =
        Workflow.newCancellationScope(
            () -> {
              Promise<ActivityPayload> activityPromise =
                  stub.executeAsync(
                      methodNameDescriptor,
                      ActivityPayload.class,
                      namespace,
                      workflowId,
                      runId,
                      callbackId,
                      activityVersion,
                      args);
              activityExecutePromiseAtomic.set(activityPromise);
            });
   Promise<ActivityPayload> activityExecutePromise = activityExecutePromiseAtomic.get();
   activityExecutePromise.get();

Do you see any issue with how I defined the async task here?

My workflow itself is pretty sequential. It runs one activity at a time and based on the result moves to the next. Each activity just makes a request and then polls for status. I do wrap all the promises for the activities in a cancelation in case i want to signal to cancel the activity. I’m not seeing how I’m spinning up 600 threads…

Any insights on how to debug this or trouble shoot?

Exact error

io.temporal.internal.sync.WorkflowRejectedExecutionError: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@1adde4da rejected from java.util.concurrent.ThreadPoolExecutor@530cfec6[Running, pool size = 600, active threads = 600, queued tasks = 0, completed tasks = 1371] 
io.temporal.internal.sync.WorkflowThreadImpl.start(WorkflowThreadImpl.java:264)
io.temporal.internal.sync.DeterministicRunnerImpl.runUntilAllBlocked(DeterministicRunnerImpl.java:229)
io.temporal.internal.sync.SyncWorkflow.eventLoop(SyncWorkflow.java:144)
io.temporal.internal.replay.ReplayWorkflowExecutor.eventLoop(ReplayWorkflowExecutor.java:74)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler$EntityManagerListenerImpl.eventLoop(ReplayWorkflowRunTaskHandler.java:328)
io.temporal.internal.statemachines.WorkflowStateMachines.eventLoop(WorkflowStateMachines.java:388)
io.temporal.internal.statemachines.WorkflowStateMachines.access$500(WorkflowStateMachines.java:71)
io.temporal.internal.statemachines.WorkflowStateMachines$WorkflowTaskCommandsListener.workflowTaskStarted(WorkflowStateMachines.java:809)
io.temporal.internal.statemachines.WorkflowTaskStateMachine.handleCompleted(WorkflowTaskStateMachine.java:121)
io.temporal.internal.statemachines.WorkflowTaskStateMachine.handleStarted(WorkflowTaskStateMachine.java:113)
io.temporal.internal.statemachines.FixedTransitionAction.apply(FixedTransitionAction.java:45)
io.temporal.internal.statemachines.StateMachine.executeTransition(StateMachine.java:137)
io.temporal.internal.statemachines.StateMachine.handleHistoryEvent(StateMachine.java:91)
io.temporal.internal.statemachines.EntityStateMachineBase.handleEvent(EntityStateMachineBase.java:63)
io.temporal.internal.statemachines.WorkflowStateMachines.handleEventImpl(WorkflowStateMachines.java:210)
io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:178)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleEvent(ReplayWorkflowRunTaskHandler.java:140)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:180)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:150)
io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithEmbeddedQuery(ReplayWorkflowTaskHandler.java:201)
io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:114)
io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:319)
io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:279)
io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

Caused By: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@1adde4da rejected from java.util.concurrent.ThreadPoolExecutor@530cfec6[Running, pool size = 600, active threads = 600, queued tasks = 0, completed tasks = 1371] 
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
io.temporal.internal.sync.WorkflowThreadImpl.start(WorkflowThreadImpl.java:247)
io.temporal.internal.sync.DeterministicRunnerImpl.runUntilAllBlocked(DeterministicRunnerImpl.java:229)
io.temporal.internal.sync.SyncWorkflow.eventLoop(SyncWorkflow.java:144)
io.temporal.internal.replay.ReplayWorkflowExecutor.eventLoop(ReplayWorkflowExecutor.java:74)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler$EntityManagerListenerImpl.eventLoop(ReplayWorkflowRunTaskHandler.java:328)
io.temporal.internal.statemachines.WorkflowStateMachines.eventLoop(WorkflowStateMachines.java:388)
io.temporal.internal.statemachines.WorkflowStateMachines.access$500(WorkflowStateMachines.java:71)
io.temporal.internal.statemachines.WorkflowStateMachines$WorkflowTaskCommandsListener.workflowTaskStarted(WorkflowStateMachines.java:809)
io.temporal.internal.statemachines.WorkflowTaskStateMachine.handleCompleted(WorkflowTaskStateMachine.java:121)
io.temporal.internal.statemachines.WorkflowTaskStateMachine.handleStarted(WorkflowTaskStateMachine.java:113)
io.temporal.internal.statemachines.FixedTransitionAction.apply(FixedTransitionAction.java:45)
io.temporal.internal.statemachines.StateMachine.executeTransition(StateMachine.java:137)
io.temporal.internal.statemachines.StateMachine.handleHistoryEvent(StateMachine.java:91)
io.temporal.internal.statemachines.EntityStateMachineBase.handleEvent(EntityStateMachineBase.java:63)
io.temporal.internal.statemachines.WorkflowStateMachines.handleEventImpl(WorkflowStateMachines.java:210)
io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:178)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleEvent(ReplayWorkflowRunTaskHandler.java:140)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:180)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:150)
io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithEmbeddedQuery(ReplayWorkflowTaskHandler.java:201)
io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:114)
io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:319)
io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:279)
io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

Also is there metrics that display what the thread count is?

Thanks,
Derek

The posted snippet is fine. Would you take a thread dump of a workflow? You can see it through tctl wf stack command or UI stack trace tab.

@maxim

UI stack trace

3 INVALID_ARGUMENT: io.temporal.internal.replay.InternalWorkflowTaskException: Failure handling event 5 of 'EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT' type. IsReplaying=false, PreviousStartedEventId=0, workflowTaskStartedEventId=0, Currently Processing StartedEventId=0 at io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:193) at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleEvent(ReplayWorkflowRunTaskHandler.java:140) at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:180) at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleQueryWorkflowTask(ReplayWorkflowRunTaskHandler.java:254) at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleQueryOnlyWorkflowTask(ReplayWorkflowTaskHandler.java:257) at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:112) at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:319) at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:279) at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalArgumentException: Unexpected event:event_id: 5 event_time { seconds: 1625735388 nanos: 194379926 } event_type: EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT task_id: 106982581 workflow_execution_timed_out_event_attributes { retry_state: RETRY_STATE_TIMEOUT } at io.temporal.internal.statemachines.WorkflowStateMachines.handleNonStatefulEvent(WorkflowStateMachines.java:418) at io.temporal.internal.statemachines.WorkflowStateMachines.handleEventImpl(WorkflowStateMachines.java:215) at io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:178) ... 11 more
[
  {
    "eventId": "1",
    "eventTime": { "seconds": "1625724588", "nanos": 188229207 },
    "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
    "version": "0",
    "taskId": "106979945",
    "workflowExecutionStartedEventAttributes": {
      "workflowType": { "name": "DslWorkflow" },
      "parentWorkflowNamespace": "",
      "parentInitiatedEventId": "0",
      "taskQueue": {
        "name": "CI_CD_WORKFLOW_TASK_QUEUE",
        "kind": "TASK_QUEUE_KIND_NORMAL"
      },
      "input": {
        "payloads": [
          {
            "metadata": { "encoding": "anNvbi9wbGFpbg==" },
            "data": "IjVkZmQ0NWU5LTllNjgtNDYzNS05NGU2LTY0NmMzNWJkODgzZSI="
          },
          { "metadata": { "encoding": "anNvbi9wbGFpbg==" }, "data": "<data>" }
        ]
      },
      "workflowExecutionTimeout": { "seconds": "10800", "nanos": 0 },
      "workflowRunTimeout": { "seconds": "10800", "nanos": 0 },
      "workflowTaskTimeout": { "seconds": "10", "nanos": 0 },
      "continuedExecutionRunId": "",
      "initiator": "CONTINUE_AS_NEW_INITIATOR_UNSPECIFIED",
      "originalExecutionRunId": "e8126814-c41e-4c53-b9fa-e7e8a6689789",
      "identity": "25@ci-cd-orchestrator-574dc69b77-dpwvq",
      "firstExecutionRunId": "e8126814-c41e-4c53-b9fa-e7e8a6689789",
      "attempt": 1,
      "workflowExecutionExpirationTime": {
        "seconds": "1625735388",
        "nanos": 188000000
      },
      "cronSchedule": "",
      "firstWorkflowTaskBackoff": { "seconds": "0", "nanos": 0 }
    },
    "attributes": "workflowExecutionStartedEventAttributes"
  },
  {
    "eventId": "2",
    "eventTime": { "seconds": "1625724588", "nanos": 188274935 },
    "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
    "version": "0",
    "taskId": "106979946",
    "workflowTaskScheduledEventAttributes": {
      "taskQueue": {
        "name": "CI_CD_WORKFLOW_TASK_QUEUE",
        "kind": "TASK_QUEUE_KIND_NORMAL"
      },
      "startToCloseTimeout": { "seconds": "10", "nanos": 0 },
      "attempt": 1
    },
    "attributes": "workflowTaskScheduledEventAttributes"
  },
  {
    "eventId": "3",
    "eventTime": { "seconds": "1625724588", "nanos": 253420482 },
    "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
    "version": "0",
    "taskId": "106979951",
    "workflowTaskStartedEventAttributes": {
      "scheduledEventId": "2",
      "identity": "25@ci-cd-orchestrator-574dc69b77-dpwvq",
      "requestId": "83244fbb-d9df-479d-b0ef-c8a1fe8b06bc"
    },
    "attributes": "workflowTaskStartedEventAttributes"
  },
  {
    "eventId": "4",
    "eventTime": { "seconds": "1625724588", "nanos": 291253693 },
    "eventType": "EVENT_TYPE_WORKFLOW_TASK_FAILED",
    "version": "0",
    "taskId": "106979954",
    "workflowTaskFailedEventAttributes": {
      "scheduledEventId": "2",
      "startedEventId": "3",
      "cause": "WORKFLOW_TASK_FAILED_CAUSE_UNSPECIFIED",
      "failure": {
        "message": "java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@3d1429d7 rejected from java.util.concurrent.ThreadPoolExecutor@530cfec6[Running, pool size = 600, active threads = 600, queued tasks = 0, completed tasks = 1371]",
        "source": "JavaSDK",
        "stackTrace": "io.temporal.internal.sync.WorkflowThreadImpl.start(WorkflowThreadImpl.java:264)\nio.temporal.internal.sync.DeterministicRunnerImpl.runUntilAllBlocked(DeterministicRunnerImpl.java:229)\nio.temporal.internal.sync.SyncWorkflow.eventLoop(SyncWorkflow.java:144)\nio.temporal.internal.replay.ReplayWorkflowExecutor.eventLoop(ReplayWorkflowExecutor.java:74)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler$EntityManagerListenerImpl.eventLoop(ReplayWorkflowRunTaskHandler.java:328)\nio.temporal.internal.statemachines.WorkflowStateMachines.eventLoop(WorkflowStateMachines.java:388)\nio.temporal.internal.statemachines.WorkflowStateMachines.access$500(WorkflowStateMachines.java:71)\nio.temporal.internal.statemachines.WorkflowStateMachines$WorkflowTaskCommandsListener.workflowTaskStarted(WorkflowStateMachines.java:809)\nio.temporal.internal.statemachines.WorkflowTaskStateMachine.handleCompleted(WorkflowTaskStateMachine.java:121)\nio.temporal.internal.statemachines.WorkflowTaskStateMachine.handleStarted(WorkflowTaskStateMachine.java:113)\nio.temporal.internal.statemachines.FixedTransitionAction.apply(FixedTransitionAction.java:45)\nio.temporal.internal.statemachines.StateMachine.executeTransition(StateMachine.java:137)\nio.temporal.internal.statemachines.StateMachine.handleHistoryEvent(StateMachine.java:91)\nio.temporal.internal.statemachines.EntityStateMachineBase.handleEvent(EntityStateMachineBase.java:63)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleEventImpl(WorkflowStateMachines.java:210)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:178)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleEvent(ReplayWorkflowRunTaskHandler.java:140)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:180)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:150)\nio.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithEmbeddedQuery(ReplayWorkflowTaskHandler.java:201)\nio.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:114)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:319)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:279)\nio.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73)\njava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\njava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\njava.lang.Thread.run(Thread.java:748)\n",
        "cause": {
          "message": "Task java.util.concurrent.FutureTask@3d1429d7 rejected from java.util.concurrent.ThreadPoolExecutor@530cfec6[Running, pool size = 600, active threads = 600, queued tasks = 0, completed tasks = 1371]",
          "source": "JavaSDK",
          "stackTrace": "java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)\njava.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)\njava.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)\njava.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)\nio.temporal.internal.sync.WorkflowThreadImpl.start(WorkflowThreadImpl.java:247)\nio.temporal.internal.sync.DeterministicRunnerImpl.runUntilAllBlocked(DeterministicRunnerImpl.java:229)\nio.temporal.internal.sync.SyncWorkflow.eventLoop(SyncWorkflow.java:144)\nio.temporal.internal.replay.ReplayWorkflowExecutor.eventLoop(ReplayWorkflowExecutor.java:74)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler$EntityManagerListenerImpl.eventLoop(ReplayWorkflowRunTaskHandler.java:328)\nio.temporal.internal.statemachines.WorkflowStateMachines.eventLoop(WorkflowStateMachines.java:388)\nio.temporal.internal.statemachines.WorkflowStateMachines.access$500(WorkflowStateMachines.java:71)\nio.temporal.internal.statemachines.WorkflowStateMachines$WorkflowTaskCommandsListener.workflowTaskStarted(WorkflowStateMachines.java:809)\nio.temporal.internal.statemachines.WorkflowTaskStateMachine.handleCompleted(WorkflowTaskStateMachine.java:121)\nio.temporal.internal.statemachines.WorkflowTaskStateMachine.handleStarted(WorkflowTaskStateMachine.java:113)\nio.temporal.internal.statemachines.FixedTransitionAction.apply(FixedTransitionAction.java:45)\nio.temporal.internal.statemachines.StateMachine.executeTransition(StateMachine.java:137)\nio.temporal.internal.statemachines.StateMachine.handleHistoryEvent(StateMachine.java:91)\nio.temporal.internal.statemachines.EntityStateMachineBase.handleEvent(EntityStateMachineBase.java:63)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleEventImpl(WorkflowStateMachines.java:210)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:178)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleEvent(ReplayWorkflowRunTaskHandler.java:140)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:180)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:150)\nio.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithEmbeddedQuery(ReplayWorkflowTaskHandler.java:201)\nio.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:114)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:319)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:279)\nio.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73)\njava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\njava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\njava.lang.Thread.run(Thread.java:748)\n",
          "applicationFailureInfo": {
            "type": "java.util.concurrent.RejectedExecutionException",
            "nonRetryable": false
          },
          "failureInfo": "applicationFailureInfo"
        },
        "applicationFailureInfo": {
          "type": "io.temporal.internal.sync.WorkflowRejectedExecutionError",
          "nonRetryable": false
        },
        "failureInfo": "applicationFailureInfo"
      },
      "identity": "25@ci-cd-orchestrator-574dc69b77-dpwvq",
      "baseRunId": "",
      "newRunId": "",
      "forkEventVersion": "0",
      "binaryChecksum": ""
    },
    "attributes": "workflowTaskFailedEventAttributes"
  },
  {
    "eventId": "5",
    "eventTime": { "seconds": "1625735388", "nanos": 194379926 },
    "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT",
    "version": "0",
    "taskId": "106982581",
    "workflowExecutionTimedOutEventAttributes": {
      "retryState": "RETRY_STATE_TIMEOUT"
    },
    "attributes": "workflowExecutionTimedOutEventAttributes"
  }
]

Not sure if this matters, but here is the code for creating the workers. I create a worker for the workflow as well as a worker for each activity and a worker to publish metrics at the end of each workflow.

    // Register workflow
    WorkerFactory workerFactory = WorkerFactory.newInstance(workflowClient);

    WorkerOptions workflowWorkerOptions =
        WorkerOptions.newBuilder()
            .setMaxConcurrentWorkflowTaskExecutionSize(400)
            .setWorkflowPollThreadCount(4)
            .build();
    final Worker worker = workerFactory.newWorker(TASK_QUEUE, workflowWorkerOptions);
    worker.registerWorkflowImplementationTypes(DslWorkflowImpl.class, CoolWorkflowImpl.class);

    final WorkerOptions activityWorkerOptions =
        WorkerOptions.newBuilder()
            .setActivityPollThreadCount(4)
            .setMaxConcurrentActivityExecutionSize(200)
            .build();

    for (BaseActivity activity : activities) {
      final String taskQueueName = activity.getClass().getInterfaces()[0].getSimpleName();
      final Worker activityWorker = workerFactory.newWorker(taskQueueName, activityWorkerOptions);
      activityWorker.registerActivitiesImplementations(activity);
    }

    // register activity that publish metrics
    WorkflowMetricPublisherActivity workflowMetricPublisherActivity =
        new WorkflowMetricPublisherActivityImpl(bigQueryClient, config, metricsProvider);
    final String taskQueueName =
        workflowMetricPublisherActivity.getClass().getInterfaces()[0].getSimpleName();

    final Worker metricActivityWorker =
        workerFactory.newWorker(taskQueueName, activityWorkerOptions);
    metricActivityWorker.registerActivitiesImplementations(workflowMetricPublisherActivity);

    workerFactory.start();

If you have a larger number of activities then creating a worker per activity type is very wasteful as it creates multiple polling threads and separate thread pools for executing activities.

Would you try switching to a single worker on a single task queue?

UI stack trace

This is not a stack trace. It is a workflow execution history. Stack trace shows threads. It is a third tab in a workflow view:

It was the first section i posted

3 INVALID_ARGUMENT: io.temporal.internal.replay.InternalWorkflowTaskException: Failure handling event 5 of 'EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT' type. IsReplaying=false, PreviousStartedEventId=0, workflowTaskStartedEventId=0, Currently Processing StartedEventId=0 at io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:193) at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleEvent(ReplayWorkflowRunTaskHandler.java:140) at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:180) at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleQueryWorkflowTask(ReplayWorkflowRunTaskHandler.java:254) at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleQueryOnlyWorkflowTask(ReplayWorkflowTaskHandler.java:257) at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:112) at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:319) at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:279) at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalArgumentException: Unexpected event:event_id: 5 event_time { seconds: 1625735388 nanos: 194379926 } event_type: EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT task_id: 106982581 workflow_execution_timed_out_event_attributes { retry_state: RETRY_STATE_TIMEOUT } at io.temporal.internal.statemachines.WorkflowStateMachines.handleNonStatefulEvent(WorkflowStateMachines.java:418) at io.temporal.internal.statemachines.WorkflowStateMachines.handleEventImpl(WorkflowStateMachines.java:215) at io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:178) ... 11 more

I think maybe our temporal UI for stack trace is broken? I need to talk to Steven to get me the trace from the tcl command since I don’t have access

Sure I can switch to a single task queue, but I didn’t think it would leads to this problem. I have about 5 activities, so this would create 5 task queues that are being polled from. I do agree this can be wasteful. Since this create a thread pool for each worker, other then it being wasteful can it result in the issue? Also what is the advantage of having multiple task queues vs. one? or when would you want to go in that direction

The output is indeed broken for the timed-out workflows. The stack trace does make sense only for open workflows. Would you start a new workflow and get the stack trace while it is still running?

Strange…i get a 504 gateway timeout when i try to query it while its running

It’s strange because i would expect the workflow to just end due to the WorkflowTaskFailed. But it looks like it keeps going in the Running state until it times out. Is that expected?

The request
POST api/namespaces/<namespace>/workflows/3f0088b9-3f81-4eed-9e11-ffc9e3f29b4d/08dcf0f4-92d1-4d28-acee-13a2c309f3b7/query/__stack_trace with a upstream request timeout

I’m guessing I need to go through the tcl to get the info

@maxim Seems like the call is failing

l$ tctl --ns <namespace> wf stack -w 3f0088b9-3f81-4eed-9e11-ffc9e3f29b4d -r 08dcf0f4-92d1-4d28-acee-13a2c309f3b7
Error: Query workflow failed.
Error Details: rpc error: code = InvalidArgument desc = io.temporal.internal.replay.InternalWorkflowTaskException: Failure handling event 5 of 'EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT' type. IsReplaying=false, PreviousStartedEventId=0, workflowTaskStartedEventId=0, Currently Processing StartedEventId=0
        at io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:193)
        at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleEvent(ReplayWorkflowRunTaskHandler.java:140)
        at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:180)
        at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleQueryWorkflowTask(ReplayWorkflowRunTaskHandler.java:254)
        at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleQueryOnlyWorkflowTask(ReplayWorkflowTaskHandler.java:257)
        at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:112)
        at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:319)
        at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:279)
        at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Unexpected event:event_id: 5
event_time {
  seconds: 1625752810
  nanos: 940064538
}
event_type: EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT
task_id: 103829107
workflow_execution_timed_out_event_attributes {
  retry_state: RETRY_STATE_TIMEOUT
}

Is the worker that hosts the workflow running? It is needed for queries (stack is a built in query) to work.

The workflow is running when I did the internal query. Sorry the command I posted is a wrapper around the tlc command.

What I did was move it all to one worker and now i’m not seeing the issue. If the issue appears again I can post it. Since the error has gone away, its hard for me to get data points now with the tlc command.

One question I did post on the slack is is it possible to get metrics on the thread count created by a worker, by an activity, and used by a worker factory. This metrics would be helpful.

Thanks,
Derek