ContinueAsNew + signals

I see some behaviour difference compared to this mention, I see that while one signal is in process another signal is getting added (we are using v0.26.0), Some times while create code it self in progress it’s receiving messages

I see some behaviour difference compared to this mention, I see that while one signal is in process another signal is getting added (we are using v0.26.0), Some times while create code it self in progress it’s receiving messages

What does in your context “one signal is in process” entail?

Let’s say I have “Big” workflow having 5 signal handlers, multiplexed with select statement. Each signal handler create one child workflow, (or some set of activities with in workflow or together), let’s say my “Big”" workflow was sent 2 signals to two different handlers concurrently, what I observe is while one is in process, other is getting received, even though not making progress. And How do we make sure draining in case of this type of multiplexing ?

The draining should be fully asynchronous. It looks like you are blocking while processing signals (invoking activities), so it is expected that other signals are delivered.

To not lose signals make sure that you drain all of them before completing workflow.

1 Like

@maxim and to be clear, would it be accurate to say that it is not possible to “drain” using a Selector? i.e. even if you are not blocking while processing signals you still need to drain every channel with ReceiveAsync before returning after Select. So if you are using a selector like @yugandhar, then you need to something like this:

type channelConfig struct {
  channel workflow.ReceiveChannel
  val interface{}
  handler func()  // do something with val
}

channels := []*channelConfig{
  {
    channel: workflow.GetSignalChannel(...)
    val: "",
    handler func() {
      // do something with val
    },
  },
  // etc
}

s := workflow.NewSelector(ctx)
for _, cc := range channels {
  s.AddReceive(cc.channel, func (c workflow.ReceiveChannel, _ bool) {
    c.Receive(ctx, &cc.val)
    cc.handler()
  })
}

for keepGoing {
  s.Select(ctx)
  // not safe to return here
}

// Now I want to return but first I need to make sure that no signals are pending
canReturn := false
for !canReturn {
  canReturn = true
  for _, cc := range channels {
    if ok := c.ReceiveAsync(&cc.val); ok {
      canReturn = false
      cc.handler()
    }
  }
}

This should work both with and without blocking calls in the handlers.

Is that right? It feels kind of awkward but I can’t think of a simpler way to express it.

Okay, the synchronous processing is a requirement for us, we want to process one by one events, to avoid synchronisation problems, I want to process one by one signals from others to maintain consistency and debug-ability.

@hundt I agree that it is not an ideal experience. I filed an issue to simplify it.

1 Like

Okay, the synchronous processing is a requirement for us, we want to process one by one events, to avoid synchronisation problems, I want to process one by one signals from others to maintain consistency and debug-ability.

You absolutely can do synchronous processing and maintain consistency and availability. But your code should be careful when closing workflow.

I’m trying this approach but getting infinite loop. Am I using ReceiveChannel in the right way here?

 future := workflow.NewTimer(ctx, time.Minute)
stopChan := workflow.GetSignalChannel(ctx, "stop")
selector := workflow.NewSelector(ctx)
var selectVal string

selector.AddReceive(stopChan, func(c workflow.ReceiveChannel, more bool) {
	logger.Info("stop received!")
	selectVal = "stop"
})

selector.AddFuture(future, func(f workflow.Future) {
	logger.Info("timer received!")
	selectVal = "timer"
})

for selectVal == "" || selector.HasPending() {
	selector.Select(ctx) // <-- I have infinite loop here sending "stop received!" into console every time
}

I think you are not actually receiving the message, so it remains pending forever. See the docs for Selector.

If I don’t receive the message then I expect selector.Select(ctx) to block until I receive it. But I see that it passes selector.Select(ctx) a lot of times because selector.HasPending() remains true. I only sent one stop signal to this workflow

This doesn’t look right. The HasPending was added pretty recently, so it might have an issue. Let us investigate.

1 Like

Oh interesting. The docs just say that the handler you give to AddReceive has to receive the message; they don’t say what happens if it doesn’t. I guess blocking makes sense? Repeatedly calling the handler also made sense to me but I might be misunderstanding something.

@hundt good catch!

Then selector.HasPending performs according to its specification. As the message wasn’t consumed it is still pending. So change your code to actually consume the message in the callback:

selector.AddReceive(stopChan, func(c workflow.ReceiveChannel, more bool) {
	logger.Info("stop received!")
	c.Receive(ctx, nil)
	selectVal = "stop"
})
1 Like

:man_facepalming:…so simple. Thank you for quick answer

I think this problem can be generalised into the pub/sub pattern: on one end, I want to keep sending signals, on the other end, I consume the signal and process it, then return continue_as_new to process another.

I don’t think the workflow.Signal is well suited for this problem. As @maxim explained in the other Q&A : Temporal is a control plane, not a data plane.

To workaround this issue, we used an external queue and then created a spacial “consumeMessage” activity that can wait indefinitely, and only consume one message and return.

IMHO, this is not a bad pattern, we fully utilised temporal’s capability to control the execution and we also have the data flowing through the activities without the restriction of history size.

One problem is if this continue_as_new workflow repeats too many times, we need to garbage collect them often.

I had trouble figuring out how to continue as new after a certain number of activities. The documentation just says “make sure to do an asynchronous drain on the Signal channel” without saying how or mentioning HasPending. Eventually, I came up with something that worked, so I thought I’d share it here for anyone else who needs help:

const LifecycleMaxEvents int = 10000

func LongrunningWorkflow(ctx workflow.Context, params *Parameters) error {
	s := workflow.NewSelector(ctx)

	var signalVal string

	signalHandlers := []struct {
		signalName  string
		handlerFunc func(ctx workflow.Context, params *Parameters, _ *string) func(workflow.ReceiveChannel, bool)
	}{
		{"signalName", handler},
		// ...
	}
	for _, handler := range signalHandlers {
		sigChan := workflow.GetSignalChannel(ctx, handler.signalName)
		s.AddReceive(sigChan, handler.handlerFunc(ctx, params, &signalVal))
	}

	for i := 0; i < LifecycleMaxEvents || s.HasPending(); i++ { // this `or` allows an arbitrary limit to be set and to flush the channel buffer once that limit is reached before continuing as new
		s.Select(ctx)
		if isDone(ctx, params.Name) { // something here to determine if we're done
			return nil
		}
	}
	return workflow.NewContinueAsNewError(ctx, LongrunningWorkflow, params)
}

func handler(ctx workflow.Context, params *Parameters, _ *string) func(workflow.ReceiveChannel, bool) {
	return func(c workflow.ReceiveChannel, more bool) {
		var signalVal string
		c.Receive(ctx, &signalVal)
		// do stuff
	}
}
3 Likes