Cross contamination between batched work triggered using Async.function()

We have a workflow that is built to accept a large group of tasks, divide them into batches of a specific size and then process each batch in parallel. We are currently achieving this with something like:

// Workflow Method
override fun processTasks(tasks: List<Task>): List<Result> {
    val results = tasks
        .chunked(batchSize)
        .map { batch -> function(::processBatch, batch) }
        .flatMap { promise -> promise.get() }
    return results
}

private fun processBatch(tasks: List<Task>): List<Result> {
    // Translate tasks to job inputs
    val jobInputs = tasks.map { activities.getJobInput(task) }
    
    // Run job in another system
    val job = activities.triggerJob(jobInputs)

    // Wait for job completion
    val results = activities.waitForJobCompletion(job)

    return results
}

This works perfectly 99% of the time. However, sometimes we encounter a situation where there seems to be cross contamination between the batches. We have logging that indicates that sometimes the job inputs that are generated by one batch are sometimes passed to the external job trigger by another batch. And then when the job completes the workflow errors out because the expected tasks weren’t processed in the job that the batch was tracking but instead were processed in one of the other jobs that a different batch was tracking.

Important points:

  • These activity functions are well tested in other workflows and have never shown to have any issues except in this specific workflow where they are running in parallel.
  • There always seems to be a replay of the workflow during processing of the batches when we encounter the failure. Maybe related, maybe not. We know this because one of our loggers involved in the process isn’t using a workflow logger and is repeating some log messages it had printed previously.

My questions are:

  1. Should we be calling anything but an activity method inside Async.function()? Are we abusing that API by calling a standard Kotlin function that in turn invokes multiple activities?
  2. Would this be better off running each batch in a small child workflow?

Are functions like chunked, map, and flatMap deterministic?

Well I’m glad you asked me that and made me think it through again. I originally thought they were and my above example is in fact deterministic. However, that example isn’t 100% accurate to our production code, the difference being that the tasks are passed to the workflow method as a set instead of a list. And the chunked function is not deterministic in that case since a set doesn’t guarantee iteration order.

So in the case that the chunked function may produce different batches on a replay, how does that interact with the Async.function() call. Doesn’t that preserve the parameters it was originally invoked with upon a replay?

Async.function is deterministic.

No, Async.function is itself deterministic, but it doesn’t take a non-deterministic input and turn it into a deterministic output.

Yeah, I had a fundamental misunderstanding of what Async.function was doing. I thought it preserved inputs/outputs in the workflow history kind of like an activity would. Thank you both for your help, we reviewed our workflow and think it’s the fact we are supplying a Set as input to the workflow is causing our problem.