Hi there! I’m new to Temporal and trying to get my head around the Go SDK and best practices. I’m trying to create something like this:
Mainly a workflow that has some timers that run in parallel and a signal to stop the whole thing if something outside this workflow indicates it has to happen but still go forward with one important activity.
I initially had all the “outreaches” in a separate child workflow but since the last outreach happens after the parent wrapped up it didn’t seem viable. Is this the best path forward based on my diagrams or am I missing something with how these are set up? Also I’m a bit unsure how to cancel the timers, is it standard practice to merely rely on a boolean to prevent the activity?
package app
import (
"context"
"time"
"go.temporal.io/sdk/workflow"
)
func SomeWorkflow(ctx workflow.Context, name string) (string, error) {
// USER ENTERS FLOW
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
ScheduleToCloseTimeout: time.Minute * 5,
})
selector := workflow.NewSelector(ctx)
contactUser := true
downgradedUser := false
logger := workflow.GetLogger(ctx)
logger.Info("WORKFLOW started")
// USER GETS FIRST OUTREACH XP
logger.Info("SENDING: outreach-0")
workflow.ExecuteActivity(ctx, SomeOutreachActivity, &SomeActivityParams{msg: "#outreach-0"}, SomeActivityParams{msg: "outreach-0"}).Get(ctx, nil)
// QUEUE UP 2nd OUTREACH XP
selector.AddFuture(workflow.NewTimer(ctx, time.Second*3), func(f workflow.Future) {
if contactUser {
logger.Info("SENDING: outreach-1")
workflow.ExecuteActivity(ctx, SomeOutreachActivity, &SomeActivityParams{msg: "#outreach-1"}, SomeActivityParams{msg: "outreach-1"}).Get(ctx, nil)
}
})
// QUEUE UP 3nd OUTREACH XP
selector.AddFuture(workflow.NewTimer(ctx, time.Second*5), func(f workflow.Future) {
if contactUser {
logger.Info("SENDING: outreach-2")
workflow.ExecuteActivity(ctx, SomeOutreachActivity, &SomeActivityParams{msg: "#outreach-2"}, SomeActivityParams{msg: "outreach-2"}).Get(ctx, nil)
}
})
// QUEUE UP LAST OUTREACH XP (occurs after timer-based future downgrade)
selector.AddFuture(workflow.NewTimer(ctx, time.Second*9), func(f workflow.Future) {
if contactUser {
logger.Info("SENDING: outreach-3")
workflow.ExecuteActivity(ctx, SomeOutreachActivity, &SomeActivityParams{msg: "#outreach-3"}, SomeActivityParams{msg: "outreach-3"}).Get(ctx, nil)
}
})
// QUEUE UP USER DOWNGRADE
selector.AddFuture(workflow.NewTimer(ctx, time.Second*6), func(f workflow.Future) {
logger.Info("TIMER: targeting user for downgrade")
if !downgradedUser {
workflow.ExecuteActivity(ctx, SomeDowngradeActivity, &SomeActivityParams{msg: "#user"}, SomeActivityParams{msg: "user"}).Get(ctx, nil)
downgradedUser = true
}
})
// SIGNAL TO END WORKFLOW: downgrade user and prevent future outreaches
var stopSignal = "stop"
stopSignalChan := workflow.GetSignalChannel(ctx, stopSignal)
selector.AddReceive(
stopSignalChan,
func(ch workflow.ReceiveChannel, more bool) {
ch.Receive(ctx, &stopSignal)
contactUser = false
logger.Info("SIGNAL: targeting user for downgrade / no more outreach")
if !downgradedUser {
workflow.ExecuteActivity(ctx, SomeDowngradeActivity, &SomeActivityParams{msg: "#user"}, SomeActivityParams{msg: "user"}).Get(ctx, nil)
}
downgradedUser = true
},
)
// right number of selects?
selector.Select(ctx)
selector.Select(ctx)
selector.Select(ctx)
selector.Select(ctx)
return "done", nil
}
type SomeActivityParams struct {
msg string
}
func SomeOutreachActivity(ctx context.Context, params SomeActivityParams) (string, error) {
return params.msg, nil
}
func SomeDowngradeActivity(ctx context.Context, params SomeActivityParams) (string, error) {
return params.msg, nil
}