Async Drain Signals?

Hi,

I’m a bit confused as to what the documentation means by make sure to do an asynchronous drain on the Signal channel. Is the following an async drain? If not, what should I be doing?

func SimpleWorkflow(ctx workflow.Context) error {
	workflow.Go(ctx, func(ctx workflow.Context) {
		var signal string
		signalChan := workflow.GetSignalChannel(ctx, "simple")
		selector := workflow.NewNamedSelector(ctx, "simple selector")
		selector.AddReceive(signalChan, func(channel workflow.ReceiveChannel, more bool) {
			channel.Receive(ctx, &signal)
		})
		for {
			selector.Select(ctx)
			workflow.GetLogger(ctx).Info("received signal", signal)
		}
	})

	for i := 0; i < 10; i++ {
		if err := workflow.Sleep(ctx, 30*time.Second); err != nil {
			return err
		}
	}
	return workflow.NewContinueAsNewError(ctx, SimpleWorkflow)
}

Take a look at post here for sample code impl (use of ReceiveAsync).

Oh… I see. So if I had several of signal receivers in various workflow.Go functions, then I would need to stop them and then do a ReceiveAsync on each receiver not in a workflow.Go function before exiting?

There is no even need to stop them as they are guaranteed not to run while the main workflow goroutine is executing.

Interesting. I must have missed that in the documentation.

Sorry for the tangent… let’s say that there are two workflow.Go routines, can they run in parallel or does Temporal limit the workflow to only one active task at one time?

Workflow code must be deterministic. The only way to run code that contains multiple goroutines is to use cooperative multi-threading. This means that only one goroutine per workflow runs at a time. When it blocks on some Temporal API like Sleep or Fugure.Get the control is passed back to Temporal SDK and the next goroutine is unblocked.

does Temporal limit the workflow to only one active task at one time?

I don’t know what you mean by task in this context. If you have two goroutines scheduling two activities then both activities will run in parallel.

Thank you! This was super enlightening.

I missed one edge case. If there is a goroutine blocked on Channel.Receive. Then a new message can be allocated to that call (even if its goroutine never wakes up) and never delivered to Channel.ReceiveAsync from a different goroutine. So to ensure no message loss the code needs to ensure that no other goroutine is still consuming from the same channel before draining it.

This looks like a design bug in the way the Channel is implemented. If filed an issue to get this looked at.

1 Like