Recurring Mandate Requirements
Our system needed to handle recurring payment mandates with several specific requirements:
- Frequency Support: Process payments at various intervals (daily, weekly, monthly, quarterly, annually)
- Dynamic Updates: Allow changing payment parameters mid-cycle
- Long-term Operation: Reliably manage mandates that may run for years
- Pause/Resume Control: Enable temporarily pausing and resuming mandates.
We implemented a Temporal workflow solution that handles recurring mandates through a state-machine approach:
workflow
Think mandate as subscription.
we will have 1 worflow for each subscription.
this is the data we are passing to each workflow
type MandateDetails struct {
ID string `json:"ID"`
MerchantID string `json:"MerchantID"`
MandateTxnID string `json:"MandateTxnID"`
AutoExecution bool `json:"AutoExecution,omitempty"`
EndDate time.Time `json:"EndDate,omitempty"`
FirstCollectionDate time.Time `json:"FirstCollectionDate,omitempty"`
FrequencyCode string `json:"FrequencyCode,omitempty"`
FrequencyType string `json:"FrequencyType,omitempty"`
StartDate time.Time `json:"StartDate,omitempty"`
Status string `json:"Status"`
NextExecution time.Time
}
func (w *MandateWorkflow) executeRecurringMandate(ctx workflow.Context) error {
updateChannel := workflow.GetSignalChannel(ctx, signalChannel)
for {
// Calculate next execution time based on frequency
nextExec := w.calculateNextExecution(ctx)
if nextExec.IsZero() {
return nil
}
// Wait until next execution while handling update signals
if err := w.waitForNextExecution(ctx, updateChannel, nextExec); err != nil {
return w.handleError(ctx, "Failed to wait for next execution", err)
}
// Skip execution if paused
if w.isPaused {
continue
}
// Execute required activities
if err := w.executeScheduledAction(ctx); err != nil {
if !w.shouldRetry(err) {
return w.handleError(ctx, "Execution failed", err)
}
w.lastError = err.Error()
continue
}
// Exit if end date reached
if !w.shouldContinue(ctx) {
return nil
}
// Continue as new workflow to avoid history growth
if err := w.continueAsNew(ctx); err != nil {
return w.handleError(ctx, "Failed to continue as new", err)
}
}
}
Frequency-Based Scheduling
func (w *MandateWorkflow) calculateNextExecution(ctx workflow.Context) time.Time {
switch w.mandate.FrequencyCode {
case models.FrequencyCodeDAILY:
return workflow.Now(ctx).Add(24 * time.Hour)
case models.FrequencyCodeWEEKLY:
return workflow.Now(ctx).AddDate(0, 0, 7)
case models.FrequencyCodeMONTHLY:
return calculateNextMonthExecution(current, 1)
case models.FrequencyCodeBIMONTHLY:
return calculateNextMonthExecution(current, 2)
case models.FrequencyCodeQUARTERLY:
return calculateNextMonthExecution(current, 3)
case models.FrequencyCodeANNUALLY:
// Handle leap year edge case
if current.Month() == 2 && current.Day() == 29 {
return time.Date(current.Year()+1, 2, 28,
current.Hour(), current.Minute(), current.Second(),
0, current.Location())
}
return current.AddDate(1, 0, 0)
}
}
func (w *MandateWorkflow) waitForNextExecution(ctx workflow.Context, updateChannel workflow.ReceiveChannel, nextExec time.Time) error {
// For manual execution, send reminder before actual execution
if !w.mandate.AutoExecution {
reminderTime := nextExec.Add(reminderOffset) // e.g., 2 minutes before
waitTime = w.getWaitDuration(ctx, reminderTime)
} else {
waitTime = w.getWaitDuration(ctx, nextExec)
}
// Create selector that responds to either timer completion or signals
selector := workflow.NewSelector(ctx)
timer := workflow.NewTimer(ctx, waitTime)
selector.AddFuture(timer, func(f workflow.Future) {
_ = f.Get(ctx, nil) // Timer completed
})
selector.AddReceive(updateChannel, func(c workflow.ReceiveChannel, more bool) {
var update MandateUpdate
c.Receive(ctx, &update)
w.handleUpdate(ctx, update) // Process updates during wait
})
selector.Select(ctx) // Block until either timer or signal
return nil
}
func (w *MandateWorkflow) continueAsNew(ctx workflow.Context) error {
// Calculate next execution before continuing
w.mandate.NextExecution = w.calculateNextExecution(ctx)
// Create a new workflow instance with updated state
return workflow.NewContinueAsNewError(ctx, MandateExecutionWorkflow, w.mandate)
}
is this the best approach? will we see any issues? or can we do better with some other appraoch?