We have a task where our input is a list of sandbox ids. I want to process them in parallel, but only process no more than 10 at a time. Ideally when one sandbox is processed by an activity, I want to start processing another one. Essentially taking from a backlog pile until everything is processed.
I would like to know what is the suggested way to implement this in a Temporal workflow, since documentation clearly says that we should not use any java synchronization primitives even volatile variables.
Here is an example of how it can be implement (Kotlin), but I’m not sure if my counters can be decremented/incremented in a safe way
fun updateSandboxes(
input: MyInput,
) {
val backlog = Workflow.newWorkflowQueue<String>(input.sandboxes.size)
val ongoingWork = Workflow.newWorkflowQueue<String>(10)
input.sandboxes.forEach {
backlog.put(it)
}
var inProgress = 0
var processed = 0
val total = input.sandboxes.size
val consumer = Async.function {
while (true) {
if (processed >= total) break
val sandboxId = ongoingWork.poll()
if (sandboxId != null) {
Async.procedure {
activity.process(sandboxId)
inProgress -= 1
processed += 1
}
} else {
Workflow.sleep(Duration.ofSeconds(10))
}
}
}
while (backlog.peek() != null) {
val sandboxId = backlog.poll()
if (inProgress < 10) {
inProgress += 1
ongoingWork.put(sandboxId)
} else {
Workflow.sleep(Duration.ofSeconds(10))
}
}
consumer.get()
}