I have a case where I have several separate workflows that all need to do a similar activity (interacting with an external API). This API supports bulk requests, and I would like to collect up the requests for several workflows and then send a single request to the API. What is the best way to do this? I’ve tried using a ChildWorkflow, but that does not seem quite right.
Here is a drawing that hopefully makes it more clear.
This is just an example of a use case that we need to support, where we are batching work together into groups. The bulk request might be to an API (happens quickly, seconds or less), or it might be waiting for human action (takes minutes or hours).
Ideally the collector would wait for N requests, or X amount of time before triggering the bulk request, and it would be able to collect a batch for another bulk request while waiting.
For such low rate the best design is to have a separate batching workflow. It receives requests from other workflows in the form of signals, then based on timer or batch size executes an activity with the batch update. Then notify all the initial senders through signals about the batch completion.
This design wouldn’t work with 100k N for example :).
It should always be alive. I would start it when your worker starts and give it unique id like “batch” to ensure that only one instance is running. It should also call continue as new periodically to ensure that history is reset to 0 size.
Let’s make it a bit more complicated. Lets say I want to batch based on some criteria (such as a company ID) so that each batch is for a specific company ID. Would a batch workflow for each company ID be the best way to go?
Ok next challenge. I’m working on implementing a POC of this, but I’ve run into an interesting problem.
I don’t necessarily know the company ID ahead of time. Since there is not an API for workflow.SignalWithStart, I thought I might be able to trigger the batch workflow using workflow.ExecuteChildWorkflow, but this has 2 problems: 1) the batch workflow is now a child of whichever workflow got there first (not ideal) and 2) I’ve discovered that if I send a signal to the workflow before it has started (without specifying the runID) the signal appears to be lost.
I’m not sure if that 2nd problem is a bug or if it is expected behavior.
Small nit: we recommend using structures to implement activities. This allows passing dependencies to activities when structure is initialized before registration with a worker.
I have a similar use case, slightly different in that the workflow needs to wait for 1. a given time or 2. collect N number of signals. I assume batching of signals is the best way to implement. But also tried with workflow.AwaitWithTimeout as used in the Java Signal Batch samples.
The issue is, I loose signals, keep getting Workflow has unhandled signals. The code is in essence the same as modified by @Jeremy_Convoy
The workflow ID is unique, only one instance is running.
I have also tried to drain the channel, but same issue, also tried draining in deferred activity on return. Also no luck.
Any pointer much appreciated.
//....
signalChan := workflow.GetSignalChannel(ctx, BatchSignalName)
channelSelector := workflow.NewSelector(ctx)
channelSelector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
if !doneWaiting || (len(events) < param.BatchMaxSize) {
var event BatchProcessEvent
c.Receive(ctx, &event)
logger.Info(fmt.Sprint("Received signal for event ", event))
events = append(events, event)
} else {
logger.Info(fmt.Sprint("Ignoring signal"))
}
})
timerFuture := workflow.NewTimer(ctx, time.Second*30)
channelSelector.AddFuture(timerFuture, func(f workflow.Future) {
doneWaiting = true
})
for !doneWaiting || (len(events) < param.BatchMaxSize) {
channelSelector.Select(ctx)
}
var event BatchProcessEvent
for {
workflow.GetLogger(ctx).Info("event received.")
more, ok := signalChan.ReceiveAsyncWithMoreFlag(&event)
if ok {
events = append(events, event)
}
if more == false {
break
}
}
if len(events) > 0 {
logger.Info(fmt.Sprint("processing batch of length ", len(events), "
//...
Your code looks OK. How do you validate that you lose signals? Workflow has unhandled signals. message is expected and benign. It appears when the server and the SDK deal with a race condition of receiving a signal while a workflow calls continue as new.
It appears when the server and the SDK deal with a race condition of receiving a signal while a workflow calls continue as new .
This is exaclty what I am observing. Might be useful if some mechanism is provided to pause the signal channel. From my observation and the docs it appears the workflow code is periodically and continuously evaluated, it is just not clear how to structure a for{ select{}} in such a fashion that new messages are only consumed while in the select{} loop, as in golang. It seems the signals are buffered.
I validate by placing a unique message, timestamp in my case in the message.
The batch activity triggers after N signals, and subsequently I see the message unhandled signals, when the worflow"restarts" in the console log. In the ui the signals appear after the activity. The continue as new instance then only consumes newer messages, not the missed ones.
If you post the complete reproduction I can look at it. My guess is that you are not draining the channel correctly before calling continue as new. I’m pretty sure that the signals are delivered to that channel in the situation you described.