In a fork/join, if one activity errors -- how to cancel the rest?

Greetings. I’m trying to figure something out. Imagine I launch 100 parallel activities in a workflow and use Promise.allOf(…) to join. Each activity could be long running (seconds/minutes). If any activity fails, the allOf(…) will fire the exception. But, what is the state of the other activities? I assume they are still running. What if I want to stop them? One solution might be to cancel the workflow, and therefore terminate the activities. That would force me to wrap the fork/join/cancel logic into a child-workflow. Is there any other way to cancel the activities without cancelling the workflow? Thanks!

2 Likes

I’ll tag maxim in here. If you configure your activity to use a heartbeat and propagate that to your activity code in the appropriate places, that will enable you to cancel it from the workflow scope. Note this means its up to you to add heartbeats to your activity code and handle cleanup.

Give me a few minutes. I decided to write a sample to demonstrate this :slight_smile:

It looks like I will not be able to complete the sample today. But here is the code. It works, but due to some problem with exception mapping in the SDK it emits an unexpected exception trace to the standard error. I’m looking into fixing the SDK issue.

public class HelloCancellationScope {

  static final String TASK_QUEUE = "HelloCancellationScope";

  @WorkflowInterface
  public interface GreetingWorkflow {
    @WorkflowMethod
    String getGreeting(String name);
  }

  @ActivityInterface
  public interface GreetingActivities {
    String composeGreeting(String greeting, String name);
  }

  public static class GreetingWorkflowImpl implements GreetingWorkflow {

    private static String[] greetings =
        new String[] {"Hello", "Bye", "Hola", "Привет", "Oi", "Hallo"};

    private final GreetingActivities activities =
        Workflow.newActivityStub(
            GreetingActivities.class,
            ActivityOptions.newBuilder()
                .setScheduleToCloseTimeout(Duration.ofSeconds(100))
                .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED)
                .build());

    @Override
    public String getGreeting(String name) {
      Promise<String>[] results = new Promise[greetings.length];
      CancellationScope scope =
          Workflow.newCancellationScope(
              () -> {
                for (int i = 0; i < greetings.length; i++) {
                  results[i] = Async.function(activities::composeGreeting, greetings[i], name);
                }
              });
      // As code inside the scope is non blocking the run doesn't block.
      scope.run();
      // Wait for one of the activities to complete.
      Promise.anyOf(results).get();
      // Cancel all other activities
      scope.cancel();
      // Get the result from one of the Promises.
      String result = null;
      for (int i = 0; i < greetings.length; i++) {
        if (results[i].isCompleted()) {
          result = results[i].get();
          break;
        }
      }
      // Wait for all activities to complete.
      Promise.allOf(results).get();
      return result;
    }
  }

  static class GreetingActivitiesImpl implements GreetingActivities {

    @Override
    public String composeGreeting(String greeting, String name) {
      ActivityExecutionContext context = Activity.getExecutionContext();
      Random random = new Random();
      int count = random.nextInt(30);
      System.out.println("Activity for " + greeting + " going to take " + count + " seconds");
      for (int i = 0; i < count; i++) {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          // Empty
        }
        try {
          context.heartbeat(i);
        } catch (ActivityCancelledException e) {
          System.out.println("Activity for " + greeting + " was cancelled");
          throw e;
        }
      }
      System.out.println("Activity for " + greeting + " completed");
      return greeting + " " + name + "!";
    }
  }

  public static void main(String[] args) {
    // gRPC stubs wrapper that talks to the local docker instance of temporal service.
    WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
    // client that can be used to start and signal workflows
    WorkflowClient client = WorkflowClient.newInstance(service);

    // worker factory that can be used to create workers for specific task queues
    WorkerFactory factory = WorkerFactory.newInstance(client);
    // Worker that listens on a task queue and hosts both workflow and activity implementations.
    Worker worker = factory.newWorker(TASK_QUEUE);
    // Workflows are stateful. So you need a type to create instances.
    worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class);
    // Activities are stateless and thread safe. So a shared instance is used.
    worker.registerActivitiesImplementations(new GreetingActivitiesImpl());
    // Start listening to the workflow and activity task queues.
    factory.start();

    // Start a workflow execution. Usually this is done from another program.\n'
    // Uses task queue from the GreetingWorkflow @WorkflowMethod annotation.
    GreetingWorkflow workflow =
        client.newWorkflowStub(
            GreetingWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build());
    // Execute a workflow waiting for it to complete.
    String greeting = workflow.getGreeting("World");
    System.out.println(greeting);
    System.exit(0);
  }
}

1 Like

Thank you, this is an awesome example! The CancellationScope totally makes sense. I’ve been experimenting this afternoon and it behaves well.

Sean

1 Like