Validating Signal Handling Pattern in Infinite Workflow

Hi everyone,

I’m currently working on a Temporal workflow that needs to handle signals continuously and run indefinitely. I’ve implemented a pattern to manage this, but I’m unsure if it’s correct or optimal. Specifically, I’m looking for feedback on how I’m using signals and handling the infinite nature of the workflow. Here’s a simplified version of my code:

const continueAsNewFrequency = 1000

type MyExampleWorkflowRequest struct {
	Field          string
	PendingSignals []string
}

type MyExampleWorkflowResponse struct{}

func MyExampleWorkflow(ctx workflow.Context, req *MyExampleWorkflowRequest) (*MyExampleWorkflowResponse, error) {
	ctx = workflow.WithActivityOptions(ctx,
		workflow.ActivityOptions{
			TaskQueue:              activitytaskqueue.ID,
			ScheduleToCloseTimeout: 10 * time.Minute,
			StartToCloseTimeout:    time.Minute,
			ScheduleToStartTimeout: time.Minute,
		},
	)

	signalReceiveCount := 0

	for _, signal := range req.PendingSignals {
		var resp *activities.MyExampleActivityResponse
		err := workflow.
			ExecuteActivity(ctx, a.MyExampleActivity, activities.MyExampleActivityRequest{Field: signal}).
			Get(ctx, &resp)
		if err != nil {
			return nil, err
		}

		req.Field = signal
		signalReceiveCount++
	}

	signalsCh := workflow.GetSignalChannel(ctx, "UPDATE")
	var pendingSignals []string
	for {
		var signal string
		_ = signalsCh.Receive(ctx, &signal)

		signalReceiveCount++
		if signalReceiveCount >= continueAsNewFrequency {
			pendingSignals = append(pendingSignals, signal)
			break
		}

		var resp *activities.MyExampleActivityResponse
		err := workflow.
			ExecuteActivity(ctx, a.MyExampleActivity, activities.MyExampleActivityRequest{Field: signal}).
			Get(ctx, &resp)
		if err != nil {
			return nil, err
		}

		req.Field = signal
	}

	// drain the signal channel
	for {
		var signal string
		more := signalsCh.ReceiveAsync(&signal)
		if !more {
			break
		}

		pendingSignals = append(pendingSignals, signal)
	}

	return &MyExampleWorkflowResponse{},
		workflow.NewContinueAsNewError(ctx,
			&MyExampleWorkflowRequest{
				Field:          req.Field,
				PendingSignals: pendingSignals,
			},
		)
}

After I reach the signal count limit, I interrupt handling the signals and drain the signal channel. I don’t process pending signals because I want to avoid receiving signals while I’m processing others. This is to prevent issues when processing signals is slow or when signals are sent very fast. Instead, I gather the pending signals and pass them to the next workflow run.

My Question is: Does the way I’m managing the infinite loop by using Signals + ContinueAsNewError make sense in this context? Are there better patterns or practices I should consider for maintaining an infinite workflow?

From my research in community posts, I often see examples that check if the signal channel has pending data and then process them. However, this might result in a never-ending workflow if signals keep coming in rapidly.

Your code looks fine. I personally would simplify the loop to:

for {
		var signal string
		_ = signalsCh.Receive(ctx, &signal)

		signalReceiveCount++

		var resp *activities.MyExampleActivityResponse
		err := workflow.
			ExecuteActivity(ctx, a.MyExampleActivity, activities.MyExampleActivityRequest{Field: signal}).
			Get(ctx, &resp)
		if err != nil {
			return nil, err
		}

		req.Field = signal
		if signalReceiveCount >= continueAsNewFrequency {
			break
		}
	}

In your implementation, you always pass at least one pending signal.

Hi Maxim,

Thank you for your suggestion and for validating the pattern. I hope this can help other Temporal users as well.