We have a workflow that is built to accept a large group of tasks, divide them into batches of a specific size and then process each batch in parallel. We are currently achieving this with something like:
// Workflow Method
override fun processTasks(tasks: List<Task>): List<Result> {
val results = tasks
.chunked(batchSize)
.map { batch -> function(::processBatch, batch) }
.flatMap { promise -> promise.get() }
return results
}
private fun processBatch(tasks: List<Task>): List<Result> {
// Translate tasks to job inputs
val jobInputs = tasks.map { activities.getJobInput(task) }
// Run job in another system
val job = activities.triggerJob(jobInputs)
// Wait for job completion
val results = activities.waitForJobCompletion(job)
return results
}
This works perfectly 99% of the time. However, sometimes we encounter a situation where there seems to be cross contamination between the batches. We have logging that indicates that sometimes the job inputs that are generated by one batch are sometimes passed to the external job trigger by another batch. And then when the job completes the workflow errors out because the expected tasks weren’t processed in the job that the batch was tracking but instead were processed in one of the other jobs that a different batch was tracking.
Important points:
- These activity functions are well tested in other workflows and have never shown to have any issues except in this specific workflow where they are running in parallel.
- There always seems to be a replay of the workflow during processing of the batches when we encounter the failure. Maybe related, maybe not. We know this because one of our loggers involved in the process isn’t using a workflow logger and is repeating some log messages it had printed previously.
My questions are:
- Should we be calling anything but an activity method inside Async.function()? Are we abusing that API by calling a standard Kotlin function that in turn invokes multiple activities?
- Would this be better off running each batch in a small child workflow?