Best practices for long-running activities

I have an activity that can take up to 3 hours to complete.

The activity is multi-threaded, so I cannot interleave calls to Activity.getExecutionContext.heartbeat() into the activity logic.

What is the best way to implement the activity heartbeat? Should I spin off the compute logic into a separate thread and use the original activity thread to heartbeat and wait on the compute thread?

E.g.

var task = Thread.run( /* compute method */ );
while (!task.isDone()) {
  Activity.getExecutionContext().heartbeat(null);
  Thread.sleep(1000);
}

What happens when the activity heartbeat fails? (e.g. the temporal server is down, or there is a network error) Does the activity fail? Does it log an error and press on?

What is the relationship between scheduleToStart, startToClose, and the heartbeat internal?
How do I ensure that a task is not served to more than one activity worker at the same time, while also ensuring that a task running on a defunct activity worker is picked up by a new worker in a reasonable timeframe?

Is there anything else I should be aware of in this scenario?

1 Like

What is the best way to implement the activity heartbeat?

If you plan to have many such activities I would recommend creating a separate component to heartbeat through executionContext using a Timer to avoid tying a thread to each heartbeat.

What happens when the activity heartbeat fails?
One of the children of an ActivityCompletionException exception will be thrown from the heartbeat call. It is up to the activity implementation to ignore it or exit. Note that from the service and workflow point of view the activity is going to be considered timed out and probably retried.

What is the relationship between scheduleToStart , startToClose , and the heartbeat internal?
ScheduleToStart is the maximum time an activity can stay in a task queue before being picked up. We recommend not setting this value unless you perform host-specific task routing.

StartToClose is the maximum time a single activity attempt can take. It looks like it should be 3 hours in your case.

Heartbeat timeout is the maximum time between heartbeats. It is expected to be relatively small. Let’s say one minute in your case.

It is not really possible to provide hard guarantee that you don’t end up with two activity attempts running at the same time. You can get pretty close by shutting down an activity on an exception thrown from the heartbeat call and setting an initial retry interval larger than the heartbeat interval.

Is there anything else I should be aware of in this scenario?
Activities can include data into a heartbeat call. When an activity is retried the value of the last heartbeat is accessible through ActivityExecutionContext.getHeartbeatDetails call. It might be useful if an activity can resume execution from some previously recorded point.

1 Like

Are you referring to java.util.Timer or a Temporal class?

Won’t java.util.Timer run the TimerTask on a separate thread, and thus fail in the call to Activity.getExecutionContext()?

Or do you mean to get the execution context from Activity in the main thread, then pass the execution context instance to another component. I.E. getting the execution context must be done from the main thread, but once that is done, any thread can act on it?

I’m referring to java.util.Timer. Yes, make the call to getExecutionContext in the activity thread and pass it down to the heartbeating component. This is one of the main reasons for having a separate context object.

@maxim The Go SDK doesn’t return any error from the heartbeat call. Does the context get cancelled if the heartbeat doesn’t go through so that I can shutdown the current activity before a retry activity starts?

Does the context get cancelled if the heartbeat doesn’t go through so that I can shutdown the current activity before a retry activity starts?

Correct. The activity cxt.Done channel is closed on activity cancellation. Here is code from the cancelactivity sample.

func (a *Activities) ActivityToBeCanceled(ctx context.Context) (string, error) {
	logger := activity.GetLogger(ctx)
	for {
		select {
		case <-time.After(1 * time.Second):
			logger.Info("heartbeating...")
			activity.RecordHeartbeat(ctx, "")
		case <-ctx.Done():
			logger.Info("context is cancelled")
			return "I am canceled by Done", nil
		}
	}
}

Sorry, I should have phrased my question differently. I understand that the context is canceled when the worker and temporal server are able to communicate with each other. Let’s say the worker is not able to communicate the heartbeat to the temporal server, but both worker and server are running fine. In this case, the temporal server will assume that the worker is not running and trigger a retry of the activity (possibly on a different node). But the first attempt of the activity is still running and is not canceled until the connection between the server and worker is restored. Is my understanding correct?

My problem is that I want to cancel the first attempt of the activity if RecordHeartbeat() fails due to connection error because I know that the temporal server will consider the first attempt as a timeout and retry anyway. I don’t want two attempts of the activity to run simultaneously. But, RecordHeartbeat() doesn’t return any errors. It simply logs them.

Looking at the code it appears that context is canceled if the workflow is closed or the service reported that activity canceled or timed out.

But it is not going to cancel context if the service is not reachable. It is going to log an error only. I filed an issue to get this fixed.

Thank you so much @maxim.

For Golang SDK:

Is there an example for sending heartbeat when using a long-running task that is initiated using a exec Command for shell commands?

Thank you