I’m considering using Temporal to bring visibility to our pub/sub service.
This pub/sub service is subscribing to a topic and once a message is received it reacts doing a request to another service.
I did a basic PoC workflow like this:
package myworkflow
import (
"fmt"
"time"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
)
func MyWorkflow(ctx workflow.Context) error {
logger := workflow.GetLogger(ctx)
logger.Info("starting workflow")
restartCh := workflow.NewChannel(ctx)
signalsCh := workflow.GetSignalChannel(ctx, "my-pubsub-signals")
ctxWebhook, cancelWebhook := workflow.WithCancel(ctx)
defer cancelWebhook()
workflow.Go(ctxWebhook, func(ctx workflow.Context) {
var done bool
selector := workflow.NewSelector(ctx)
selector.AddReceive(ctx.Done(), func(ch workflow.ReceiveChannel, more bool) {
done = true
})
selector.AddReceive(signalsCh, func(ch workflow.ReceiveChannel, more bool) {
var signal MySignalType
ch.Receive(ctx, &signal)
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
TaskQueue: "my-task-queue",
ScheduleToCloseTimeout: time.Minute,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 1 * time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: 0,
MaximumAttempts: 10,
NonRetryableErrorTypes: []string{},
},
})
// This activity will be executed everytime a message is published to the Pub/Sub topic
err := workflow.ExecuteActivity(ctx, "my-activity", signal).Get(ctx, nil)
if err != nil {
logger.Error("my activity", "error", err)
}
if workflow.GetInfo(ctx).GetContinueAsNewSuggested() {
restartCh.Send(ctx, true)
}
})
for !done {
selector.Select(ctx)
}
})
pubsubCtx, cancelPubSub := workflow.WithCancel(ctx)
pubsubCtx = workflow.WithActivityOptions(pubsubCtx, workflow.ActivityOptions{
TaskQueue: "my-task-queue",
HeartbeatTimeout: 10 * time.Second,
ScheduleToCloseTimeout: 10 * 365 * 24 * time.Hour, // run this activity forever
WaitForCancellation: true,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 1 * time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: 0,
MaximumAttempts: 0,
NonRetryableErrorTypes: []string{},
},
})
// This activity will listen to the Pub/Sub topic and send signals to the workflow
future := workflow.ExecuteActivity(pubsubCtx, "my-pubsub-listener")
selector := workflow.NewSelector(ctx)
var err error
var restart bool
selector.AddFuture(future, func(f workflow.Future) {
err = f.Get(ctx, nil)
})
selector.AddReceive(restartCh, func(_ workflow.ReceiveChannel, _ bool) {
restart = true
cancelPubSub()
})
selector.Select(ctx)
if restart {
err := workflow.Await(ctx, func() bool {
return workflow.AllHandlersFinished(ctx)
})
if err != nil {
return fmt.Errorf("wait for all handlers to finish: %w", err)
}
err = workflow.NewContinueAsNewError(ctx, MyWorkflow)
if err != nil {
return fmt.Errorf("continue as new: %w", err)
}
return nil
}
if err != nil {
return fmt.Errorf("pubsub listener activity: %w", err)
}
return nil
}
What I want to know is if Temporal is a good use case for that. Although this pubsub service is really simple, it reacts to millions of events per hour. Is Temporal suitable for that? Is this workflow scalable? do you see any issue with this architecture design?
Thanks