Hi,
I have been looking into the timer example in go-samples and have run into an issue when trying to extend it slightly. In the example there is a timer that if triggered executes SendEmailActivity if the order processing is not completed within 3 seconds. Now imagine that the SendEmailActivity is unstable and we want to use retry policies. If the OrderProcessingActivity completes while the SendEmailActivity is retrying there is no way for the workflow to proceed. Unfortunately I’ve not been able to reproduce this blocking behaviour with a test. Is there any way to model it such that we cover this scenario?
Here is the code to reproduce the issue.
// SampleTimerWorkflow workflow definition
func SampleTimerWorkflow(ctx workflow.Context, processingTimeThreshold time.Duration) error {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)
childCtx, cancelHandler := workflow.WithCancel(ctx)
selector := workflow.NewSelector(ctx)
// In this sample case, we want to demo a use case where the workflow starts a long running order processing operation
// and in the case that the processing takes too long, we want to send out a notification email to user about the delay,
// but we won't cancel the operation. If the operation finishes before the timer fires, then we want to cancel the timer.
var processingDone bool
f := workflow.ExecuteActivity(ctx, OrderProcessingActivity)
selector.AddFuture(f, func(f workflow.Future) {
processingDone = true
// cancel timerFuture
cancelHandler()
})
// use timer future to send notification email if processing takes too long
timerFuture := workflow.NewTimer(childCtx, processingTimeThreshold)
selector.AddFuture(timerFuture, func(f workflow.Future) {
if !processingDone {
ao := workflow.ActivityOptions{
StartToCloseTimeout: (1 * time.Second),
ScheduleToCloseTimeout: (1 * time.Minute),
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 5 * time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: 100 * time.Second,
MaximumAttempts: 0,
},
}
childCtx := workflow.WithActivityOptions(ctx, ao)
// processing is not done yet when timer fires, send notification email
if err := workflow.ExecuteActivity(childCtx, SendEmailActivity).Get(childCtx, nil); err != nil {
workflow.GetLogger(childCtx).Error("ExecuteActivity SendEmailActivity")
}
}
})
// wait the timer or the order processing to finish
selector.Select(ctx)
// now either the order processing is finished, or timer is fired.
if !processingDone {
// processing not done yet, so the handler for timer will send out notification email.
// we still want the order processing to finish, so wait on it.
selector.Select(ctx)
}
workflow.GetLogger(ctx).Info("Workflow completed.")
return nil
}
func OrderProcessingActivity(ctx context.Context) error {
logger := activity.GetLogger(ctx)
logger.Info("OrderProcessingActivity processing started.")
timeNeededToProcess := time.Second * 10
time.Sleep(timeNeededToProcess)
logger.Info("OrderProcessingActivity done.", "duration", timeNeededToProcess)
return nil
}
func SendEmailActivity(ctx context.Context) error {
return fmt.Errorf("some error")
}```