Limit workflow-based concurrent executions?

Hi, I am looking for a way to limit the concurrent amount of a type of workflow.
This often is because the executing workflow code must work based on some kind of semaphore.

There is not server side support for this feature. We have plans to add such support in the future, but currently you have to implement it in the application code.

What is the use case? Such limitation is very rarely needed.

Hi Maxim, my workflow uses activities to instrument an external service that can only process 1 task concurrently.

For such a use case, I would use a mutex workflow. The basic idea that for each such resource exists a single workflow execution that invokes that activity. All other workflows send signals to this workflow to request the activity execution. The mutex workflow upon receiving a signal invokes the activity or buffers it until the previous execution is done.

Here is a sample of such a workflow.

Hi maxim,

Thanks for this tip, as it was very helpful!

I have a similar situation, but with an additional constraint: there is a “sequence number” of the tasks sent to the external service, and it is not allowed to send tasks out of order: I need to ensure that a task is not sent to the external service if a task with a higher sequence number has already been sent (I should just discard the lower numbered task).

I tried to implement this with a permanent mutex workflow that remembers the highest sequence number sent and refuses to give out locks for a lower sequence number. The issue is that, because this workflow lives forever, I need to use ContinueAsNew from time to time (passing on the sequence number to the new run), and I found that occasionally when I returned my NewContinueAsNewError there happened to be a signal already queued at that moment, which I think resulted in the signal getting thrown away.

Is there a way to have a workflow that receives signals ContinueAsNew while guaranteeing that no signals are missed?

For anyone who might find this thread on Google and be wondering about my question: see this later thread in which I provided some pared-down example code. The bottom line is that you should drain all open signal channels immediately before returning if you wish to ensure that you did not miss any signals.

2 Likes

Hi maxim,
We’ are interested in this feature, too。 Our use case is:
considering the resource (cpu, memory ) limit of the worker process, we want to limit the concurrency of a specific workflow type. For example, we create a workflow exporting data as excel file, and at most 4 workflow tasks are allowed to run at the same time.
This requirement is similar to the worker option maxConcurrentWorkflowTaskExecutionSize, but works on specific workflows.
is this a meaningful feature for workflow task management?

You would have to use different workers for that, similar as to WorkerOptions setMaxConcurrentActivityExecutionSize currently.

Feel free to open issue in the sdk-features repo for per-workflow-type setting and let’s see if it’s a feature sdk team thinks would be feasible to add.

2 Likes

Just to expand on @tihomir answer. You don’t want to restrict number of workflows running in parallel. You want to restrict number of activities running on a worker as activities are resource intensive, not the workflow code.

Such limitation is very rarely needed.

All rate limited API calls to a third party will have this type of limitation. I send out 6 workflows and each workflow calls an API which rate limits to 3 concurrent calls. I can of course “retry until success” to my rate-limited API which is a bit of a hack. I was hoping there was a global concurrent limit (not at the worker level but globally) on either a Workflow or an Activity.

This topic was raised in 2022. Did anything change ? Thank you.

The global rate limit (activities per second) per task queue was always supported. The global limit of the number of parallel activities is still not supported.

May I know how to set the global per second limit in the go ask?

Use worker.Options.TaskQueueActivitiesPerSecond.

Thanks Maxim, but I think I meant globally across all workers (all hosts). Worker Options only affect that worker.

I think I have to try out the other alternatives you and tihomir have suggested such as sliding-window-batch or mutex. It seems like that’s the only way you can control a workflow on a “global” basis.

Thanks again.

Thanks Maxim, but I think I meant globally across all workers (all hosts). Worker Options only affect that worker.

You assumption is not correct. This specific setting affects the overall task queue rate across all the workers. The single worker rate limit is specified through worker.Options.WorkerActivitiesPerSecond.

Thank you Maxim.

This cognitively is mind bending. What happens if two workers set worker.Options.TaskQueueActivitiesPerSecond to two different numbers? I always thought a worker option was specific to that worker only. Is there documentation on which worker options are global and which are local? Am I completely misunderstanding something fundamental here?

I believe the rate limit will flicker between different values. In practice, it only happens during rolling worker restarts and is not a real issue.

Currently this is the only global worker option.

This approach was the fastest way to put this feature in production without introducing a separate per task-queue metadata store.