Scale Temporal workflow

I am new to temporal
I am trying to build a migration tool using Temporal with following steps

  1. It will connect with external Database to fetch records and those records ID can be upto millions of ID.
  2. One we receive records ID, then it should connect with AWS s3 bucket(source) to get file
  3. Once file is found, it has to download and compute hash at runtime and extract mimetype
  4. now, it should connect to RestAPI to submit the data like hash, mimetype , this API should return the Unique ID called as URNID
  5. Now, we should copy the file to another s3(Destination)bucket with ID name as URNID
  6. now, this URNID should update the external Database as mention in steps one

Which approach you should suggest , if we have to deploy this in pod to handle high load

How big are the files? Temporal and gRPC have size limits (see here).

Can you explain more about the millions of IDs? Temporal has limits on a single workflow’s history (see here), but it scales very well on the number of workflows.

How big are the files? Temporal and gRPC have size limits (see here).

files are average 2 MB

Can you explain more about the millions of IDs? Temporal has limits on a single workflow’s history (see here), but it scales very well on the number of workflows.

When I say millions of ID’s , I mean roughly around 100K unique guid like
(‘bf3f200a-9a5c-478e-a7bf-a141ed633a3d’),
(‘c8c30a68-8ad0-4d11-9b81-469ea2a94984’),
(‘c98717d1-aabd-4811-aaf6-17a7b1fe9f5e’)

Currently , I was thinking to childworkflow (with fanout pattern) for all the activities and with parent workflow will fetch the IDs from external DB .
After running this approach , we got this.

Workflow history count exceeds limit"

@GSmithAppsTemporal Could you help or suggest if how we move further or if the mentioned approach we can go?
Thanks in advance

Yes those are big files, so you may have to use the techniques described in the Troubleshoot the blob size limit error, in particular, passing references to the files rather than passing the files themselves.

Regarding that error, yes, you’re running into the error I was mentioning in the second statement above about history size limits.

I’m not sure which SDK you’re using, but I’d suggest looking through the batch samples

Thanks for reply,
I am using .NET SDK
I am wondering , I was able to download s3 bucket file which is 10MB , generate Hash from it . Though it was slow but didn’t have any error.

for Limit error I used this code

 // If workflow history length exceeds threshold, continue as new to avoid exceeding limits
  if (Workflow.CurrentHistoryLength > maxHistoryLength)
  {
      Console.WriteLine("\u001b[1m\u001b[31mHistory length {0} exceeded limit, continuing as new at batchStart={1}\u001b[0m", Workflow.CurrentHistoryLength, batchStart + totalProcessed);
      throw Workflow.CreateContinueAsNewException((MoneyTransferWorkflow wf) => wf.RunAsync(details, batchStart + totalProcessed));
  }

  // If we processed a full window, continue with next window
  if (totalProcessed == windowSize * batchSize)
  {
      Console.WriteLine($"Sliding window complete, continuing to next window: start={batchStart + windowSize * batchSize}");
      throw Workflow.CreateContinueAsNewException((MoneyTransferWorkflow wf) => wf.RunAsync(details, batchStart + windowSize * batchSize));
  }

I am using childworkflow with sliding window and fanout
Is this correct approach to scale when we deploy this in production ?
looking forward to hear from you

@GSmithAppsTemporal this is sample code for POC ,

 [WorkflowRun]
 public async Task<string> RunAsync(PaymentDetails details, int batchStart = 0)
 {
     int totalProcessed = 0;
     for (int w = 0; w < windowSize; w++)
     {
         int offset = batchStart + w * batchSize;
         var recordIds = await Workflow.ExecuteChildWorkflowAsync(
             (FetchRecordIdsWorkflow wf) => wf.RunAsync(offset, batchSize)
         );
         Console.WriteLine($"Fetched {recordIds?.Count ?? 0} record IDs for batch starting at index {offset}");

         if (recordIds == null || recordIds.Count == 0)
             break;

         // FAN-OUT: process all records in parallel
         var tasks = new List<Task>();
         foreach (var recordId in recordIds)
         {
             tasks.Add(ProcessRecordAsync(recordId));
         }
         await Task.WhenAll(tasks); // FAN-IN: wait for all to finish

         totalProcessed += recordIds.Count;
         if (recordIds.Count < batchSize)
             break;
     }

     // If workflow history length exceeds threshold, continue as new to avoid exceeding limits
     if (Workflow.CurrentHistoryLength > maxHistoryLength)
     {
         Console.WriteLine("\u001b[1m\u001b[31mHistory length {0} exceeded limit, continuing as new at batchStart={1}\u001b[0m", Workflow.CurrentHistoryLength, batchStart + totalProcessed);
         throw Workflow.CreateContinueAsNewException((MoneyTransferWorkflow wf) => wf.RunAsync(details, batchStart + totalProcessed));
     }

     // If we processed a full window, continue with next window
     if (totalProcessed == windowSize * batchSize)
     {
         Console.WriteLine($"Sliding window complete, continuing to next window: start={batchStart + windowSize * batchSize}");
         throw Workflow.CreateContinueAsNewException((MoneyTransferWorkflow wf) => wf.RunAsync(details, batchStart + windowSize * batchSize));
     }

     return "Done";
 }

 private async Task ProcessRecordAsync(string recordId)
 {
     try
     {
         Console.WriteLine($"Processing record ID: {recordId}");
         var downloadResult = await Workflow.ExecuteChildWorkflowAsync(
             (DownloadFileWorkflow wf) => wf.RunAsync(recordId)
         );
         Console.WriteLine($"Downloaded file for record ID: {recordId}, result: {downloadResult}");

         var hashMimeResult = await Workflow.ExecuteChildWorkflowAsync(
             (ComputeHashAndMimeWorkflow wf) => wf.RunAsync(downloadResult)
         );
         Console.WriteLine($"Computed hash and mime for record ID: {recordId}, result: {hashMimeResult}");

         var vrnResult = await Workflow.ExecuteChildWorkflowAsync(
             (GetVRNFromApiWorkflow wf) => wf.RunAsync(hashMimeResult)
         );
         Console.WriteLine($"Received VRN for record ID: {recordId}, VRN: {vrnResult}");

         var size = await Workflow.ExecuteChildWorkflowAsync(
             (SizeFileWorkflow wf) => wf.RunAsync(downloadResult)
         );
         Console.WriteLine($"Received VRN for record ID: {recordId}, VRN: {vrnResult}");

         await Workflow.ExecuteChildWorkflowAsync(
             (UpdateDbWithVRNWorkflow wf) => wf.RunAsync(recordId, vrnResult, size)
         );
     }
     catch (Exception ex)
     {
         // Log the error and continue with other records
         Console.WriteLine($"Error processing record {recordId}: {ex.Message}");
         // Optionally: return, or set a status for this record
     }
 }
  • I don’t see any activities in here. Are you doing activities in the child workflows?
  • In this line (ComputeHashAndMimeWorkflow wf) => wf.RunAsync(downloadResult) I believe you are passing the downloaded file as an argument to a workflow. I would not do this with these large files.
  • Instead of this Workflow.CurrentHistoryLength > maxHistoryLength you should use ContinueAsNewSuggested
  • The continue as new checks are outside the for loop, so I don’t believe they will be used
  • This one isn’t as significant, but in workflow code, you want to use the workflow logger rather than writing to the console.

Also, what triggers this? And Is this a one time job, or will it run many times in production?

  • I don’t see any activities in here. Are you doing activities in the child workflows?

yes, I calling activities in child workflow and created seperated methods for with [Activity] annotation. See below

[Workflow]
public class DownloadFileWorkflow
{
    [WorkflowRun]
    public async Task<string> RunAsync(string recordId)
    {
        Console.WriteLine("Record", recordId);
       
        return await Workflow.ExecuteActivityAsync(
            (IMigrationActivities a) => a.DownloadFileFromS3Async(recordId),
            new ActivityOptions { StartToCloseTimeout = TimeSpan.FromMinutes(5) }
        );
    }
  • In this line (ComputeHashAndMimeWorkflow wf) => wf.RunAsync(downloadResult) I believe you are passing the downloaded file as an argument to a workflow. I would not do this with these large files.

This is just a String value ( i.e file name), probably I should change the variable name.

I tried with temporal 1.7.0 but I was getting "Failed finding child for sequence " then reverted back to 1.0.0

  • The continue as new checks are outside the for loop, so I don’t believe they will be used
  • This one isn’t as significant, but in workflow code, you want to use the workflow logger rather than writing to the console.

Also, what triggers this? And Is this a one time job, or will it run many times in production?

This is one time use and can be used once in while . However, we would like run it on kube pods .

“Failed finding child for sequence”

I see this related post and PR that may help fix this on the next .NET sdk release

I don’t quite understand the conditions being used to break out of the loop. Those look to me like they’ll break out of the loop if the overall process is done, correct?

On a related note, I’m not sure I understand the two conditions and the continue as new:

  • If the code gets to these two conditions and it is not totally finished, I think these two conditions should behave the same (i.e. continue as new).
  • Similarly, I actually think they already do behave the same: isn’t batchStart + windowSize * batchSize the same as batchStart + totalProcessed at this point in the code?

the two condition was a fix for over come from history issue and I think we are good. To help you with the conditions

First condition: Prevents workflow history from getting too large by starting a new workflow run.
Second condition: Continues processing the next batch/window of records in a new workflow run.

@GSmithAppsTemporal so, now I have moved to another approach with creating workflow only i.e no child flows. I think this works better :slight_smile:

Okay cool, is there anything remaining to help with, or is it good on your end?