Right way to cancel long running activity

Hello,

Not sure if this has been discussed before.

We have a long-running activity that submits ML workload to a compute cluster and then keeps polling for the workload completion. It also emits hearbeats to temporal.

When workflow gets manually cancelled in temporal UI we want to react to it and cancel the workload in the cluster.

We followed the polling example and implemented the workload cancellation on ctx.Done, when there is a ctx.Err(). Is that the right way to implement it. Here is how the code looks like. Note that this needs to be idempotent as well.

ExecuteTask
   // prepare payload to be submitted to compute cluster
   var task Task

   // check for previous heartbeats, if we had submitted this payload before to compute cluster
   if activity.HasHeartbeatDetails ...
       ....
   // else submit a new payload
   else
       ....
   
   // enter watch sleep loop
   for {
       // get task status from compute cluster 
       // send heartbeat
       // react to status from compute cluster 
       
       select {
       case <- ctx.Done():
          if ctx.Err() != nil {
              **// cancel task in compute cluster if its still running**
         }
       case <- time.After(a.WatchBoltTaskPollInterval):  
       }    
    }

I think you want to break out of the loop on ctx.Done(). The sample just says “cancel task in compute cluster”.

Note that this needs to be idempotent as well.

This is not possible unless the compute cluster API is idempotent.

Thanks for your reply Maxim,

Yes, we are breaking out of that, it’s just that pseudocode I posted does not have it.

The real one looks like

case <- ctx.Done():
          if ctx.Err() != nil {
              **// cancel task in compute cluster if its still running**
         }
         return nil, ctx.Err()

However we are observing something that we are not able to simulate in tests.

Let me explain.

In the first if block where we are getting heartbeat information from temporal to decide if we should submit a new task. We are seeing unmarshalling failures when we are using the same activity code to run a sequential workflow (submit compute task 1 → submit compute task 2).

We can use a stateful store to keep that state (task id from a submission) instead of the heartbeat. But wanted to understand the root cause of this issue. We are not able to replicate this issue in integration or unit tests, though.

In a real scenario, these workloads in the compute cluster run for days (this is when we are seeing the issue)

Please let us know if there is anything we should do differently.

I wouldn’t rely on heartbeat to decide if you want to submit a new task. Heartbeat is not guaranteed to be recorded as it is throttled for performance.

I recommend breaking your code into two separate activities. The first submits the job to the cluster, the second polls for the results.

So we did that first.

But then we have this requirement that while polling for status, if it fails, we want to retry it n times. Due to this requirement, we changed the implementation to a single idempotent activity, relying on the heartbeat information to do some state management. Let us know if you would still recommend going with two activities. And if so, how would you recommend implementing retries?

But I understand your point about not storing info in heartbeats. We are now changing the implementation to store state in a stateful store instead. We will query the stateful store on activity failures to rebuild the state.

Please let us know what you think.

I don’t see a problem with storing data in the heartbeat if it is OK to return the stale data, as not every heartbeat call is recorded.

I would retry the sequence of activities from the workflow. It can be as simple as a loop. See the fileprocessing sample.