ContinueAsNew + signals

I am trying to write a long-running workflow which receives signals, and I am finding that when I ContinueAsNew it might drop some on the floor. Here is some example workflow code:

func SignalWorkflow(ctx workflow.Context) error {
	logger := workflow.GetLogger(ctx)
	requestLockCh := workflow.GetSignalChannel(ctx, "test-signals")
	for iterations := 0; iterations < 10; iterations++ {
		var n int
		for requestLockCh.ReceiveAsync(&n) {
			logger.Info("Got signal", "N", n)
		}
		workflow.Sleep(ctx, time.Second)
	}
	return workflow.NewContinueAsNewError(ctx, SignalWorkflow)
}

And here is some code that signals it:

c, err := client.NewClient(client.Options{})
if err != nil {
	log.Fatalln("unable to create Temporal client", err)
}
defer c.Close()
for n := 0; n < 100; n++ {
	err = c.SignalWorkflow(context.Background(), "test-workflow", "", "test-signals", n)
	if err != nil {
		log.Fatalf("error signalling workflow: %s", err)
	}
	time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
}

Here is a relevant excerpt from the logs:

2020/11/16 14:57:15 INFO  Got signal Namespace default TaskQueue test-queue WorkerID 42150@Chriss-MacBook-Pro.local@ WorkflowType SignalWorkflow WorkflowID test-workflow RunID 79cc26c1-5fa3-420e-b2dd-af18ac6b2cd9 N 25
2020/11/16 14:57:15 INFO  Got signal Namespace default TaskQueue test-queue WorkerID 42150@Chriss-MacBook-Pro.local@ WorkflowType SignalWorkflow WorkflowID test-workflow RunID 79cc26c1-5fa3-420e-b2dd-af18ac6b2cd9 N 26
2020/11/16 14:57:15 INFO  Got signal Namespace default TaskQueue test-queue WorkerID 42150@Chriss-MacBook-Pro.local@ WorkflowType SignalWorkflow WorkflowID test-workflow RunID 79cc26c1-5fa3-420e-b2dd-af18ac6b2cd9 N 27
2020/11/16 14:57:16 INFO  Workflow has unhandled signals Namespace default TaskQueue test-queue WorkerID 42150@Chriss-MacBook-Pro.local@ WorkflowType SignalWorkflow WorkflowID test-workflow RunID 79cc26c1-5fa3-420e-b2dd-af18ac6b2cd9 SignalNames [test-signals]
2020/11/16 14:57:17 INFO  Got signal Namespace default TaskQueue test-queue WorkerID 42150@Chriss-MacBook-Pro.local@ WorkflowType SignalWorkflow WorkflowID test-workflow RunID 84987203-fb7e-48a8-98d5-a0a4ece96ded N 32
2020/11/16 14:57:17 INFO  Got signal Namespace default TaskQueue test-queue WorkerID 42150@Chriss-MacBook-Pro.local@ WorkflowType SignalWorkflow WorkflowID test-workflow RunID 84987203-fb7e-48a8-98d5-a0a4ece96ded N 33

Am I using the API wrong, or are signals not intended to guarantee delivery?

1 Like
func SignalWorkflow(ctx workflow.Context) error {
	logger := workflow.GetLogger(ctx)
	requestLockCh := workflow.GetSignalChannel(ctx, "test-signals")
	for iterations := 0; iterations < 10; iterations++ {       <- here, the logic will give up after 10 batch receive of signals
		var n int
		for requestLockCh.ReceiveAsync(&n) {
			logger.Info("Got signal", "N", n)
		}
		workflow.Sleep(ctx, time.Second)
	}
	return workflow.NewContinueAsNewError(ctx, SignalWorkflow)
}

you may want to add additional handling of signals just before return

func SignalWorkflow(ctx workflow.Context) error {
	logger := workflow.GetLogger(ctx)
	requestLockCh := workflow.GetSignalChannel(ctx, "test-signals")
	for iterations := 0; iterations < 10; iterations++ {
		var n int
		for requestLockCh.ReceiveAsync(&n) {
			logger.Info("Got signal", "N", n)
		}
		workflow.Sleep(ctx, time.Second)
	}
	
	var n int                             <- additional logic
	for requestLockCh.ReceiveAsync(&n) {
		logger.Info("Got signal", "N", n)
	}
	return workflow.NewContinueAsNewError(ctx, SignalWorkflow)
}

SDK logs also shows

2020/11/16 14:57:16 INFO  Workflow has unhandled signals Namespace default TaskQueue test-queue WorkerID 42150@Chriss-MacBook-Pro.local@ WorkflowType SignalWorkflow WorkflowID test-workflow RunID 79cc26c1-5fa3-420e-b2dd-af18ac6b2cd9 SignalNames [test-signals]

Thanks, @Wenquan_Xing! Yes, I see that I could avoid sleeping right before returning to make this problem less likely, but I would like to know if there is some pattern I can follow to guarantee that no signals will be missed.

To guarantee you need to drain the signal channel using ReceiveAsync right before calling continue as new.

Note: This applies to Go SDK only! All other SDKs don’t use channels to deliver signals and already provide such a guarantee without any explicit coding needed.

@maxim Thanks!

Suppose there are multiple signal channels. How do I know that channel A didn’t get a signal while I was draining channel B?

Unless you block on an external event (like the timer in case of workflow.Sleep) there is a hard guarantee that no new signal will be added to the A while draining B.

Got it, thanks @maxim! From your answers and from very quickly glancing at the Go implementation I’m thinking that there is some sort of event loop that, among things, processes incoming signals and puts them into the channel, and this event loop only happens when a workflow “wakes back up” after sleeping, running an activity, etc. So therefore as long as I don’t do anything that makes my workflow “sleep” after draining all the channels there will be no chance for that loop to run.

Furthermore, I don’t need to worry about what happens if I crash before I finish draining the channel, because these incoming signals are history events and they will just be replayed to refill the channel on the next execution of the workflow.

Is this all about right? Is there somewhere that the execution model is explained more fully so that I don’t have to bother you with these types of questions? :slight_smile:

These are awesome questions. And your understanding is correct. One missing link is that if a signal is received while workflow task that decides to close workflow (or calls continue as new) is executing then the workflow task result is ignored and the task is retried to give the workflow chance to process the signal. It works like a transactional memory that is rolled back to the state before the signal was received.

Great, thanks @maxim! And regarding your further point about restarting to give it a cance to process the signal, when you say a “signal is received while workflow task…is executing” you mean “received by Temporal” but not yet by a worker, right? That is, it did not ever process the signal in an event loop.

While looking at the implementation code I noticed that it does not seem very goroutine-safe. So would it be accurate to say that you can never start new goroutines in a workflow, even if you find some way to make its output deterministic, because the runtime might break if you do that?

Thanks again for your time!

And regarding your further point about restarting to give it a cance to process the signal, when you say a “signal is received while workflow task…is executing” you mean “received by Temporal” but not yet by a worker, right? That is, it did not ever process the signal in an event loop.

Right, signal accepted by the service, but not delivered to the worker.

While looking at the implementation code I noticed that it does not seem very goroutine-safe. So would it be accurate to say that you can never start new goroutines in a workflow, even if you find some way to make its output deterministic, because the runtime might break if you do that?

You should never create goroutines directly. Instead, use workflow.Go to create them. This way the code is executed by the SDK by using cooperative multithreading which is safe without any additional synchronization.

Read the Workflow Implementation documentation on what is allowed and what is not in the workflow code.

Thanks again, @maxim.

I have read that documentation, but I thought that “deterministic” and “idempotent” were the real requirements and that the other bullet points were tips for making sure you achieved those goals. I see now that everything listed on that page should be viewed as a hard requirement for workflows.

Deterministic is a real requirement and all bullet points are how you achieve it. For example using native goroutines breaks determinism, while goroutines created through workflow.Go are executed deterministically.

1 Like

I think maybe I do not understand what “determinism” means in this context. I thought “deterministic” meant “always produces the same output from the same input” which is certainly possible with goroutines. It can’t mean “always performs the same operations in the same order” because that is not guaranteed even within a single goroutine.

What does the documentation mean when it says “deterministic”?

Deterministic in this context is that it executes all the operations in exactly the same order as it is not about only external operations but also about the internal state of a workflow.

It is guaranteed if access to the shared state is synchronized as the link you posted states. And when Temporal SDK executes workflow code it is always fully synchronized by the framework.

I’m not sure that is a very precise definition. For example, here is a trivial program that has deterministic behavior but is not guaranteed to execute all operations in exactly the same order; specifically it may assign values to a and b in either order, or simultaneously. I’m sure that behavior does not make the code ineligible to be used in a workflow.

Would it be accurate to say that the requirement (in addition to input/output determinism) is, “all the Temporal SDK calls can be ordered by the happens before relation”? So, for example, goroutines are fine as long as you use synchronization events to ensure that the order of execution of Temporal SDK calls is predetermined.

I agree that compiler is free to choose the order of execution having that the observable behaviour is the same.

So, for example, goroutines are fine as long as you use synchronization events to ensure that the order of execution of Temporal SDK calls is predetermined.

Goroutines are not fine as any custom synchronization has the potential for blocking goroutines which breaks cooperative multithreading Temporal SDK relies on.

Ah, okay, I think I am starting to understand. It sounds like the execution model of workflows is quite different from what I am used to in Go, seeming more like Javascript, where you can have futures, callbacks, etc., but they never actually run concurrently. So I am thinking the restrictions can be considered to be mostly about not doing anything that breaks out of that model or is incompatible with it. For example, the docs don’t mention package sync, but I am guessing it is also forbidden in workflows.

Thanks again for all your helpful answers and your time!

1 Like

I still couldn’t understand why sleeping is causing to miss the events in the new workflow ?

I still couldn’t understand why sleeping is causing to miss the events in the new workflow ?

My understanding is that it goes something like this:

  1. (time = T) Worker calls workflow.Sleep and pauses execution of the workflow code.
  2. Workflow runtime receives a signal from the Temporal service. The runtime puts the signal on the internal signal channel. The Temporal service records in the workflow’s history that the signal was received by the workflow.
  3. (time = T+1) Workflow code is resumed and returns without checking the signal channel.

So now, from the Temporal service’s perspective, the workflow received the signal and returned, so it must have handled it. So there is no need to send it again to the next run of the workflow.

This is what confused me as well: I thought when you read a signal from the channel that is when the “signal received” history event happens. But in fact it already happened and you need to handle the signal that was already received (by draining the channel) or it will be lost.

1 Like

@hundt is correct. Sleeping is not causing to miss the events as they still delivered to the signal channel.

But workflow that doesn’t drain the signal channel asynchronously before exiting just ignores buffered signals.