Hi everyone,
I’m looking for your inputs on a design for a user subscription workflow. As far as the recommendation goes it is said, that long-lived user oriented workflows could/should be started. So I came up with an initial design like this:
type (
Details struct {}
UnsubscribeMessage struct {}
SubscribeMessage struct {
MessageType string
}
)
func SubscriptionWorkflow(ctx workflow.Context, details Details) error {
logger := workflow.GetLogger(ctx)
var subscription *activity.UserSubscription
childCtx, cancelHandler := workflow.WithCancel(ctx)
var subscriptions []string
// TODO Register business event listener
eventsCh := make(chan interface{})
sink := ext.NewChanSink(eventsCh)
registerBusinessEventHandler(sink)
// setup query handlers
err := workflow.SetQueryHandler(ctx, "subscriptions", func(input []byte) ([]string, error) {
return subscriptions, nil
})
if err != nil {
logger.Info("SetQueryHandler failed: " + err.Error())
return err
}
shutdown := false
for {
selector := workflow.NewSelector(childCtx)
// exit condition
selector.AddReceive(childCtx.Done(), func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, nil)
shutdown = true
})
// Loop worker searcher
selector.AddFuture(workflow.NewTimer(childCtx, time.Millisecond * 10), func(f workflow.Future) {
logger.Info("no signal yet")
})
// Business events
selector.AddReceive(eventsCh, func(c workflow.ReceiveChannel, more bool) {
var bizEvent *activity.BizEvent
c.Receive(childCtx, &bizEvent)
workflow.Go(ctx, func(ctx workflow.Context) {
workflow.ExecuteActivity(ctx, subscription.Notify, bizEvent)
})
})
// Signal Actions
selector.AddReceive(workflow.GetSignalChannel(childCtx, UnsubscribeSignal), func(c workflow.ReceiveChannel, more bool) {
var unsubMsg *UnsubscribeMessage
c.Receive(childCtx, &unsubMsg)
logger.Info("Unsubscribe signal received: %v", unsubMsg)
if len(subscriptions) == 0 {
// cleanup workflow, no subscriptions are left
cancelHandler()
}
})
selector.AddReceive(workflow.GetSignalChannel(childCtx, SubscribeSignal), func(c workflow.ReceiveChannel, more bool) {
var subMsg *SubscribeMessage
c.Receive(childCtx, &subMsg)
logger.Info("Subscribe signal received: %v", subMsg)
subscriptions = append(subscriptions, subMsg.MessageType)
})
selector.Select(childCtx)
if shutdown {
break
}
}
logger.Info("User subscription has been terminated.")
return nil
}
So here are my questions:
-
When registering activities like this, does it mean that all invocation go through the same object? Do I need to worry about scaling this out as more and more workflows reaching into this one object?
userSub := &activity.UserSubscription{configProvider, emailService} w.RegisterActivity(userSub)
-
The workflow code is non-complete, but one of the major points is that I want to hook into my rabbitmq client and listen to my business events outside of the temporal framework. I would need a regular channel for that. The rabbitmq client uses regular channels and go routines itself. How do I setup my hooks inside the workflow to receive events from the rabbitmq client? I’m talking about these lines:
// TODO Register business event listener eventsCh := make(chan interface{}) sink := ext.NewChanSink(eventsCh) registerBusinessEventHandler(sink)
-
Does the workflow in general look “OK” or do I need to go a completely different way if I want to achieve a per User (probably workflowId = e-mail) long-running workflow?
Thanks!