Collecting results for bulk operations

I have a case where I have several separate workflows that all need to do a similar activity (interacting with an external API). This API supports bulk requests, and I would like to collect up the requests for several workflows and then send a single request to the API. What is the best way to do this? I’ve tried using a ChildWorkflow, but that does not seem quite right.

Here is a drawing that hopefully makes it more clear.

3 Likes

What is the request rate? The number of requests per batch and number of batches per second?

This is just an example of a use case that we need to support, where we are batching work together into groups. The bulk request might be to an API (happens quickly, seconds or less), or it might be waiting for human action (takes minutes or hours).

Ideally the collector would wait for N requests, or X amount of time before triggering the bulk request, and it would be able to collect a batch for another bulk request while waiting.

The design depends on the rate and batch size. So N/X doesn’t really help :).

1 Like

haha fair enough. Can you help me understand why? Or several options that I should consider?

Let’s say N = 10, and X = 30 minutes, and lets assume it is human work, so the bulk request takes 1 hour to complete.

For such low rate the best design is to have a separate batching workflow. It receives requests from other workflows in the form of signals, then based on timer or batch size executes an activity with the batch update. Then notify all the initial senders through signals about the batch completion.

This design wouldn’t work with 100k N for example :).

Ok that makes sense.

What is the best way to trigger the batch workflow? ExecuteChildWorkflow?

It should always be alive. I would start it when your worker starts and give it unique id like “batch” to ensure that only one instance is running. It should also call continue as new periodically to ensure that history is reset to 0 size.

That makes sense.

Let’s make it a bit more complicated. Lets say I want to batch based on some criteria (such as a company ID) so that each batch is for a specific company ID. Would a batch workflow for each company ID be the best way to go?

Yes, absolutely. Use the company ID as a batch workflow ID in this case.

Ok next challenge. I’m working on implementing a POC of this, but I’ve run into an interesting problem.

I don’t necessarily know the company ID ahead of time. Since there is not an API for workflow.SignalWithStart, I thought I might be able to trigger the batch workflow using workflow.ExecuteChildWorkflow, but this has 2 problems: 1) the batch workflow is now a child of whichever workflow got there first (not ideal) and 2) I’ve discovered that if I send a signal to the workflow before it has started (without specifying the runID) the signal appears to be lost.

I’m not sure if that 2nd problem is a bug or if it is expected behavior.

Here is a sample of the POC I have created, based on samples-go/expense: POC for batching work together. by jmoseley · Pull Request #1 · jmoseley/samples-go · GitHub

Is it possible to use SignalWithStart from within a workflow?

The workaround is to use SignalWithStart from an activity.

Oh that makes sense, I’ll give that a try.

That worked great, thanks for the help.

For anyone coming here later, I updated that branch with the working implementation.

Small nit: we recommend using structures to implement activities. This allows passing dependencies to activities when structure is initialized before registration with a worker.

See greetings example.

I have a similar use case, slightly different in that the workflow needs to wait for 1. a given time or 2. collect N number of signals. I assume batching of signals is the best way to implement. But also tried with workflow.AwaitWithTimeout as used in the Java Signal Batch samples.

The issue is, I loose signals, keep getting Workflow has unhandled signals. The code is in essence the same as modified by @Jeremy_Convoy
The workflow ID is unique, only one instance is running.
I have also tried to drain the channel, but same issue, also tried draining in deferred activity on return. Also no luck.
Any pointer much appreciated.

//....
signalChan := workflow.GetSignalChannel(ctx, BatchSignalName)
	channelSelector := workflow.NewSelector(ctx)
	channelSelector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
		if !doneWaiting || (len(events) < param.BatchMaxSize) {
			var event BatchProcessEvent
			c.Receive(ctx, &event)
			logger.Info(fmt.Sprint("Received signal for event ", event))
			events = append(events, event)
		} else {
			logger.Info(fmt.Sprint("Ignoring signal"))
		}
	})

	timerFuture := workflow.NewTimer(ctx, time.Second*30)
	channelSelector.AddFuture(timerFuture, func(f workflow.Future) {
		doneWaiting = true
	})

	for !doneWaiting || (len(events) < param.BatchMaxSize) {
		channelSelector.Select(ctx)
	}

	var event BatchProcessEvent
	for {
		workflow.GetLogger(ctx).Info("event received.")
		more, ok := signalChan.ReceiveAsyncWithMoreFlag(&event)
		if ok {
			events = append(events, event)
		}
		if more == false {
			break
		}
	}

	if len(events) > 0 {
		logger.Info(fmt.Sprint("processing batch of length ", len(events), "
//...
1 Like

Your code looks OK. How do you validate that you lose signals? Workflow has unhandled signals. message is expected and benign. It appears when the server and the SDK deal with a race condition of receiving a signal while a workflow calls continue as new.

Thank you @maxim for your support.

It appears when the server and the SDK deal with a race condition of receiving a signal while a workflow calls continue as new .

This is exaclty what I am observing. Might be useful if some mechanism is provided to pause the signal channel. From my observation and the docs it appears the workflow code is periodically and continuously evaluated, it is just not clear how to structure a for{ select{}} in such a fashion that new messages are only consumed while in the select{} loop, as in golang. It seems the signals are buffered.

I validate by placing a unique message, timestamp in my case in the message.
The batch activity triggers after N signals, and subsequently I see the message unhandled signals, when the worflow"restarts" in the console log. In the ui the signals appear after the activity. The continue as new instance then only consumes newer messages, not the missed ones.

If you post the complete reproduction I can look at it. My guess is that you are not draining the channel correctly before calling continue as new. I’m pretty sure that the signals are delivered to that channel in the situation you described.

Here a test case for producing the results. Test logs included.

https://github.com/windhooked/samples-go/tree/main/signal-batching

Another observation I notice that an additional empty signal is recieved.