Exception in worker: "Failure processing workflow task"

Hi,
I started evaluating temporal, using Java SDK. I tried creating a simple test by submitting 1K workflows and started 3 worker processes to handle the activities.
After a while I keep getting the error. Seems like I now have 1000 or more workflow and there is some kind of loop going on I cannot understand.
Tried tctl to just terminate all of them, but I cannot find a way to just select and terminate all open/running workflows.
So, my question is two fold:

  • where is this error coming from?
  • how can i use tctl (or is there a Java API to do the same?) to terminate a set of workflows (no elastic search).
    Thanks

Here is the stack trace

19:19:18.486 [Workflow Executor taskQueue=“stress1-queue”, namespace=“default”: 305] ERROR i.t.internal.worker.PollerOptions - uncaught exception
java.lang.RuntimeException: Failure processing workflow task. WorkflowId=sendtoserver-93d3197c-4ab1-46bf-9c51-5f5ec6fd037d, RunId=1fbb478c-2c66-4f46-b499-949f19fe3923
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:337)
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:275)
at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:79)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.temporal.internal.replay.InternalWorkflowTaskException: Failure handling event 8 of ‘EVENT_TYPE_ACTIVITY_TASK_SCHEDULED’ type. IsReplaying=true, PreviousStartedEventId=6, workflowTaskStartedEventId=15, Currently Processing StartedEventId=6
at io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:193)
at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleEvent(ReplayWorkflowRunTaskHandler.java:140)
at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:180)
at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:150)
at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithEmbeddedQuery(ReplayWorkflowTaskHandler.java:202)
at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:112)
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:309)
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:275)
at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73)
… 3 common frames omitted
Caused by: java.lang.IllegalStateException: Event 8 of EVENT_TYPE_ACTIVITY_TASK_SCHEDULED does not match command COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION
at io.temporal.internal.statemachines.WorkflowStateMachines.handleCommandEvent(WorkflowStateMachines.java:263)
at io.temporal.internal.statemachines.WorkflowStateMachines.handleEventImpl(WorkflowStateMachines.java:199)
at io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:178)
… 11 common frames omitted

Such issue can be caused by non deterministic workflow code. Does your code follow the implementation constraints?

This is test code only that does a Thread.sleep() trying to emulate a long-running call (e.g. a synchronous http call). I understand that in a production situation I would use the Workflow.sleep() to release the calling thread but this is only for testing.

It might be that i changed a bit the workflow or/and activity implementation, I cannot remember, although there is no mutable state in the implementations; and this might have caused this. Honestly this is my first test code after having read 1-2 days all documentation and was hoping to understand a bit the execution stack (threads and stuff) of activity/workflow workers (as well as the io specifics with the engine).

What is interesting thought is that:

  1. it seems that now its all in a loop now and the worker gets this exception continuously (unless it just retries the max-attempts=500 I set in the options and i just need to let it run out), which is not in the workflow code itself
  2. I have not been able to find a tctl command to just terminate all workflow and start afresh (so I can try to reproduce that issue or see how to avoid it).
  3. I am trying to understand what happens in a production environment in such a case and what my reaction would be

By the way: it seems that the db (cassandar, default dev docker image) takes a big cpu hit during this.

Thanks for the feedback
Costas

This is test code only that does a Thread.sleep() trying to emulate a long-running call (e.g. a synchronous http call). I understand that in a production situation I would use the Workflow.sleep() to release the calling thread but this is only for testing.

Thread.sleep is strictly prohibited in the workflow code. It doesn’t emulate long-running call. It emulates a bug in the workflow code.

It might be that i changed a bit the workflow or/and activity implementation

During development make sure that you terminate all the running workflows before running updated workflow code. In production always use versioning to perform upgrades while workflows are running.

  1. it seems that now its all in a loop now and the worker gets this exception continuously (unless it just retries the max-attempts=500 I set in the options and i just need to let it run out), which is not in the workflow code itself

This is by design. The idea is that if workflow code is changed without versioning the workflow progress is blocked without failing workflow. So if the broken code is rolled back workflow can continue execution without a problem. Failing workflow might be catastrophic. Imagine a few million multi-month workflows failing in production due to a single bad deployment.

  1. I have not been able to find a tctl command to just terminate all workflow and start afresh (so I can try to reproduce that issue or see how to avoid it).

Such command is only supported when running with the Elastic Search integration. Here is the relevant documentation.

  1. I am trying to understand what happens in a production environment in such a case and what my reaction would be

The reaction would be rolling back the bad deployment and updating the code with correct versioning. It might be also needed to mark the rolled back build as bad to rollback the state of workflows that made progress with it.

Thanks for the clarifications Maxim. I will try to reset the entire docker image and start again since I am not sure how to get out of the mess at this point.
A final question: is there a Java API to perform the things that you do with tctl? For example, get all workflows currently ‘running’ and terminate each one in a loop?

Elastic search: I am seeing this: How to set up ElasticSearch? but it seems too much at this point just to get some dev tests running.

Costas

Web UI and tctl implement all their functionality through the Temporal service gRPC API. This API is also used by all the SDKs implementations. For example to list open workflows using Java SDK:

    WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
    ListOpenWorkflowExecutionsResponse response =
        service
            .blockingStub()
            .listOpenWorkflowExecutions(
                ListOpenWorkflowExecutionsRequest.newBuilder().setNamespace("default").build());
    System.out.println(response.getExecutionsList());

For local development and tests you can use docker-compose-es.yml.