I couldn’t figure out how to do the Selector way but was able to get it working based on the example in the docs by moving my timer (expire) inside the activity.
func MyWorkflow(ctx workflow.Context, request MyWorkflowRequest) error {
// Register signal channels
// ...
// Related workflows
relatedWorkflowIds := []string{
//...
}
// Handle cancellation from this point forward, anywhere in the workflow
completionHandlerOptions := workflow.ActivityOptions{
StartToCloseTimeout: expireTime.Sub(currentTime),
HeartbeatTimeout: 5 * time.Second,
WaitForCancellation: true,
}
defer func() {
// This is executed at the very end of the workflow
logger.Info("Beginning cleanup")
cancelled := false
newCtx := ctx
if errors.Is(ctx.Err(), workflow.ErrCanceled) {
cancelled = true
// get a new disconnected context
newCtx, _ = workflow.NewDisconnectedContext(ctx)
logger.Debug("Created a new disconnected context")
}
// Send signal
err := workflow.SignalExternalWorkflow(newCtx, SomeWorkflowId, "", SomeWorkflowSignal, data).Get(ctx, nil)
if err != nil {
logger.Error("Failed to trigger signal", "error", err)
}
// if the workflow was not actually cancelled, we don't need to do anything else
if !cancelled {
logger.Debug("Workflow was not cancelled")
return
}
if len(relatedWorkflowIds) == 0 {
logger.Debug("No related workflows to cancel")
return
}
// Cascade the cancellation to any related workflows
logger.Debug("Cascading cancellation to related workflows", "relatedWorkflowIds", relatedWorkflowIds)
for _, relatedWorkflowId := range relatedWorkflowIds {
err := workflow.RequestCancelExternalWorkflow(newCtx, relatedWorkflowId, "").Get(ctx, nil)
if err != nil {
logger.Error("Failed to cancel related workflow", "relatedWorkflowId", relatedWorkflowId, "error", err)
}
logger.Info("Related workflow cancelled", "relatedWorkflowId", relatedWorkflowId)
}
}()
// Completion handler activity
ctx = workflow.WithActivityOptions(ctx, completionHandlerOptions)
var result string
err = workflow.ExecuteActivity(ctx, CompletionHanlder, currentTime, expireTime).Get(ctx, &result)
if err != nil {
logger.Error("MyWorkflow Completion Activity failed", "Error", err)
return err
}
logger.Info("MyWorkflow Completion Activity completed", "result", result)
return nil
}
Then I created a new activity which just does this:
func CompletionHanlder(ctx context.Context, startTime time.Time, expireTime time.Time) (string, error) {
for {
select {
case <-time.After(1 * time.Second):
activity.RecordHeartbeat(ctx, "")
case <-time.After(expireTime.Sub(startTime)):
log.Println("This Activity is completed!")
return "I expired", nil
case <-ctx.Done():
log.Println("This Activity is canceled!")
return "I was canceled", nil
}
}
}