I’m currently working with the Temporal workflow framework and the Java Temporal SDK. My setup involves an AWS SQS listener that triggers a Temporal workflow. The messages received by the listener contain a sourceId. Based on this sourceId, I need to start Temporal workflows in a queue. However, if there is already a running workflow for the same sourceId, I want to add the new message to a FIFO queue and wait for the previous workflow to complete.
I tried this approach: https://github.com/temporalio/samples-java/blob/main/core/src/main/java/io/temporal/samples/hello/HelloSignal.java
The above example works well when processing two messages in signal. However, as I attempt to scale it and send three or more messages via AWS SQS, I start encountering Timeout exceptions in my workflow execution
My listener:
@SqsListener(value = QueueName.PROCESSING_QUEUE,
deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void receive(ProcessingMessage message) {
var workflowId = createWorkflowId(message.getSourceId());
var options = WorkflowOptions.newBuilder()
.setTaskQueue(STARTER_WORKFLOW_QUEUE)
.setWorkflowId(workflowId)
.setWorkflowTaskTimeout(Duration.of(1, ChronoUnit.MINUTES))
.build();
log.info("Starting workflow with id - {}", workflowId);
var workflow = workflowClient.newUntypedWorkflowStub("RecordStarterWorkflow", options);
workflow.signalWithStart("putNewEmployee", new Object[]{message}, new Object[]{});
}
My StarterWorkflow:
private final LinkedBlockingQueue<ProcessingMessage> employeeMessages
= new LinkedBlockingQueue<>();
@Override
public void startWorkflow() {
do {
var message = employeeMessages.poll();
var childWorkflow = Workflow.newChildWorkflowStub(SomeWorkflow.class, options);
try {
childWorkflow.startWorkflow(message);
} catch (Exception exception) {
log.error("Workflow failed. Workflow id:{} | Message:{}", options.getWorkflowId(),
exception.getMessage());
}
} while (!employeeMessages.isEmpty());
log.info("Processing finished");
}
@Override
@SneakyThrows
public void putNewEmployee(ProcessingMessage message) {
log.debug("Received signal with message: {}", message);
this.employeeMessages.add(message);
}
Don’t use BlockingQueue or any other blocking abstractions inside the workflow code. Only Temporal APIs should be used for blocking. You can use a queue created through Workflow.newWorkflowQueue instead.
Thank you for your response. The issue seems to be partially resolved by using Workflow.newWorkflowQueue in our local environment. However, when we deploy the code to our test environment, we are still encountering timeouts. These timeouts occur during the execution of our workflow activities.
The workflow follows a sequence of steps involving signaling to the workflow and adding items to the queue while certain workflow activities are running. I suspect that this concurrent interaction might be contributing to the WorkflowTaskTimedOut.
Could you provide insights into whether signaling to a workflow and adding items to the queue while workflow activities are running could potentially lead to the WorkflowTaskTimedOut errors? Any suggestions on how to handle such situations more effectively would be greatly appreciated.
In logs I can see this message:
Failure while reporting workflow progress to the server. If seen continuously the workflow might be stuck. WorkflowId=Some-Workflow–Source-Id-123465879, RunId=13e1cd20-2372-41d1-a1b3-5effd417d68a, startedEventId=10",“logger_name”:“io.temporal.internal.worker.WorkflowWorker”,“thread_name”:“Workflow Executor taskQueue="Starter-Workflow-v1-Queue", namespace="default": 11”,“level”:“WARN”,“level_value”:30000,“stack_trace”:"io.grpc.StatusRuntimeException: NOT_FOUND: Workflow task not found.\n\tat
Im testing with 2-10 messages with span between them 20-100 milliseconds. On the second-third signal I receive Timeout in workflow. Timeout never happens when Im sending one message only. For example Timeout on second message signal
Yes in logs I can see this message:
Failure while reporting workflow progress to the server. If seen continuously the workflow might be stuck. WorkflowId=Some-Workflow–Source-Id-123465879, RunId=13e1cd20-2372-41d1-a1b3-5effd417d68a, startedEventId=10",“logger_name”:“io.temporal.internal.worker.WorkflowWorker”,“thread_name”:“Workflow Executor taskQueue=“Starter-Workflow-v1-Queue”, namespace=“default”: 11”,“level”:“WARN”,“level_value”:30000,“stack_trace”:"io.grpc.StatusRuntimeException: NOT_FOUND: Workflow task not found.\n\tat
No, Im not using any custom DataConverters. I thought my Signal message could be too big. I simplified it to only one Long parameter like this. Still there is WorkflowTaskTimedOut exceptions.
like this:
The other possibility is a very large history that takes too long to load on your setup. Otherwise, you need to profile your workflow implementation to determine why it takes so long.