Getting error in workflow when waiting for 2 hours and querying at the same time

We have seen random errors in production when executing our workflow. It happens with combination of events coming after 2 hours and while querying the workflow at same time.

Below is the error from temporal events.

io.temporal.internal.replay.InternalWorkflowTaskException: Failure handling event 9 of type 'EVENT_TYPE_WORKFLOW_TASK_COMPLETED' during execution. {PreviousStartedEventId=8, workflowTaskStartedEventId=12, Currently Processing StartedEventId=8} 
io.temporal.internal.statemachines.WorkflowStateMachines.createEventProcessingException(WorkflowStateMachines.java:222)
io.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:201)
io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:175)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:176)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:145)
io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithQuery(ReplayWorkflowTaskHandler.java:122)
io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:97)
io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:241)
io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:199)
io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:93)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:834)

Caused By: java.lang.IllegalArgumentException: Unexpected event:event_id: 9
event_time {
  seconds: 1649383700
  nanos: 584454300
}
event_type: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
task_id: 13631528
workflow_task_completed_event_attributes {
  scheduled_event_id: 7
  started_event_id: 8
  identity: "95641@C02DD17BMD6R"
}
 
io.temporal.internal.statemachines.WorkflowStateMachines.handleNonStatefulEvent(WorkflowStateMachines.java:494)
io.temporal.internal.statemachines.WorkflowStateMachines.handleSingleEvent(WorkflowStateMachines.java:240)
io.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:199)
io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:175)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:176)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:145)
io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithQuery(ReplayWorkflowTaskHandler.java:122)
io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:97)
io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:241)
io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:199)
io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:93)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:834)

I am able to reproduce this in local with a simple set up. Below is the code. (Temporal 1.14.5 with mysql backend. Java sdk 1.8.0). But the code waits for 2 hours and query continuously at same time. I am not able to reproduce without those.

package io.temporal.samples.hello;

import java.time.Duration;
import java.util.UUID;

import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.workflow.*;

public class ParentChildWorkflow {

    // Define the task queue name
    static final String TASK_QUEUE = "HelloActivityTaskQueue";


   @WorkflowInterface
    public interface ParentWorkflow {

        @WorkflowMethod
        void execute();

        @QueryMethod
        String query();
    }

    @WorkflowInterface
    public interface ChildWorkflow {

        @WorkflowMethod
        void execute();
    }


    public static class ParentWorkflowImpl implements ParentWorkflow {

        @Override
        public void execute() {

            ChildWorkflow childWf =
                Workflow.newChildWorkflowStub(ChildWorkflow.class, ChildWorkflowOptions.newBuilder()
                    .setTaskQueue(TASK_QUEUE)
                    .build());
            Promise<Void> promise = Async.procedure(childWf::execute);

            promise.get();
        }

        @Override
        public String query() {
            return "hello";
        }

    }

    // Define the workflow implementation which implements our getGreeting workflow method.
    public static class ChildWorkflowImpl implements ChildWorkflow {


        @Override
        public void execute() {

            //Cannot reproduce with 5 mins. Able to reproduce with 120 mins. Haven't tried other duration
            Workflow.sleep(Duration.ofMinutes(120));

        }

    }

    public static void main(String[] args) throws InterruptedException {


        WorkflowClient client = initWorker();

        ParentWorkflow workflow1 =
                client.newWorkflowStub(
                        ParentWorkflow.class,
                        WorkflowOptions.newBuilder()
                                .setWorkflowId(UUID.randomUUID().toString())
                                .setTaskQueue(TASK_QUEUE)
                                .build());

       WorkflowClient.start(workflow1::execute);


        //query continuously. Not able to reproduce without this block
       while(true) {
           workflow1.query();
       }


    }

    static WorkflowClient initWorker() {
        WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
        WorkflowClient client = WorkflowClient.newInstance(service);

        WorkerFactory factory =
                WorkerFactory.newInstance(client);
        Worker worker = factory.newWorker(TASK_QUEUE);

        worker.addWorkflowImplementationFactory(ParentWorkflow.class, () -> new ParentWorkflowImpl());
        worker.addWorkflowImplementationFactory(ChildWorkflow.class, () -> new ChildWorkflowImpl());
        factory.start();

        return client;
    }
}

Am not sure if it is related. When workflow fails because of this error, I get an exception in the worker. This is not at the time of failure, This is approximately 1 hour after execution.

22:28:26.333 [Workflow Executor taskQueue="HelloActivityTaskQueue", namespace="default": 13] WARN  i.t.internal.worker.WorkflowWorker - Workflow task failure during replying to the server. startedEventId=0, WorkflowId=24a1d3a8-5954-4ff3-a151-523641b90530, RunId=127382c7-4cf9-436a-b3be-8128fdc21909. If seen continuously the workflow might be stuck.
io.grpc.StatusRuntimeException: NOT_FOUND: query task not found, or already expired
	at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
	at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
	at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.respondQueryTaskCompleted(WorkflowServiceGrpc.java:2966)
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.sendReply(WorkflowWorker.java:363)
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:254)
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:199)
	at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:93)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

worker.addWorkflowImplementationFactory(ParentWorkflow.class, () β†’ new ParentWorkflowImpl());
worker.addWorkflowImplementationFactory(ChildWorkflow.class, () β†’ new ChildWorkflowImpl());

I believe you should just use
worker.registerWorkflowImplementationTypes(ParentWorkflowImpl.class, ChildWorkflowImpl.class);

Promise promise = Async.procedure(childWf::execute);
promise.get();

If you want to wait for child to complete you could just call

childWf.execute();

WorkflowClient.start(workflow1::execute);
        //query continuously. Not able to reproduce without this block
       while(true) {
           workflow1.query();
       }

Not sure what is the intent of this test, you start your workflow execution async, and then slam the frontend service with query calls forever (even after workflow exec completes). Depending on your setup (assuming here local docker) this could run into issues on both server side and your workers. I believe the error you are getting is that server wasnt able to dispatch the query task to your worker(s).

What is the use case where you would need such a test?
If you are trying to test polling see this forum thread.

This is happening in our service with the combination of query and a promise.get() So I thought I could remove unwanted items in our real workflow and create a simple test case.

Purpose of the test is simple - to reproduce that error which happens in our real workflow.

Not that we are slamming with query like this - but I don’t have an exact way to time that perfectly.

The error
Failure handling event 9 of type 'EVENT_TYPE_WORKFLOW_TASK_COMPLETED' during execution. {PreviousStartedEventId=8, workflowTaskStartedEventId=12, Currently Processing StartedEventId=8}

seems to be coming from io.temporal.internal.statemachines.WorkflowStateMachines.handleNonStatefulEvent(WorkflowStateMachines.java:494)

And looking into the code in java sdk, there is no case for EVENT_TYPE_WORKFLOW_TASK_COMPLETED. I am not that familiar with internals of sdk so may be it is not needed, but the go equivalent appears to have handled that case.

Is there an easy way to mock PollWorkflowTaskQueue with a series of responses ? I know exactly what is causing this issue if I can mock the PollWorkflowTaskQueue and send the mocked temporal responses in order.

Hi,

First of all, thanks a lot for the reproduction and such deep details!

I think that we actually have a stable reproduction of this exception here:

I will see what’s going on there and get back here. I’m sure that while the exception is thrown and logged, the query actually returns correctly and it doesn’t affect workflow execution.

io.grpc.StatusRuntimeException: NOT_FOUND: query task not found, or already expired

It’s likely unrelated and separate. Usually, Temporal server returns NOT_FOUND on valid ids when the timeout associated with this entity is expired.

Thank you,

Yes I do not think query task not found, or already expired not causing this.

But when I start 3 instances of workflow and query all 3 of them continuously, only one of them fails. And the failed workflow exactly get the query task not found, or already expired error and not the others. This may not be the cause, but this is hitting some cracks which might already be existing.

What I observed in logs is, as soon as this error happens temporal poll responds WORKFLOW_TASK_COMPLETED to close the state machine of previous events.

Here is the one such event.

workflow_execution {
  workflow_id: "8d840b5c-81ff-4e97-ae01-4fe548b737d5"
  run_id: "514fede8-6158-4456-80bf-88527066c0e7"
}
workflow_type {
  name: "ParentWorkflow"
}
previous_started_event_id: 8
attempt: 1
history {
  events {
    event_id: 1
    event_time {
      seconds: 1649422723
      nanos: 283275000
    }
    event_type: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
    task_id: 13631569
    workflow_execution_started_event_attributes {
      workflow_type {
        name: "ParentWorkflow"
      }
      task_queue {
        name: "HelloActivityTaskQueue"
        kind: TASK_QUEUE_KIND_NORMAL
      }
      workflow_execution_timeout {
      }
      workflow_run_timeout {
      }
      workflow_task_timeout {
        seconds: 10
      }
      original_execution_run_id: "514fede8-6158-4456-80bf-88527066c0e7"
      identity: "99141@C02DD17BMD6R"
      first_execution_run_id: "514fede8-6158-4456-80bf-88527066c0e7"
      attempt: 1
      first_workflow_task_backoff {
      }
      header {
      }
    }
  }
  events {
    event_id: 2
    event_time {
      seconds: 1649422723
      nanos: 283529100
    }
    event_type: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
    task_id: 13631570
    workflow_task_scheduled_event_attributes {
      task_queue {
        name: "HelloActivityTaskQueue"
        kind: TASK_QUEUE_KIND_NORMAL
      }
      start_to_close_timeout {
        seconds: 10
      }
      attempt: 1
    }
  }
  events {
    event_id: 3
    event_time {
      seconds: 1649422723
      nanos: 312208200
    }
    event_type: EVENT_TYPE_WORKFLOW_TASK_STARTED
    task_id: 13631574
    workflow_task_started_event_attributes {
      scheduled_event_id: 2
      identity: "99141@C02DD17BMD6R"
      request_id: "6f995ba1-c0d6-41c3-8f4f-d37308fe975e"
    }
  }
  events {
    event_id: 4
    event_time {
      seconds: 1649422723
      nanos: 535516700
    }
    event_type: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
    task_id: 13631577
    workflow_task_completed_event_attributes {
      scheduled_event_id: 2
      started_event_id: 3
      identity: "99141@C02DD17BMD6R"
    }
  }
  events {
    event_id: 5
    event_time {
      seconds: 1649422723
      nanos: 535578600
    }
    event_type: EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED
    task_id: 13631578
    start_child_workflow_execution_initiated_event_attributes {
      namespace: "default"
      workflow_id: "c8b8af74-fabd-3f91-89f4-ea81e479e292"
      workflow_type {
        name: "ChildWorkflow"
      }
      task_queue {
        name: "HelloActivityTaskQueue"
      }
      workflow_execution_timeout {
      }
      workflow_run_timeout {
      }
      workflow_task_timeout {
        seconds: 10
      }
      parent_close_policy: PARENT_CLOSE_POLICY_TERMINATE
      workflow_task_completed_event_id: 4
      workflow_id_reuse_policy: WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE
      header {
      }
    }
  }
  events {
    event_id: 6
    event_time {
      seconds: 1649422723
      nanos: 567649100
    }
    event_type: EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED
    task_id: 13631581
    child_workflow_execution_started_event_attributes {
      namespace: "default"
      initiated_event_id: 5
      workflow_execution {
        workflow_id: "c8b8af74-fabd-3f91-89f4-ea81e479e292"
        run_id: "c1a45a9a-2b22-4490-8712-1c1421d8b2b5"
      }
      workflow_type {
        name: "ChildWorkflow"
      }
      header {
      }
    }
  }
  events {
    event_id: 7
    event_time {
      seconds: 1649422723
      nanos: 567684600
    }
    event_type: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
    task_id: 13631582
    workflow_task_scheduled_event_attributes {
      task_queue {
        name: "99141@C02DD17BMD6R:c018d33d-ca8c-49db-8194-33d335ec6956"
        kind: TASK_QUEUE_KIND_STICKY
      }
      start_to_close_timeout {
        seconds: 10
      }
      attempt: 1
    }
  }
  events {
    event_id: 8
    event_time {
      seconds: 1649422723
      nanos: 581140400
    }
    event_type: EVENT_TYPE_WORKFLOW_TASK_STARTED
    task_id: 13631586
    workflow_task_started_event_attributes {
      scheduled_event_id: 7
      identity: "c018d33d-ca8c-49db-8194-33d335ec6956"
      request_id: "a9b71cb5-a418-4dc6-bee2-762781d09662"
    }
  }
  events {
    event_id: 9
    event_time {
      seconds: 1649422723
      nanos: 615979600
    }
    event_type: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
    task_id: 13631589
    workflow_task_completed_event_attributes {
      scheduled_event_id: 7
      started_event_id: 8
      identity: "99141@C02DD17BMD6R"
    }
  }
}
query {
  query_type: "query"
}
workflow_execution_task_queue {
  name: "HelloActivityTaskQueue"
  kind: TASK_QUEUE_KIND_NORMAL
}

As you can see event type 9 will close the current state machine.

After promise.get corresponding to child workflow completes, the response from poll is

workflow_execution {
  workflow_id: "8d840b5c-81ff-4e97-ae01-4fe548b737d5"
  run_id: "514fede8-6158-4456-80bf-88527066c0e7"
}
workflow_type {
  name: "ParentWorkflow"
}
previous_started_event_id: 8
started_event_id: 12
attempt: 1
history {
  events {
    event_id: 9
    event_time {
      seconds: 1649422723
      nanos: 615979600
    }
    event_type: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
    task_id: 13631589
    workflow_task_completed_event_attributes {
      scheduled_event_id: 7
      started_event_id: 8
      identity: "99141@C02DD17BMD6R"
    }
  }
  events {
    event_id: 10
    event_time {
      seconds: 1649429923
      nanos: 714445300
    }
    event_type: EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED
    task_id: 13631592
    child_workflow_execution_completed_event_attributes {
      namespace: "default"
      workflow_execution {
        workflow_id: "c8b8af74-fabd-3f91-89f4-ea81e479e292"
        run_id: "c1a45a9a-2b22-4490-8712-1c1421d8b2b5"
      }
      workflow_type {
        name: "ChildWorkflow"
      }
      initiated_event_id: 5
      started_event_id: 6
    }
  }
  events {
    event_id: 11
    event_time {
      seconds: 1649429923
      nanos: 714490100
    }
    event_type: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
    task_id: 13631593
    workflow_task_scheduled_event_attributes {
      task_queue {
        name: "99141@C02DD17BMD6R:c018d33d-ca8c-49db-8194-33d335ec6956"
        kind: TASK_QUEUE_KIND_STICKY
      }
      start_to_close_timeout {
        seconds: 10
      }
      attempt: 1
    }
  }
  events {
    event_id: 12
    event_time {
      seconds: 1649429923
      nanos: 735475200
    }
    event_type: EVENT_TYPE_WORKFLOW_TASK_STARTED
    task_id: 13631597
    workflow_task_started_event_attributes {
      scheduled_event_id: 11
      identity: "c018d33d-ca8c-49db-8194-33d335ec6956"
      request_id: "a3de3a52-b651-4b2e-8c3a-4f598d587792"
    }
  }
}
workflow_execution_task_queue {
  name: "HelloActivityTaskQueue"
  kind: TASK_QUEUE_KIND_NORMAL
}
scheduled_time {
  seconds: 1649429923
  nanos: 714490100
}
started_time {
  seconds: 1649429923
  nanos: 735475200
}
queries {
  key: "cdf86bff-2ea3-4f87-a09d-8236e183f0e7"
  value {
    query_type: "query"
  }
}

And you can see the error is Failure handling event 9 from stack trace. I think since the state machine is closed because of previous poll I mentioned, and the recent event again starts from 9, this is trying to close that state machine again and erroring out.

1 Like