Workflow orchestration for data pipeline activities

Hi,

I’m trying to implement a data pipeline workflow via Temporal Java-SDK.

Here are the sequential steps in my mind:

  1. First read the database rows from a big table(let’s say ten million rows in total) one by one via JDBC stream query.
  2. Pass each row into the next handlers, for example, Elasticsearch for indexing.

The rough idea is:

  1. create a DB-reading activity for reading the rows from the big table
  2. create any required Data-Pipeline activities for each row handler, and use each row as the activity input argument.

But when I tried to implement the activities call inside the main workflow function, I got this issue:
because the dataset is huge, iterate each row from the DB, store them into a list, and put them as the return data, return them from the DB-reading activity to the workflow is impossible, so I’m seeking a way that the DB-reading activity can send data to the workflow continuously without blocking the DB-reading process and workflow so that I can get each streaming row and pass each row to the next Data-Pipeline activities.

I tried to search such topic in this community, and found these:

  1. Reactive not supported: Reactive support within activity.

  2. Send data to the workflow via signal: Passing Activity Stub to (other) Activity Method Blows Call Stack - #3 by alec

From link 2, It seems like I can use signal to send these millions of rows to the workflow one by one. However, it was assuming that the total number of such signals is bounded.

So, my questions here are:

  1. does the DB-reading activity case is signal bounded(I guess not).
  2. If I can’t use the signal for sending data to workflow from activity, How can I achieve my use case, any suggestions?

Many thanks.

Hi found an alternative way instead of using the signals from here:

Have two workflows:

  1. the first one is doing the long polling activity from the DB
  2. start another row-based workflow inside the polling workflow in the first workflow

The cons of this approach are it’s not easy to track the association between the two workflows, but in our case, it seems like it’s not so important.

Hi,
I’m exploring Temporal for the first time, for a use case similar to what’s described here. I’d like to read a potentially large number of database rows and perform some processing on each one.

If I understand correctly, the approach described here and the linked post would be as follows:

  • Workflow A, which calls Activity A
  • Activity A queries the DB and streams the results
  • Within Activity A, create an instance of workflow B for each row.

I put together a quick proof-of-concept and this looks like it works though I’m not sure if there are any caveats to be aware of. Are there any limits for example on Activities invoking other workflows?

Do you need to periodically poll the db or do you read once?

Are there any limits for example on Activities invoking other workflows

Depending on the rate you are creating new executions you should set appropriate namespace rps limits, see here.
Same for your db, meaning you might need to protect overloading your db via dynamic config:

frontend.persistenceMaxQPS - frontend persistence max qps, default 2000
history.persistenceMaxQPS - history persistence max qps, default 9000
matching.persistenceMaxQPS - matching persistence max qps, default 3000

I only need to query the DB once for a single workflow execution. Probably using Spring Data JPA streaming capabilities to process data in a streaming fashion, as described in this post.

I’ve tried 2 separate approaches, which both seem to work for a simple POC:

  1. Create a workflow for each DB row, within the data loading activity, as described above.
  2. Data loading activity calls a signal method of Workflow A, for each DB row, which in turn executes a child workflow.

Any thoughts on which would be the better design?

I only need to query the DB once for a single workflow execution.

One idea is to have an activity that returns one “page” of rows and returns as result, then for each row have your workflow process this data via activity or child workflow (async it seems would be best for you, looks you do not need to wait for those operations to complete inside your workflow right?)

This workflow would need to then call continueAsNew after instantiating the activities/child workflows and pass the current "page number/amount of processed rows) to the next execution. On next run activity can use this to return the next “page” or rows and so on until you have processed all data.

Paging would indeed make the workflow simpler to implement. It will be a less efficient way to load the data though and might not perform well. We are using a MySQL database and if we use LIMIT-based queries, performance tends to suffer at high offsets. Hence the thinking to use a single query and stream the data.

async it seems would be best for you, looks you do not need to wait for those operations to complete inside your workflow right?

That’s correct, everything can be async and we don’t need to wait for operations to complete.

Ok so I assume thus you need to do the streaming inside a long running activity?
In that case make sure the activity heartbeats. In case of failure / timeout you can use last heartbeat details to continue your streaming from the point it previously failed.

For passing data as activity result, note there is a 2mb blob size limit (that translates to single activity input / result), just fyi.

I put together some sample code to show the two approaches I mentioned.

1. Workflow created by Activity
2. Activity signals parent workflow

I’m just generating random data in these examples but the stream would be replaced with a DB query.

I’m not using heartbeats in these but I’ll look more into those.