Is it possible that you perform SignalWithStart to start the workflow?
If so, signal is getting applied before workflow run method is called. (Applying signals before running workflow tasks is the way how we guarantee that signals are not getting lost)
Whatever is causing it, it looks like you are getting a signal at the same time when the workflow gets started. Can you rewrite your signal processing method to not require input being set?
When a workflow constantly throws a NullPointerException its execution gets blocked. So it cannot process both queries and signals in this state. So fix the bug and it is going to work as expected.
But what i notice is other workflows are also affected, and i am not able to query other workflows too . also any idea why the Workflow Task handler heart beat stops ?
I see workflow Timeout every where, possibly becaus the workflow task handler had gone down. but if i restart my process hosting java sdk, other struck workflow do get processed, and again get struck (until i terminate the rouge worfklow throwing null pointer)
Thanks for the pointer on threads @maxim,
I am actually able to reproduce this with modification of hellp example. i see the taskQueue thread is full of the same workflow.
i suspect its the combination of Worlflow.retry + signals going wrong.
Here is a sample code
package io.temporal.samples.hello;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.common.RetryOptions;
import io.temporal.samples.hello.HelloPeriodic.GreetingWorkflow;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.commons.lang.RandomStringUtils;
@SuppressWarnings("ALL")
public class HelloSignal {
static final String TASK_QUEUE = "HelloSignal";
/** Workflow interface must have a method annotated with @WorkflowMethod. */
@WorkflowInterface
public interface GreetingWorkflow {
/**
* list of greeting strings that were received through the waitForNameMethod. This method will
* block until the number of greetings specified are received.
*/
@WorkflowMethod
List<String> getGreetings();
/** Receives name through an external signal. */
@SignalMethod
void waitForName(String name);
/** Receives name through an external signal. */
@SignalMethod
void turnRouge();
@SignalMethod
void exit();
}
/** GreetingWorkflow implementation that returns a greeting. */
public static class GreetingWorkflowImpl implements GreetingWorkflow {
List<String> messageQueue = new ArrayList<>(10);
boolean exit = false;
private final GreetingWorkflow continueAsNew =
Workflow.newContinueAsNewStub(GreetingWorkflow.class);
@Override
public List<String> getGreetings() {
;
Workflow.retry(RetryOptions.getDefaultInstance(), Optional.empty(), ()-> greetloop());
// will never reach here.
return null;
}
@Override
public void waitForName(String name) {
messageQueue.add("Hello " + name + "!");
}
private List<String> greetloop(){
List<String> receivedMessages = new ArrayList<>(10);
while (true) {
Workflow.await(() -> !messageQueue.isEmpty() || exit);
if (messageQueue.isEmpty() && exit) {
return receivedMessages;
}
String message = messageQueue.remove(0);
receivedMessages.add(message);
}
//return receivedMessages ;
}
@Override
public void exit() {
exit = true;
}
@Override
public void turnRouge() {
throw new NullPointerException("went bonkers!");
}
}
public static void main(String[] args) throws Exception {
// Start a worker that hosts the workflow implementation.
WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
WorkflowClient client = WorkflowClient.newInstance(service);
// In a real application use a business ID like customer ID or order ID
String workflowId = RandomStringUtils.randomAlphabetic(10);
// Start a workflow execution. Usually this is done from another program.
// Get a workflow stub using the same task queue the worker uses.
// The newly started workflow is going to have the workflowId generated above.
WorkflowOptions workflowOptions =
WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).setWorkflowId(workflowId).build();
GreetingWorkflow workflow = client.newWorkflowStub(GreetingWorkflow.class, workflowOptions);
// Start workflow asynchronously to not use another thread to signal.
WorkflowClient.start(workflow::getGreetings);
// After start for getGreeting returns, the workflow is guaranteed to be started.
// So we can send a signal to it using the workflow stub.
// This workflow keeps receiving signals until exit is called
// workflow.waitForName("World"); // sends waitForName signal
workflow.turnRouge();
//submit a workflow, send a signal
WorkerFactory factory = WorkerFactory.newInstance(client);
Worker worker = factory.newWorker(TASK_QUEUE);
worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class);
factory.start();
// Create a new stub using the workflowId.
// This is to demonstrate that to send a signal only the workflowId is required.
GreetingWorkflow workflowById = client.newWorkflowStub(GreetingWorkflow.class, workflowId);
workflowById.waitForName("Universe"); // sends waitForName signal
workflowById.exit(); // sends exit signal
// Calling synchronous getGreeting after workflow has started reconnects to the existing
// workflow and blocks until a result is available. Note that this behavior assumes that
// WorkflowOptions are not configured with WorkflowIdReusePolicy.AllowDuplicate. In that case
// the call would fail with WorkflowExecutionAlreadyStartedException.
List<String> greetings = workflowById.getGreetings();
System.out.println(greetings);
System.exit(0);
}
}
I see the thread count keeps increasing, and after a while my worker becomes unresponsive.
I tries continueasnew instead of Workflow.retry() ,but results are same.
In your reproduction all the workflows are failing. So it is expected that worker cannot make any progress. In the earlier conversation you mentioned that a single workflow that is having issue would block all others. I’ll try to play with your sample to make sure I’m not missing something.
Thanks @maxim, in this use example its a single workflow goblling up all resource i guess, and eventaully the taskqueue is about 250 or 300, every thing starts timing out, queries stop working etc.
@nadilas, it was purely a java SDK bug, we’ve hit a race condition resulting in a deadlock during workflow task execution, which made it impossible to acquire a lock for new task processing attempts, which in turn was blocking a new thread for each new attempt, and since we retry workflow tasks indefinitely it was using all available threads in the thread pool at the end. We’ve addressed the root cause of the issue and also added a timeout for the lock acquisition to prevent similar issues from happening again. You may find more details in this PR if you are interested.