Hi. Whenever we try to complete an activity with a cancellation error we run into Potential deadlock detected: workflow goroutine \"7\" didn't yield for over a second
. We have so far worked around it by adding a check that if temporal.IsCanceledError(err)
then we don’t send the error in the activity completion request, but trying to understand what we might be doing wrong…
A bit of background:
We have a workflow receiving signals from a parent workflow in another namespace. Whenever a signal is received, our workflow will start a child workflow which in turn will run a number of activities in parallel, and at the end, returns the results to our workflow which will send back a complete activity request to the parent.
Another requirement we have is for a special cancel signal. When that is received, we are supposed to cancel any running activities, and then start a compensation workflow to do cleanups for us…
Workflow code snippets:
s := workflow.NewSelector(ctx)
childCtx, childCancel := workflow.WithCancel(ctx)
s.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
var stageChangeRequest models.StateChangeSignal
c.Receive(ctx, &stageChangeRequest)
// Cancellation stage
if stageChangeRequest.Stage == "CANCELATION" {
childCancel()
workflow.Sleep(ctx, 2 * HeartBeatTimeout; // Side question: How to wait for child workflow cancellation cleanly? Currently we do heartbeats from all activities and here are hoping by then all activities have seen the cancelled context...
workflow.Go(ctx, func(ctx workflow.Context) {
// Run compensation task
// ...
var temporalErr error
if err != nil {
temporalErr = temporal.NewApplicationErrorWithCause(err.Error(), string(stageRequest.Stage), err, migration)
}
temporalClient.CompleteActivity(context.Background(), stageRequest.TaskToken, result, temporalErr)
})
} else {
workflow.Go(childCtx, func(childCtx workflow.Context) {
// Run regular child workflow until completion
// ...
var temporalErr error
if err != nil { // If we change this to "err != nil && !temporal.IsCanceledError(err)" then things go fine
temporalErr = temporal.NewApplicationErrorWithCause(err.Error(), string(stageRequest.Stage), err, migration)
}
temporalClient.CompleteActivity(context.Background(), stageRequest.TaskToken, result, temporalErr)
})
}
})
for {
s.Select(ctx)
if completed {
return "", nil
}
}
At each stage, the child workflow will fire up a number of activities like this:
...
childCtx, cancelHandler := workflow.WithCancel(ctx)
selector := workflow.NewSelector(ctx)
var activityErr error
for _, s := range activities {
f := future, settable := workflow.NewFuture(childCtx)
workflow.Go(childCtx, func(ctx workflow.Context) {
err := // Run activity
settable.Set(nil, err)
})
selector.AddFuture(f, func(f workflow.Future) {
err := f.Get(ctx, nil)
if err != nil {
// cancel all pending activities
cancelHandler()
activityErr = err
}
})
}
for i := 0; i < len(activities); i++ {
selector.Select(ctx)
}
return activityErr
Why can’t we call CompleteActivity with a cancellation error in the above workflow? Besides that, any suggestions to improve the flow? Any way to make sure last child workflows have finished before we try to compensate without sleeping?