Hi team, I have a use case where I plan to send kafka events as signals to a workflow that runs per account. It is possible that lots of events (10k events per account) could be received from the kafka stream. I want to ensure we do not breach the max signal limit by doing the ContinueAsNew
. But using Receive and ReceiveAsync (to drain) is causing lots of duplicate signals received while draining. (Tested in my local environment)
Does this approach look ok? I don’t see any duplicates while running the code below. We can run the workflow for 60 min and do ContinueAsNew
. I will really appreciate some thoughts here.
func sampleSignalCounterWorkflowVersion3(ctx workflow.Context, counter int) error {
logger := workflow.GetLogger(ctx)
logger.Info("Started SignalCounterWorkflow")
ch := workflow.GetSignalChannel(ctx, "channelA")
for i := 0; i < 10; i++ {
var val int
ok := ch.ReceiveAsync(&val)
if !ok {
workflow.Sleep(ctx, 10*time.Millisecond)
} else {
workflow.GetLogger(ctx).Info("Received signal!", zap.Int("value", val))
}
}
// Drain
for {
var val int
ok := ch.ReceiveAsync(&val)
if !ok {
break
} else {
workflow.GetLogger(ctx).Info("Received signal!", zap.Int("value", val))
}
}
return workflow.NewContinueAsNewError(ctx, sampleSignalCounterWorkflowVersion3, 0)
}