Hello everyone, I’m trying to write a batcher program for temporals workflow. Go.
I have a huge csv I need to process, which involves validating data fields, then going to our database to check the validity of the data in the csv.
This process per row is very slow when dealing with a 10k row csv so I am implementing a batcher that wraps workflow. Go
For the most part it’s working correctly, but I do get frequent context closed, gorouting blocked errors
func Batch[D any](ctx workflow.Context, data []D, batchFunc BatchFunc[D], opts ...BatchOption) ([]D, error) {
// new options with default values
options := &batchOptions{
batchSize: 10, // default batch size
}
// apply the options
for _, opt := range opts {
opt(options)
}
// timestamp now time to track the total time the batcher takes
startTime := time.Now().UTC()
// total batches
totalBatches := (len(data) + options.batchSize - 1) / options.batchSize
currentBatch := 1 // current batch starting from 1
workflow.GetLogger(ctx).Info(
"starting batcher with batch size",
options.batchSize,
"total batches required",
totalBatches,
"batches",
)
// define a new data slice
var mutatedData []D
// itterate through the current batch size
for start, end := 0, 0; start <= len(data)-1; start = end {
// calculate the end index of the batch
end = start + options.batchSize
// if the end index is greater than the length of the data slice
if end > len(data) {
end = len(data)
}
// get the data slice for the current batch
batchData := data[start:end]
// new workflow waitgroup
wg := workflow.NewWaitGroup(ctx)
// get the time of the current batch start
batchStartTime := time.Now().UTC()
// log the current batch out of the total batches
workflow.GetLogger(ctx).Info(
"starting batch",
currentBatch,
"of",
totalBatches,
"with",
len(batchData),
"items",
)
// increment the current batch
currentBatch++
// itterate through the batch data
for _, bd := range batchData {
d := bd // Should be outside lambda to be captured correctly
// add to the waitgroup
wg.Add(1)
// Start a goroutine in a workflow safe way
workflow.Go(ctx, func(gCtx workflow.Context) {
// defer the waitgroup done
defer wg.Done()
// execute the batch function
mutated, err := batchFunc(gCtx, d)
if err != nil {
workflow.GetLogger(gCtx).Error("error in batch function: ", err)
return // TODO: handle error
}
// append the mutated data to the new data slice
mutatedData = append(mutatedData, mutated)
})
}
// wait for the waitgroup to finish
wg.Wait(ctx)
// log the end time of the current batch
workflow.GetLogger(ctx).Debug(
"batch finished in", time.Since(batchStartTime),
)
}
workflow.GetLogger(ctx).Debug(
"batcher finished successfully in", time.Since(startTime),
)
// return the mutated data and nil error
return mutatedData, nil
}
This is the batcher in full minus the functions for the options.
Now not sure if it’s related but the goroutine error always happens on a database heavy activity, and usually always the same activity too (it’s the expensive one go to many places and validate the csv fields with database info)
So my question is what’s wrong with it (I know it’s not catching errors properly etc this is still very new and working on adding those things once this is ironed out.)
Or at the very least how would one go about debugging an issue like this it’s never the same chunk of data that errors it just happens to usually be the same activity.
If you need to see the workflow batchFunc let me know. but basically in that workflow I just use workflow. Context the one passed in by the workflow. Go (gCtx)
NOTE: I am using temporal.testsuite right now I haven’t actually tested it against a live temporal environment but as far as I’m aware the testsuite is a true to life experience so I don’t think that’s the issue.