I’m a bit confused as to what the documentation means by make sure to do an asynchronous drain on the Signal channel. Is the following an async drain? If not, what should I be doing?
func SimpleWorkflow(ctx workflow.Context) error {
workflow.Go(ctx, func(ctx workflow.Context) {
var signal string
signalChan := workflow.GetSignalChannel(ctx, "simple")
selector := workflow.NewNamedSelector(ctx, "simple selector")
selector.AddReceive(signalChan, func(channel workflow.ReceiveChannel, more bool) {
channel.Receive(ctx, &signal)
})
for {
selector.Select(ctx)
workflow.GetLogger(ctx).Info("received signal", signal)
}
})
for i := 0; i < 10; i++ {
if err := workflow.Sleep(ctx, 30*time.Second); err != nil {
return err
}
}
return workflow.NewContinueAsNewError(ctx, SimpleWorkflow)
}
Oh… I see. So if I had several of signal receivers in various workflow.Go functions, then I would need to stop them and then do a ReceiveAsync on each receiver not in a workflow.Go function before exiting?
Interesting. I must have missed that in the documentation.
Sorry for the tangent… let’s say that there are two workflow.Go routines, can they run in parallel or does Temporal limit the workflow to only one active task at one time?
Workflow code must be deterministic. The only way to run code that contains multiple goroutines is to use cooperative multi-threading. This means that only one goroutine per workflow runs at a time. When it blocks on some Temporal API like Sleep or Fugure.Get the control is passed back to Temporal SDK and the next goroutine is unblocked.
does Temporal limit the workflow to only one active task at one time?
I don’t know what you mean by task in this context. If you have two goroutines scheduling two activities then both activities will run in parallel.
I missed one edge case. If there is a goroutine blocked on Channel.Receive. Then a new message can be allocated to that call (even if its goroutine never wakes up) and never delivered to Channel.ReceiveAsync from a different goroutine. So to ensure no message loss the code needs to ensure that no other goroutine is still consuming from the same channel before draining it.
This looks like a design bug in the way the Channel is implemented. If filed an issue to get this looked at.