Need help on batch file processing

The current steps we are following are as follows

  1. Download file(F1) from S3 (assume the file contains list of items and it can contains ~200K entries)

    for example: i1
    i2
    i3

  2. For each line in the file, we publish the message to Kafka topic (T1)

  3. There is a Kafka consumer(C1), that consumes from topic(T1) and checks whether the item exists in the DB table or not
    a) if item present in the DB table, it publish a message to another Kafka topic(T2) that item present in db
    for example: {“item”: “i1”, “status”: “present_in_db_table”}

    b) if record not present in the DB table, it publish a message to same Kafka topic(T2) that item not present in db
    for example: {“item”: “i2”, “status”: “not_present_in_db_table”}

  1. There is another Kafka consumer(C2), that consumes messages from Kafka topic(T2) and writes to a db table(Table 1) like below
    -------------------------------------------------------
    | item | status | data |
    |------------|-----------------------------|------------|
    | i1 | exists_in_db | |
    |------------|-----------------------------|------------|
    | i2 | not_exists_in_db | |
    | ----------|------------------------------|------------|
    | i3 | exists_in_db | |
    --------------------------------------------------------

  2. There is another scheduled job(assume runs every 10 mins)(Job 1) and do the following
    a) Checks the number of rows in the db table(Table 1)
    b if the number of rows is equal to number of entries in the file(downloaded from S3), do the do the following
    i) creates a file(F2) with having status equal to “exists_in_db” and upload to S3
    i) and Sends POST Request to a REST service(internal to our Organisation)
    for example, https://internalservice.com/request_submit
    data: {
    “path”: “S3_path”
    }

  3. The internal REST receives the request and do the following for each item
    a) Down the file F2
    a) for each item in the F2,
    reads the data from its own db table(Table 2) and publish a messages to another Kafka topic (T3)

  4. There is another Kafka consumer(C2), that consumes messages from Kafka topic(T3) and updates the data field Table 1 that mentioned in the Step 4
    -------------------------------------------------------
    | item | status | data |
    |------------|-----------------------------|------------|
    | i1 | exists_in_db | d1 |
    |------------|-----------------------------|------------|
    | i2 | not_exists_in_db | |
    | ----------|------------------------------|------------|
    | i3 | exists_in_db | d3 |
    --------------------------------------------------------

  5. There is another scheduled job(assume runs every 10 mins)(Job 2) and do the following
    a) if the data populated for all the items whose status is exists_in_db then it creates a output file(F3) with data field values
    the sample data in the output file(F3) is:
    i1, d1
    i3, d3
    b) copies the outfile to Amazon S3
    c) Sends a notification to the team that file(F1) processing completed with output file(F3) path in S3

We want to leverage Temporal for the above process, can you please help me how can do above process in the Temporal?

It is hard to understand what is the use case for what you are describing. The sequence of steps describes a specific implementation. Would you describe your requirements from the high level independently from DB and Kafka details?

Hi maxim,

On high level, the steps are as follow

  1. Download file S3
  2. For each record in the file, publish a message to Kafka Topic(T1)
  3. There is a kafka consumer, that consumes the messages from the above topic(T1) and apply transformations and publish a message to another Kafka topic(T2)
  4. Now I want to consume messages from Kafka topic(T2) and when ever I received all the messages belongs to my downloaded file then mark the processes as complete

Is having a Kafka topic a hard requirement? It is possible to use it with Temporal, but it adds complexity which is not needed. Is your use case about transforming a file to another file one record at a time?

Yes, using Kafka is a hard requirement. The Kafka topic belongs to other team, so we can’t control that.

Yes our use case is, transforming a file to another file one record at a time and the transformation of all the records in the file completed, need to send notification to the user that process completed.

Is only sending notification to another team through Kafka is a hard requirement or passing each record through Kafka is a hard requirement as well?

passing each record through Kafka is a hard requirement. The other team act on the record, performs some transformations over the record(like calling third party services, etc…) and publishes the transformed record another Kafka topic(T1). So, our team consumes from Kafka topic(T1) and when I receive, response for all my records, need to send notification to the user.

I don’t think Temporal is a good fit for this use case. Temporal scales out very well for a large number of entities. But each entity should have limited throughput. In your case, you need a single entity with a large throughput. It is possible to implement it with Temporal, but it is not going to have clear benefits over your existing architecture.

Here entity means file? If yes, What if my file contains less records like 10K? If no, can I know what is entity meaning here?

Entity means workflow execution (instance). A single workflow instance is not designed for high throughput. Temporal scales with the number of workflow executions.

In above, you mentioned, my use case can be implemented with Temporal. Can I know how can we implement with Temporal, so that, we can give it a try(at least for experimentation purpose), because here throughput is okay for me as i can wait for a day to complete whole process.

How long does the processing of each item take? What is the maximum rate (items/second) of item processing that you expect?

I usually advise against using Temporal for use cases that are not a very good fit. Because I’ve seen cases when someone implemented a use case that is not a good fit and later used it as a way to “prove” that Temporal is not a good fit for an organization.

Temporal is not intended as a replacement for big data or streaming platforms. It is used extensively as a control plain for such platforms, but it is not a replacement for the data plain.

Each item can take ~1-2 Minutes. We are expecting ~10 items/sec even if it is less also fine. My file can contain ~200K items and expect to finish in 4-5 days.

In this case, Temporal is a reasonable fit. How do you verify that every record in the file was processed?
Do you need to store the id of every record before sending it to Kafka for processing?

Currently we are storing every record in the DB before sending it to Kafka to verify later that every record got processed.

I see. Thanks a lot for the additional context.

Let’s see how this workflow could be designed if the file was small (around 1k records):

  1. The first activity downloads file from S3
  2. The next activities should be routed to the same host as the downloaded file using host specific task queue (like in the fileprocessing sample).
  3. The second activity would parse the file to extract IDS of all the messages and return them into the workflow. The workflow stores the IDs in a set (in Java SDK this set would be a workflow object field).
  4. The third activity would parse the file and send a message to Kafka for each record.
  5. The third activity would heartbeat after each record including the last processed record offset into the heartbeat. This offset would be available if the activity times out and is retried.
  6. A Kafka listener receives replies from the external service. For each reply it sends a signal to the workflow.
  7. Upon receiving a signal the workflow removes the correspondent record id from the set of ids. If set is empty an activity that reports the completion of the processing is executed. Then the workflow completes.

The above design will not work with 200k records as a single workflow cannot process that many signals. There are various ways to resolve this.

The first option is to not send all the 200k records to the external service at once. It depends on how external service processes them. If it is possible then the original design will change to

  1. The first activity downloads file from S3
  2. Execute a child workflow to process the file.
  3. The child workflow activities should be routed to the same host as the downloaded file using host specific task queue (like in the fileprocessing sample).
  4. The first child workflow activity would parse the file to extract a page of IDS for 1k messages and return them into the workflow. The workflow stores the IDs in a set.
  5. The second child workflow activity would parse the file and send a message to Kafka for each record.
  6. The second child workflow activity would heartbeat after each record including the last processed record offset into the heartbeat. This offset would be available if the activity times out and is retried.
  7. A Kafka listener receives replies from the external service. For each reply it sends a signal to the child workflow.
  8. Upon receiving a signal the workflow removes the correspondent record id from the set of ids.
  9. If set is empty and there are more IDs in the file, the child workflow calls continue as new passing the offset of the last processed record to the next workflow run which repeats the process from (4)
  10. If the whole file is processed then the child workflow completes.
  11. An activity that reports the completion of the processing is executed. Then the parent workflow completes.

If it is not possible to send records in batches as all the records should be sent for processing before any response is sent back, then the following design should help:

  1. The first activity downloads file from S3
  2. The following activities should be routed to the same host as the downloaded file using host specific task queue (like in the fileprocessing sample).
  3. The next activity counts the number of records in the file.
  4. Then it creates a child workflow for each 1k records. Each child gets a range of records assigned to it.
  5. Then a child runs activities that load the set of IDs from its range from the file and then sends a message to Kafka for each record in its range.
  6. The child workflow waits for signals for each of its records. Note that the response should contain enough information to construct ID of a child workflow to send signal to.
  7. When all signals are received a child completes.
  8. When all the children complete then the parent workflow executes the notification activity and complete.

The extreme version of the last design is when a workflow is created for each record. In this case, as the parent cannot have 200k children it has to create a tree of children. For example, a parent with 1000 children with each of them having 1000 children gives 1 million child workflows. Workflow per record is useful in cases when each record processing is pretty complex. I don’t think it is needed in your case.

Thank you maxim for the detailed explanation.