Hi, We have a workflow that listens on a signal forever, in order to avoid the history size issue, we’d like to use the Continue As New feature to start the same workflow and continue to listen to the same signal again.
Please check the code below, the issue here is the sender have no idea(can not) if the workflow needs to ContinueAsNew and keep sending the signals continuously, meanwhile, the workflow does not want to lose any signals that are sent by the sender, so it uses selector.HasPending()
to do the check if the polled signals are read or not. however, it seems like the selector.HasPending()
always returns true.
Not sure if I’m doing the right code on the workflow-code side. or temporal has no such feature support yet for the moment.
The workflow code is written & served by Go, it’s something like the below
func RunWorkflow(ctx workflow.Context) error {
maximumSignals := 100
numberSignals := 0
channel := workflow.GetSignalChannel(ctx, "abc")
selector := workflow.NewSelector(ctx)
selector.AddReceive(channel, func(c workflow.ReceiveChannel, more bool) {
numberSignals += 1
var signalVal interface{}
c.Receive(ctx, &signalVal)
// start a chile workflow for the received signal.
})
for numberSignals < maximumSignals || selector.HasPending() {
// got infinite loop here,
// 1. even numberSignals < maximumSignals,
// 2. but selector.HasPending() always is true.
selector.Select(ctx)
}
return workflow.NewContinueAsNewError(ctx, RunWorkflow)
}
And we have different clients(in Java) are sending the signals continuously to the channel. here for simplicity, I just write this pseudo code in Go as an example
func SignalWorkflow() {
c, err := client.NewClient(client.Options{})
if err != nil {
log.Fatalln("unable to create Temporal client", err)
}
defer c.Close()
val := 0
for {
val += 1
err = c.SignalWorkflow(context.Background(), "test-workflow", "", "abc", val)
if err != nil {
log.Fatalf("error signalling workflow: %s", err)
}
time.Sleep(3 * time.Millisecond)
}
}