We have seen random errors in production when executing our workflow. It happens with combination of events coming after 2 hours and while querying the workflow at same time.
I am able to reproduce this in local with a simple set up. Below is the code. (Temporal 1.14.5 with mysql backend. Java sdk 1.8.0). But the code waits for 2 hours and query continuously at same time. I am not able to reproduce without those.
package io.temporal.samples.hello;
import java.time.Duration;
import java.util.UUID;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.workflow.*;
public class ParentChildWorkflow {
// Define the task queue name
static final String TASK_QUEUE = "HelloActivityTaskQueue";
@WorkflowInterface
public interface ParentWorkflow {
@WorkflowMethod
void execute();
@QueryMethod
String query();
}
@WorkflowInterface
public interface ChildWorkflow {
@WorkflowMethod
void execute();
}
public static class ParentWorkflowImpl implements ParentWorkflow {
@Override
public void execute() {
ChildWorkflow childWf =
Workflow.newChildWorkflowStub(ChildWorkflow.class, ChildWorkflowOptions.newBuilder()
.setTaskQueue(TASK_QUEUE)
.build());
Promise<Void> promise = Async.procedure(childWf::execute);
promise.get();
}
@Override
public String query() {
return "hello";
}
}
// Define the workflow implementation which implements our getGreeting workflow method.
public static class ChildWorkflowImpl implements ChildWorkflow {
@Override
public void execute() {
//Cannot reproduce with 5 mins. Able to reproduce with 120 mins. Haven't tried other duration
Workflow.sleep(Duration.ofMinutes(120));
}
}
public static void main(String[] args) throws InterruptedException {
WorkflowClient client = initWorker();
ParentWorkflow workflow1 =
client.newWorkflowStub(
ParentWorkflow.class,
WorkflowOptions.newBuilder()
.setWorkflowId(UUID.randomUUID().toString())
.setTaskQueue(TASK_QUEUE)
.build());
WorkflowClient.start(workflow1::execute);
//query continuously. Not able to reproduce without this block
while(true) {
workflow1.query();
}
}
static WorkflowClient initWorker() {
WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
WorkflowClient client = WorkflowClient.newInstance(service);
WorkerFactory factory =
WorkerFactory.newInstance(client);
Worker worker = factory.newWorker(TASK_QUEUE);
worker.addWorkflowImplementationFactory(ParentWorkflow.class, () -> new ParentWorkflowImpl());
worker.addWorkflowImplementationFactory(ChildWorkflow.class, () -> new ChildWorkflowImpl());
factory.start();
return client;
}
}
Am not sure if it is related. When workflow fails because of this error, I get an exception in the worker. This is not at the time of failure, This is approximately 1 hour after execution.
22:28:26.333 [Workflow Executor taskQueue="HelloActivityTaskQueue", namespace="default": 13] WARN i.t.internal.worker.WorkflowWorker - Workflow task failure during replying to the server. startedEventId=0, WorkflowId=24a1d3a8-5954-4ff3-a151-523641b90530, RunId=127382c7-4cf9-436a-b3be-8128fdc21909. If seen continuously the workflow might be stuck.
io.grpc.StatusRuntimeException: NOT_FOUND: query task not found, or already expired
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.respondQueryTaskCompleted(WorkflowServiceGrpc.java:2966)
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.sendReply(WorkflowWorker.java:363)
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:254)
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:199)
at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:93)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
worker.addWorkflowImplementationFactory(ParentWorkflow.class, () β new ParentWorkflowImpl());
worker.addWorkflowImplementationFactory(ChildWorkflow.class, () β new ChildWorkflowImpl());
I believe you should just use
worker.registerWorkflowImplementationTypes(ParentWorkflowImpl.class, ChildWorkflowImpl.class);
If you want to wait for child to complete you could just call
childWf.execute();
WorkflowClient.start(workflow1::execute);
//query continuously. Not able to reproduce without this block
while(true) {
workflow1.query();
}
Not sure what is the intent of this test, you start your workflow execution async, and then slam the frontend service with query calls forever (even after workflow exec completes). Depending on your setup (assuming here local docker) this could run into issues on both server side and your workers. I believe the error you are getting is that server wasnt able to dispatch the query task to your worker(s).
What is the use case where you would need such a test?
If you are trying to test polling see this forum thread.
This is happening in our service with the combination of query and a promise.get() So I thought I could remove unwanted items in our real workflow and create a simple test case.
Purpose of the test is simple - to reproduce that error which happens in our real workflow.
Not that we are slamming with query like this - but I donβt have an exact way to time that perfectly.
The error Failure handling event 9 of type 'EVENT_TYPE_WORKFLOW_TASK_COMPLETED' during execution. {PreviousStartedEventId=8, workflowTaskStartedEventId=12, Currently Processing StartedEventId=8}
seems to be coming from io.temporal.internal.statemachines.WorkflowStateMachines.handleNonStatefulEvent(WorkflowStateMachines.java:494)
And looking into the code in java sdk, there is no case for EVENT_TYPE_WORKFLOW_TASK_COMPLETED. I am not that familiar with internals of sdk so may be it is not needed, but the go equivalent appears to have handled that case.
Is there an easy way to mock PollWorkflowTaskQueue with a series of responses ? I know exactly what is causing this issue if I can mock the PollWorkflowTaskQueue and send the mocked temporal responses in order.
First of all, thanks a lot for the reproduction and such deep details!
I think that we actually have a stable reproduction of this exception here:
I will see whatβs going on there and get back here. Iβm sure that while the exception is thrown and logged, the query actually returns correctly and it doesnβt affect workflow execution.
io.grpc.StatusRuntimeException: NOT_FOUND: query task not found, or already expired
Itβs likely unrelated and separate. Usually, Temporal server returns NOT_FOUND on valid ids when the timeout associated with this entity is expired.
Yes I do not think query task not found, or already expired not causing this.
But when I start 3 instances of workflow and query all 3 of them continuously, only one of them fails. And the failed workflow exactly get the query task not found, or already expired error and not the others. This may not be the cause, but this is hitting some cracks which might already be existing.
What I observed in logs is, as soon as this error happens temporal poll responds WORKFLOW_TASK_COMPLETED to close the state machine of previous events.
And you can see the error is Failure handling event 9 from stack trace. I think since the state machine is closed because of previous poll I mentioned, and the recent event again starts from 9, this is trying to close that state machine again and erroring out.