Custom cleanup on activity cancelation

Is there any way we can execute some custom cleanup when an activity is canceled?
My use case is :
A jdbc query is called in an activity. The query should be canceled for releasing database resource when the workflow and activity is canceled.
In my implementation, the query is stored as a member object of the activity. What I need is a proper hook where I can do my custom cleanup when the workflow and activity is canceled.
Any suggestions? @maxim

In the sample code(HelloCancelationScope), I see we can use heartbeat and exception capture to detect cancelation, but that means heartbeat would be spread all over business process. What is more, there may exist external calls like jdbc query, which last long time but no activity heartbeat can be inserted.
From my point of view, heartbeat and life cycle of activities should both be managed by temporal. Only some hooks as event handlers should be exposed to SDK user.

there may exist external calls like jdbc query, which last long time but no activity heartbeat can be inserted

Activity code is your code so you could use java concurrency to deal with this situation.
You could also create a separate component that does the heartbeat via Activity.getExecutionContext() with a Time for example.

Only some hooks as event handlers should be exposed to SDK user.

Feel free to share your usability improvements, we are always looking for user feedback for all SDKs.

Hi tihomir,
I had a try but got an error:
“java.lang.IllegalStateException: ActivityExecutionContext can be used only inside of activity implementation methods and in the same thread that invoked an activity
Does this exception hint means we cannot use concurrency for activity heartbeat?

Could you share your impl and the SDK version you are using please?

@whitecrow

you would have to get the ActivityExecutionContext in the main activity thread and pass it to the heartbeating one. Another possible option to use async activity completion to heartbeat, see sample here (use completionClient.heartbeat(token, details)).

I tried these options, both of them failed with an exception:
io.temporal.client.ActivityNotExistsException
Here are my impl, with SDK version 1.5.0:
Basically, I try to setup a timer task for activity heartbeat.

public abstract class ActivityHeartbeat {
    Timer heartbeatTimer = new Timer("HeartbeatTimer");

    /**
     * Keep heartbeat and do custom cleanup if workflow cancellation is found
     *
     * @param interval
     *   Heartbeat interval, in seconds
     */
    public void heartbeatWithActivityContext(ActivityExecutionContext context, int interval) {
        TimerTask task = new TimerTask() {
            public void run() {
                try {
                    System.out.println("Activity heart beats");
                    context.heartbeat(this.scheduledExecutionTime());
                } catch(Exception e) {
                    cancel();
                    System.out.println("Exception received, " + e.getClass().getName());
                    throw e;
                }
            }
        };
        heartbeatTimer.scheduleAtFixedRate(task, interval * 1000, interval * 1000);
    }

    public void heartbeatWithCompletionClient(ActivityCompletionClient completionClient, byte[] token, int interval) {
        TimerTask task = new TimerTask() {
            public void run() {
                try {
                    System.out.println("Activity heart beats");
                    completionClient.heartbeat(token, this.scheduledExecutionTime());
                } catch(Exception e) {
                    cancel();
                    System.out.println("Exception received, " + e.getClass().getName());
                    throw e;
                }
            }
        };
        heartbeatTimer.scheduleAtFixedRate(task, interval * 1000, interval * 1000);
    }
}

My experiment with temporal sample HelloAcitity:

  static class GreetingActivitiesImpl extends ActivityHeartbeat implements GreetingActivities {
    @Override
    public String composeGreeting(String greeting, String name) {
      System.out.println("Activity is triggered");
      heartbeatWithActivityContext(Activity.getExecutionContext(), 1);
      return longTimeActivity(greeting, name);
    }

    private String longTimeActivity(String greeting, String name) {
      System.out.println("Activity begins");
      try{
        Thread.sleep(10000);
      } catch (InterruptedException e) {
        return "";
      }
      System.out.println("Activity Completes");
      return greeting + " " + name + "!";
    }
  }

My experiment with temporal sample HelloAsyncActivityCompletion:

static class GreetingActivitiesImpl extends ActivityHeartbeat implements GreetingActivities {

    private final ActivityCompletionClient completionClient;

    GreetingActivitiesImpl(ActivityCompletionClient completionClient) {
      this.completionClient = completionClient;
    }

    @Override
    public String composeGreeting(String greeting, String name) {

      // Get the activity execution context
      ActivityExecutionContext context = Activity.getExecutionContext();

      // Set a correlation token that can be used to complete the activity asynchronously
      byte[] taskToken = context.getTaskToken();

      ForkJoinPool.commonPool().execute(() -> composeGreetingAsync(taskToken, greeting, name));
      context.doNotCompleteOnReturn();

      heartbeatWithCompletionClient(completionClient, taskToken, 1);

      return "ignored";
    }


    private void composeGreetingAsync(byte[] taskToken, String greeting, String name) {
      System.out.println("Activity begins");
      String result = greeting + " " + name + "!";
      try {
        Thread.sleep(10000);
      } catch (Exception e) {
        return;
      }
      System.out.println("Activity completes");
      // Complete our workflow activity using ActivityCompletionClient
      completionClient.complete(taskToken, result);
    }
  }

Tried your solution that uses Timer and it seems to work ok, just make sure to catch ActivityNotExistsException that will be received on the heartbeat after your activity completes, for example:

TimerTask task =
        new TimerTask() {
          public void run() {
            try {
              System.out.println("Activity heart beats");
              context.heartbeat(this.scheduledExecutionTime());
            } catch (ActivityNotExistsException e) {
            // ...
            // cancel the timer task
            this.cancel();
      }
   }
};

Do you set heartbeat timeout in activity options in your workflow code? It should be > than the interval you are heartbeating at.

I set the heartbeat timeout to 3s, and the actual heartbeat interval is 1s.
The activity time cost should be 10s. What wired is, my first activity heartbeat immediately got an ActivityNotExistsException.
My trace log looks like:

Activity is triggered
Activity begins
Activity heart beats
Exception received, io.temporal.client.ActivityNotExistsException
Activity Completes

Here are my impl, with SDK version 1.5.0

SDK version 1.5.0 is pretty old, would suggest trying to upgrade to latest if possible. My test was with 1.12.0.

Sure, I will have a try. Thanks a lot~