Hi, I have a code where the workflow needs to wait for at least 2 authorised users to send signals before proceeding. I am facing 2 issues with this whole situation. I have used signals for other things in my code before one these issues seem new:
It looks like the Workflow restarts every time the signal is received. I’m not sure why/how that happens, but each signal is supposed to carry an Auth header which the workflow needs to verify but from my service logs, when we receive Signal1 with Auth header1, the verification is triggered once, but when we receive Signal2 with Auth header2, the verification is triggered for Auth header1 and then right after for Auth header2. The weirdest part is I have a log before waiting loop being printed before Signal2 is triggering the verification twice, which I think means that the workflow is restarting? This is how my wait loop looks
Set<String> authenticatedKeyIds = null;
logger.info("Waiting for authentication to complete.");
while (getAuthenticatedUsers().size() < 2) {
Workflow.sleep(60000);
}
authenticatedKeyIds = getAuthenticatedUsers();
logger.info("Authentication complete");
Ideally “Waiting for authentication to complete.” should have been logged only once regardless of the number of times the signal is being sent, but after sending 2 signals, I see it being logged twice (the second time uncannily right after the 2nd signal is received).
This is how the signal is being handled:
authenticateUser(authorizationHeader);
public void authenticateUser(String authorizationHeader) {
logger.info("Received authentication request with header {}", authorizationHeader);
try {
String keyId = authenticate(authorizationHeader);
if (keyId==null) {
logger.error("Unauthenticated user with header {}", authorizationHeader);
} else {
logger.info("Authentication successful for keyId {}", keyId);
authenticatedUsers.add(keyId);
}
} catch (Exception e) {
logger.error("Error in verifying header");
}
return;
}
Every time the second trigger happens, even though the verification has been successful, I get an error on the UI that says
io.temporal.internal.replay.InternalWorkflowTaskException: Failure handling event 5 of 'EVENT_TYPE_TIMER_STARTED' type. IsReplaying=true, PreviousStartedEventId=3, workflowTaskStartedEventId=44, Currently Processing StartedEventId=3
io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:193)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleEvent(ReplayWorkflowRunTaskHandler.java:140)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:180)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:150)
io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithEmbeddedQuery(ReplayWorkflowTaskHandler.java:202)
io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:112)
io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:309)
io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:275)
io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:829)
Caused By: java.lang.IllegalStateException: Event 5 of EVENT_TYPE_TIMER_STARTED does not match command COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK
io.temporal.internal.statemachines.WorkflowStateMachines.handleCommandEvent(WorkflowStateMachines.java:263)
io.temporal.internal.statemachines.WorkflowStateMachines.handleEventImpl(WorkflowStateMachines.java:199)
io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:178)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleEvent(ReplayWorkflowRunTaskHandler.java:140)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:180)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:150)
io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithEmbeddedQuery(ReplayWorkflowTaskHandler.java:202)
io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:112)
io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:309)
io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:275)
io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:829)
My suspicion is that both the issues might be related but I have no clue what might be the reason behind it. Any kind of help would be greatly appreciated
Hi @maxim, authenticateUser is a SignalMethod, I have a couple of them defined for other signals too and they’ve worked before, but I’m not sure if this one is anything different.
Do you use Workflow.getLogger to initialize the logger? In the majority of cases you should care about correctness of external side effects. Is data that query method returns correct? Are activities invoked by the workflow as a reaction to a signal correct?
Due to the nature of the Temporal execution model printing something from the workflow code and expect it to be executed only once is not really good way to understand and troubleshoot things.
But I see the SignalMethod authenticateUser being triggered twice too (one with the last header and one with the new one) when a new signal is received which does not look very good for my code. On top of this I have this error which I think is caused by the workflow replaying many times but I don’t understand the cause for this.
I see the SignalMethod authenticateUser being triggered twice
Do you see any externally visible side effects that confirm that? Can you put a counter that is incremented on every call to this method? And then have a query method that returns it to check.
Hi @maxim, I checked by adding new logs and I think it’a going through a Workflow Replay every time a signal is sent. The docs say as long as my code is deterministic it shouldn’t change anything. I wanted to check if that would explain the io.temporal.internal.replay.InternalWorkflowTaskException: Failure handling event 5 of 'EVENT_TYPE_TIMER_STARTED' type. IsReplaying=true, PreviousStartedEventId=3, workflowTaskStartedEventId=44, Currently Processing StartedEventId=3 error too and is that something I should be concerned about?
Oh, I see. Is there a way to find out what the non-deterministic code might be? My code seems simple and very deterministic to me, but weirdly I receive this error when I send my signal a 2nd time, even if I send it a 3rd time, I don’t get this error, even though from my logs I can see the workflow replaying after the 3rd signal as well. Also there isn’t much for the workflow to replay by this point in my code, this is how my WorkflowMethod looks
public void startWorkflow(String workflowName, String workflowId, String id) throws Exception {
Set<String> authenticatedKeyIds = null;
if (useAuth) {
logger.info("Waiting for authentication to complete.");
Workflow.await(Duration.ofHours(2), () -> getAuthenticatedUsers().size() > 2);
authenticatedKeyIds = getAuthenticatedUsers();
logger.info("Authentication complete with for keys "+ authenticatedKeyIds);
}
}
and this is how my SignalMethod authenticateUser looks
public void authenticateUser(String authorizationHeader) {
logger.info("Received authentication request with header {}", authorizationHeader);
try {
String keyId = authenticate(authorizationHeader);
if (keyId==null) {
logger.error("Unauthenticated user with header {}", authorizationHeader);
} else {
logger.info("Authentication successful for keyId {}", keyId);
authenticatedUsers.add(keyId);
}
} catch (Exception e) {
logger.error("Error in verifying header");
}
return;
}
public Set<String> getAuthenticatedUsers() {
return authenticatedUsers;
}
I think the only thing that essentially changes in my code on each replay is the size of authenticatedKeyIds but I’m not sure if it should count to non determinism because that’s the whole point of waiting for the signal, right?
where AuthManager is a singleton class with the getInstance() method defined as
private static AuthManager instance;
public static AuthManager getInstance() throws Exception {
if (instance == null) {
instance = new AuthManager();
try {
buildMetadata();
} catch (Exception e) {
throw new Exception("Error in creating metadata "+ e);
}
}
return instance;
}
The buildMetadata() method calls another service but it returns the same data every time it is called (because that doesn’t depend on the header). Also I feel like using the singleton class should make it more deterministic because the instance is always the same on every call?
I’m pretty sure that this is the cause of the nondeterminism. Such shared code that calls external services is not allowed inside the workflow. Even if a remote call returns the same value it is still non deterministic as it can fail sometimes.