Best practice for streaming large file (~2GB) where each line triggers a new activity

Hi all,

I’m using the Go SDK and need advice on handling a large file (~2GB) where each line should trigger a separate activity.

My main challenge:
I don’t want to read the entire file into memory or load it all before launching the activities. I want to stream the file line-by-line, and for each line, launch a new activity.

What’s the best Temporal-native way to do this?

Is it possible to stream the file from within an activity and push data to the workflow as it reads? Or should the file reader live outside Temporal and communicate with the workflow as it streams?

I’d appreciate any guidance on the right architecture for this.

Thanks.

Hi,
you could use a payload codec to store the payload in external data stores (S3) and replaces the payload in Temporal with a reference (such a key or url). When decoding, the codec retrieves the data from an external store using the reference. See the project linked in the documentation here: Codec Server - Temporal Platform feature guide | Temporal Platform Documentation (you might need to disable the deadlock detector workflow package - go.temporal.io/sdk/workflow - Go Packages)

Another way is to use the " worker-specific task queues" approach, documented here samples-go/fileprocessing at main · temporalio/samples-go · GitHub

Let me know if this helps

Antonio

Perhaps you could have a long running activity which reads the file and heartbeats with the activity’s progress (the file position of the data processed so far). Now the activity can crash or be stopped and be restarted, and it can use the heartbeat details passed to the activity to find out where it can resume.

Note that while one activity can’t directly start another activity, the activity can use the Temporal client to do anything that the client can: start a workflow, send a signal to a workflow, etc. When a workflow receives a line to be processed, it could start an activity to process that line.

This is pseudocode:

file_position = activity_info.heartbeat_details.progress OR 0

seek_to_file_position(file_position)

loop
   line = read_line_from_file()
   temporal_client.get_workflow().signal_with_start(line);
   heartbeat({ progress: get_current_file_position() })

This gives you an “at least once” execution: the activity could crash between sending the signal and heartbeating process, and it would then send the same signal again when it restarted… but you wouldn’t lose any lines because the activity crashed, had a network glitch, etc.

Keep in mind that while Temporal can run millions of workflows in parallel, the number of events that any one workflow can process per second is limited. Thus you might, for example, want to split sending signals across multiple workflow instances.

If individual lines could be large, you might prefer to send the file position through the workflow to the activity instead of the line itself. (The activity would need access to the file to seek to that position and read the line for itself).