Producer/Consumer in a Workflow

We have a task where our input is a list of sandbox ids. I want to process them in parallel, but only process no more than 10 at a time. Ideally when one sandbox is processed by an activity, I want to start processing another one. Essentially taking from a backlog pile until everything is processed.

I would like to know what is the suggested way to implement this in a Temporal workflow, since documentation clearly says that we should not use any java synchronization primitives even volatile variables.

Here is an example of how it can be implement (Kotlin), but I’m not sure if my counters can be decremented/incremented in a safe way

fun updateSandboxes(
        input: MyInput,
    ) {
        val backlog = Workflow.newWorkflowQueue<String>(input.sandboxes.size)
        val ongoingWork = Workflow.newWorkflowQueue<String>(10)
        input.sandboxes.forEach {
            backlog.put(it)
        }
        var inProgress = 0
        var processed = 0 
        val total = input.sandboxes.size
        val consumer = Async.function {
            while (true) {
                if (processed >= total) break
                val sandboxId = ongoingWork.poll()
                if (sandboxId != null) {
                    Async.procedure {
                        activity.process(sandboxId)
                        inProgress -= 1
                        processed += 1
                    }
                } else {
                    Workflow.sleep(Duration.ofSeconds(10))
                }
            }
        }
        while (backlog.peek() != null) {
            val sandboxId = backlog.poll()
            if (inProgress < 10) {
                inProgress += 1
                ongoingWork.put(sandboxId)
            } else {
                Workflow.sleep(Duration.ofSeconds(10))
            }
        }
        consumer.get()
    }

I would use WorkflowQueue.take, which blocks if there are no items in the queue instead of Sleeping every 10 seconds.

It is safe to update counters like you do.