User subscription workflow design

Hi everyone,

I’m looking for your inputs on a design for a user subscription workflow. As far as the recommendation goes it is said, that long-lived user oriented workflows could/should be started. So I came up with an initial design like this:

type (
	Details struct {}

	UnsubscribeMessage struct {}

	SubscribeMessage struct {
	    MessageType string
    }
)

func SubscriptionWorkflow(ctx workflow.Context, details Details) error {
	logger := workflow.GetLogger(ctx)
	var subscription *activity.UserSubscription

	childCtx, cancelHandler := workflow.WithCancel(ctx)
	var subscriptions []string

	// TODO Register business event listener
	eventsCh := make(chan interface{})
	sink := ext.NewChanSink(eventsCh)
	registerBusinessEventHandler(sink)

	// setup query handlers
	err := workflow.SetQueryHandler(ctx, "subscriptions", func(input []byte) ([]string, error) {
		return subscriptions, nil
	})
	if err != nil {
		logger.Info("SetQueryHandler failed: " + err.Error())
		return err
	}

	shutdown := false
	for {
		selector := workflow.NewSelector(childCtx)

		// exit condition
		selector.AddReceive(childCtx.Done(), func(c workflow.ReceiveChannel, more bool) {
			c.Receive(ctx, nil)
			shutdown = true
		})
		// Loop worker searcher
		selector.AddFuture(workflow.NewTimer(childCtx, time.Millisecond * 10), func(f workflow.Future) {
			logger.Info("no signal yet")
		})

		// Business events
		selector.AddReceive(eventsCh, func(c workflow.ReceiveChannel, more bool) {
			var bizEvent *activity.BizEvent
			c.Receive(childCtx, &bizEvent)

			workflow.Go(ctx, func(ctx workflow.Context) {
				workflow.ExecuteActivity(ctx, subscription.Notify, bizEvent)
			})
		})

		// Signal Actions
		selector.AddReceive(workflow.GetSignalChannel(childCtx, UnsubscribeSignal), func(c workflow.ReceiveChannel, more bool) {
			var unsubMsg *UnsubscribeMessage
			c.Receive(childCtx, &unsubMsg)
			logger.Info("Unsubscribe signal received: %v", unsubMsg)
			if len(subscriptions) == 0 {
				// cleanup workflow, no subscriptions are left
				cancelHandler()
			}
		})
		selector.AddReceive(workflow.GetSignalChannel(childCtx, SubscribeSignal), func(c workflow.ReceiveChannel, more bool) {
			var subMsg *SubscribeMessage
			c.Receive(childCtx, &subMsg)
			logger.Info("Subscribe signal received: %v", subMsg)
			subscriptions = append(subscriptions, subMsg.MessageType)
		})

		selector.Select(childCtx)
		if shutdown {
			break
		}
	}

	logger.Info("User subscription has been terminated.")
	return nil
}

So here are my questions:

  • When registering activities like this, does it mean that all invocation go through the same object? Do I need to worry about scaling this out as more and more workflows reaching into this one object?

    userSub := &activity.UserSubscription{configProvider, emailService}
    w.RegisterActivity(userSub)
    
  • The workflow code is non-complete, but one of the major points is that I want to hook into my rabbitmq client and listen to my business events outside of the temporal framework. I would need a regular channel for that. The rabbitmq client uses regular channels and go routines itself. How do I setup my hooks inside the workflow to receive events from the rabbitmq client? I’m talking about these lines:

      // TODO Register business event listener
      eventsCh := make(chan interface{})
      sink := ext.NewChanSink(eventsCh)
      registerBusinessEventHandler(sink)
    
  • Does the workflow in general look “OK” or do I need to go a completely different way if I want to achieve a per User (probably workflowId = e-mail) long-running workflow?

Thanks!

1 Like

I’ve added Max to give a more comprehensive answer but from what I can tell you’re trying to receive non-Temporal signals in the workflow definition. There are only two ways to get data into a workflow

  1. Poll from activity on remote endpoint (not recommended)
  2. Use Temporal signal functionality to receive business level event.

Just to be clear, using arbitrary eventing (within a workflow) is non-deterministic and therefore not a valid approach. Also for infinitely running workflows like this, it may be wise to design with ContinueAsNew in mind.

@ryland I just changed the draft today to renew every 30 days to reset the workflow history as recommended elsewhere. :slight_smile: Thanks for the tip.

Regarding the other points, I kind of came to the same conclusion myself, but I really didn’t want to duplicate the biz event logic for the workflow, I was hoping I could somehow tap into outside world. It makes however complete sense not to allow this.

To summarize, what you mean is something along the lines of:

  • use at least one signal channel to receive business events in the subscription workflow (to reduce complexity and not receive on a per event type)
  • create a business event consumer layer which signals all subscription type of workflows, who can decide internally whether the event is relevant for them

Did I get it right?

1 Like

Never use native Go channels in the workflow code. Use the Channel interface with instances created through workflow.NewChannel or workflow.GetSignalChannel.

@ryland is correct that the best way to deliver business events to a workflow is through signals. The standard approach is to have a queue consumer that receives messages, filters them, trims them to include only information relevant to workflows (to reduce payload sizes), determines appropriate workflowIds, and signals those workflows.

Thank you both @ryland, @maxim!

Could you please answer the last open point?

From the Temporal point of view, an activity implementation is just a function that is called from multiple goroutines. Note that you can specify a limit of the maximum number of parallel activity invocations per worker. And the SDK is going to guarantee that no more than that limit is called in parallel. If a single worker process cannot keep up with the load you are expected to scale out by running more such processes.

Understood thank you. That means I can rely on the standard cpu/Memory metrics of the container and scale horizontally when needed.

A metric to watch is schedule to start latency which represents how long an activity task stayed in a task queue. If workers are able to keep up with the load this latency should be close to zero. If worker cannot keep up the time is going to increase.

About the above reset, was this referring to calling ContinueAsNew every 30 days?

Looking again at the example here: https://docs.temporal.io/docs/workflows, should this be implemented with ContinueAsNew? If we’re executing roughly the same amount of activities (3-4) every 30 days, at what point would this hit max history constraints?

I think the limit of history size is 100k events. So 3-4 activities per month wouldn’t require the continue as new for many years unless activity inputs and outputs are very large.

One reason to call continue as new more frequently is versioning. If you call it on a timer once a month then there is a hard guarantee that no workflow code is using a version that is more than a month old. This would allow eliminating old versions based on this knowledge.

Would another potential option be separating it into a ChildWorkflow?

I’m not sure how a child workflow helps with a perpetually running workflow versioning.

Hmm - like, instead of running the 3-4 activities in the same long running workflow (that run once a month), it’d run in a child workflow. This child workflow would be spawned each month. This would allow new code there, but you’re right that you’d still have this parent workflow that is very long-lived.

That might be okay though, in the case of a basic subscriptions program, which just is running some code every 30 days except when terminated.

I see. This way the parent workflow indeed would be very simple and require much less maintenance.

Just to follow up, I actually used a restart frequency factor parameter. Which defines after how many events/signals/loops should the history be reset. The current implementation takes a parameters and multiples by a fixed baseline.

Example:

const frequencyBase = 10000

func SubscriptionWorkflow(ctx workflow.Context, details Details) error {
    ...
    limit := details.FrequencyFactor * frequencyBase
    ...
    For i := 0; i < limit; i++ { ...workflow body... }
    ...
    return workflow.NewContinueAsNewError(ctx, SubscriptionWorkflow, details)
}

The idea was to quickly adjust the restart limit on new workflows by changing the factor without redeploying the application.

The restart frequency approach is fine. I personally prefer the time based one as it gives better guarantees around which versions are in production.

That makes perfect sense, I will have to take a deeper look at versioning.
I wanted to go into production with a proof of concept, hence the sloppy solution to remind me that it needs to be reset at one point. Although I’m resetting way too often currently :slight_smile:, roughly around every 150 events, your mentioned 100k is a much nicer limit.

Resetting every 150 events is not a problem. Temporal own CRON workflow implementation calls continue as new after each iteration, for example.

The optimal frequency should be chosen carefully when you run a very large number of such workflows in parallel. For example, let’s say that there are have 300 million parallel workflows that reset once a day. Then just reset traffic is going to generate around 3.5 thousand workflow starts per second. Change this to once per month and this background rate drops to 115 per second.

1 Like