Hello, I am having some issues designing one selector that has one Future and one Receiver from a signal channel.
I want to block until one of the two events occur and then cancel the other pendings.
var status string
s := workflow.NewSelector(ctx)
getStatusCtx, cancelGetStatus := workflow.WithCancel(ctx)
// Get status activity options.
getStatusActivityOptions := workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
StartToCloseTimeout: time.Minute * 15,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second * 30,
BackoffCoefficient: 1.0, // Same interval.
},
// WaitForCancellation: true, // I don't understand how this flag works.
}
getStatusCtx = workflow.WithActivityOptions(getStatusCtx, getStatusActivityOptions)
getStatusFuture := workflow.ExecuteActivity(getStatusCtx, a.GetStatus, statusID)
s.AddFuture(getStatusFuture, func(f workflow.Future) {
localStatus := ""
if err := f.Get(ctx, &localStatus); err == nil {
status = localStatus
}
})
hookChannel := workflow.GetSignalChannel(ctx, HookSignal)
s.AddReceive(hookChannel, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &status)
})
// Select blocks until one of the listeners finishes.
s.Select(ctx)
cancelGetStatus()
// I want to have the status here.
If I do it without the cancelGetStatus()
it works fine but I am afraid of a race condition if the getStatusFuture ends and tries to update the status when I am reading it.
So i want to cancel all the pendings but I get the following log and error.
Logs:
{"PanicError": "unknown decision DecisionType: Activity, ID: 12, possible causes are nondeterministic workflow definition code or incompatible change in the workflow definition", "PanicStack": "process event for payment-service-worker [panic]:\ngo.temporal.io/temporal/internal.panicIllegalState(...)\n\t/Users/foo/.gvm/pkgsets/go1.14/global/pkg/mod/go.temporal.io/temporal@v0.26.0/internal/internal_decision_state_machine.go:353\ngo.temporal.io/temporal/internal.(*decisionsHelper).getDecision(0xc0002c6680, 0x0, 0xc000044bd8, 0x2, 0xc000000000, 0xc0005baafa)\n\t/Users/foo/.gvm/pkgsets/go1.14/global/pkg/mod/go.temporal.io/temporal@v0.26.0/internal/internal_decision_state_machine.go:749 +0x267\ngo.temporal.io/temporal/internal.(*decisionsHelper).handleActivityTaskClosed(0xc0002c6680, 0xc000044bd8, 0x2, 0x2, 0x0)\n\t/Users/foo/.gvm/pkgsets/go1.14/global/pkg/mod/go.temporal.io/temporal@v0.26.0/internal/internal_decision_state_machine.go:787 +0x47\ngo.temporal.io/temporal/internal.(*workflowExecutionEventHandlerImpl).handleActivityTaskCompleted(0xc000133120, 0xc000531640, 0xc0005baafa, 0x2)\n\t/Users/foo/.gvm/pkgsets/go1.14/global/pkg/mod/go.temporal.io/temporal@v0.26.0/internal/internal_event_handlers.go:971 +0x6b\ngo.temporal.io/temporal/internal.(*workflowExecutionEventHandlerImpl).ProcessEvent(0xc000133120, 0xc000531640, 0x0, 0x0, 0x0)\n\t/Users/foo/.gvm/pkgsets/go1.14/global/pkg/mod/go.temporal.io/temporal@v0.26.0/internal/internal_event_handlers.go:811 +0x5a9\ngo.temporal.io/temporal/internal.(*workflowExecutionContextImpl).ProcessWorkflowTask(0xc000486630, 0xc0005cbd70, 0x29b7f80, 0xc00003d080, 0xc000486630, 0x0)\n\t/Users/foo/.gvm/pkgsets/go1.14/global/pkg/mod/go.temporal.io/temporal@v0.26.0/internal/internal_task_handlers.go:871 +0xd82\ngo.temporal.io/temporal/internal.(*workflowTaskHandlerImpl).ProcessWorkflowTask(0xc0002a7ae0, 0xc0005cbd70, 0xc0004bd740, 0x0, 0x0, 0x0, 0x0)\n\t/Users/foo/.gvm/pkgsets/go1.14/global/pkg/mod/go.temporal.io/temporal@v0.26.0/internal/internal_task_handlers.go:770 +0x4b0\ngo.temporal.io/temporal/internal.(*workflowTaskPoller).processWorkflowTask(0xc0003263c0, 0xc0005cbd70, 0x0, 0x0)\n\t/Users/foo/.gvm/pkgsets/go1.14/global/pkg/mod/go.temporal.io/temporal@v0.26.0/internal/internal_task_pollers.go:284 +0x430\ngo.temporal.io/temporal/internal.(*workflowTaskPoller).ProcessTask(0xc0003263c0, 0x1c72d60, 0xc0005cbd70, 0x20a68a0, 0xc00062ea80)\n\t/Users/foo/.gvm/pkgsets/go1.14/global/pkg/mod/go.temporal.io/temporal@v0.26.0/internal/internal_task_pollers.go:257 +0x7f\ngo.temporal.io/temporal/internal.(*baseWorker).processTask(0xc000614380, 0x1c72960, 0xc0005dc940)\n\t/Users/foo/.gvm/pkgsets/go1.14/global/pkg/mod/go.temporal.io/temporal@v0.26.0/internal/internal_worker_base.go:320 +0xe0\ncreated by go.temporal.io/temporal/internal.(*baseWorker).runTaskDispatcher\n\t/Users/foo/.gvm/pkgsets/go1.14/global/pkg/mod/go.temporal.io/temporal@v0.26.0/internal/internal_worker_base.go:267 +0xff"}
Workflow error: context deadline exceeded
.
Is the retry mechanism of the getStatusActivity
not stopping on cancellation ?
Is this the correct way of approaching this problem ?
Thanks for the help.