Hello folks! I was hoping you could help me with this issue/question:
For this particular use case, I need to process “bursts” of messages, typically ranging from a single message to low thousands of messages. Two interesting characteristics of these bursts are:
- They are unpredictable in when they will arrive. We don’t get them at once, so we don’t know when the last message came.
- Due to downstream API rate limiting, we can only process one message at a time, and maybe even with some delay in between.
I decided to use a Temporal workflow to process these messages. My current idea is:
- Use a “long-running” “singleton” Temporal workflow to process messages.
- Use signals to send messages to the workflow.
- Have a sufficiently long timer to identify when the workflow stops receiving messages and finish it successfully when that happens.
- Use the
SignalWithStartWorkflow
API to send messages to an open/running workflow or start a new one otherwise. I’d need a “fixed” workflow ID to make the API work properly and set the workflow ID reuse policy to allow duplicates.
Here’s some example code to illustrate my approach:
package example
import (
"context"
"go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/workflow"
"time"
)
// Workflow & activity implementation
func MessageProcessingWorkflow(ctx workflow.Context) error {
signalChannel := workflow.GetSignalChannel(ctx, "message_signal")
var err error
inactive := false
for {
selector := workflow.NewSelector(ctx)
timerCtx, cancelTimerCtx := workflow.WithCancel(ctx)
inactivityTimer := workflow.NewTimer(timerCtx, time.Minute)
selector.AddFuture(inactivityTimer, func(future workflow.Future) {
inactive = true
})
selector.AddReceive(signalChannel, func(channel workflow.ReceiveChannel, more bool) {
cancelTimerCtx()
var msg Message
channel.Receive(ctx, &msg)
err = workflow.ExecuteActivity(ctx, ProcessMessage, msg).Get(ctx, nil)
})
selector.Select(ctx)
// If the workflow is inactive for 1 minute, stop the workflow.
if inactive {
return err
}
}
}
func ProcessMessage(ctx context.Context, msg Message) error {
// ...
return nil
}
type Message struct {
// ...
}
// Example "starter"
func sendSignalToWorkflowOptionallyStartingIt(c client.Client) {
c.SignalWithStartWorkflow(
context.TODO(),
"singleton_messaging_processing_workflow",
"message_signal",
nil,
client.StartWorkflowOptions{
// ...
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
MessageProcessingWorkflow,
)
}
Now, assuming this approach has some merit (please, let me know otherwise!), here’s my question:
Is it possible to miss signals during a (possibly) short time window where a workflow execution is finishing up, but the Temporal cluster isn’t yet made aware of that?
The sequence would be:
- The timer fires up.
- The workflow’s
selector.AddFuture
callback starts to execute. - Incoming message signal! At this point, the Temporal cluster doesn’t know the workflow will end before consuming the signal channel again, so I assume it will simply queue the message to this workflow run?
- Workflow finishes executing the callback, eventually finishing the whole workflow cleanly.
Did we lose the message? Is there a way to prevent this? Or is this approach fundamentally flawed?