Validation of app architecture using temporal

Problem statement

We are trying to rebuild our OMS using temporal. The system receives the orders, Shards them based on some properties, and processes the orders for each shard collected over time. If the batch is taking time to process because of the load on the downstream services we will fall back to process the orders one at a time till downstream scales up or stabilizes.

We want to test temporal before we migrate over from the current system.

Requirments

Sharding will be based on geolocation and vehicle properties. There are around 100 distinct locations and 4 vehicle properties and each property can have 1-10 distinct values . These properties can be added/removed based on that the number of shards may increase/decrease.

Sharding is done in this way because downstream service requires batch of orders to have similar properties

Estimated number of shards

Since shards can be any combination of geo and vehicle properties number of shards increases over time can go up to 33000 shards and more in the worst-case scenario. To begin with, the number of shards will be around 300-500. Shards can be dynamically added/removed. If shards increase or decrease orders will be rebalanced from the next cycle.

Estimated Min: 
100 (Geo location)

Estimated Max: 
100 (Geo location) 
x 2 (Vehicle Property 1) 
x 5 (Vehicle Property 2) 
x 3 (Vehicle Property 3) 
x 11 (Vehicle Property 4) 
= 33000

Number of shards will increase around 10-20% each year

Implementation

  • The current OMS would enqueue the orders in RabbitMQ.
  • An Order collector workflow will be running which will be collecting them and pushing them to a data store.
  • Order collector workflow will also trigger a periodic workflow for each shard as and when it gets the order for that shard. We will generate a deterministic Workflow ID for each shard. If the workflow id exists temporal should reject the ExecuteWorkflow request.
  • This periodic workflow will read all the current open orders and pass the ID(s) to a child workflow.
  • If mode is batch, Periodic workflow waits for the child workflow execution to complete/times out and it continues as new to process the next set of orders in the same shard.
  • If processing mode is not batch it will trigger a single workflow for each order and periodic workflow will not wait for the execution of child workflows to complete and pick the next set of orders for processing.
  • The child workflow calls several activities which involves calling multiple external services.
  • The Activities will only be doing the external calls and the processing would be done in the workflow itself.
  • Each activity will have its own timeouts and retry policies.
  • If in between an order is canceled, we would send a signal to the periodic workflow for the particular shard handling the order, which in turn will signal the child workflow and remove it from the list of current processing orders.
  • Workers will be running on VMs and will be scaled as when required. The Temporal server will be running on Kubernetes with auto scale enabled.

We have attached the flow diagram as well to make this more clear.

Scale

  • The Order throughput ranges between 5K-10K orders per minute.
  • Timeout for each batch processing would be around 5 sec
  • Assuming each shard takes a minimum of 1 sec there can be 60 cycles per minute per shard
  • Input to each activity would possibly range between 4MB to 10MB.

We are looking to scale by 10-20% each year at the minimum.

Questions

  • Are we using temporal the correct way to solve our problem?
  • Will temporal be able to scale based on the projected throughput?
  • Will the amount of data being passed around workflows and Activities pose a problem? What are the alternatives?
  • Is the use of signals correct?
  • Will temporal be able to scale to the number of parallel workflows that would be created as per our implementation? Else what is the alternative?
  • Are we using temporal the correct way to solve our problem?

I believe Temporal is a good fit for your problem.

Your design has a bottleneck as it is. Temporal scales out by running many workflows in parallel. It doesn’t scale up by pushing more load through a single workflow instance. If each order is passed through the “Order Collector Workflow” then it wouldn’t be able to keep up. If the “Order Collector Workflow” just runs activities that receive messages from RabbitMQ and perform their operations without returning to the workflow code then it would work.

Another question is why do you have RabbitMQ in front of Temporal? You could start workflows (possibly using SignalWithStart) directly from the current OMS.

I would recommend instead of RabbitMQ and OrderCollectorWorkflow to start a workflow per order and let that workflow communicate with batch workflows through signals.

  • Will temporal be able to scale based on the projected throughput?

Yes, this is not a very high throughput use case for Temporal.

  • Will the amount of data being passed around workflows and Activities pose a problem? What are the alternatives?

Yes, the maximum input/output size is 2 megabytes. But on average it should be much lower for Temporal to perform well. An alternative is to pass around ids and store the order data in an external data store.

  • Is the use of signals correct?

It looks reasonable.

  • Will temporal be able to scale to the number of parallel workflows that would be created as per our implementation? Else what is the alternative?

We tested to a few hundred million always running parallel workflows. Given enough DB disk I believe the system can go even higher. Temporal scalability depends mostly on the number of updates per second that the workflows need, not by the number of parallel workflows.

Could you elaborate a bit more what you meant by this? Does it mean that we should not return data back from activity thus not bloating workflow history? Or you mean to fire another Order Collector periodic workflow without waiting for the activity to finish?

I guess we failed to mention that Order Collector will be a periodic workflow which will query “n”(100 to start with) number of orders from the RabbitMQ and then ContinueAsNew when the orders have been pushed to datastore and Scheduler triggered.
But from this discussion it feels like “n” should be 1 for maximum throughput. So the flow for OrderCollector would be to fetch one order at a time, and then without waiting on the ActivityFuture schedule itself with “ContinueAsNew”. In this way we’ll have more parallel workflows, 1 for each order. Is this understanding correct?

We were thinking of going this route because we don’t want to add a dependency of Temporal(relatively new technology for us) on the current production OMS. Current OMS already supports RabbitMQ functionalities.

Does it mean that more the number of tasks(Activities), more the updates on the workflow thus affecting scaling?

I think the Temporal approach would be a workflow-per-order as @maxim suggested.

You could trigger this asynchronously from a OrderMessageConsumer workflow that reads each message from Rabbit and then starts a workflow without waiting for the result.

Then once you’re happy with Temporal remove the queue consumer workflow in favour of starting a workflow from your existing application.

Could you elaborate a bit more what you meant by this? Does it mean that we should not return data back from activity thus not bloating workflow history? Or you mean to fire another Order Collector periodic workflow without waiting for the activity to finish?

I mean that activity keeps running without ever returning to its workflow. Its implementation in the loop consumes messages from RabbitMQ and starts workflows using Temporal Client.

I guess we failed to mention that Order Collector will be a periodic workflow which> will query “ n ”(100 to start with) number of orders from the RabbitMQ and then ContinueAsNew when the orders have been pushed to datastore and Scheduler triggered.

I don’t understand why a queue consumer is periodic. I don’t even think that the queue consumer should be a workflow. It can be a standalone process that starts a workflow for every message received.

But from this discussion it feels like “ n ” should be 1 for maximum throughput. So the flow for OrderCollector would be to fetch one order at a time, and then without waiting on the ActivityFuture schedule itself with “ ContinueAsNew ”. In this way we’ll have more parallel workflows, 1 for each order. Is this understanding correct?

I don’t think you need the OrderCollector workflow at all.

We were thinking of going this route because we don’t want to add a dependency of Temporal(relatively new technology for us) on the current production OMS. Current OMS already supports RabbitMQ functionalities.

Makes sense. Just make sure that any queue consumer is throttled as I’ve seen cases when the Temporal cluster was overloaded when a queue accumulated backlog and the consumer was DDOSing it with 10k start workflow per second.

Does it mean that more the number of tasks(Activities), more the updates on the workflow thus affecting scaling?

Yes, number of activities per second directly corresponds to the number of DB updates.

The queue throughput would be same as production traffic throughput in our case, so throttling wouldn’t make sense. And there is a good probability that we might scale to that level where we get more than 10k orders per second at some time. I’m hoping scaling temporal should help us reach this scale without any downtime.


Based on the above discussion we made some changes to out initial architecture.

  • We have replaced the Rabbitmq with an API server.
  • The API server will create a workflow for each order.
  • Each OrderWorkflow will signal the Scheduler workflow.
  • SchedulerWorkflow will accumulate orders till the current batch is finished.
  • Once the current batch has finished executing, it will start a BatchWorkflow for the accumulated orders.
  • We would maintain a datastore for all the enriched data and the activity will query/update it.
  • The data passed to Activities would be just the order-ids.
  • Once the BatchWorkflow is completed we will signal the order-workflow to continue with the order-related individual flow.

Hoping the understanding is correct and this is the correct way to implement our app using temporal.
Thanks a ton.

The queue throughput would be same as production traffic throughput in our case, so throttling wouldn’t make sense. And there is a good probability that we might scale to that level where we get more than 10k orders per second at some time. I’m hoping scaling temporal should help us reach this scale without any downtime.

The problem is when a consumer is down for some time a queue can accumulate a backlog. Then unthrottled consumer can consume with much higher rate than the usual happy case traffic.

Hoping the understanding is correct and this is the correct way to implement our app using temporal.
Thanks a ton.

Your design works assuming that the batch workflow is not going to be a bottleneck. How many signals per second do you expect it to receive and what is then maximum batch size?

Currently the max number of bookings per second that we see is ~150 per shard. If our batch processing lets say takes 5 seconds and assuming orders that would be re-queued would be 100 then max batch size would be 150x5+100 =850. So roughly it will be in the range of 750-1000.
Given this, we are expecting it to grow by 10% every year.

Then the batch workflow is going to be a bottleneck as 150 signals per second is too much for a single workflow execution. In this case, I recommend not having a batch workflow. Here is a much more scalable approach to batching:

  • Each OrderWorkflow executes an activity on a special “batch” task queue.
  • An activity worker listens to that queue and batches multiple activity invocations. Activities are complete only when the whole batch is successfully sent out.
  • You need to ensure that there is only one worker process listening on that queue. It can be done through your deployment infra or running that worker from an activity of a special workflow that controls that worker.

I’m not sure I get this. How is the activity going to accumulate the order-ids? Do we need to maintain it in a persistent store for each shardID? Each shard completion is independent of each other.

How will I achieve this?
Possible to point me to any example code?

Didn’t understand the reasoning here.