Aggregating many JSON files into Parquet

Hi,

Let’s say I have millions of workflows running concurrently, each creating a structure that represents a JSON file. I want to have an aggregator component that receives these JSON files and aggregates them into Parquet files.

A Parquet file needs to be big, so the aggregator needs to wait until it received about 500,000 JSON files, or until a timeout. Is there a way to achieve this fan-in strategy in Temporal?

Here’s a pretty quick and dirty idea. Please validate this logic but this would be the way I’d start to approach solving this problem. I think this would work for aggregating the JSON files when some threshold is reached or a timer as expired. Here we are generating N number of JSON objects concurrently. As each completes we append to a list. When the list reaches a specific size or the timer has expired we kick off another activity to generate the parquet file from the complete list of JSON objects. We then wait for all the parquet files to be generated. Here, we assume the JSON objects and the parquet files are stored somewhere externally, like blob storage. Don’t need to use URI, but need to pass around some reference to where the JSON objects and Parquet files are stored. You might also be able to leverage a custom task queue specific to a specific worker, so a worker could store the JSON objects on local disk. Probably a future optimisation but might work.

I’m also not sure if you need to cancel a time if it’s still running when the size threshold is reached.

import io.temporal.activity.ActivityInterface
import io.temporal.workflow.Async
import io.temporal.workflow.Promise
import io.temporal.workflow.Workflow
import io.temporal.workflow.WorkflowInterface
import io.temporal.workflow.WorkflowMethod
import java.net.URI
import java.time.Duration

@WorkflowInterface
interface JsonAggregator {
  @WorkflowMethod
  fun start(count: Long, timeout: Long)
}

@ActivityInterface
interface JsonGenerator {
  fun generate(idx: Long): URI
}

@ActivityInterface
interface ParquetAggregator {
  fun aggregate(uris: List<URI>): URI
}

class JsonAggregatorImpl : JsonAggregator {
  private val jsonGenerator = Workflow.newActivityStub(JsonGenerator::class.java)
  private val parquetAggregator = Workflow.newActivityStub(ParquetAggregator::class.java)

  override fun start(count: Long, timeout: Long) {
    val generators = (0..count).map { idx ->
      Async.function {
        jsonGenerator.generate(idx)
      }
    }.toMutableList()

    val parquetPromises = mutableListOf<Promise<URI>>()
    var jsonUris = mutableListOf<URI>()

    var timer = Workflow.newTimer(Duration.ofSeconds(timeout))

    while (generators.isNotEmpty()) {
      val generator = Promise.anyOf(generators)
      generators.remove(generator)
      val uri = generator.get()
      jsonUris.add(uri)

      if (jsonUris.size == MAX_JSON_FILES || timer.isCompleted()) {
        val parquetPromise = Async.function { parquetAggregator.aggregate(jsonUris) }
        parquetPromises.add(parquetPromise)
        jsonUris = mutableListOf<URI>()
        timer = Workflow.newTimer(Duration.ofSeconds(timeout))
      }
    }

    if (jsonUris.isNotEmpty()) {
      val parquetPromise = Async.function { parquetAggregator.aggregate(jsonUris) }
      parquetPromises.add(parquetPromise)
    }

    Promise.allOf(parquetPromises)
  }

  companion object {
    private const val MAX_JSON_FILES = 500_000
  }
}

Thank you very much for the detailed answer.
I think this doesn’t work my use case. I don’t generate all of the JSON files together at the start, because each JSON file is created in a different workflow run, and the outputs need to be aggregated. A single workflow that gets these JSON files (or reference to files) by signals doesn’t work because we need more signals than a workflow’s history size limit. There may be another solution to run an activity when every JSON file is complete (from its own workflow), but I’m not quite sure how that’ll work. Does anyone have a recommendation for this use case in Temporal?

You can use ContinueAsNew to avoid the history size limit issue. Can still potentially Signal the Workflow doing the aggregation from other Workflows and periodically call ContinueAsNew to prevent a large history. Here is an example of how to use ContinueAsNew.

If I use ContinueAsNew, I need to split 500,000 signals into chunks small enough for a signal run to handle them. Let’s say a workflow history can contain 100,000 signals. So I get 100,000 signals, save the list of files in blob storage, call ContinueAsNew for the next batch and do this 5 times. Once I got to the 5th time, I create the parquet file. I have some issues with this solution, because:

  1. You’ll have a workflow with 100,000 signals and you can’t really see anything in the UI and when querying.
  2. From my understanding, ContinueAsNew is used more for periodic work, where the runs are not related to each other. For this use case, splitting the signals between 5 runs can result in loss of visibility (a signal workflow run is not related to a parquet file), and having to check for signals at the end of each run in order to not lose any signals.

What do you think?

I would use an activity to perform the aggregation. The activity would receive the invocation with the reference to the JSON doc (or even the whole doc is not large) and add it to the local file. The activities shouldn’t be acknowledged until the file is completed and uploaded to the final destination.

How do you suggest to acknowledge the activities? If I want to acknowledge 500,000 activities, I need to make sure I acknowledge all of them. Is running a workflow that runs an activity that acknowledges them one by one with heartbeating after each acknowledgement a good solution? Or is there any other method for this in Temporal?

You can save their IDs to another file on the same machine and then use that file to ack them.

Thank you.

When I use the local file to ack them, what happens if the worker goes down and the file is lost? Some of the activities are retried and built again to another Parquet file, would you say this is better than to keep the file at a blob store where it’s more persistent to prevent this from happening? And do you recommend acknowledging them with a new workflow run that runs a heartbeating activity? Because this is a batch operation.

Also, when you said you recommend building a local file, and it restricts it to one worker instance, how do I make sure the activities will get to this worker? They come from different workflows so I can’t use a session.

would you say this is better than to keep the file at a blob store where it’s more persistent to prevent this from happening?

Yes, if you can append directly to the blob store then the design is simpler as you can ack activities as soon as their records are added to the file.

Also, when you said you recommend building a local file, and it restricts it to one worker instance, how do I make sure the activities will get to this worker? They come from different workflows so I can’t use a session.

You can use a host specific task queue directly. Session internally does the same thing.

Thank you very much.

How do I make sure only one worker listens to this queue? If I have a microservice with several instances, does each of them need to try to listen to this queue, and the first one succeeds? Is there a better way than this locking mechanism?

This batching use case seems like a very common one. Do you plan to add this ability to Temporal in the future by using Temporal primitives?

How do I make sure only one worker listens to this queue? If I have a microservice with several instances, does each of them need to try to listen to this queue, and the first one succeeds? Is there a better way than this locking mechanism?

Each worker gets its own queue by using its host name as the queue name. Also if you are using a blob store that supports independent appends then you can call it from multiple hosts.

This batching use case seems like a very common one. Do you plan to add this ability to Temporal in the future by using Temporal primitives?

Yes, we have such plans. But no timeline for such feature yet.

Thanks, these answers have been very helpful.

If I have workflows running on several instances, how do I route at each point in time the activity to the worker queue with the right host name? The worker can go down and another one can register with a different queue name (if I have several instances how do they know which instance to register if I only have to register one worker?).

Workflow has to query the queue name before sending activities to it. You can have a separate workflow that manages which host currently is used to generate the output file. All other workflows would query this workflow for the queue name. And workers upon start would signal that workflow with their queue names.

Curious how this helps to solve the issue with potentially 500,000 signals referencing these JSON objects (or containing them)? Don’t we still run into the same issues with Workflow history size? If a Workflow receives a Signal containing a reference to a JSON object, the Workflow then invokes an Activity to aggregate the JSON object, do we still need to use ContinueAsNew to avoid issues where the Workflow size might reach 500,000? I’m working through similar problem so curious how this type of problem might be solved.

@Tucker_Barbour the original question stated that each file is generated by a different workflow execution (instance). So no single workflow would need to process all the data:

If a Workflow receives a Signal containing a reference to a JSON object, the Workflow then invokes an Activity to aggregate the JSON object, do we still need to use ContinueAsNew to avoid issues where the Workflow size might reach 500,000? I’m working through similar problem so curious how this type of problem might be solved.

If all the signals have to be processed by a single workflow then continue as new is required. I don’t think the design that relies on a single workflow execution for 500k signals is going to work anyway. The problem is performance. A single workflow has limited throughput on the number of signals it can process per second. Each signal call grabs the lock on the workflow execution and performs a database write. So if a single write is 20ms then no more than 50 signals per second can be processed. In reality, this number is probably even lower. The solution is to use multiple workflow executions as they are distributed across multiple DB shards and have their own locks.

Ah yes. That makes sense now. Thanks for the clarification.

Do queries have the same performance concerns as signals? If each Workflow generating a JSON object queries a single Workflow responsible for managing which Worker to use for generating the output parquet file, would we still need to be concerned with throughput? I’m imaging a Workflow responsible for generating the JSON objects to be something like

  • Invoke Activity to generate a JSON object and return a reference to the JSON object.
  • Invoke Activity to query another Workflow for the TaskQueue used to aggregate JSON objects. The TaskQueue will be unique to an individual Worker.
  • Create a new ActivityStub using the TaskQueue returned from the previous step
  • Invoke Activity with the stub created in the previous step providing the reference to the JSON object from step 1. This Activity with finish with doNotCompleteOnReturn so we can complete the Activity once the JSON object has been included in the final parquet output at some point later on.

I believe queries are batched so their throughput should be much better. In the worst case scenario you can create hierarchical queries. For example to have 100 workflows that are queried (by hashing caller id to one of their ids) and these 100 query to 1 that actually controls the host assignment.