Exceptions from parallel activities causing Workflow restart infinitely

hi,

We have situation, wherein 2 activities which are running in parallel throws Exception causes the workflow to execute from the beginning infinitely.

  • One of the exception is Retryable and another is Non Retryable .
  • Both the exceptions are thrown simultaneously.
  • Also, we have set timeout to the workflow and and retry policy to No Retry.
  • We are catching the exception in the workflow and perform few housekeeping activities. We are not failing the workflow under any circumstances
  • The workflow eventually ends with TIMEOUT

We are observing this behaviour constantly and able to reproducible with a certain input.

Please help suggest on how we can handle this better.

sdk: java 1.0.9

What is the exception in the last WorkflowTaskFailed event recorded in the history?

I would also recommend upgrading to the latest version of the SDK.

hi @maxim ,
We don’t see any exception recorded.
Will try with latest java sdk and report back.
PSB

hi @maxim ,

Tried with v1.7.0 java sdk, issue persists. But I see following additional logs (3 times)

internal.worker.WorkflowWorker: Workflow task failure during replying to the server. startedEventId=19, WorkflowId=45F46FAD4EB8455AA8C7F0B3F25C17DC@AVABCgA-06437495323552-0, RunId=415789e6-c560-4d4f-ac46-4368a849618b. If seen continuously the workflow might be stuck. io.grpc.StatusRuntimeException: NOT_FOUND: workflow execution already completed
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262) ~[grpc-stub-1.43.2.jar:1.43.2]
	at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243) ~[grpc-stub-1.43.2.jar:1.43.2]
	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156) ~[grpc-stub-1.43.2.jar:1.43.2]
	at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.respondWorkflowTaskCompleted(WorkflowServiceGrpc.java:2671) ~[temporal-serviceclient-1.7.0.jar:?]
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.lambda$sendReply$0(WorkflowWorker.java:320) ~[temporal-sdk-1.7.0.jar:?]
	at io.temporal.internal.retryer.GrpcRetryer.lambda$retry$0(GrpcRetryer.java:44) ~[temporal-serviceclient-1.7.0.jar:?]
	at io.temporal.internal.retryer.GrpcSyncRetryer.retry(GrpcSyncRetryer.java:61) ~[temporal-serviceclient-1.7.0.jar:?]
	at io.temporal.internal.retryer.GrpcRetryer.retryWithResult(GrpcRetryer.java:51) ~[temporal-serviceclient-1.7.0.jar:?]
	at io.temporal.internal.retryer.GrpcRetryer.retry(GrpcRetryer.java:41) ~[temporal-serviceclient-1.7.0.jar:?]
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.sendReply(WorkflowWorker.java:313) ~[temporal-sdk-1.7.0.jar:?]
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:243) ~[temporal-sdk-1.7.0.jar:?]
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:188) ~[temporal-sdk-1.7.0.jar:?]
	at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:83) ~[temporal-sdk-1.7.0.jar:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]

It looks like the workflow task constantly timing out. What is the original exception in the worker log?

yes @maxim ,

Here is the sequence

  1. Workflow starts
  2. We invoke 2 activities in parallel say: A1 and A2
  3. A1 throws a Non Retryable exception (custom one - we have set this as doNotRetry in RetryOptions)
  4. Workflow catches the exception and is performing housekeeping actions (like updating a cache etc)
  5. A2 throws a Retryable exception (java.util.concurrent.TimeoutException) i.e,. API call times out.
  6. Now the workflow goes crazy… and starts from the beginning (step 1)
  7. Step 6 happens infinitely until Workflow Timeout

Both the activities are local activities. Is there anyway to cancel a local activity?
Please suggest any workarounds

Do you see any exceptions at step 5 or 6 logged by the workflow worker?

On step 6, I see below exception

internal.worker.WorkflowWorker: Workflow task failure during replying to the server. startedEventId=14, WorkflowId=5089E07B66E342388F0FEF2FE4735E18@AVABCgA-06437495323552-0, RunId=21f5ef39-bf54-458e-876a-2a57c98f4449. If seen continuously the workflow might be stuck. io.grpc.StatusRuntimeException: INVALID_ARGUMENT: encouter invalid commands sequence: RecordMarker, RecordMarker, RecordMarker, CompleteWorkflowExecution, RecordMarker
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262) ~[grpc-stub-1.43.2.jar:1.43.2]
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243) ~[grpc-stub-1.43.2.jar:1.43.2]
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156) ~[grpc-stub-1.43.2.jar:1.43.2]
at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.respondWorkflowTaskCompleted(WorkflowServiceGrpc.java:2671) ~[temporal-serviceclient-1.7.0.jar:?]
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.lambda$sendReply$0(WorkflowWorker.java:320) ~[temporal-sdk-1.7.0.jar:?]
at io.temporal.internal.retryer.GrpcRetryer.lambda$retry$0(GrpcRetryer.java:44) ~[temporal-serviceclient-1.7.0.jar:?]
at io.temporal.internal.retryer.GrpcSyncRetryer.retry(GrpcSyncRetryer.java:61) ~[temporal-serviceclient-1.7.0.jar:?]
at io.temporal.internal.retryer.GrpcRetryer.retryWithResult(GrpcRetryer.java:51) ~[temporal-serviceclient-1.7.0.jar:?]
at io.temporal.internal.retryer.GrpcRetryer.retry(GrpcRetryer.java:41) ~[temporal-serviceclient-1.7.0.jar:?]
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.sendReply(WorkflowWorker.java:313) ~[temporal-sdk-1.7.0.jar:?]
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:243) ~[temporal-sdk-1.7.0.jar:?]
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:188) ~[temporal-sdk-1.7.0.jar:?]
at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:83) ~[temporal-sdk-1.7.0.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]

This looks like a bug in the SDK. Are you able to reproduce it using the latest SDK version?

I am unable to reproduce this using 1.7.0 or 1.8.0. For both cases the local activity that throws non-retryable error fails and produces a MarkerRecorded event with the failure. The second local activity retries and then fails with the retryable error.

What server version are you using?
Could you show your relevant code so can make sure I’m doing the same thing?

I am unable to reproduce this using 1.7.0 or 1.8.0. For both cases the local activity that throws non-retryable error fails and produces a MarkerRecorded event with the failure.

Actually I take that back. It just does not happen every time. After running it 10 times I was able to get:

13:56:45.271 [Workflow Executor taskQueue="HelloParallelActivityTaskQueue", namespace="default": 1] WARN  i.t.internal.worker.WorkflowWorker - Workflow task failure during replying to the server. startedEventId=3, WorkflowId=HelloParallelActivityWorkflow, RunId=790382b4-27c5-4f0c-bbd4-cdc88e699036. If seen continuously the workflow might be stuck.
io.grpc.StatusRuntimeException: INVALID_ARGUMENT: encouter invalid commands sequence: CompleteWorkflowExecution, RecordMarker

followed by:

io.grpc.StatusRuntimeException: INVALID_ARGUMENT: encouter invalid commands sequence: CompleteWorkflowExecution, RecordMarker

@ravikiran are you by chance not waiting for the async started local activities to complete? I can see thementioned issue if start the async invocations, but then don’t wait for them to complete. This could be a race condition between workflow completing and marker being recorded in history.

If I do for example:

List<Promise<String>> promiseList = new ArrayList<>();
      promiseList.add(Async.function(localActivityOne::doSomething));
      promiseList.add(Async.function(localActivityTwo::doSomethingElse);

     // ...

      // wait for local activities to complete to get results
     // getting the error if i don't have this code block in workflow 
      for (Promise<String> promise : promiseList) {
        // ...
         promise.get();  
      }

I am unable to get this error, tried running it over 30 times.

hi @tihomir,

I have tried using v1.7.0, issue persists.

Few observations:

  1. If the order of exception changes, we dont see any issue.
  2. In the below snippet, we see the “Workflow completed” message/log before Workflow going into infinite loop.

This is what I am doing

try {
      Promise a1Promise = Async.function(localActivityOne::doSomething);
      Promise a2Promise = Async.function(localActivityTwo::doSomethingElse);
      // running the promises in parallel
      String a1Result = (String) a1Promise.get();
      String a2Result = (String) a2Promise.get();
    } catch (Exception e) {
      // perform cache updates in case of error // or add wait or sleep to see the issue
    }
    System.out.println("Workflow completed");
    return;

@ravikiran could you provide a reproducible sample please, if not, can you show your local activity and workflow options, and local activity code where you throw the exceptions?

I ask because ran test using your latest shown code over 200 times and was not able to get the failure.
Can however reproduce it without the:

String a1Result = (String) a1Promise.get();
String a2Result = (String) a2Promise.get();

(without waiting on local activity promise completion)

Hi @tihomir ,

Are you sure you are throwing NonRetryable exception first followed by Retryable one?

Anyway, thanks to you for below question. It is indeed a race condition and when we get a “NonRetryable” exception, we just complete other promise and ignore its result or exception.

Basically, this is what we did to workaround the issue.

try {
      Promise a1Promise = Async.function(localActivityOne::doSomething);
      Promise a2Promise = Async.function(localActivityTwo::doSomethingElse);
      // running the promises in parallel
      String a1Result = (String) a1Promise.get();
      String a2Result = (String) a2Promise.get();
    } catch (Exception e) {
         try {
              a2Promise.get();
         } catch(Exception ex) { 
               //ignore
         }
     // perform other operations
    }
    System.out.println("Workflow completed");
    return;