We have a long-running activity that submits ML workload to a compute cluster and then keeps polling for the workload completion. It also emits hearbeats to temporal.
When workflow gets manually cancelled in temporal UI we want to react to it and cancel the workload in the cluster.
We followed the polling example and implemented the workload cancellation on ctx.Done, when there is a ctx.Err(). Is that the right way to implement it. Here is how the code looks like. Note that this needs to be idempotent as well.
ExecuteTask
// prepare payload to be submitted to compute cluster
var task Task
// check for previous heartbeats, if we had submitted this payload before to compute cluster
if activity.HasHeartbeatDetails ...
....
// else submit a new payload
else
....
// enter watch sleep loop
for {
// get task status from compute cluster
// send heartbeat
// react to status from compute cluster
select {
case <- ctx.Done():
if ctx.Err() != nil {
**// cancel task in compute cluster if its still running**
}
case <- time.After(a.WatchBoltTaskPollInterval):
}
}
Yes, we are breaking out of that, it’s just that pseudocode I posted does not have it.
The real one looks like
case <- ctx.Done():
if ctx.Err() != nil {
**// cancel task in compute cluster if its still running**
}
return nil, ctx.Err()
However we are observing something that we are not able to simulate in tests.
Let me explain.
In the first if block where we are getting heartbeat information from temporal to decide if we should submit a new task. We are seeing unmarshalling failures when we are using the same activity code to run a sequential workflow (submit compute task 1 → submit compute task 2).
We can use a stateful store to keep that state (task id from a submission) instead of the heartbeat. But wanted to understand the root cause of this issue. We are not able to replicate this issue in integration or unit tests, though.
In a real scenario, these workloads in the compute cluster run for days (this is when we are seeing the issue)
Please let us know if there is anything we should do differently.
I wouldn’t rely on heartbeat to decide if you want to submit a new task. Heartbeat is not guaranteed to be recorded as it is throttled for performance.
I recommend breaking your code into two separate activities. The first submits the job to the cluster, the second polls for the results.
But then we have this requirement that while polling for status, if it fails, we want to retry it n times. Due to this requirement, we changed the implementation to a single idempotent activity, relying on the heartbeat information to do some state management. Let us know if you would still recommend going with two activities. And if so, how would you recommend implementing retries?
But I understand your point about not storing info in heartbeats. We are now changing the implementation to store state in a stateful store instead. We will query the stateful store on activity failures to rebuild the state.