I’ve been looking at Temporal for a while now and i still don’t see any feature giving us the ability to easily process activities in a fair way between tenants. Am i missing something ? There’s a couple of posts on the forum linked to this issue but haven’t found a good answer…
I’m building a SaaS and each tenant needs to make progress w/o blocking each other and with minimal complexity. For example, tenants T_1 and T_2 generate 1k activities of type A_1 each. Say we only have one worker where activity A_1 can be processed. As far as i know, Temporal Queues are FIFO-like so all 1k tasks of T_1 would need to be processed before T_2’s. That’s a huge issue for us, especially in when A_1 is long running. Is there a good way for the workers to alternate between T_1 and T_2 ?
Namespace ? We could probably use namespaces but we do not want to create one ns per tenant since it’s gonna become an ops nightmare to manage.
Task Queues ? I don’t think it’s their intended use case since they must be known in advance. Say we postfix every task queues by the tenant id, we would need to register workers for each queues. If you have thousands of tenants, not sure that’s gonna scale. Might also become an ops nightmare to manage. One way to solve this could be to support wildcards when registering a worker ? This way we could schedule workflows in a dynamic task queue, ex default:{{tenant_id}} and use default:* in the worker’s config which internally alternate between the queues. It doesn’t give us activity level QoS but workflow level is already much better than what we have right now. At least each tenant’s workflows can progress concurrently. I do believe that Temporal needs a better queue/scheduling system but for now that should help a lot.
I don’t see a good solution to this problem in Temporal and I’m really surprised since QoS between tenants is not a special edge case, especially in SaaS apps… Like i said earlier, am i missing something ?
You are correct. The fair scheduling is not yet supported. We are working on it.
Good ! Any rough ETA ? 2025 ? Or is it still in the research state ? I know it’s complex to implement, especially on Cassandra/Scylla you might be limited in your design.
Are you aware about any open source durable queue implementations that support fair scheduling?
You raise a good point. My comment wasn’t necessarily only for Temporal in the end. It’s such a core issue to SaaS that i’m just surprised that we still don’t have good solutions in place for this (been looking for years now). It’s kind of forcing the space to go single-tenant, even Kafka/Redpanda doesn’t support this either. Single-tenancy doesn’t even solve the actual issue since you might want fairness between features inside the same tenant anyways. Please keep that in mind in your implementation. In the end, the feature that i need is to have fairness at multiple level and not just per tenant. In the context of Temporal, the workload should be fairly distributed between each workflows, activities and tenants so that everything is slowly progressing concurrently. And some use cases might require other strategies like priorities or weighted.
As for existing solution “open source” solutions of a distributed/durable task queue (not message queue), i found https://hatchet.run/ which is built on top of Postgres. There’s also BullMQ Pro “Group” feature but it’s Redis based so not really distributed/durable.
There’s also other features i would like to see:
global back pressure (shared external service throttled)
specific back pressure (tenant of a external service getting throttled)
circuit breaker (shared service is dead)
dynamic rate limit (give time to an external service to scale up)
Good to know ! And ofc someone can contact me to gather my requirements and use cases. Just send me an email link tho this account.
In the mean time, do you have some sugestion on how to achieve this ? We are using JS/TS. Pretty familiar with Redis so maybe we can hack something in.
Just thinking this through, without fair scheduling on the activity queue the activity workers need to pull all the tasks e.g. to even see the T_2 tasks when there’s a backlog of T_1 tasks before them. With more tasks pulled than the activity worker can all do concurrently, the next point of intervention is for the activity worker to not execute on lower priority tasks until enough higher priority tasks have been completed.
What you’re essentially describing @awwx is to offload the tasks to another task queue which can do fair scheduling.
It’s what i intended to do by using Redis and Asynchronous Activity Completion or Signals. We already have an in-house task queue built on Redis so that part shouldn’t be an issue.
The first part is to have a process that dequeue from Temporal and enqueue to Redis which seems trivial to implement.
The second part might be tricky but it’s to adapt the true activity worker to pull tasks from Redis and make the link with Temporal. @maxim Do you see a good way to do the second part ? Say we have the activity token, is there a way in the TS sdk to call an activity function as if it was called from the worker ? My goal would be to make the activities agnostic of the fact that it’s going through Redis so that when Temporal support fair scheduling we can quickly make the switch.
I was also thinking of another implementation using prefixed activity task queues per tenant. Controlling the fairness at the activity level instead of using Redis in between which reduce complexity and potential failure. We don’t need exact fair scheduling, something good enough would work too.
We currently have ~1k tenants and maybe a handful of topics (task queue names), queue-name:{{tenant_id}}, so maybe a total of ~5k-10k activity task queues. Is there a limit to the number of task queues in temporal ? What if we go with something like queue-name:{{activity_type}}:{{tenant_id}} ? That will explode the number of task queues but would help us get better fairness group. From the doc, i don’t think it’s an issue.
As i said in my op, it’s now more of an ops issue.The next challenge is to create Worker Entities dynamically when we detect a new queue. I assume that having a few thousand Workers in the same process isn’t an issue if they all share the same connection Client ? Is there a way to have more control over the worker’s polling mechanism in the TS SDK ? Like can we manually trigger a fetch ? Can we pause/stop a worker ? Something else ? What would be the better approach ?
I also don’t see a way to query the queues in the doc, is there a way ? Since we are probably going to self host, i was thinking of maybe poking the database which is probably gonna be PG. It’s not great since we are playing with the internals of Temporal, but it’s one way to do it. I could also track the queues by registering them before each workflow start but i wouldn’t be able to derive some stats like queue lengths which could be helpful.
Anyways, i would really like your input on this @maxim, i think i like that better then having to deal with Redis in the middle.
I wonder if an easy step might be to implement fair scheduling within each individual activity worker? It wouldn’t be exact because it wouldn’t be global fair scheduling, but perhaps it might be good enough while you wait for Temporal’s fair scheduling?
@maxim, suppose we’ve running for example 10 activity workers, and each worker eagerly consumes as many activity tasks from the activity queue as rapidly as it can. (E.g. maxConcurrentActivityExecutionSize, maxConcurrentActivityTaskPollers, etc. is set to some large number, larger than the total number of tasks waiting in the activity queue).
Is there anything that would prevent one of the workers grabbing all or most of the tasks? A worker is advertising that it can handle all of the tasks in the queue, so if it happens to run a little faster than the others it might dequeue first and grab all the tasks that it says it can handle? Or does Temporal do some kind of round-robin allocation of tasks across available workers, even if each worker says that it could handle all of them?
I don’t like this approach, too many issues with it imo. Could probably work alright if the number of task is somewhat small and fast but at that point you might not need fair scheduling.
If you have a lot of tasks, it might eat too much memory. You need to send healthchecks for each one of them (also an issue with redis in the middle). You might have unbalanced workload where some workers are hoarding tasks that others could do (work stealing somehow ?), especially if tasks are slow and inconsistent.
Seems to me like an awful lot of complexity with little benefits if any. Imo using a queue in the middle with fair scheduling builtin sounds like a better option if you wanna go the async activity route which isn’t simple either. The only reason I’m considering it, is because i already built one and could probably adapt it with a small amount of work.
Hmm, suppose hypothetically we did have access to a durable queue system with fair scheduling. To check my own understanding of what we’d do then:
The workflow calls an activity.
The activity worker pulls the activity task, queues the task along with the tenant id and the asynchronous task execution token, and returns indicating that the activity will be completed asynchronously. At this point the workflow is still waiting on activity completion.
Separately, queue workers pull tasks from the task queue with fair scheduling. When done with the task, the queue worker calls the Temporal client to complete the activity.
We could also want some mechanism to keep track of which tasks are in the queue so that we can heartbeat on them.
Fairness: When you have multiple clients or users triggering workflows, concurrency control can help ensure fair access to resources. By limiting the number of concurrent runs per client or user, you can prevent a single client from monopolizing the system and ensure that all clients get a fair share of the available resources.
Which maybe isn’t quite what we want? If the overall load happens to be low at the moment, perhaps we wouldn’t want to throttle a tenant if there’s capacity to spare.
Yes. But one thing i would like to do is to make sure the that the worker pulling from the fair shared queue is executing the task as if it was coming from temporal as an activity handler to minimize the transition when temporal support it. Basically from the handler’s perspective nothing really changed, we’ve simply created a transport protocol through another queue (man in the middle).
As for Hatchet, i think their formulation is a bit confusing. They explain it better in Round Robin section. In the end they are essentially splitting the queue into multiple sub-queues internally, like per tenant, and round robin between them. As far as i know they don’t support multi-level, like per tenant per task/activity type, which i would like, at least not yet.
Well, the activity can get the activity execution context to do a few things like heartbeats and to get info about the activity execution (the activity id, etc.) If it were me, I’d probably write for example a “heartbeat” function that would call the activity execution context heartbeat method if running in an activity worker, or would instead call the heartbeat method in the activity completion client if running in a queue worker. That wouldn’t be writing totally unaltered activity code of course, but it’d only be a few functions. If you really wanted to be able to write totally pure Temporal activity code I suppose you could get into the SDK internals and supply your own activity execution context, though for me I don’t think I’d find it worth it.
Would this be, for example, suppose T_2 is next up to have one of its tasks executed and there are multiple T_2 tasks in the queue, you’d like to additionally be able to prioritize which T_2 task you’d do first?
Deciding which tenant to do next would I think require keep some history, e.g. the reason why we’re deciding to do T_2 next is because we’ve more recently executed a T_1 task than a T_2 one. But within a tenant’s tasks prioritization could simply be based on the tenant’s tasks in the queue?
In the end im gonna need to do some hack somewhere so the question is where and how simple can i do it. Gonna take the time to look at the sdk at some point. Sadly adding temporal isn’t our priority for now, we have a couple other things to address. Hopefully, early next year I’m gonna have time or temporal will have solved the issue
As for multi-level, see it as double round robin in my use case. First you choose a tenant, then one activity type that are enqueued. A simple example would be T1A1, T2A1, T1A2, T2A2. There are many strategies we could do ! Maybe instead of round robin we track the processing time of each task and make it more fair. We add weight per task type for priority. But this can add a lot of complexity real quick.
Oh, a thought. The workflow wants to run an activity, but to hold off on executing the activity if there are other higher priority activities that need to run first. Suppose the workflow does this in multiple steps: it calls a scheduling activity, an execution activity, and a completion activity. The execution activity is the activity that we’ve been talking about previously, the one that does the actual work that needs to be fairly scheduled.
The workflow runs a scheduling activity which makes an API call to the fair scheduling service to request an execution slot, passing along the parameters that need to be taken into consideration such as the tenant id and the execution activity type. This activity returns immediately.
The fair scheduling service waits until the work should be executed, taking into account the fair scheduling and prioritization considerations. Then the service signals the workflow telling it that it can proceed. [Edit: #1 and #2 could be combined into an activity that asynchronously completes when the workflow can proceed; the workflow can then wait for the scheduling activity instead of waiting for a signal.]
The workflow calls the execution activity. Like any other regular activity, this might take a long time, or return an error, or timeout, etc.
When the execution activity completes (whether successfully, or with an error or timeout, etc), the workflow calls the completion activity. This makes an API call to the fair scheduling service which releases the execution slot. This activity also returns immediately.
If other tasks were waiting, the fair execution service gives the execution slot to the next highest priority task.
To ensure that the workflow releases its execution slot even if it’s terminated (which prevents the workflow from taking any further actions itself), the workflow when it starts could run a child workflow with the parent close policy set to “request cancel”. Then the child will receive a cancellation request when the parent workflow closes for any reason (including forced termination). The child can respond to the cancellation request by calling the fair scheduling API with the parent’s workflow id to release any execution slots that the parent workflow may have been holding.
This approach has some nice features:
The durable nature of workflow execution means that we don’t need to worry about how to release the execution slot if a worker crashes.
The execution activity is a standard Temporal activity that runs on a normal Temporal activity worker: no need to write different code to heartbeat etc. in a different worker environment.
The fair scheduling service only needs a few pieces of data such as the workflow id and what’s needed for scheduling such as the tenant id. Unlike having a separate durable queue which supports fair scheduling, other data such as the activity parameters don’t need to go through the fair scheduling service.
If/when Temporal implements a fair scheduling system that does what you want, all you need to do is remove the scheduling and completion activity calls from the workflow.