Trouble with workflow cancellation

Hi,
I have a temporal implementation that has the following structure. Its a user-interactive workflow that should be responsive to stop/cancel requests.
The workflows and activities are modelled as follows.

##parent workflow
*the parent workflow contains a seqence of steps to be executed one after another. each step is modelled as a workflow embedded inside a cancellation scope that can be cancelled and the parent workflow stopped at that point	
*when user wants to stop the workflow, the parent workflow will receive a STOP signal using a signal method. At that point, the parent workflow will trigger cancel() on the currently executing cancellationscope(which is excecuting a child workflow) and will return, skipping the execution of remaining steps(cancellation scopes).
	##cancellationscope(each step of a parent workflow)  		
	*Each cancellationscope will trigger a child workflow
	*Because the cancellation scope needs to be cancelled, it triggers the embedded child workflows asynchronously.
	*Inside the cancellation scope, Async.procedure() is used to trigger the child workflow asynchronously
	*After workflow is triggered, the parent workflow needs to wait till this step is complete. So a Promise.get()(promise returned by Async.procedure()) is used to wait till the child workflow is complete. this promise.get() is executed by the parentworkflow.
		##childworkflow
		*The child workflow is embedded inside the cancellation scope. 
		*The child workflow will trigger a set of activities in parallel asynchronously using Async.function().
		*The child workflow will do a promise.get() for all the promises returned by Async.function() to wait for all the activities execution to complete and return
			##activity
			*The activity does some processing and handles DB persistence.
			*Each activity heartbeats and will return, when it detects that the child workflow that triggered the activity got cancelled.

Configuration
* every Activity/Child workflow is set with RetryOptions.maximumattempts as 1 so that no retry happens.

when we signal the parent workflow to stop, the parent workflow inturn cancels the cancellationscope(the workflow step that is currently executing) that triggered the child workflow and returns

Expectation:
* The child workflow receives the cancellation exception.
* the child workflow gets cancelled
* all the activities that are triggered from the child workflow(scheduled for execution) will get cancelled
* the parent workflow gets completed. the child workflow gets cancelled( this canceled/completed state) is expected to be reflected in the temporal web UI

What actually happens:
* in the temporal we can see both parent and child workflows continue to run.
* first exception we see is io.temporal.failure.ActivityFailure(caused by io.temporal.failure.CanceledFailure) at the point where the childworkflow is waiting for the activity result we see cancelled failures in the logs at the point where child workflow is doing a get() on activitiy promises.
* Second exception is: WARN [temporalworker,] [Workflow Executor taskQueue=“WorkflowQueue”, namespace=“default”: 100] WARN i.t.i.sync.DeterministicRunnerImpl.close - Promise completed with exception and was never accessed. The ignored exception:
io.temporal.failure.ActivityFailure: scheduledEventId=32, startedEventId=0, activityType=‘GenerateAndPersist’, activityId=‘461240c0-9005-3e25-9f39-424b920e77ba’, identity=‘workflow’, retryState=RETRY_STATE_UNSPECIFIED.
This second exception keeps coming in the logs and doesnt stop.
* In the temporal web UI we can see that both the child and parent workflows continue in the running state indefinitely

full details of the exceptions are below:

 * First exception
06-05-2022 09:45:40.058 ERROR [temporalworker,,,] [workflow-method] ERROR i.t.i.s.POJOWorkflowImplementationFactory.logWorkflowExecutionException - Workflow execution failure WorkflowId=658f5ea3-f9a9-34f7-9c9c-c8fc61bb7961, RunId=9c34a875-d2f6-464f-b0d1-b2f2aaf10952, WorkflowType=ReachLinkEnumerationWorkflow io.temporal.failure.ActivityFailure: scheduledEventId=23, startedEventId=0, activityType='GenerateAndPersist', activityId='bce747c8-db73-38db-838f-b1a3ee2ffd5b', identity='workflow', retryState=RETRY_STATE_UNSPECIFIED
at java.base/java.lang.Thread.getStackTrace(Unknown Source)
at io.temporal.internal.sync.CompletablePromiseImpl.throwFailure(CompletablePromiseImpl.java:136)
at io.temporal.internal.sync.CompletablePromiseImpl.getImpl(CompletablePromiseImpl.java:95)
at io.temporal.internal.sync.CompletablePromiseImpl.get(CompletablePromiseImpl.java:74)
at java.base/java.util.ArrayList.forEach(Unknown Source)
at XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX(application code line where its doing a get() on promise)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at io.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation$RootWorkflowInboundCallsInterceptor.execute(POJOWorkflowImplementationFactory.java:324)
at io.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation.execute(POJOWorkflowImplementationFactory.java:282)
at io.temporal.internal.sync.WorkflowExecuteRunnable.run(WorkflowExecuteRunnable.java:53)
at io.temporal.internal.sync.SyncWorkflow.lambda$start$0(SyncWorkflow.java:126)
at io.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:101)
at io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper.run(WorkflowThreadImpl.java:107)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
   Caused by: io.temporal.failure.CanceledFailure: 
at io.temporal.failure.FailureConverter.failureToExceptionImpl(FailureConverter.java:128)
at io.temporal.failure.FailureConverter.failureToException(FailureConverter.java:72)
at io.temporal.failure.FailureConverter.failureToExceptionImpl(FailureConverter.java:86)
at io.temporal.failure.FailureConverter.failureToException(FailureConverter.java:72)
at o.temporal.internal.sync.SyncWorkflowContext$ActivityCallback.lambda$invoke$0(SyncWorkflowContext.java:198)
... 7 common frames omitted
  • second exception (one that is coming indefinitely)
    06-05-2022 09:45:40.063 WARN [temporalworker,] [Workflow Executor taskQueue=“WorkflowQueue”, namespace=“default”: 100] WARN i.t.i.sync.DeterministicRunnerImpl.close - Promise completed with exception and was never accessed. The ignored exception:
    io.temporal.failure.ActivityFailure: scheduledEventId=32, startedEventId=0, activityType=‘GenerateAndPersist’, activityId=‘461240c0-9005-3e25-9f39-424b920e77ba’, identity=‘workflow’, retryState=RETRY_STATE_UNSPECIFIED
    at java.base/java.lang.Thread.getStackTrace(Unknown Source)
    at io.temporal.internal.sync.CompletablePromiseImpl.throwFailure(CompletablePromiseImpl.java:136)
    at io.temporal.internal.sync.CompletablePromiseImpl.getImpl(CompletablePromiseImpl.java:95)
    at io.temporal.internal.sync.CompletablePromiseImpl.get(CompletablePromiseImpl.java:74)
    at io.temporal.internal.sync.DeterministicRunnerImpl.close(DeterministicRunnerImpl.java:387)
    at io.temporal.internal.sync.DeterministicRunnerImpl.runUntilAllBlocked(DeterministicRunnerImpl.java:317)
    at io.temporal.internal.sync.SyncWorkflow.eventLoop(SyncWorkflow.java:144)
    at io.temporal.internal.replay.ReplayWorkflowExecutor.eventLoop(ReplayWorkflowExecutor.java:74)
    at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler$EntityManagerListenerImpl.eventLoop(ReplayWorkflowRunTaskHandler.java:328)
    at io.temporal.internal.statemachines.WorkflowStateMachines.eventLoop(WorkflowStateMachines.java:388)
    at io.temporal.internal.statemachines.WorkflowStateMachines.lambda$scheduleActivityTask$ff6e4b43$1(WorkflowStateMachines.java:449)
    at io.temporal.internal.statemachines.ActivityStateMachine.notifyCanceled(ActivityStateMachine.java:318)
    at io.temporal.internal.statemachines.FixedTransitionAction.apply(FixedTransitionAction.java:45)
    at io.temporal.internal.statemachines.StateMachine.executeTransition(StateMachine.java:137)
    at io.temporal.internal.statemachines.StateMachine.handleHistoryEvent(StateMachine.java:91)
    at io.temporal.internal.statemachines.EntityStateMachineBase.handleEvent(EntityStateMachineBase.java:63)
    at io.temporal.internal.statemachines.WorkflowStateMachines.handleEventImpl(WorkflowStateMachines.java:210)
    at io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:178)
    at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleEvent(ReplayWorkflowRunTaskHandler.java:140)
    at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:180)
    at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:150)
    at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithEmbeddedQuery(ReplayWorkflowTaskHandler.java:201)
    at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:114)
    at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:319)
    at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:279)
    at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
    Caused by: io.temporal.failure.CanceledFailure:
    at io.temporal.failure.FailureConverter.failureToExceptionImpl(FailureConverter.java:128)
    at io.temporal.failure.FailureConverter.failureToException(FailureConverter.java:72)
    at io.temporal.failure.FailureConverter.failureToExceptionImpl(FailureConverter.java:86)
    at io.temporal.failure.FailureConverter.failureToException(FailureConverter.java:72)
    at io.temporal.internal.sync.SyncWorkflowContext$ActivityCallback.lambda$invoke$0(SyncWorkflowContext.java:198)
    at io.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:101)
    at io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper.run(WorkflowThreadImpl.java:107)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    … 3 common frames omitted

Expectation:

  • The child workflow receives the cancellation exception.
  • the child workflow gets cancelled
  • all the activities that are triggered from the child workflow(scheduled for execution) will get cancelled
  • the parent workflow gets completed. the child workflow gets cancelled( this canceled/completed state) is expected to be reflected in the temporal web UI

With cancellation you can configure if you want to wait for cancellation to complete or not (for both your child workflow as well as activities invoked in the child workflows).

Here I assume that you want to be able to do some cleanup work in both your child workflow as well as activities upon cancellation. For this case you can set:

ChildWorkflowOptions: setCancellationType(ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED)

ActivityOptions: .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED)

Here is a full sample that shows this, hope it helps.

If you run the Starter you can see that upon receiving the cancellation signal, the parent workflow will wait for child cancellation, and child workflow is going to allow activity to perform some “cleanup” as well. The parent then can catch ChildWorkflowFailure and return a “real” result.

Take a look at other options for ChildWorkflowCancellationType and ActivityCancellationType to fine-tune this for your specific requirements.

If you do not care about cleanup, you could set:

ChildWorkflowOptions:

.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_REQUEST_CANCEL)
.setCancellationType(ChildWorkflowCancellationType.TRY_CANCEL)

ActivityOptions:
.setCancellationType(ActivityCancellationType.TRY_CANCEL)

In this case in your workflow code you would catch a CanceledFailure right away and in activity you would get ActivityNotExistsException instead of ActivityCanceledException on next heartbeat since the child gets cancelled before the activity completes.

hi Tihomir,
Thanks for your reply(sorry about the delayed response, got held up with few other things). I did try out setting the different cancellation types, but it did not work. The problem is, cancellation does not stop the workflow and the workflow continues to print the following exception

WARN [temporalworker,,,] [Workflow Executor taskQueue="WorkflowQueue", namespace="default": 100] WARN  i.t.i.sync.DeterministicRunnerImpl.close - Promise completed with exception and was never accessed. The ignored exception:
io.temporal.failure.ActivityFailure: scheduledEventId=47, startedEventId=0, activityType='GenerateAndPersist', activityId='5688d4f6-96ad-3024-99a5-e81a32af7c92', identity='workflow', retryState=RETRY_STATE_UNSPECIFIED
...
...
Caused by: io.temporal.failure.CanceledFailure: 
	at io.temporal.failure.FailureConverter.failureToExceptionImpl(FailureConverter.java:128)
	at io.temporal.failure.FailureConverter.failureToException(FailureConverter.java:72)

So the activity actually receives the cancellation call. we can also see the cancellation exception coming. but we do not understand why this is getting printed infinitely and the control does not come out of the workflow execution at all.

in the workflow which triggers this activity, the workflow asynchronously triggers a bunch of activities and waits for all of them to complete. the cancellation of the workflow happens when multiple activities are in progress.
for the same activity instance(activity with same activity ID), we see multiple exceptions (“Promise completed with exception and was never accessed. The…”)

.setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED)
        .setHeartbeatTimeout(startToCloseTimeout)
        .setStartToCloseTimeout(startToCloseTimeout)   
.setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).setDoNotRetry().build())

But still in the logs i can see that retry state is unspecified.
We are doing a get() on the promise returned by the activity. the below snippet represents how the
workflow triggers multiple parallel activities and waits for the activities to complete.

	for (var i = 0; i < numberOfBatches; i++) {
        var childActivity =
            Activities.newActivity(ChildActivity.class);
        activityPromises.add(Async.function(
            () -> {
              String result =
                  childActivity.executeActivity();
              return handleActivityResult(result);
            }));
      }
      //wait till all activities are completed.
      activityPromises.forEach(Promise::get);

do you see any issue with the way we invoke activities from inside the workflow? why do we see the exceptions infinitely?

Hi @thiru could you share your code (maybe push it to a github repo and link) so I can take a look. It would be easier to debug that way. Thanks!

Hi @tihomir , unfortunately i dont think we can share code, i will try to create a sample that resembles the code and recreate the issue and post the code.
btw, it might help, if you can point out when we might hit the following exception?

[Workflow Executor taskQueue="WorkflowQueue", namespace="default": 100] WARN  i.t.i.sync.DeterministicRunnerImpl.close - Promise completed with exception and was never accessed. The ignored exception:
io.temporal.failure.ActivityFailure: scheduledEventId=47, startedEventId=0, activityType='GenerateAndPersist', activityId='5688d4f6-96ad-3024-99a5-e81a32af7c92', identity='workflow', retryState=RETRY_STATE_UNSPECIFIED

Hi Tihomir,
I created a simplified sample code that closely resembles our implementation and with simple activities and parent child workflow. i have updated the code in the github
It contains the sample code and also the export of the infinitely running workflows in the temporal. I took the dump of the workflows(parent and child workflows from the temporal UI) .
In the sample code shared if you run the code, you will hit the same exceptions, that i shared in the earlier posts(Promise completed with exception and was never accessed. The ignored exception) but the workflow exits and is not running infinitely as in production code.
One more exception that i could see in the logs (the production code logs where the workflows run forever)

06-05-2022 09:45:52.259 ERROR [temporalworker,,,] [Workflow Executor taskQueue="WorkflowQueue", namespace="default": 102] ERROR i.t.i.r.ReplayWorkflowTaskHandler.failureToResult - Workflow task failure. startedEventId=296, WorkflowId=658f5ea3-f9a9-34f7-9c9c-c8fc61bb7961, RunId=9c34a875-d2f6-464f-b0d1-b2f2aaf10952. If see continuously the workflow might be stuck.
java.lang.Error: closed
	at io.temporal.internal.sync.DeterministicRunnerImpl.checkClosed(DeterministicRunnerImpl.java:433)
	at io.temporal.internal.sync.DeterministicRunnerImpl.executeInWorkflowThread(DeterministicRunnerImpl.java:479)
	at io.temporal.internal.sync.SyncWorkflowContext$ActivityCallback.invoke(SyncWorkflowContext.java:193)
	at io.temporal.internal.statemachines.WorkflowStateMachines.lambda$scheduleActivityTask$ff6e4b43$1(WorkflowStateMachines.java:447)
	at io.temporal.internal.statemachines.ActivityStateMachine.notifyCanceled(ActivityStateMachine.java:318)
	at io.temporal.internal.statemachines.FixedTransitionAction.apply(FixedTransitionAction.java:45)
	at io.temporal.internal.statemachines.StateMachine.executeTransition(StateMachine.java:137)
	at io.temporal.internal.statemachines.StateMachine.handleHistoryEvent(StateMachine.java:91)

One more observation is:
in case where the workflow is running forever, i could see that the exception

i.t.i.sync.DeterministicRunnerImpl.close - Promise completed with exception and was never accessed. The ignored exception:
io.temporal.failure.ActivityFailure: scheduledEventId=27, startedEventId=0, activityType='GenerateAndPersist', activityId='077ba88a-48cf-3439-9481-d3db92c34496', identity='workflow', retryState=RETRY_STATE_UNSPECIFIED
	at java.base/java.lang.Thread.getStackTrace(Unknown Source)
	at io.temporal.internal.sync.CompletablePromiseImpl.throwFailure(CompletablePromiseImpl.java:136)
	at io.temporal.internal.sync.CompletablePromiseImpl.getImpl(CompletablePromiseImpl.java:95)
	at io.temporal.internal.sync.CompletablePromiseImpl.get(CompletablePromiseImpl.java:74)

comes repeatedly comes with the same activityID. but in the sample code, we can see that the same exception comes, but only once for each activity ID and then the workflow gets cancelled properly.
Please let me know if you have any leads to investigate this issue further.
We do have a slightly convoluted use case where we need the workflows to run a mandatory task even when it is cancelled. So we use signals and cancellation scopes to execute steps of a workflow and then cancel the child workflows.

Made some updates to your code, see here. Give it a try and hope it gets you in the right direction.

Hi @tihomir ,
unfortunately we could not get the cancellation working properly with the new example also. But after quite a bit of experimentation, we tried out something that worked.

Instead of

activityPromises.forEach(Promise::get)

we changed it to

Exception activityExp = null;
  for (Promise<String> tmpPromise : activityPromises) {
      try {
        tmpPromise.get();
      } catch (Exception e) {
        activityExp = e;       
      }
   }
if(activityExp!=null){
 throw activtyExp;
}

though both superficially look similar and appear to wait for all of the activity promises to complete, the second snippet ensure that the get() of all promises are called even when one of the get()'s throw an exception. But in the first snippet when one of the get()'s throw an exception, the other get()'s are not called at all.

from the exceptions our hypothesis is, when get()'s are not called on activity promises(which are executed by Async utility), then probably the workflow is not terminated properly. somehow when get() is called at least once, the workflow is terminated properly. We have not tested this assumption extensively, bust just an assumption based on whatever observations we made.

Thanks for the update. Believe you can still call
Promise.allOf(activityPromises).get();
and then loop through the promises to do checks, for example:

for (Promise<String> promise : activityPromises) {
      if(promise.isCompleted()) { ... }
      if(promise.getFailure() == null) { ... }
     // ...
}

Hope this helps.