Complete activity with cancellation error

Hi. Whenever we try to complete an activity with a cancellation error we run into Potential deadlock detected: workflow goroutine \"7\" didn't yield for over a second . We have so far worked around it by adding a check that if temporal.IsCanceledError(err) then we don’t send the error in the activity completion request, but trying to understand what we might be doing wrong…

A bit of background:

We have a workflow receiving signals from a parent workflow in another namespace. Whenever a signal is received, our workflow will start a child workflow which in turn will run a number of activities in parallel, and at the end, returns the results to our workflow which will send back a complete activity request to the parent.

Another requirement we have is for a special cancel signal. When that is received, we are supposed to cancel any running activities, and then start a compensation workflow to do cleanups for us…

Workflow code snippets:

s := workflow.NewSelector(ctx)
childCtx, childCancel := workflow.WithCancel(ctx)

s.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
    var stageChangeRequest models.StateChangeSignal
    c.Receive(ctx, &stageChangeRequest)
    // Cancellation stage
    if stageChangeRequest.Stage == "CANCELATION" { 
        childCancel()
        workflow.Sleep(ctx, 2 * HeartBeatTimeout; // Side question: How to wait for child workflow cancellation cleanly? Currently we do heartbeats from all activities and here are hoping by then all activities have seen the cancelled context...
        workflow.Go(ctx, func(ctx workflow.Context) {
            // Run compensation task
            // ...
            var temporalErr error
            if err != nil {
                temporalErr = temporal.NewApplicationErrorWithCause(err.Error(), string(stageRequest.Stage), err, migration)
            }
            temporalClient.CompleteActivity(context.Background(), stageRequest.TaskToken, result, temporalErr)
        })
    } else {
        workflow.Go(childCtx, func(childCtx workflow.Context) {
            // Run regular child workflow until completion
            // ...
            var temporalErr error
            if err != nil { // If we change this to "err != nil && !temporal.IsCanceledError(err)" then things go fine
               temporalErr = temporal.NewApplicationErrorWithCause(err.Error(), string(stageRequest.Stage), err, migration)
            }
            temporalClient.CompleteActivity(context.Background(), stageRequest.TaskToken, result, temporalErr)
        })
    }
})
for {
    s.Select(ctx)
    if completed {
        return "", nil
    }
}

At each stage, the child workflow will fire up a number of activities like this:

...
childCtx, cancelHandler := workflow.WithCancel(ctx)
selector := workflow.NewSelector(ctx)
var activityErr error
for _, s := range activities {
	f := future, settable := workflow.NewFuture(childCtx)
	workflow.Go(childCtx, func(ctx workflow.Context) {
		err := // Run activity
		settable.Set(nil, err)
	})	
	selector.AddFuture(f, func(f workflow.Future) {
		err := f.Get(ctx, nil)
		if err != nil {
			// cancel all pending activities
			cancelHandler()
			activityErr = err
		}
	})
}

for i := 0; i < len(activities); i++ {
	selector.Select(ctx)
}
return activityErr

Why can’t we call CompleteActivity with a cancellation error in the above workflow? Besides that, any suggestions to improve the flow? Any way to make sure last child workflows have finished before we try to compensate without sleeping?

The most obvious problem with your workflow is that it uses WorkflowClient directly. As outlined in the documentation:

  • Workflow code can not affect changes in external systems directly.

To cancel an activity you have to cancel the workflow.Context passed to the ExecuteActivity method. Do not use TemporalClient for that. It should be used only in an asynchronous activity implementation and not ever by the workflow code.

Make sure that each activity that is expected to be cancelled is heartbeating and that its heartbeat timeout is set to some reasonable value. Currently, cancellation is not supported for the activities that don’t heartbeat.

See the cancellation sample.

Thanks for the quick response! The heartbeating and activity cancellation seems to work already (we basically heartbeat on a timer and make sure to check the context and pass it along inside the activity).
For acknowledging completion, what you are saying is that it will make sense to have the childworkflow wait for all parallel activities to finish (e.g. either finish successfully or by cancellation/error), then start another activity that will just call CompleteActivity on the client?

I don’t understand why you are trying to call CompleteActivity at all.

Are you implementing activities asynchronously by returning ErrResultPending? This is the only valid usage for CompleteActivity.

We are not, but there is an outside workflow invoking our workflow inside an activity, and passing us a token so that we acknowledge when we are done with each signal

I see. I would recommend using signals for the workflow to workflow communication. Or in the majority of situations just invoking another workflow as a child workflow and waiting on its result.

Unfortunately I’m not sure if I have much control over the external workflow. Instead of calling CompleteActivity from inside my workflow (thanks so much for pointing out that it is breaking the workflow contract, I should had thought about it before :man_facepalming: ), I will try to have the workflow trigger another activity which will just make the CompleteActivity call

Thank you so much for the help