Selector with cancellation

Hello, I am having some issues designing one selector that has one Future and one Receiver from a signal channel.

I want to block until one of the two events occur and then cancel the other pendings.

var status string

s := workflow.NewSelector(ctx)

getStatusCtx, cancelGetStatus := workflow.WithCancel(ctx)

// Get status activity options.
getStatusActivityOptions := workflow.ActivityOptions{
	ScheduleToStartTimeout: time.Minute,
	StartToCloseTimeout:    time.Minute * 15,
	RetryPolicy: &temporal.RetryPolicy{
		InitialInterval:    time.Second * 30,
		BackoffCoefficient: 1.0, // Same interval.
	},
	// WaitForCancellation: true, // I don't understand how this flag works.
}

getStatusCtx = workflow.WithActivityOptions(getStatusCtx, getStatusActivityOptions)

getStatusFuture := workflow.ExecuteActivity(getStatusCtx, a.GetStatus, statusID)

s.AddFuture(getStatusFuture, func(f workflow.Future) {
	localStatus := ""

	if err := f.Get(ctx, &localStatus); err == nil {
		status = localStatus
	}
})

hookChannel := workflow.GetSignalChannel(ctx, HookSignal)

s.AddReceive(hookChannel, func(c workflow.ReceiveChannel, more bool) {
	c.Receive(ctx, &status)
})

// Select blocks until one of the listeners finishes.
s.Select(ctx)
cancelGetStatus()


// I want to have the status here.

If I do it without the cancelGetStatus() it works fine but I am afraid of a race condition if the getStatusFuture ends and tries to update the status when I am reading it.

So i want to cancel all the pendings but I get the following log and error.

Logs:

{"PanicError": "unknown decision DecisionType: Activity, ID: 12, possible causes are nondeterministic workflow definition code or incompatible change in the workflow definition", "PanicStack": "process event for payment-service-worker [panic]:\ngo.temporal.io/temporal/internal.panicIllegalState(...)\n\t/Users/foo/.gvm/pkgsets/go1.14/global/pkg/mod/go.temporal.io/temporal@v0.26.0/internal/internal_decision_state_machine.go:353\ngo.temporal.io/temporal/internal.(*decisionsHelper).getDecision(0xc0002c6680, 0x0, 0xc000044bd8, 0x2, 0xc000000000, 0xc0005baafa)\n\t/Users/foo/.gvm/pkgsets/go1.14/global/pkg/mod/go.temporal.io/temporal@v0.26.0/internal/internal_decision_state_machine.go:749 +0x267\ngo.temporal.io/temporal/internal.(*decisionsHelper).handleActivityTaskClosed(0xc0002c6680, 0xc000044bd8, 0x2, 0x2, 0x0)\n\t/Users/foo/.gvm/pkgsets/go1.14/global/pkg/mod/go.temporal.io/temporal@v0.26.0/internal/internal_decision_state_machine.go:787 +0x47\ngo.temporal.io/temporal/internal.(*workflowExecutionEventHandlerImpl).handleActivityTaskCompleted(0xc000133120, 0xc000531640, 0xc0005baafa, 0x2)\n\t/Users/foo/.gvm/pkgsets/go1.14/global/pkg/mod/go.temporal.io/temporal@v0.26.0/internal/internal_event_handlers.go:971 +0x6b\ngo.temporal.io/temporal/internal.(*workflowExecutionEventHandlerImpl).ProcessEvent(0xc000133120, 0xc000531640, 0x0, 0x0, 0x0)\n\t/Users/foo/.gvm/pkgsets/go1.14/global/pkg/mod/go.temporal.io/temporal@v0.26.0/internal/internal_event_handlers.go:811 +0x5a9\ngo.temporal.io/temporal/internal.(*workflowExecutionContextImpl).ProcessWorkflowTask(0xc000486630, 0xc0005cbd70, 0x29b7f80, 0xc00003d080, 0xc000486630, 0x0)\n\t/Users/foo/.gvm/pkgsets/go1.14/global/pkg/mod/go.temporal.io/temporal@v0.26.0/internal/internal_task_handlers.go:871 +0xd82\ngo.temporal.io/temporal/internal.(*workflowTaskHandlerImpl).ProcessWorkflowTask(0xc0002a7ae0, 0xc0005cbd70, 0xc0004bd740, 0x0, 0x0, 0x0, 0x0)\n\t/Users/foo/.gvm/pkgsets/go1.14/global/pkg/mod/go.temporal.io/temporal@v0.26.0/internal/internal_task_handlers.go:770 +0x4b0\ngo.temporal.io/temporal/internal.(*workflowTaskPoller).processWorkflowTask(0xc0003263c0, 0xc0005cbd70, 0x0, 0x0)\n\t/Users/foo/.gvm/pkgsets/go1.14/global/pkg/mod/go.temporal.io/temporal@v0.26.0/internal/internal_task_pollers.go:284 +0x430\ngo.temporal.io/temporal/internal.(*workflowTaskPoller).ProcessTask(0xc0003263c0, 0x1c72d60, 0xc0005cbd70, 0x20a68a0, 0xc00062ea80)\n\t/Users/foo/.gvm/pkgsets/go1.14/global/pkg/mod/go.temporal.io/temporal@v0.26.0/internal/internal_task_pollers.go:257 +0x7f\ngo.temporal.io/temporal/internal.(*baseWorker).processTask(0xc000614380, 0x1c72960, 0xc0005dc940)\n\t/Users/foo/.gvm/pkgsets/go1.14/global/pkg/mod/go.temporal.io/temporal@v0.26.0/internal/internal_worker_base.go:320 +0xe0\ncreated by go.temporal.io/temporal/internal.(*baseWorker).runTaskDispatcher\n\t/Users/foo/.gvm/pkgsets/go1.14/global/pkg/mod/go.temporal.io/temporal@v0.26.0/internal/internal_worker_base.go:267 +0xff"}

Workflow error: context deadline exceeded.

Is the retry mechanism of the getStatusActivity not stopping on cancellation ?
Is this the correct way of approaching this problem ?

Thanks for the help.

2 Likes

I don’t think there is a possibility for the race condition. Temporal SDK relies on cooperative multithreading. so it is not possible for two goroutines to execute at the same time without one of them being blocked. Also in the case of selector, the callbacks are called only when Selector.Select is called. So there is no way the callback for the activity result would execute after the signal callback executed unless s.Select(ctx) is called again.

I’m concerned about the panic you received. Are you 100% sure that it didn’t happen with a workflow that was started before some code change? Here is an example with similar logic which works fine.

Some nits:

The following block doesn’t need the if condition, as a pointer passed to Get, is not going to be assigned when the Future.Get returns error.

s.AddFuture(getStatusFuture, func(f workflow.Future) {
	localStatus := ""

	if err := f.Get(ctx, &localStatus); err == nil {
		status = localStatus
	}
})

At the same time, it looks like you are ignoring the error returned by the activity, which is not really a good idea.

1 Like

@maxim, thanks for the fast response!

The following block doesn’t need the if condition, as a pointer passed to Get, is not going to be assigned when the Future.Get returns error.

Good to know, but i was worried of the other case in which the Get doesn’t return an error. It’s clear now.

About the omission of the error:
The getStatusActivity is actually using the retry mechanism to poll the status from an external endpoint, in case the status is not available I want the activity to try it again. Thats why i ended up with this setup of waiting for the nil error. Maybe i can have an infinity loop inside the activity and also listening to the ctx.Done() channel but it looked like an anti-pattern to me.

What would be the best approach to a polling activity?

About my logs:
I debugged a bit more. I see that in the case of the signal ending the select, the execution continues but the RequestCancelActivity panics with: unknown decision DecisionType: Activity, ID: 12, possible causes are nondeterministic workflow definition code or incompatible change in the workflow definition multiple times like if the retry of the getStatusActivity had something to do with it.
I will keep testing it to be sure that i don’t have any other bugs but this mainly appears when wrap the context as my example with the WithCancel and i call the cancelGetStatus() after the s.Select(ctx).

Is there a reason why I would need to cancel the pending activities of the selector ? If not then i will not call the cancelGetStatus.

Thanks again for the response and the suggestions.

Hello, I’m also interested in this use case.
What’s the best practice for waiting on either a signal or a polling activity?

Should you have 1 activity that has something like:

ticker := time.NewTicker(30 * time.Second)

for {
  select {
    case <- ticker.C:
       // do some polling
     case <-ctx.Done():
       // context deadline exceeded
  }
}

Or should you have the for loop in the workflow and the activity is just a single polling action?

In the end either a success poll OR a signal should be able to unblock the workflow.Select(ctx). How to achieve this?

Would you ask about polling for status in a separate topic? It is a very FAQ and it deserves a separate answer.

We have known issue with activity cancellation. I’m not sure if it is related. Ideally if you could provide a stand alone reproduction it would really help us to ensure that this is resolved before the V1 release.

Would you ask about this in a separate Topic. This is very important pattern and deserves its own discussion.

Sorry, I missed this message somehow.

I’ve created a new topic: What is the best practice for a polling activity?