Thoughts on global coordination of workflow activities

Temporal scales to millions of workflows running in parallel.

A common question here in the forums is how to coordinate the execution of activities across workflows. For example:

  • How do I run workflows, or activities, one at a time?
  • How do I rate limit activities?
  • How would I implement fair scheduling of activities?

Conceptually, if networking and CPU’s were infinitely fast and computers had infinite memory, we could have a single workflow instance do the coordination. Workflows that wanted to do something would signal the single coordinating workflow, and the coordinating workflow could signal back when the conditions were met for the requesting workflow to proceed.

In reality of course computers aren’t infinitely fast, which is why we have distributed systems, and any single durable workflow execution is limited in the volume of tasks that it can execute per second.

Still, if our coordination requirements aren’t truly global, if the coordination requirements can be partitioned so that the requirements could be met by a coordination workflow with multiple instances and no single workflow instance would be overwhelmed… this may often be a good option. You get all the advantages of Temporal workflow execution: durability, the ability to code whatever coordination logic you might need, and so on.

So the rest of this post considers the scenario where even if perhaps we might be able to partition coordination to some extent, it wouldn’t be enough to avoid having a coordinating workflow be overwhelmed.

For the first question of how to run workflows or activities one at a time, people often try to reach for a solution where they try to restrict the execution of the workflow or activity worker. But, along with being complicated, we lose availability. Ideally, when a workflow or activity task is ready to be run, we’d like a worker to be available to run the task. Letting a task be ready to run but then deliberately not having a worker available to execute the task mixes up the functional and operational requirements of our system. I suppose it might work in simple cases, but I suspect that as the system evolves we’d be likely to get into a situation where where say, “Whoops, we actually did want that task to execute, but it’s being throttled at the worker level.”

For rate limiting and fair scheduling, a common thought here in the forums is “maybe I can use Redis”. Let’s explore this a bit.

Redis pioneered having the entire database stored in memory. When this is feasible, it can be a dramatic simplification. Suppose we have a lot of complicated business logic about how we need to coordinate: for example, we’re doing fair scheduling, and first we want to fair schedule at the tenant level and then by activity, etc. or whatever. When the whole picture is in memory at once that’s easy to do, and might often be feasible. Coordinating a few million workflows with a small amount of data per workflow might turn into perhaps a few GB-ish of data, which comfortably fits into memory.

If the needed data for the coordination requirements don’t all fit into memory, that may add a fair bit of implementation complexity… Of course, we can use a database, but now the database needs to be spilling to disk (we know the dataset is too large to be entirely cached in memory)… and would the database actually be able to do that fast enough so that the coordination service doesn’t itself become a bottleneck?

I suspect that any disk-based database solution would likely need to be engineered for the particular coordination requirements that need to be implemented. As an example, here’s a design for a CockroachDB priority task queue: Design for a Simple, Scalable, Priority Task Queue, which, for the simple case of scheduling tasks by time (the author describes it as a “priority” queue, by which he means you can make a task higher priority by queuing it for an earlier time)… and, if your coordination requirements happen to be met by the particular feature offered (i.e., pull tasks in time order), we can see that this design promises to not only allow multiple producers and consumers for scaling, but also usefully permits scaling by multiple database worker instances as well.

I’ve heard here in the forums that fair scheduling is on the Temporal roadmap, and I’m just speculating here, but at a guess perhaps this might end up being similar to other Temporal services: maybe there would be some configuration that you can specify, and within the universe of the configuration options, the backend would be engineered with whatever database indexes etc. would be needed so as to be able to scale to whatever volume might be needed, satisfying those particular requirements.

And, perhaps, maybe someday Temporal, or perhaps someone else, might provide for example a rate limiting feature, which again maybe might be be specifically engineered with whatever database indexes, sharding, etc. would be needed to provide that service at whatever scale might be needed.

OK, so at low enough volume or where the coordination requirements can be partitioned finely enough that it can be handled by a coordination workflow, that has all the usual Temporal advantages: your coordination business logic can be as complicated as you might need it to be, you can write it in your favorite programming language, you automatically get durable execution, and so on.

At the high end we can imagine that eventually Temporal or others will be providing specific services such as rate limiting or fair scheduling, and if your requirements fit within the requirements offered, great, plug it in and go, and scaling is taken care of.

Getting back to, “what if your coordination data requirements can all fit in memory, like Redis”, this might potentially be able to cover a lot of ground: we can’t scale infinitely, but a great deal more than an individual workflow instance would be able to.

Redis, like for example PostgreSQL, has a primary-follower system: clients write to the primary which replies to the client after durably storing the write on the primary; and then asynchronously the writes are delivered to the followers. If the primary dies one of the followers can be promoted to primary… however, there’s still a brief delay where a write can be lost where the client gets confirmation of the write but then the primary crashes before the write makes it to a follower.

One perspective might be, “well, this is super rare (a Redis cluster may run for years and never have the primary crash), and it’s only a tiny bit of data, so not really worth worrying about.”

I… tend to get grumpy when systems go from “always works” to “almost always works”. If I get a bug report because something didn’t work, and the system always works, then I know it’s a bug in my code. If the system sometimes doesn’t work I can waste a lot of time trying to find my bug before finally realizing that it was the system that had a “rare” failure. So I’m motivated to at least consider whether that might be an option that gets us to the “always works” category.

I’m also familiar with Kafka which can be configured for durable writes (a write is stored on multiple broker instances before the client gets confirmation of the write). A Kafka Streams application can store local state on disk on the application workers (in our case the state would be the coordination data), so that after a worker crash the state of the application can be more quickly read back into memory.

A question here is can we partition the coordination data at all? We’re here because we weren’t able to partition it finely enough to just have a workflow handle it… but some kinds of coordination like rate limiting or fair scheduling might really be global: it’s not just chunks too big for a workflow to handle, but perhaps all the data needs to fit in the memory of a single instance.

One can run a Kafka Streams application with just one partition… but then after a crash the new worker needs to read the entire state from disk before it can get going again. (Usually we’d run a Kafka Streams application on data that can be partitioned so that we can add more workers as data volume increases, which keeps recovery time lower for any particular worker). An advantage of Redis is that the followers keep a copy of the state in memory, so that’s a much faster recovery time if we’re talking about a lot of non-partitioned data.

AWS has an intriguing option here. MemoryDB has a Redis API and runs Redis instances, but outside of Redis it also durably stores writes in a transaction log. This means that if the primary crashes and one of the followers is promoted, all the confirmed writes will be replayed on the follower before it takes over, and no data will be lost. They advertise that as it “usually takes just a few seconds until you can write to the cluster again” (Minimizing downtime in MemoryDB with Multi-AZ - Amazon MemoryDB)

Do to my grumpiness I personally would be more reluctant to use standard Redis and would be happier running something on MemoryDB; however, MemoryDB has a standard Redis API so any solution would also run on standard Redis for anyone who didn’t mind the small risk of a rare primary crash causing some bit of data loss.