Cancel of a CancellationScope not working as expected

I create a cancellation scope which triggers a child workflow asynchronously. The child workflow executes an activity. when i cancel the scope, the child workflow gets terminated, but the activity that is being executed inside the child workflow doesnt seem to get a cancelled failure. instead, its heartbeat gets a “io.grpc.StatusRuntimeException: NOT_FOUND: workflow execution already completed” which i assume is because the childworkflow got cancelled, but the cancellation was not propagated to the activity inside the child workflow.
is this behavior expected? how do we cancel the childworkflow and also the activity inside the childworkflow properly?
After getting this exception, the activity doesnt seem to exit cleanly with an exception, but keeps waiting and the worker shutdown doesnt seem to have any impact. the JVM stays up.
can someone please give some pointers to the right approach to handle this?

After the CancellationScope is cancelled, I see from the thread dump that a workflow thread is waiting at a point where the activity is triggered.
“workflow-method”
java.lang.Thread.State: WAITING
at java.base@11.0.9/jdk.internal.misc.Unsafe.park(Native Method)
at java.base@11.0.9/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
at java.base@11.0.9/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2081)
at app//io.temporal.internal.sync.WorkflowThreadContext.yield(WorkflowThreadContext.java:88)
at app//io.temporal.internal.sync.WorkflowThreadImpl.yield(WorkflowThreadImpl.java:416)
at app//io.temporal.internal.sync.WorkflowThread.await(WorkflowThread.java:45)
at app//io.temporal.internal.sync.CompletablePromiseImpl.getImpl(CompletablePromiseImpl.java:84)
at app//io.temporal.internal.sync.CompletablePromiseImpl.get(CompletablePromiseImpl.java:74)
at app//io.temporal.internal.sync.ActivityStubBase.execute(ActivityStubBase.java:44)
at app//io.temporal.internal.sync.ActivityInvocationHandler.lambda$getActivityFunc$0(ActivityInvocationHandler.java:77)
at app//io.temporal.internal.sync.ActivityInvocationHandler$$Lambda$147/0x00000001003df440.apply(Unknown Source)
at app//io.temporal.internal.sync.ActivityInvocationHandlerBase.invoke(ActivityInvocationHandlerBase.java:70)
at app//com.sun.proxy.$Proxy13.ActivityGreeting(Unknown Source)
at app//io.temporal.samples.hello.TNPSParentChildCancel$ChildWorkflowImpl.ChildGreeting(TNPSParentChildCancel.java:285)
at java.base@11.0.9/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
I Wonder why the workflow thread might still be waiting when cancellation scope is already cancelled?

When an activity is canceled the workflow reacts differently depending on the ActivityOptions.cancellationType property value. The default is TRY_CANCEL which sends cancellation to an activity but immediately throws CanceledFailure from the activity invocation without waiting for the activity to cancel. In your case, it looks like the child workflow immediately completes. Try setting the child workflow activity ActivityOptions.cancellationType to WAIT_CANCELLATION_COMPLETED for child workflow to wai for the activity cancellation.

Hi Maxim,
Thanks a lot for your quick reply. But looks like setting the cancellationtype parameter didnt solve the issue. Here is the quick gist of what i am trying to achieve

  1. parent workflow triggers activities and child workflows.
  2. Parent workflow needs to execute a cancellable part and a non-cancellable part. So whatever is cancellable is encapsulated inside a cancellation scope and is run asynchronously from INSIDE the parentworkflow.
  3. whenever parent workflow needs to be cancelled, a signal is sent to the parentworkflow. the parentworkflow waits for the cancel signal and cancels the cancellation scope.
  4. The assumption here is: once the cancellation scope is cancelled, the child workflow triggered from inside the cancellation scope will get terminated. any activity triggered from inside the childworkflow will also get an exception when they heartbeat.
  5. what seems to happen:
    5.1 the activity gets an activitycompletion exception
    5.2 the workflowthread executing the childworkflow seems to remain alive and the JVM doesnt seem to shutdown even after the temporal worker is shutdown because the workflow worker thread is alive.
    I am attaching the code that mostly based on the temporal samples for your reference. any pointers would be of great help! thanks for your time!
package io.temporal.samples.hello;

import io.temporal.activity.Activity;
import io.temporal.activity.ActivityCancellationType;
import io.temporal.activity.ActivityExecutionContext;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import io.temporal.activity.ActivityOptions;
import io.temporal.client.ActivityCompletionException;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.common.RetryOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.workflow.Async;
import io.temporal.workflow.CancellationScope;
import io.temporal.workflow.ChildWorkflowCancellationType;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Promise;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

public class NewParentChildCancel {

  private static final String workflowID = "WorkflowID";
  private static final String qName = "Q";

  public static void main(String[] args) {
    WorkflowServiceStubs workflowstubs = WorkflowServiceStubs.newInstance();
    WorkflowClient wfClient = WorkflowClient.newInstance(workflowstubs);
    WorkerFactory workerfactory = WorkerFactory.newInstance(wfClient);

    Worker worker = workerfactory.newWorker(qName);
    worker.registerActivitiesImplementations(new GreetingActivityImpl());
    worker.registerWorkflowImplementationTypes(ParentWorkflowImpl.class);
    worker.registerWorkflowImplementationTypes(ChildWorkflowImpl.class);
    workerfactory.start();

    // call the Parent workflow
    ParentWorkflowIntf parentStub =
        wfClient.newWorkflowStub(
            ParentWorkflowIntf.class,
            WorkflowOptions.newBuilder()
                .setWorkflowId(workflowID)
                .setTaskQueue(qName)
                .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build())
                .build());

    // started the workflow
    WorkflowClient.execute(parentStub::ParentGreeting);

    // wait for 5secs
    try {
      Thread.sleep(5000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("WAIT DONE BEFORE CANCEL");
    // signal the parent workflow to stop execution and return.
    // the parent workflow will cancel the childworkflow that is currently executing
    // inside a cancellationscope and return
    parentStub.singal("CANCEL");
    System.out.println("CANCEL DONE FROM PARENT WORKFLOW");

    // wait for 30 secs for the cancellation to complete
    try {
      Thread.sleep(30000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("START WORKER SHUTDOWN");
    workerfactory.shutdown();
    System.out.println("WORKER SHUTDOWN DONE");
  }

  @WorkflowInterface
  public interface ParentWorkflowIntf {
    @WorkflowMethod
    void ParentGreeting();

    @SignalMethod
    void singal(String cancelString);
  }

  public static class ParentWorkflowImpl implements ParentWorkflowIntf {

    private String cancelChildflag = " ";
    private String completeflag = null;

    @Override
    public void ParentGreeting() {
      try {
        // Run the child Workflow using Async from cancellation scope
        ChildWorkFlowScope workflowstep = new ChildWorkFlowScope();
        CancellationScope scope = workflowstep.getChildWorkflowCancelScope();
        scope.run();
        // End of the scope run

        // wait till parent workflow gets a signal to cancel execution.
        // once parent workflow gets a cancel signal, cancel the CancellationScope and
        // return
        System.out.println("SCOPE WAIT LOOP. scope complete flag: " + completeflag);
        while (!workflowstep.getResult().isCompleted()) {
          System.out.println("WAITING FOR SCOPE COMPLETION");
          Workflow.sleep(500);
          if ("CANCEL".equalsIgnoreCase(cancelChildflag)) {
            System.out.println("SCOPE CANCEL CALLED");
            scope.cancel();
            System.out.println("SCOPE CANCEL DONE");
            break;
          }
        }
        System.out.println(" END OF PARENT WORKFLOW ");
      } catch (Exception e) {
        System.out.println(" START - EXCEPTION HANDLING INSIDE PARENT WORKFLOW");
        e.printStackTrace();
        System.out.println(" END - EXCEPTION HANDLING INSIDE PARENT WORKFLOW");
      }
    }

    @Override
    public void singal(String cancelString) {
      System.out.println(" START- Inside signal method: " + cancelString);
      this.cancelChildflag = cancelString;
      System.out.println("END - Inside signal method: CANCEL CALLED");
    }
  }

  // encapsulates one step of parent workflow where child workflow is run in
  // async manner. returns a cancellationscope to execute and returns the result
  static class ChildWorkFlowScope {
    private ChildWorkflowIntfc childworkflowStub =
        Workflow.newChildWorkflowStub(
            ChildWorkflowIntfc.class,
            ChildWorkflowOptions.newBuilder()
                .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build())
                .setCancellationType(ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED)
                .build());

    private Promise result = null;

    CancellationScope getChildWorkflowCancelScope() {
      return Workflow.newCancellationScope(
          () -> {
            result = Async.function(childworkflowStub::ChildGreeting, 1);
          });
    }

    public Promise getResult() {
      return result;
    }
  }

  @WorkflowInterface
  public interface ChildWorkflowIntfc {
    @WorkflowMethod
    String ChildGreeting(int i);
  }

  public static class ChildWorkflowImpl implements ChildWorkflowIntfc {
    private final GreetingActivity activity1 =
        Workflow.newActivityStub(
            GreetingActivity.class,
            ActivityOptions.newBuilder()
                .setTaskQueue(qName)
                .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build())
                .setStartToCloseTimeout(Duration.ofSeconds(30))
                .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED)
                .build());

    @Override
    public String ChildGreeting(int i) {
      String resString = "NOT DONE";
      try {
        System.out.println(" START Child workflow value I " + i);
        final List<Promise<String>> result = new ArrayList<>();
        activity1.ActivityGreeting(1);
        //        activity2.ActivityGreeting(2);
        System.out.println(" END Child workflow value I " + i);
      } catch (Exception e) {
        System.out.println(" START - INSiDE CHILD WORKFLOW EXCEPTION BLOCK" + i);
        e.printStackTrace();
        System.out.println(" END -INSiDE CHILD WORKFLOW EXCEPTION BLOCK" + i);
      }
      return resString;
    }
  }

  @ActivityInterface
  public interface GreetingActivity {
    @ActivityMethod
    String ActivityGreeting(int i);
  }

  // greeting activity runs for 7.5 secs and heartbeats.
  // returns a simple string as greeting
  public static class GreetingActivityImpl implements GreetingActivity {
    @Override
    public String ActivityGreeting(int i) {
      System.out.println("STARTING ACTIVITY...." + i);
      ActivityExecutionContext context = Activity.getExecutionContext();
      for (int k = 0; k < 15; k++) {
        sleep();
        System.out.println(String.format(" ACTIVITY IN PROGRESS : %s, I: %s", k, i));
        try {
          System.out.println(" HEART BEATING : " + k);
          context.heartbeat(1);
        } catch (ActivityCompletionException e) {
          System.out.println(" INSIDE ACTIVITY CLEANUP K: " + k);
          //          throw e;
          return "DUMMY";
        }
      }
      System.out.println("END ACTIVITY I:" + i);
      return "GREET " + i;
    }

    private void sleep() {`Preformatted text`
      try {
        Thread.sleep(500);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}

I see a few issues with your implementation:

  • You are using busy wait loop inside a workflow. It is an anti-pattern as it grows workflow history with each iteration. Use Workflow.await to wait for some condition inside a workflow.
  • Your parent workflow is not waiting for the child to complete. CancellationScope.cancel does not wait for the cancellation to complete (which can be a very long running operation). It delivers cancellation request and immediately returns. So the parent workflow immediately completes if it doesn’t wait for the child result.
  • You are using a signal to cancel the parent workflow. Have you considered to cancel it directly through the provided API?

The following code works for me:

package io.temporal.samples.hello;

import io.temporal.activity.Activity;
import io.temporal.activity.ActivityCancellationType;
import io.temporal.activity.ActivityExecutionContext;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import io.temporal.activity.ActivityOptions;
import io.temporal.client.ActivityCompletionException;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.common.RetryOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.workflow.Async;
import io.temporal.workflow.CancellationScope;
import io.temporal.workflow.ChildWorkflowCancellationType;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Promise;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class NewParentChildCancel {

  private static final String workflowID = "WorkflowID";
  private static final String qName = "Q";

  public static void main(String[] args) {
    WorkflowServiceStubs workflowstubs = WorkflowServiceStubs.newInstance();
    WorkflowClient wfClient = WorkflowClient.newInstance(workflowstubs);
    WorkerFactory workerfactory = WorkerFactory.newInstance(wfClient);

    Worker worker = workerfactory.newWorker(qName);
    worker.registerActivitiesImplementations(new GreetingActivityImpl());
    worker.registerWorkflowImplementationTypes(ParentWorkflowImpl.class);
    worker.registerWorkflowImplementationTypes(ChildWorkflowImpl.class);
    workerfactory.start();

    // call the Parent workflow
    ParentWorkflowIntf parentStub =
        wfClient.newWorkflowStub(
            ParentWorkflowIntf.class,
            WorkflowOptions.newBuilder()
                .setWorkflowId(workflowID)
                .setTaskQueue(qName)
                .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build())
                .build());

    // started the workflow
    WorkflowClient.execute(parentStub::ParentGreeting);

    // wait for 5secs
    try {
      Thread.sleep(5000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("WAIT DONE BEFORE CANCEL");
    // signal the parent workflow to stop execution and return.
    // the parent workflow will cancel the childworkflow that is currently executing
    // inside a cancellationscope and return
    parentStub.singal("CANCEL");
    System.out.println("CANCEL DONE FROM PARENT WORKFLOW");

    // wait for 30 secs for the cancellation to complete
    try {
      Thread.sleep(30000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("START WORKER SHUTDOWN");
    workerfactory.shutdown();
    workerfactory.awaitTermination(1, TimeUnit.DAYS);
    System.out.println("WORKER SHUTDOWN DONE");
  }

  @WorkflowInterface
  public interface ParentWorkflowIntf {
    @WorkflowMethod
    void ParentGreeting();

    @SignalMethod
    void singal(String cancelString);
  }

  public static class ParentWorkflowImpl implements ParentWorkflowIntf {

    private String cancelChildflag = " ";
    private String completeflag = null;

    @Override
    public void ParentGreeting() {
      try {
        // Run the child Workflow using Async from cancellation scope
        ChildWorkFlowScope workflowstep = new ChildWorkFlowScope();
        CancellationScope scope = workflowstep.getChildWorkflowCancelScope();
        scope.run();
        // End of the scope run

        // wait till parent workflow gets a signal to cancel execution.
        // once parent workflow gets a cancel signal, cancel the CancellationScope and
        // return
        System.out.println("WAITING FOR SCOPE COMPLETION: " + completeflag);
        Workflow.await(
            () ->
                "CANCEL".equalsIgnoreCase(cancelChildflag)
                    || workflowstep.getResult().isCompleted());
        if ("CANCEL".equalsIgnoreCase(cancelChildflag)) {
          System.out.println("SCOPE CANCEL CALLED");
          scope.cancel();
          System.out.println("SCOPE CANCEL DONE");
        }
        workflowstep.getResult().get();
        System.out.println(" END OF PARENT WORKFLOW ");
      } catch (Exception e) {
        System.out.println(" START - EXCEPTION HANDLING INSIDE PARENT WORKFLOW");
        e.printStackTrace();
        System.out.println(" END - EXCEPTION HANDLING INSIDE PARENT WORKFLOW");
        throw e;
      }
    }

    @Override
    public void singal(String cancelString) {
      System.out.println(" START- Inside signal method: " + cancelString);
      this.cancelChildflag = cancelString;
      System.out.println("END - Inside signal method: CANCEL CALLED");
    }
  }

  // encapsulates one step of parent workflow where child workflow is run in
  // async manner. returns a cancellationscope to execute and returns the result
  static class ChildWorkFlowScope {
    private ChildWorkflowIntfc childworkflowStub =
        Workflow.newChildWorkflowStub(
            ChildWorkflowIntfc.class,
            ChildWorkflowOptions.newBuilder()
                .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build())
                .setCancellationType(ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED)
                .build());

    private Promise result = null;

    CancellationScope getChildWorkflowCancelScope() {
      return Workflow.newCancellationScope(
          () -> {
            result = Async.function(childworkflowStub::ChildGreeting, 1);
          });
    }

    public Promise getResult() {
      return result;
    }
  }

  @WorkflowInterface
  public interface ChildWorkflowIntfc {
    @WorkflowMethod
    String ChildGreeting(int i);
  }

  public static class ChildWorkflowImpl implements ChildWorkflowIntfc {
    private final GreetingActivity activity1 =
        Workflow.newActivityStub(
            GreetingActivity.class,
            ActivityOptions.newBuilder()
                .setTaskQueue(qName)
                .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build())
                .setStartToCloseTimeout(Duration.ofSeconds(30))
                .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED)
                .build());

    @Override
    public String ChildGreeting(int i) {
      String resString = "NOT DONE";
      try {
        System.out.println(" START Child workflow value I " + i);
        final List<Promise<String>> result = new ArrayList<>();
        activity1.ActivityGreeting(1);
        //        activity2.ActivityGreeting(2);
        System.out.println(" END Child workflow value I " + i);
      } catch (Exception e) {
        System.out.println(" START - INSiDE CHILD WORKFLOW EXCEPTION BLOCK" + i);
        e.printStackTrace();
        System.out.println(" END -INSiDE CHILD WORKFLOW EXCEPTION BLOCK" + i);
      }
      return resString;
    }
  }

  @ActivityInterface
  public interface GreetingActivity {
    @ActivityMethod
    String ActivityGreeting(int i);
  }

  // greeting activity runs for 7.5 secs and heartbeats.
  // returns a simple string as greeting
  public static class GreetingActivityImpl implements GreetingActivity {
    @Override
    public String ActivityGreeting(int i) {
      System.out.println("STARTING ACTIVITY...." + i);
      ActivityExecutionContext context = Activity.getExecutionContext();
      for (int k = 0; k < 15; k++) {
        sleep();
        System.out.println(String.format(" ACTIVITY IN PROGRESS : %s, I: %s", k, i));
        try {
          System.out.println(" HEART BEATING : " + k);
          context.heartbeat(1);
        } catch (ActivityCompletionException e) {
          System.out.println(" INSIDE ACTIVITY CLEANUP K: " + k);
          //          throw e;
          return "DUMMY";
        }
      }
      System.out.println("END ACTIVITY I:" + i);
      return "GREET " + i;
    }

    private void sleep() {
      try {
        Thread.sleep(500);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}

3 Likes

Hi Maxim,
Thanks a lot for the reply. This worked and helped us. But we have a few more issues. it would be great if you can help us out(sorry about the long description! We suspect there might be a bug and your help would be great, hope i don’t drag this further and waste your time :slight_smile: ).
Let me explain our requirements so that if there are standard patterns to achieve it and we are not using it, then you can give the right pointers.
Our requirement:
the workflows are user triggered. They are long running. But we require the users ability to pause and re-start them at their convenience. So we made the workflow idempotent.The user can cancel the workflows and then restart will use the same input.
Our Implementation:
To achieve this, the workflow needs to persist the workflow’s input in the DB(so that re-run can take that input from the DB). We run that part as the first activity in the workflow. this activity should ALWAYS run even if the user triggers a cancel immediately after starting the workflow. This is why we cannot use the cancel() API because we want to run one activity for sure even if the workflow is cancelled.
Next, the steps of parent workflow(each one is child workflows or activity wrapped in a cancellationscope) are run in a sequence. if the parent workflow receives a cancel, it will cancel the current child activity/workflow that is currently running and return without running the subsequent steps. But the first activity of persisting input data must always complete.
So in the code that you sent, i added an “AlwaysWorkActivity” which is an activity that should always complete. it doesn’t heart beat and is not wrapped in a cancellation scope. so once the parentworkflow starts i assume it will always complete.
The next step in the parent workflow is a cancellation scope that wraps a child workflow.
The rest of the code remains the same.
If i run this code, it doesn’t complete and exit.
from the thread dumps i can see that the parent workflow thread is waiting at line.141 where we are trying to get the result of the child workflow.
After debugging, i believe this might be the reason:
The cancellationscope that triggers the child workflow is run, but the child workflow is not yet scheduled for execution in the temporal before cancel() is called. So the get() call expecting the result waits there indefinitely. But the child workflow is not scheduled at all since cancellationscope is already cancelled. i can see from temporal UI that there is no childworkflow is scheduled. So the get() call is waiting indefinitely.
If i introduce a Workflow.sleep()(uncomment line.126) between scope.run() and scope.cancel() and give some time for the childworkflow to get scheduled as part of scope.run() then the behavior is fine where the child workflow gets scheduled and cancelled properly.
Even though the cancellation works fine, the control doesnt come to exception handling block of the child workflow. i expected a cancelledFailure exception to be thrown here.
If this assumption is correct, then is this a bug?
Thanks again for reading till here :slight_smile:

package io.temporal.samples.hello;

import io.temporal.activity.Activity;
import io.temporal.activity.ActivityCancellationType;
import io.temporal.activity.ActivityExecutionContext;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import io.temporal.activity.ActivityOptions;
import io.temporal.client.ActivityCompletionException;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.common.RetryOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.workflow.Async;
import io.temporal.workflow.CancellationScope;
import io.temporal.workflow.ChildWorkflowCancellationType;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Promise;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class MaximCodeModified {

  private static final String workflowID = "WorkflowID";
  private static final String qName = "Q";

  public static void main(String[] args) {
    WorkflowServiceStubs workflowstubs = WorkflowServiceStubs.newInstance();
    WorkflowClient wfClient = WorkflowClient.newInstance(workflowstubs);
    WorkerFactory workerfactory = WorkerFactory.newInstance(wfClient);

    Worker worker = workerfactory.newWorker(qName);
    worker.registerActivitiesImplementations(new GreetingActivityImpl());
    worker.registerActivitiesImplementations(new AlwaysWorkActivityImpl());
    worker.registerWorkflowImplementationTypes(ParentWorkflowImpl.class);
    worker.registerWorkflowImplementationTypes(ChildWorkflowImpl.class);
    workerfactory.start();

    // call the Parent workflow
    ParentWorkflowIntf parentStub =
        wfClient.newWorkflowStub(
            ParentWorkflowIntf.class,
            WorkflowOptions.newBuilder()
                .setWorkflowId(workflowID)
                .setTaskQueue(qName)
                .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build())
                .build());

    // start the parent workflow
    WorkflowClient.execute(parentStub::ParentGreeting);

    // wait for 5secs before cancelling
    try {
      Thread.sleep(5000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("WAIT DONE BEFORE CANCEL");
    // signal the parent workflow to stop execution and return.
    // the parent workflow will cancel the childworkflow that is currently executing
    // inside a cancellationscope and return
    parentStub.singal("CANCEL");
    System.out.println("CANCEL DONE FROM PARENT WORKFLOW");

    // wait for 30 secs for the cancellation to complete
    try {
      Thread.sleep(30000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("START WORKER SHUTDOWN");
    workerfactory.shutdown();
    workerfactory.awaitTermination(1, TimeUnit.DAYS);
    System.out.println("WORKER SHUTDOWN DONE");
  }

  @WorkflowInterface
  public interface ParentWorkflowIntf {
    @WorkflowMethod
    void ParentGreeting();

    @SignalMethod
    void singal(String cancelString);
  }

  public static class ParentWorkflowImpl implements ParentWorkflowIntf {

    private String cancelChildflag = " ";
    private String completeflag = null;

    @Override
    public void ParentGreeting() {
      try {

        // run always work activity
        final AlwaysWorkActivity alwaysWorkActivity =
            Workflow.newActivityStub(
                AlwaysWorkActivity.class,
                ActivityOptions.newBuilder()
                    .setTaskQueue(qName)
                    .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build())
                    .setStartToCloseTimeout(Duration.ofSeconds(30))
                    .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED)
                    .build());
        System.out.println("START ALWAYS ACTIVITY");
        alwaysWorkActivity.alwaysActivityGreeting(10);
        System.out.println("END ALWAYS ACTIVITY");
        // end always work activity

        // Run the child Workflow using Async from cancellation scope
        ChildWorkFlowScope workflowstep = new ChildWorkFlowScope();
        CancellationScope scope = workflowstep.getChildWorkflowCancelScope();
        scope.run();
        // End of the scope run

        // this sleep might ensure the scope gets triggered and the async child worflow
        // triggered as part of scope gets scheduled.
        //Workflow.sleep(1000);

        // wait till parent workflow gets a signal to cancel execution.
        // once parent workflow gets a cancel signal, cancel the CancellationScope and
        // return
        System.out.println("WAITING FOR SCOPE COMPLETION: " + completeflag);
        Workflow.await(
            () ->
                "CANCEL".equalsIgnoreCase(cancelChildflag)
                    || workflowstep.getResult().isCompleted());
        if ("CANCEL".equalsIgnoreCase(cancelChildflag)) {
          System.out.println("SCOPE CANCEL CALLED");
          scope.cancel();
          System.out.println("SCOPE CANCEL DONE");
        }
        workflowstep.getResult().get();
        System.out.println(" END OF PARENT WORKFLOW ");
      } catch (Exception e) {
        System.out.println(" START - EXCEPTION HANDLING INSIDE PARENT WORKFLOW");
        e.printStackTrace();
        System.out.println(" END - EXCEPTION HANDLING INSIDE PARENT WORKFLOW");
        throw e;
      }
    }

    @Override
    public void singal(String cancelString) {
      System.out.println(" START- Inside signal method: " + cancelString);
      this.cancelChildflag = cancelString;
      System.out.println("END - Inside signal method: CANCEL CALLED");
    }
  }

  // encapsulates one step of parent workflow where child workflow is run in
  // async manner. returns a cancellationscope to execute and returns the result
  static class ChildWorkFlowScope {
    private ChildWorkflowIntfc childworkflowStub =
        Workflow.newChildWorkflowStub(
            ChildWorkflowIntfc.class,
            ChildWorkflowOptions.newBuilder()
                .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build())
                .setCancellationType(ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED)
                .build());

    private Promise result = null;

    CancellationScope getChildWorkflowCancelScope() {
      return Workflow.newCancellationScope(
          () -> {
            result = Async.function(childworkflowStub::ChildGreeting, 1);
          });
    }

    public Promise getResult() {
      return result;
    }
  }

  @WorkflowInterface
  public interface ChildWorkflowIntfc {
    @WorkflowMethod
    String ChildGreeting(int i);
  }

  public static class ChildWorkflowImpl implements ChildWorkflowIntfc {
    private final GreetingActivity activity1 =
        Workflow.newActivityStub(
            GreetingActivity.class,
            ActivityOptions.newBuilder()
                .setTaskQueue(qName)
                .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build())
                .setStartToCloseTimeout(Duration.ofSeconds(30))
                .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED)
                .build());

    @Override
    public String ChildGreeting(int i) {
      String resString = "NOT DONE";
      try {
        System.out.println(" START Child workflow value I " + i);
        final List<Promise<String>> result = new ArrayList<>();
        activity1.ActivityGreeting(1);
        //        activity2.ActivityGreeting(2);
        System.out.println(" END Child workflow value I " + i);
      } catch (Exception e) {
        System.out.println(" START - INSiDE CHILD WORKFLOW EXCEPTION BLOCK" + i);
        e.printStackTrace();
        System.out.println(" END -INSiDE CHILD WORKFLOW EXCEPTION BLOCK" + i);
      }
      return resString;
    }
  }

  @ActivityInterface
  public interface GreetingActivity {
    @ActivityMethod
    String ActivityGreeting(int i);
  }

  // greeting activity runs for 7.5 secs and heartbeats.
  // returns a simple string as greeting
  public static class GreetingActivityImpl implements GreetingActivity {
    @Override
    public String ActivityGreeting(int i) {
      System.out.println("STARTING ACTIVITY...." + i);
      ActivityExecutionContext context = Activity.getExecutionContext();
      for (int k = 0; k < 15; k++) {
        sleep();
        System.out.println(String.format(" ACTIVITY IN PROGRESS : %s, I: %s", k, i));
        try {
          System.out.println(" HEART BEATING : " + k);
          context.heartbeat(1);
        } catch (ActivityCompletionException e) {
          System.out.println(" INSIDE ACTIVITY CLEANUP K: " + k);
          //          throw e;
          return "DUMMY";
        }
      }
      System.out.println("END ACTIVITY I:" + i);
      return "GREET " + i;
    }

    private void sleep() {
      try {
        Thread.sleep(500);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

  @ActivityInterface
  public interface AlwaysWorkActivity {
    @ActivityMethod
    void alwaysActivityGreeting(int i);
  }

  public static class AlwaysWorkActivityImpl implements TNPSParentChildCancel.AlwaysWorkActivity {
    @Override
    public void alwaysActivityGreeting(int i) {
      System.out.println("START: Always work ACTIVITY");
      for (int k = 0; k < 10; k++) {
        try {
          Thread.sleep(1000);
          System.out.println("ALWAYS WORK ACTIVITY :" + k);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
      System.out.println("END: Always work ACTIVITY");
    }
  }
}

In your cancellation scope you can wait for the child workflow to start, for example (I also added exceptionally clause there to show you that you can catch the child workflow cancellation exception there as well):

CancellationScope getChildWorkflowCancelScope() {
      return Workflow.newCancellationScope(
          () -> {
            result =
                Async.function(childworkflowStub::ChildGreeting, 1)
                    .exceptionally(
                        e -> {
                          // exception here is going to be ChildWorkflowFailure
                          // with its cause being CanceledFailure as per your cancellation request     
                        });
            Promise<WorkflowExecution> childExecution =
                Workflow.getWorkflowExecution(childworkflowStub);
            // This call to .get will wait for child workflow to start execution
            childExecution.get();
          });
    }

If I read the issue correctly, this should help.

2 Likes

Hi Tihomir,
That worked perfectly for our case. Thanks a lot for your help!

After debugging, i believe this might be the reason:
The cancellationscope that triggers the child workflow is run, but the child workflow is not yet scheduled for execution in the temporal before cancel() is called. So the get() call expecting the result waits there indefinitely. But the child workflow is not scheduled at all since cancellationscope is already cancelled. i can see from temporal UI that there is no childworkflow is scheduled. So the get() call is waiting indefinitely.

Thank you for your report and debugging. It indeed looks like a bug.

2 Likes