User subscription workflow design

Hi everyone,

I’m looking for your inputs on a design for a user subscription workflow. As far as the recommendation goes it is said, that long-lived user oriented workflows could/should be started. So I came up with an initial design like this:

type (
	Details struct {}

	UnsubscribeMessage struct {}

	SubscribeMessage struct {
	    MessageType string
    }
)

func SubscriptionWorkflow(ctx workflow.Context, details Details) error {
	logger := workflow.GetLogger(ctx)
	var subscription *activity.UserSubscription

	childCtx, cancelHandler := workflow.WithCancel(ctx)
	var subscriptions []string

	// TODO Register business event listener
	eventsCh := make(chan interface{})
	sink := ext.NewChanSink(eventsCh)
	registerBusinessEventHandler(sink)

	// setup query handlers
	err := workflow.SetQueryHandler(ctx, "subscriptions", func(input []byte) ([]string, error) {
		return subscriptions, nil
	})
	if err != nil {
		logger.Info("SetQueryHandler failed: " + err.Error())
		return err
	}

	shutdown := false
	for {
		selector := workflow.NewSelector(childCtx)

		// exit condition
		selector.AddReceive(childCtx.Done(), func(c workflow.ReceiveChannel, more bool) {
			c.Receive(ctx, nil)
			shutdown = true
		})
		// Loop worker searcher
		selector.AddFuture(workflow.NewTimer(childCtx, time.Millisecond * 10), func(f workflow.Future) {
			logger.Info("no signal yet")
		})

		// Business events
		selector.AddReceive(eventsCh, func(c workflow.ReceiveChannel, more bool) {
			var bizEvent *activity.BizEvent
			c.Receive(childCtx, &bizEvent)

			workflow.Go(ctx, func(ctx workflow.Context) {
				workflow.ExecuteActivity(ctx, subscription.Notify, bizEvent)
			})
		})

		// Signal Actions
		selector.AddReceive(workflow.GetSignalChannel(childCtx, UnsubscribeSignal), func(c workflow.ReceiveChannel, more bool) {
			var unsubMsg *UnsubscribeMessage
			c.Receive(childCtx, &unsubMsg)
			logger.Info("Unsubscribe signal received: %v", unsubMsg)
			if len(subscriptions) == 0 {
				// cleanup workflow, no subscriptions are left
				cancelHandler()
			}
		})
		selector.AddReceive(workflow.GetSignalChannel(childCtx, SubscribeSignal), func(c workflow.ReceiveChannel, more bool) {
			var subMsg *SubscribeMessage
			c.Receive(childCtx, &subMsg)
			logger.Info("Subscribe signal received: %v", subMsg)
			subscriptions = append(subscriptions, subMsg.MessageType)
		})

		selector.Select(childCtx)
		if shutdown {
			break
		}
	}

	logger.Info("User subscription has been terminated.")
	return nil
}

So here are my questions:

  • When registering activities like this, does it mean that all invocation go through the same object? Do I need to worry about scaling this out as more and more workflows reaching into this one object?

    userSub := &activity.UserSubscription{configProvider, emailService}
    w.RegisterActivity(userSub)
    
  • The workflow code is non-complete, but one of the major points is that I want to hook into my rabbitmq client and listen to my business events outside of the temporal framework. I would need a regular channel for that. The rabbitmq client uses regular channels and go routines itself. How do I setup my hooks inside the workflow to receive events from the rabbitmq client? I’m talking about these lines:

      // TODO Register business event listener
      eventsCh := make(chan interface{})
      sink := ext.NewChanSink(eventsCh)
      registerBusinessEventHandler(sink)
    
  • Does the workflow in general look “OK” or do I need to go a completely different way if I want to achieve a per User (probably workflowId = e-mail) long-running workflow?

Thanks!

1 Like

I’ve added Max to give a more comprehensive answer but from what I can tell you’re trying to receive non-Temporal signals in the workflow definition. There are only two ways to get data into a workflow

  1. Poll from activity on remote endpoint (not recommended)
  2. Use Temporal signal functionality to receive business level event.

Just to be clear, using arbitrary eventing (within a workflow) is non-deterministic and therefore not a valid approach. Also for infinitely running workflows like this, it may be wise to design with ContinueAsNew in mind.

@ryland I just changed the draft today to renew every 30 days to reset the workflow history as recommended elsewhere. :slight_smile: Thanks for the tip.

Regarding the other points, I kind of came to the same conclusion myself, but I really didn’t want to duplicate the biz event logic for the workflow, I was hoping I could somehow tap into outside world. It makes however complete sense not to allow this.

To summarize, what you mean is something along the lines of:

  • use at least one signal channel to receive business events in the subscription workflow (to reduce complexity and not receive on a per event type)
  • create a business event consumer layer which signals all subscription type of workflows, who can decide internally whether the event is relevant for them

Did I get it right?

1 Like

Never use native Go channels in the workflow code. Use the Channel interface with instances created through workflow.NewChannel or workflow.GetSignalChannel.

@ryland is correct that the best way to deliver business events to a workflow is through signals. The standard approach is to have a queue consumer that receives messages, filters them, trims them to include only information relevant to workflows (to reduce payload sizes), determines appropriate workflowIds, and signals those workflows.

Thank you both @ryland, @maxim!

Could you please answer the last open point?

From the Temporal point of view, an activity implementation is just a function that is called from multiple goroutines. Note that you can specify a limit of the maximum number of parallel activity invocations per worker. And the SDK is going to guarantee that no more than that limit is called in parallel. If a single worker process cannot keep up with the load you are expected to scale out by running more such processes.

Understood thank you. That means I can rely on the standard cpu/Memory metrics of the container and scale horizontally when needed.

A metric to watch is schedule to start latency which represents how long an activity task stayed in a task queue. If workers are able to keep up with the load this latency should be close to zero. If worker cannot keep up the time is going to increase.