I’m trying to implement a batch data processing job with multiple steps as a temporal workflow. The initial activity needs to read from entries from a database and make the records available to other activities for processing in a parallel manner. Ideally I’d like to stream these records (through an equivalent of a go channel) and have multiple consumer activities read from it.
I haven’t been able to figure out how to achieve this in temporal yet. The closest I’ve got is
func (a *PolicyApplier) StreamInventory(ctx context.Context, realm types.Realm, respChan workflow.SendChannel) error {
itemChan := make(chan *types.Inventory, 100)
go func() {
a.store.GetInventory(ctx, realm.StoreID(), itemChan)
close(itemChan)
}()
count := 0
for item := range itemChan {
respChan.Send(ctx, item)
count++
if count%10 == 0 {
activity.RecordHeartbeat(ctx, count)
}
}
return nil
}
But this doesn’t work as the activity doesn’t have access to the workflow context and can’t send to the workflow channel variant. I understand that the workflow can not pass along anything that can’t be serialized to the activity since its remote. My assumption is the the workflow.Channel is implemented as a (somehow) serializable equivalent of a regular go channel but perhaps I’m wrong.
If the records are small, the first activity could return the records to the workflow, and the workflow could call the other activities with the records.
If the records are too large to comfortably pass through the Temporal server, the first activity could save the record somewhere (e.g. to disk or to S3, etc.) and then pass a reference to the saved data back to the workflow (the disk filename, or S3 bucket and key, etc). The workflow could then call the other activities with the location of data.
If you want to pass the records in memory from the first activity to the other activities so that you don’t need to save the data, you need to ensure that all the activities involved run in the same worker so that they share the same memory space. (See Best way to streaming data between activities in Temporal - #7 by antonio.perez) A disadvantage is lower availability: if your single worker crashes or you restart it for a code update, your system will need to wait until it has restarted to continue processing.
And finally, if your activities might be simply transforming data (as opposed to taking actions such as making API calls), and the processing you want to do might fit naturally into a MapReduce or other data processing pipeline system, you might perhaps want to move the data processing into such a data processing system, and then use workflows to control the data processing system through its API.
Thanks for your response. I managed to resolve the issue using a cache and task queue in Redis. In case this is useful to anyone else searching for a similar solution, I already had a redis cache running along side and accessible to my workers. I updated the initial activity to populate a task queue in redis and make the other activities consume the task queue from redis. I established a sentinel key to mark the end of queue and added it at the end of the task queue once the producer activity was done. Each consumer activity would read the end of queue entry, requeue it and complete successfully.
Finally, I added a cleanup activity that removes the whole queue once all consumers are finished.