I tried these options, both of them failed with an exception:
io.temporal.client.ActivityNotExistsException
Here are my impl, with SDK version 1.5.0:
Basically, I try to setup a timer task for activity heartbeat.
public abstract class ActivityHeartbeat {
Timer heartbeatTimer = new Timer("HeartbeatTimer");
/**
* Keep heartbeat and do custom cleanup if workflow cancellation is found
*
* @param interval
* Heartbeat interval, in seconds
*/
public void heartbeatWithActivityContext(ActivityExecutionContext context, int interval) {
TimerTask task = new TimerTask() {
public void run() {
try {
System.out.println("Activity heart beats");
context.heartbeat(this.scheduledExecutionTime());
} catch(Exception e) {
cancel();
System.out.println("Exception received, " + e.getClass().getName());
throw e;
}
}
};
heartbeatTimer.scheduleAtFixedRate(task, interval * 1000, interval * 1000);
}
public void heartbeatWithCompletionClient(ActivityCompletionClient completionClient, byte[] token, int interval) {
TimerTask task = new TimerTask() {
public void run() {
try {
System.out.println("Activity heart beats");
completionClient.heartbeat(token, this.scheduledExecutionTime());
} catch(Exception e) {
cancel();
System.out.println("Exception received, " + e.getClass().getName());
throw e;
}
}
};
heartbeatTimer.scheduleAtFixedRate(task, interval * 1000, interval * 1000);
}
}
My experiment with temporal sample HelloAcitity:
static class GreetingActivitiesImpl extends ActivityHeartbeat implements GreetingActivities {
@Override
public String composeGreeting(String greeting, String name) {
System.out.println("Activity is triggered");
heartbeatWithActivityContext(Activity.getExecutionContext(), 1);
return longTimeActivity(greeting, name);
}
private String longTimeActivity(String greeting, String name) {
System.out.println("Activity begins");
try{
Thread.sleep(10000);
} catch (InterruptedException e) {
return "";
}
System.out.println("Activity Completes");
return greeting + " " + name + "!";
}
}
My experiment with temporal sample HelloAsyncActivityCompletion:
static class GreetingActivitiesImpl extends ActivityHeartbeat implements GreetingActivities {
private final ActivityCompletionClient completionClient;
GreetingActivitiesImpl(ActivityCompletionClient completionClient) {
this.completionClient = completionClient;
}
@Override
public String composeGreeting(String greeting, String name) {
// Get the activity execution context
ActivityExecutionContext context = Activity.getExecutionContext();
// Set a correlation token that can be used to complete the activity asynchronously
byte[] taskToken = context.getTaskToken();
ForkJoinPool.commonPool().execute(() -> composeGreetingAsync(taskToken, greeting, name));
context.doNotCompleteOnReturn();
heartbeatWithCompletionClient(completionClient, taskToken, 1);
return "ignored";
}
private void composeGreetingAsync(byte[] taskToken, String greeting, String name) {
System.out.println("Activity begins");
String result = greeting + " " + name + "!";
try {
Thread.sleep(10000);
} catch (Exception e) {
return;
}
System.out.println("Activity completes");
// Complete our workflow activity using ActivityCompletionClient
completionClient.complete(taskToken, result);
}
}