Seeking Guidance in flow for Recurring execution with continue as new pattern

Recurring Mandate Requirements

Our system needed to handle recurring payment mandates with several specific requirements:

  • Frequency Support: Process payments at various intervals (daily, weekly, monthly, quarterly, annually)
  • Dynamic Updates: Allow changing payment parameters mid-cycle
  • Long-term Operation: Reliably manage mandates that may run for years
  • Pause/Resume Control: Enable temporarily pausing and resuming mandates.
    We implemented a Temporal workflow solution that handles recurring mandates through a state-machine approach:
    workflow

Think mandate as subscription.
we will have 1 worflow for each subscription.

this is the data we are passing to each workflow


type MandateDetails struct {
	ID string `json:"ID"`
	MerchantID string `json:"MerchantID"`
	MandateTxnID string `json:"MandateTxnID"`
	AutoExecution bool `json:"AutoExecution,omitempty"`
	EndDate time.Time `json:"EndDate,omitempty"`
	FirstCollectionDate time.Time `json:"FirstCollectionDate,omitempty"`
	FrequencyCode string `json:"FrequencyCode,omitempty"`
	FrequencyType string `json:"FrequencyType,omitempty"`
	StartDate time.Time `json:"StartDate,omitempty"`
	Status string `json:"Status"`
	NextExecution time.Time
}

func (w *MandateWorkflow) executeRecurringMandate(ctx workflow.Context) error {
    updateChannel := workflow.GetSignalChannel(ctx, signalChannel)

    for {
        // Calculate next execution time based on frequency
        nextExec := w.calculateNextExecution(ctx)
        if nextExec.IsZero() {
            return nil
        }

        // Wait until next execution while handling update signals
        if err := w.waitForNextExecution(ctx, updateChannel, nextExec); err != nil {
            return w.handleError(ctx, "Failed to wait for next execution", err)
        }

        // Skip execution if paused
        if w.isPaused {
            continue
        }

        // Execute required activities
        if err := w.executeScheduledAction(ctx); err != nil {
            if !w.shouldRetry(err) {
                return w.handleError(ctx, "Execution failed", err)
            }
            w.lastError = err.Error()
            continue
        }

        // Exit if end date reached
        if !w.shouldContinue(ctx) {
            return nil
        }

        // Continue as new workflow to avoid history growth
        if err := w.continueAsNew(ctx); err != nil {
            return w.handleError(ctx, "Failed to continue as new", err)
        }
    }
}

Frequency-Based Scheduling

func (w *MandateWorkflow) calculateNextExecution(ctx workflow.Context) time.Time {
    switch w.mandate.FrequencyCode {
    case models.FrequencyCodeDAILY:
        return workflow.Now(ctx).Add(24 * time.Hour)
    case models.FrequencyCodeWEEKLY:
        return workflow.Now(ctx).AddDate(0, 0, 7) 
    case models.FrequencyCodeMONTHLY:
        return calculateNextMonthExecution(current, 1)
    case models.FrequencyCodeBIMONTHLY:
        return calculateNextMonthExecution(current, 2)
    case models.FrequencyCodeQUARTERLY:
        return calculateNextMonthExecution(current, 3)
    case models.FrequencyCodeANNUALLY:
        // Handle leap year edge case
        if current.Month() == 2 && current.Day() == 29 {
            return time.Date(current.Year()+1, 2, 28, 
                current.Hour(), current.Minute(), current.Second(),
                0, current.Location())
        }
        return current.AddDate(1, 0, 0)
    }
}
func (w *MandateWorkflow) waitForNextExecution(ctx workflow.Context, updateChannel workflow.ReceiveChannel, nextExec time.Time) error {
    // For manual execution, send reminder before actual execution
    if !w.mandate.AutoExecution {
        reminderTime := nextExec.Add(reminderOffset) // e.g., 2 minutes before
        waitTime = w.getWaitDuration(ctx, reminderTime)
    } else {
        waitTime = w.getWaitDuration(ctx, nextExec)
    }
    
    // Create selector that responds to either timer completion or signals
    selector := workflow.NewSelector(ctx)
    timer := workflow.NewTimer(ctx, waitTime)
    
    selector.AddFuture(timer, func(f workflow.Future) {
        _ = f.Get(ctx, nil)  // Timer completed
    })
    
    selector.AddReceive(updateChannel, func(c workflow.ReceiveChannel, more bool) {
        var update MandateUpdate
        c.Receive(ctx, &update)
        w.handleUpdate(ctx, update)  // Process updates during wait
    })
    
    selector.Select(ctx)  // Block until either timer or signal
    return nil
}
func (w *MandateWorkflow) continueAsNew(ctx workflow.Context) error {
    // Calculate next execution before continuing
    w.mandate.NextExecution = w.calculateNextExecution(ctx)
    
    // Create a new workflow instance with updated state
    return workflow.NewContinueAsNewError(ctx, MandateExecutionWorkflow, w.mandate)
}

is this the best approach? will we see any issues? or can we do better with some other appraoch?

@maxim would love to know if the approach we are using is the best or not.

I don’t see any issues with your design.

NIT: I would add the timer cancellation in the channel AddReceive handler.

Also, consider using workflow.GetInfo(ctx).GetContinueAsNewSuggested() to trigger continue as new.

what do you mean by timer cancellation here? can you please elaborate?

Cancel the context passed to the timer to cancel in case a signal was received.