Scaling Task Queues | Correlation between numHistoryShards, taskQueuePartitions, number of matching service hosts

Could I know the significance of taskQueuePartitions in presence of high load on a single task Queue?
From @maxim ’s talk on How to Design a workflow engine from first principles, I inferred that a single task queue must reside on a single shard (which is why local transfer queues exist…to transfer all the tasks from individual shards to the central shard housing the task queue under question), Because if the task queue is spread across multiple shards, then fanout increases, time taken to fetch tasks from all the shards for the given task queue increases.

  • Is shard = partition? (shards as in the value set via numHistoryShards)
  • How would more taskQueuePartitions help during high load?
  • What would be the reasonable value for read and write partitions for task queues, if the numHistoryShards=4096? do numHistoryShards and taskQueuePartitions need to be proportional?
  • Can a single instance of matching service handle a large taskQueuePartitions value? or should number of matching service hosts be proportional to taskQueuePartitions as well?
  • Can taskQueuePartitions be different for different task queues, depending on load per task queue?

Thanks!

EDIT: Tuning Temporal setup for better performance - #2 by maxim , Our issues are essentially the same as the one posted in this post.

I think there is some terminology confusion. Temporal history service divides workflow id space into numHistoryShards shards. Each shard contains a transfer queue. This queue implements an outbox pattern as tasks are inserted into it using the same transaction that updates the workflow state.

Task queues are maintained by the matching service. They are partitioned independently. By default, each task queue uses 4 partitions.

When an activity or workflow task is scheduled, it is first added transactionally to the shard transfer queue. Then the task is consumed from that queue and pushed to the matching engine. The choice of which partition to push a task is based on the task id and is completely independent of the history service sharding logic.

  • Is shard = partition? (shards as in the value set via numHistoryShards)

No. Shard is the history service concept. Partition is the matching engine concept.

  • How would more taskQueuePartitions help during high load?

On average, you expect 100 tasks per second per task queue partition. This heavily depends on the service and DB hardware.

  • What would be the reasonable value for read and write partitions for task queues, if the numHistoryShards=4096? do numHistoryShards and taskQueuePartitions need to be proportional?

They are absolutely independent.

  • Can a single instance of matching service handle a large taskQueuePartitions value? or should number of matching service hosts be proportional to taskQueuePartitions as well?

It can handle certain throughput per host. If multiple partitions that share the matching host don’t overload it then it is OK. The number of matching hosts should be proportional to the total traffic across all task queues.

  • Can taskQueuePartitions be different for different task queues, depending on load per task queue?

Yes, only task queues that need to support high traffic (over 400 tasks per second) need the increased number of task queue partitions.

1 Like

Thanks @maxim , clears up a lot of things.

Yes, only task queues that need to support high traffic (over 400 tasks per second) need the increased number of task queue partitions

Could you point us to how to achieve this? I was of the opinion that dynamicConfig set at the server level applies to all task queues in the system (across all namespaces).

  • Is it possible to assign few partitions (say 4 partitions) to task queue X, and large number of partitions (say 32 partitions) to task queue Y ? If yes, since a task queue could be created either at worker startup time, or during workflow creation time, which place would be appropriate to do this?

  • Say there was an existing taskQueue with default partitions (4 partitions), and you changed dynamicConfig to update partitions to 32. Will the existing task queue pick this change and spread itself across 32 partitions? or will this change apply to only new task queues created after the change was applied?

On average, you expect 100 tasks per second per task queue partition.

  • Is this because all requests to a given partition is serialized ( service acquires a lock on a given partition ), and the only way to increase throughput (given this condition), is to increase the number of partitions?
  • Will increasing number of partitions increase the number of database calls ( to say, execute the query get me all pending tasks from task queue X ) ? Will this be counterproductive on our attempt to increase task queue throughput? …or more importantly, is there any downside to increasing task queue partitions that we should be wary of?

Thanks!

Could you point us to how to achieve this? I was of the opinion that dynamicConfig set at the server level applies to all task queues in the system (across all namespaces).

For certain dynamic config properties such as task queue read/write partitions you can define constraints, in this case namespace and task queue names, for example:


matching.numTaskqueueWritePartitions:
  - value: X
    constraints:
      namespace: "..."
      taskQueueName: "..."

same would apply for matching.numTaskqueueReadPartitions.

Just note that to increase the number of partitions (on running cluster), first increase numTaskqueueReadPartitions and then numTaskqueueWritePartitions. If you want to decrease, do it in reverse of that.

is there any downside to increasing task queue partitions

Yes you can have too many partitions, typical recommendation for even larger loads is to increase default to 8 or 16 (no more).

1 Like

Thanks @tihomir .

Slightly confused with the 8 to 16 limitation of task queue partitions.

Lets just say you’re expecting 10,000 tasks(workflow or activity) per second for a given task queue. Now if a single partition gives me 100 tasks per second, I wouldn’t be getting more than 1600 tasks per second on that queue, right?

Perhaps I am banking too much on task queue partitions to achieve this scale. Are there any known alternate ways?

You have to configure 100 partitions to support 10k tasks per second.

After bringing down partitions from 128 to 8, after taking into account @tihomir’s suggestion, I did see some improvements…


Even the workflow completion rates increased from 150/s to 200/s.

The activity tasks graph are very similar to the workflow tasks one (attached above), so its a total of ~4000tasks per second… which should’ve needed atleast 40 partitions per task queue (but works well even at 8 partitions per task queue)

@maxim could you shed some light on why the disparity?

Slightly confused with the 8 to 16 limitation of task queue partitions.

Yes, is “typical” recommendation and not a hard one.

That many paritions would require a lot of worker polling threads to be saturated. So when you reduced the number of partitions, you improved the matching engine performance for the number of poller threads you specified.

1 Like

I see. So then workers aren’t really allowed to scale independently of the temporal cluster services(incl cassandra) ? Too many workers increase the polling threads and deteriorate matching engine performance. Is that correct?
I guess when it comes to it, we have to use rate limiting to prevent workers from bombarding the cluster.
Looking for a way where other teams (who develop workflows) can safely scale up or down the workers that they manage, without knowing about central cluster internal configs. Is there any other parameter apart from rate limiting that can aid this?

Too many workers increase the polling threads and deteriorate matching engine performance. Is that correct?

Your workers poll frontend service so yes you would need to think about rps limits of frontend and downstream services (history, matching):

frontend.rps
frontend.namespaceRPS
history.rps
matching.rps

and qps limits to protect persistence layer:

frontend.persistenceMaxQPS
history.persistenceMaxQPS
matching.persistenceMaxQPS

Is there any other parameter apart from rate limiting that can aid this?

Sync match rate and task_schedule_to_start_latency (server metric) are good indicators of unprovisioned workers. This along with (sdk metrics) workflow and activity task schedule to start latencies and your worker cpu/mem utilization can give you indication if you need to scale number of workers and/or increase number of workflow / activity task pollers.

1 Like

@maxim could you explain what you mean by “a lot of worker polling threads to be saturated” ?
Upon increasing the number of partitions, I see an increase in task pile up which does not go down even if workers are increased. Do you have to increase poller threads if partitions are increased? isn’t increasing the number of workers same as increasing polling threads on a single worker?

The matching engine performs the best when each partition has enough waiting polls for each new task to be matched immediately. If the number of pollers is insufficient, then tasks have to be stored in DB and forwarded to other partitions, increasing matching latency and throughput. As a single portion can support at least 100 tasks per second, there is no need to have more than 4-5 partitions for 400 tasks per second.

1 Like

Thanks @maxim , using this formula of 100tasks/sec per partition, we are able to reach 900 workflows/sec on 64 partitions ( 1 workflow = 4 workflow tasks + 3 activity tasks = 7 tasks; 900 workflows = 6300 tasks, =~64 partitions )

However, the results are the same at 4K shards and 8K shards. We expected the performance to increase with more shards. We are indeed maintaining ~512 shards per history node, so appropriate scaling is being taken care of.

Is there an explanation to why increasing shard count does not yield performance improvement?

You don’t need that many partitions for workflow tasks. The reason is that workflow tasks use host specific task queues most of the time. This is to support caching of their state. So in most cases, the rate of tasks on the main workflow task queue is more or less proportional to the number of workflow starts. In your case, I believe 10-15 partitions for workflows would be enough. For activity tasks 900*3=2700, you might need 25-30 partitions.

1 Like

Will keep this in mind when we will have two task queues for a workflow - one for its workflow tasks, and one for its activity tasks.

But does this apply when when we use the same task queue for everything pertaining to a given workflow definition (activity tasks and workflow tasks) ? Is using the same task queue a bad practise, or perhaps a performance deterrent? It was easy and intuitive so far to provide just one task queue for a given workflow definition.

Also, doesn’t sticky execution Tasks | Temporal Documentation ensure that activity tasks of the same workflow execute on the same worker IF the worker had registered with all the activities of the said workflow?

No, sticky execution is only for workflows. For activities, you have to use sessions for go or create per host-specific task queue yourself for all other SDKs. See the fileprocessing sample by the SDK of choice.

1 Like

It is never a single task queue. When you create a worker for a task queue that hosts both workflows and activities, then at least 3 real task queues are created. Two with the same name as the task queue name passed to a worker. One for activity tasks and one for workflow tasks. And another host specific one for the sticky workflow tasks.

I believe dynamic config allows overriding partitions per task type, not only name.

1 Like