I had trouble figuring out how to continue as new after a certain number of activities. The documentation just says “make sure to do an asynchronous drain on the Signal channel” without saying how or mentioning HasPending. Eventually, I came up with something that worked, so I thought I’d share it here for anyone else who needs help:
const LifecycleMaxEvents int = 10000
func LongrunningWorkflow(ctx workflow.Context, params *Parameters) error {
s := workflow.NewSelector(ctx)
var signalVal string
signalHandlers := []struct {
signalName string
handlerFunc func(ctx workflow.Context, params *Parameters, _ *string) func(workflow.ReceiveChannel, bool)
}{
{"signalName", handler},
// ...
}
for _, handler := range signalHandlers {
sigChan := workflow.GetSignalChannel(ctx, handler.signalName)
s.AddReceive(sigChan, handler.handlerFunc(ctx, params, &signalVal))
}
for i := 0; i < LifecycleMaxEvents || s.HasPending(); i++ { // this `or` allows an arbitrary limit to be set and to flush the channel buffer once that limit is reached before continuing as new
s.Select(ctx)
if isDone(ctx, params.Name) { // something here to determine if we're done
return nil
}
}
return workflow.NewContinueAsNewError(ctx, LongrunningWorkflow, params)
}
func handler(ctx workflow.Context, params *Parameters, _ *string) func(workflow.ReceiveChannel, bool) {
return func(c workflow.ReceiveChannel, more bool) {
var signalVal string
c.Receive(ctx, &signalVal)
// do stuff
}
}