What happens to signals sent to a closing workflow?

Hello folks! I was hoping you could help me with this issue/question:

For this particular use case, I need to process “bursts” of messages, typically ranging from a single message to low thousands of messages. Two interesting characteristics of these bursts are:

  • They are unpredictable in when they will arrive. We don’t get them at once, so we don’t know when the last message came.
  • Due to downstream API rate limiting, we can only process one message at a time, and maybe even with some delay in between.

I decided to use a Temporal workflow to process these messages. My current idea is:

  • Use a “long-running” “singleton” Temporal workflow to process messages.
  • Use signals to send messages to the workflow.
  • Have a sufficiently long timer to identify when the workflow stops receiving messages and finish it successfully when that happens.
  • Use the SignalWithStartWorkflow API to send messages to an open/running workflow or start a new one otherwise. I’d need a “fixed” workflow ID to make the API work properly and set the workflow ID reuse policy to allow duplicates.

Here’s some example code to illustrate my approach:

package example

import (
	"context"
	"go.temporal.io/api/enums/v1"
	"go.temporal.io/sdk/client"
	"go.temporal.io/sdk/workflow"
	"time"
)

// Workflow & activity implementation

func MessageProcessingWorkflow(ctx workflow.Context) error {
	signalChannel := workflow.GetSignalChannel(ctx, "message_signal")
	var err error
	inactive := false

	for {
		selector := workflow.NewSelector(ctx)
		timerCtx, cancelTimerCtx := workflow.WithCancel(ctx)

		inactivityTimer := workflow.NewTimer(timerCtx, time.Minute)
		selector.AddFuture(inactivityTimer, func(future workflow.Future) {
			inactive = true
		})

		selector.AddReceive(signalChannel, func(channel workflow.ReceiveChannel, more bool) {
			cancelTimerCtx()

			var msg Message
			channel.Receive(ctx, &msg)

			err = workflow.ExecuteActivity(ctx, ProcessMessage, msg).Get(ctx, nil)
		})

		selector.Select(ctx)

		// If the workflow is inactive for 1 minute, stop the workflow.
		if inactive {
			return err
		}
	}
}

func ProcessMessage(ctx context.Context, msg Message) error {
	// ...
	return nil
}

type Message struct {
	// ...
}

// Example "starter"

func sendSignalToWorkflowOptionallyStartingIt(c client.Client) {
	c.SignalWithStartWorkflow(
		context.TODO(),
		"singleton_messaging_processing_workflow",
		"message_signal",
		nil,
		client.StartWorkflowOptions{
			// ...
			WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
		},
		MessageProcessingWorkflow,
	)
}

Now, assuming this approach has some merit (please, let me know otherwise!), here’s my question:

Is it possible to miss signals during a (possibly) short time window where a workflow execution is finishing up, but the Temporal cluster isn’t yet made aware of that?

The sequence would be:

  • The timer fires up.
  • The workflow’s selector.AddFuture callback starts to execute.
  • Incoming message signal! At this point, the Temporal cluster doesn’t know the workflow will end before consuming the signal channel again, so I assume it will simply queue the message to this workflow run?
  • Workflow finishes executing the callback, eventually finishing the whole workflow cleanly.

Did we lose the message? Is there a way to prevent this? Or is this approach fundamentally flawed?

Did we lose the message? Is there a way to prevent this? Or is this approach fundamentally flawed?

Your implementation is going to lose messages as the order of Select branch execution is not defined. Consider a situation when both signalChannel has a message and the timer future is ready. Then the timer branch can be called first, causing the loop to exit without processing all the buffered signals.

The correct version:

func MessageProcessingWorkflow(ctx workflow.Context) error {
	signalChannel := workflow.GetSignalChannel(ctx, "message_signal")
	var err error

	for {
		selector := workflow.NewSelector(ctx)
		timerCtx, cancelTimerCtx := workflow.WithCancel(ctx)

		inactivityTimer := workflow.NewTimer(timerCtx, time.Minute)
		selector.AddFuture(inactivityTimer, func(future workflow.Future) {
		})

		selector.AddReceive(signalChannel, func(channel workflow.ReceiveChannel, more bool) {
			cancelTimerCtx()

			var msg Message
			channel.Receive(ctx, &msg)

			err = workflow.ExecuteActivity(ctx, ProcessMessage, msg).Get(ctx, nil)
		})

		selector.Select(ctx)

		// Exit loop only if timer fired and there are no pending signals
		if !selector.HasPending() {
		   break;
		}
	}
}

I think you can simplify your code by using `Channel.ReceiveWithTimeout``:

	for  {
			var msg Message
			ok,  _ := channel.ReceiveWithTimeout(ctx, time.Minute, &msg)
			if !ok {
			    return err
			}
			err = workflow.ExecuteActivity(ctx, ProcessMessage, msg).Get(ctx, nil)
	 }

Note that a single Temporal workflow execution (instance) has limited throughput and might not perform well under a very spiky load. So test it before putting it in production.

If a signal is received while the workflow executes the last line of the workflow function, the Temporal will detect this, roll back the workflow to the last blocked state, and give it a chance to handle the signal. So as a programmer, you don’t need to do anything special besides ensuring that the signalChannel is fully drained before exiting the workflow function to handle the race condition you correctly identified.

The production implementation should also not wait and call continue-as-new as soon as it reaches a certain number of signals to avoid exceeding the history size limit.

1 Like

Hello Maxim!

I see, perfect! And your simplified solution looks even better.

I will test the workflow with production load and also experiment with continue-as-new.

Thanks a lot! Really helpful!

Hello again, @maxim!

Quick question about the ReceiveWithTimeout solution you proposed: I noticed it doesn’t cancel the timers when the channel successfully receives a message within time, as I can see they firing up in the UI still. There are no immediate repercussions, as the code still works, but is this by design? Do you see any “non-functional” issues (e.g., less efficient) with it?

It is by omission, and we should fix it. I filed an issue. I think it is OK to use in the meantime.