How big are the files? Temporal and gRPC have size limits (see here).
Can you explain more about the millions of IDs? Temporal has limits on a single workflow’s history (see here), but it scales very well on the number of workflows.
How big are the files? Temporal and gRPC have size limits (see here).
files are average 2 MB
Can you explain more about the millions of IDs? Temporal has limits on a single workflow’s history (see here), but it scales very well on the number of workflows.
When I say millions of ID’s , I mean roughly around 100K unique guid like
(‘bf3f200a-9a5c-478e-a7bf-a141ed633a3d’),
(‘c8c30a68-8ad0-4d11-9b81-469ea2a94984’),
(‘c98717d1-aabd-4811-aaf6-17a7b1fe9f5e’)
Currently , I was thinking to childworkflow (with fanout pattern) for all the activities and with parent workflow will fetch the IDs from external DB .
After running this approach , we got this.
Workflow history count exceeds limit"
@GSmithAppsTemporal Could you help or suggest if how we move further or if the mentioned approach we can go?
Thanks in advance
Yes those are big files, so you may have to use the techniques described in the Troubleshoot the blob size limit error, in particular, passing references to the files rather than passing the files themselves.
Regarding that error, yes, you’re running into the error I was mentioning in the second statement above about history size limits.
I’m not sure which SDK you’re using, but I’d suggest looking through the batch samples
Thanks for reply,
I am using .NET SDK
I am wondering , I was able to download s3 bucket file which is 10MB , generate Hash from it . Though it was slow but didn’t have any error.
for Limit error I used this code
// If workflow history length exceeds threshold, continue as new to avoid exceeding limits
if (Workflow.CurrentHistoryLength > maxHistoryLength)
{
Console.WriteLine("\u001b[1m\u001b[31mHistory length {0} exceeded limit, continuing as new at batchStart={1}\u001b[0m", Workflow.CurrentHistoryLength, batchStart + totalProcessed);
throw Workflow.CreateContinueAsNewException((MoneyTransferWorkflow wf) => wf.RunAsync(details, batchStart + totalProcessed));
}
// If we processed a full window, continue with next window
if (totalProcessed == windowSize * batchSize)
{
Console.WriteLine($"Sliding window complete, continuing to next window: start={batchStart + windowSize * batchSize}");
throw Workflow.CreateContinueAsNewException((MoneyTransferWorkflow wf) => wf.RunAsync(details, batchStart + windowSize * batchSize));
}
I am using childworkflow with sliding window and fanout
Is this correct approach to scale when we deploy this in production ?
looking forward to hear from you
[WorkflowRun]
public async Task<string> RunAsync(PaymentDetails details, int batchStart = 0)
{
int totalProcessed = 0;
for (int w = 0; w < windowSize; w++)
{
int offset = batchStart + w * batchSize;
var recordIds = await Workflow.ExecuteChildWorkflowAsync(
(FetchRecordIdsWorkflow wf) => wf.RunAsync(offset, batchSize)
);
Console.WriteLine($"Fetched {recordIds?.Count ?? 0} record IDs for batch starting at index {offset}");
if (recordIds == null || recordIds.Count == 0)
break;
// FAN-OUT: process all records in parallel
var tasks = new List<Task>();
foreach (var recordId in recordIds)
{
tasks.Add(ProcessRecordAsync(recordId));
}
await Task.WhenAll(tasks); // FAN-IN: wait for all to finish
totalProcessed += recordIds.Count;
if (recordIds.Count < batchSize)
break;
}
// If workflow history length exceeds threshold, continue as new to avoid exceeding limits
if (Workflow.CurrentHistoryLength > maxHistoryLength)
{
Console.WriteLine("\u001b[1m\u001b[31mHistory length {0} exceeded limit, continuing as new at batchStart={1}\u001b[0m", Workflow.CurrentHistoryLength, batchStart + totalProcessed);
throw Workflow.CreateContinueAsNewException((MoneyTransferWorkflow wf) => wf.RunAsync(details, batchStart + totalProcessed));
}
// If we processed a full window, continue with next window
if (totalProcessed == windowSize * batchSize)
{
Console.WriteLine($"Sliding window complete, continuing to next window: start={batchStart + windowSize * batchSize}");
throw Workflow.CreateContinueAsNewException((MoneyTransferWorkflow wf) => wf.RunAsync(details, batchStart + windowSize * batchSize));
}
return "Done";
}
private async Task ProcessRecordAsync(string recordId)
{
try
{
Console.WriteLine($"Processing record ID: {recordId}");
var downloadResult = await Workflow.ExecuteChildWorkflowAsync(
(DownloadFileWorkflow wf) => wf.RunAsync(recordId)
);
Console.WriteLine($"Downloaded file for record ID: {recordId}, result: {downloadResult}");
var hashMimeResult = await Workflow.ExecuteChildWorkflowAsync(
(ComputeHashAndMimeWorkflow wf) => wf.RunAsync(downloadResult)
);
Console.WriteLine($"Computed hash and mime for record ID: {recordId}, result: {hashMimeResult}");
var vrnResult = await Workflow.ExecuteChildWorkflowAsync(
(GetVRNFromApiWorkflow wf) => wf.RunAsync(hashMimeResult)
);
Console.WriteLine($"Received VRN for record ID: {recordId}, VRN: {vrnResult}");
var size = await Workflow.ExecuteChildWorkflowAsync(
(SizeFileWorkflow wf) => wf.RunAsync(downloadResult)
);
Console.WriteLine($"Received VRN for record ID: {recordId}, VRN: {vrnResult}");
await Workflow.ExecuteChildWorkflowAsync(
(UpdateDbWithVRNWorkflow wf) => wf.RunAsync(recordId, vrnResult, size)
);
}
catch (Exception ex)
{
// Log the error and continue with other records
Console.WriteLine($"Error processing record {recordId}: {ex.Message}");
// Optionally: return, or set a status for this record
}
}
I don’t see any activities in here. Are you doing activities in the child workflows?
In this line (ComputeHashAndMimeWorkflow wf) => wf.RunAsync(downloadResult) I believe you are passing the downloaded file as an argument to a workflow. I would not do this with these large files.
Instead of this Workflow.CurrentHistoryLength > maxHistoryLength you should use ContinueAsNewSuggested
The continue as new checks are outside the for loop, so I don’t believe they will be used
This one isn’t as significant, but in workflow code, you want to use the workflow logger rather than writing to the console.
Also, what triggers this? And Is this a one time job, or will it run many times in production?
I don’t see any activities in here. Are you doing activities in the child workflows?
yes, I calling activities in child workflow and created seperated methods for with [Activity] annotation. See below
[Workflow]
public class DownloadFileWorkflow
{
[WorkflowRun]
public async Task<string> RunAsync(string recordId)
{
Console.WriteLine("Record", recordId);
return await Workflow.ExecuteActivityAsync(
(IMigrationActivities a) => a.DownloadFileFromS3Async(recordId),
new ActivityOptions { StartToCloseTimeout = TimeSpan.FromMinutes(5) }
);
}
In this line (ComputeHashAndMimeWorkflow wf) => wf.RunAsync(downloadResult) I believe you are passing the downloaded file as an argument to a workflow. I would not do this with these large files.
This is just a String value ( i.e file name), probably I should change the variable name.
Instead of this Workflow.CurrentHistoryLength > maxHistoryLength you should use ContinueAsNewSuggested
I tried with temporal 1.7.0 but I was getting "Failed finding child for sequence " then reverted back to 1.0.0
The continue as new checks are outside the for loop, so I don’t believe they will be used
This one isn’t as significant, but in workflow code, you want to use the workflow logger rather than writing to the console.
Also, what triggers this? And Is this a one time job, or will it run many times in production?
This is one time use and can be used once in while . However, we would like run it on kube pods .
I see this related post and PR that may help fix this on the next .NET sdk release
I don’t quite understand the conditions being used to break out of the loop. Those look to me like they’ll break out of the loop if the overall process is done, correct?
On a related note, I’m not sure I understand the two conditions and the continue as new:
If the code gets to these two conditions and it is not totally finished, I think these two conditions should behave the same (i.e. continue as new).
Similarly, I actually think they already do behave the same: isn’t batchStart + windowSize * batchSize the same as batchStart + totalProcessed at this point in the code?
the two condition was a fix for over come from history issue and I think we are good. To help you with the conditions
First condition: Prevents workflow history from getting too large by starting a new workflow run.
Second condition: Continues processing the next batch/window of records in a new workflow run.
@GSmithAppsTemporal so, now I have moved to another approach with creating workflow only i.e no child flows. I think this works better