Hi everyone,
I’m currently working on a Temporal workflow that needs to handle signals continuously and run indefinitely. I’ve implemented a pattern to manage this, but I’m unsure if it’s correct or optimal. Specifically, I’m looking for feedback on how I’m using signals and handling the infinite nature of the workflow. Here’s a simplified version of my code:
const continueAsNewFrequency = 1000
type MyExampleWorkflowRequest struct {
Field string
PendingSignals []string
}
type MyExampleWorkflowResponse struct{}
func MyExampleWorkflow(ctx workflow.Context, req *MyExampleWorkflowRequest) (*MyExampleWorkflowResponse, error) {
ctx = workflow.WithActivityOptions(ctx,
workflow.ActivityOptions{
TaskQueue: activitytaskqueue.ID,
ScheduleToCloseTimeout: 10 * time.Minute,
StartToCloseTimeout: time.Minute,
ScheduleToStartTimeout: time.Minute,
},
)
signalReceiveCount := 0
for _, signal := range req.PendingSignals {
var resp *activities.MyExampleActivityResponse
err := workflow.
ExecuteActivity(ctx, a.MyExampleActivity, activities.MyExampleActivityRequest{Field: signal}).
Get(ctx, &resp)
if err != nil {
return nil, err
}
req.Field = signal
signalReceiveCount++
}
signalsCh := workflow.GetSignalChannel(ctx, "UPDATE")
var pendingSignals []string
for {
var signal string
_ = signalsCh.Receive(ctx, &signal)
signalReceiveCount++
if signalReceiveCount >= continueAsNewFrequency {
pendingSignals = append(pendingSignals, signal)
break
}
var resp *activities.MyExampleActivityResponse
err := workflow.
ExecuteActivity(ctx, a.MyExampleActivity, activities.MyExampleActivityRequest{Field: signal}).
Get(ctx, &resp)
if err != nil {
return nil, err
}
req.Field = signal
}
// drain the signal channel
for {
var signal string
more := signalsCh.ReceiveAsync(&signal)
if !more {
break
}
pendingSignals = append(pendingSignals, signal)
}
return &MyExampleWorkflowResponse{},
workflow.NewContinueAsNewError(ctx,
&MyExampleWorkflowRequest{
Field: req.Field,
PendingSignals: pendingSignals,
},
)
}
After I reach the signal count limit, I interrupt handling the signals and drain the signal channel. I don’t process pending signals because I want to avoid receiving signals while I’m processing others. This is to prevent issues when processing signals is slow or when signals are sent very fast. Instead, I gather the pending signals and pass them to the next workflow run.
My Question is: Does the way I’m managing the infinite loop by using Signals
+ ContinueAsNewError
make sense in this context? Are there better patterns or practices I should consider for maintaining an infinite workflow?
From my research in community posts, I often see examples that check if the signal channel has pending data and then process them. However, this might result in a never-ending workflow if signals keep coming in rapidly.