Creating 10m+ workflows Best Practices

I am planning on transitioning our company’s architecture to utilize temporal, for bullet-proof resiliency. For starters, we will self-host the temporal server on our side.

Use Case/Architecture:
We have a micro-service (A) that is responsible to ingest giant CSV files (sometimes upto 10m rows) from several different sources. Currently, this job runs on a cron, checks if new files are available, and then ingests and persists the 10m rows in DB. We do some validations, checks in between to ensure data integrity, sometimes by even calling external services. Each row that is parsed from the CSV is pushed to a queue (so 10m+ messages) for another micro-service to consume.

We have 2 types of files:

  • Order details
  • Shipping details

Micro-service(B) is responsible for consuming these messages, and acting on it. The main goal of this service is that it must wait for the data from both sources. The data can unfortunately come to us in any order. It then does some external lookups, validates the data in both sources match, and updates the status in DB accordingly.

Here is what I am thinking of my temporal architecture:

Microservice (A): Every time a cron finds a new file, start a new workflow FileProcessorWorkflow

The FIleProcessorWorkflow does the following:

  1. Chunk the file into manageable chunk size inside the workflow
  2. Call an activity which takes the chunk, and then does validations
  3. Call an activity that pushes each row in the chunk to an external queue, with all relevant metadata
  4. Call an activity to update the status of the File in the DB to be processed

MicroService (B): For each unique key (the matching key between the two sources) create a temporal workflow called DataComparisonWorkflow

  1. Create a new workflow the first time a message with that key comes
  2. Call an activity to perform any validations as needed, by calling external services
  3. Wait for the data from the other source(s) to come through
  4. When the next message comes through with the same key, progress the state of the existing workflow, do the data comparison between the two sources, and update the status in the DB accordingly. In the future we might extend to this to 10+ sources and match all of them (Not sure which paradigm is best to achieve this)

Here are some concerns/questions I have:

  • Is it good practice to adapt to this queue based approach which will rapidly create 10m+ workflows per file?
  • Are there any recommended way for “Wait for the data from the other source to come through.” I read about signals, but I was not sure how it fits into the above architecture
  • Any suggestions to make the architecture process more in parallel, so it’s scalable to multiple files coming in at once? Imagine 1000s of files, each with 10m+ rows, get parsed, transformed, validated and then pushed to a queue for matching.
  • Is it better to adopt this queue-based approach for Microservice (B) versus creating a child workflow for each row in the file inside Micro-service (A)? The one concern I have with queue based approach is I could lose some reference to the parent file workflow. If matching fails, I might want to do some sort of ‘compensating functions’ (Micro-service (B) calling Micro-service (A)) to update the state of the File so it gets re-pulled again the next day.

I don’t think you need a queue. At step (3) the activity can call signalWithStart directly to start DataComparisonWorkflow. The activity should rate limit these calls so as not to overload the Temporal cluster. The specific rate limit depends on the cluster size. Ideally this activity would backoff if it gets any errors/timeouts back.

For starters, we will self-host the temporal server on our side.

I would recommend talking to our sales team before going self-service route. You want to spend time implementing your business logic, not learning to operate high scale stateful service reliably.

2 Likes

Thanks @maxim, I might be doing something similar right now from within the activity:

    [Activity]
    public static async Task ProcessingChunkMatchingAsync(List<string> results, FileType fileType)
    {
        foreach (var transactionRow in results)
        {
            Workflow.Logger.LogInformation($"Sending for data integration check: {transactionRow}");

            var options = new WorkflowOptions
            {
                Id = transactionRow.Id,
                TaskQueue = "FILE_TRANSACTION_QUEUE",
            };

            var handle = _client.GetWorkflowHandle<MatchingWorkflow>(transactionRow.Id);

            try
            {
                // Try to signal the workflow
                await handle.SignalAsync(wf => wf.TransactionSignal(transactionRow));
            }
            catch (Exception)
            {
                // Workflow doesn't exist, start it
                Workflow.Logger.LogInformation($"Workflow does not exist, starting new workflow for: {transactionRow}");

                await _client.StartWorkflowAsync(
                    (MatchingWorkflow wf) => wf.RunAsync(transactionRow),
                    options);
            }
        }
    }

I tried playing around with signalWithStart, but couldn’t get a hang of how to use it with the .NET SDK. Any suggestions/recommendation on the above approach? I am trying to think I am exposing my self to some sort of a race condition issue where two transactions with the same id come in different chunks, causing it to attempt to create two new workflows (when it should really create a workflow for one, and signal for the other)

Would also love some input on how to incorporate signalWithStart, and why it’s a better solution

Please add rate limiting to the activity loop to avoid overloading the cluster with starts.