Use case for Pub/Sub Workflow integration

I’m considering using Temporal to bring visibility to our pub/sub service.
This pub/sub service is subscribing to a topic and once a message is received it reacts doing a request to another service.

I did a basic PoC workflow like this:

package myworkflow

import (
	"fmt"
	"time"

	"go.temporal.io/sdk/temporal"
	"go.temporal.io/sdk/workflow"
)

func MyWorkflow(ctx workflow.Context) error {
	logger := workflow.GetLogger(ctx)

	logger.Info("starting workflow")

	restartCh := workflow.NewChannel(ctx)
	signalsCh := workflow.GetSignalChannel(ctx, "my-pubsub-signals")

	ctxWebhook, cancelWebhook := workflow.WithCancel(ctx)
	defer cancelWebhook()

	workflow.Go(ctxWebhook, func(ctx workflow.Context) {
		var done bool

		selector := workflow.NewSelector(ctx)

		selector.AddReceive(ctx.Done(), func(ch workflow.ReceiveChannel, more bool) {
			done = true
		})

		selector.AddReceive(signalsCh, func(ch workflow.ReceiveChannel, more bool) {
			var signal MySignalType
			ch.Receive(ctx, &signal)

			ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
				TaskQueue:              "my-task-queue",
				ScheduleToCloseTimeout: time.Minute,
				RetryPolicy: &temporal.RetryPolicy{
					InitialInterval:        1 * time.Second,
					BackoffCoefficient:     2.0,
					MaximumInterval:        0,
					MaximumAttempts:        10,
					NonRetryableErrorTypes: []string{},
				},
			})

			// This activity will be executed everytime a message is published to the Pub/Sub topic
			err := workflow.ExecuteActivity(ctx, "my-activity", signal).Get(ctx, nil)
			if err != nil {
				logger.Error("my activity", "error", err)
			}

			if workflow.GetInfo(ctx).GetContinueAsNewSuggested() {
				restartCh.Send(ctx, true)
			}
		})

		for !done {
			selector.Select(ctx)
		}
	})

	pubsubCtx, cancelPubSub := workflow.WithCancel(ctx)

	pubsubCtx = workflow.WithActivityOptions(pubsubCtx, workflow.ActivityOptions{
		TaskQueue:              "my-task-queue",
		HeartbeatTimeout:       10 * time.Second,
		ScheduleToCloseTimeout: 10 * 365 * 24 * time.Hour, // run this activity forever
		WaitForCancellation:    true,
		RetryPolicy: &temporal.RetryPolicy{
			InitialInterval:        1 * time.Second,
			BackoffCoefficient:     2.0,
			MaximumInterval:        0,
			MaximumAttempts:        0,
			NonRetryableErrorTypes: []string{},
		},
	})

	// This activity will listen to the Pub/Sub topic and send signals to the workflow
	future := workflow.ExecuteActivity(pubsubCtx, "my-pubsub-listener")

	selector := workflow.NewSelector(ctx)

	var err error
	var restart bool

	selector.AddFuture(future, func(f workflow.Future) {
		err = f.Get(ctx, nil)
	})

	selector.AddReceive(restartCh, func(_ workflow.ReceiveChannel, _ bool) {
		restart = true
		cancelPubSub()
	})

	selector.Select(ctx)

	if restart {
		err := workflow.Await(ctx, func() bool {
			return workflow.AllHandlersFinished(ctx)
		})
		if err != nil {
			return fmt.Errorf("wait for all handlers to finish: %w", err)
		}

		err = workflow.NewContinueAsNewError(ctx, MyWorkflow)
		if err != nil {
			return fmt.Errorf("continue as new: %w", err)
		}

		return nil
	}

	if err != nil {
		return fmt.Errorf("pubsub listener activity: %w", err)
	}

	return nil
}

What I want to know is if Temporal is a good use case for that. Although this pubsub service is really simple, it reacts to millions of events per hour. Is Temporal suitable for that? Is this workflow scalable? do you see any issue with this architecture design?

Thanks

A single workflow execution (instance) has a very limited throughput <100 requests per second. Temporal scales out with the number of parallel workflow executions. So the design that pushes all the messages through a single workflow is not scalable.

1 Like