Limit Concurrent Activities in Parallel

In Golang, there’s the popular concept of worker pools: Go by Example: Worker Pools. Is there a way to implement this in Temporal?

Let’s say that you have a batch of activities to execute but only want 4 activities executing at the same time, is this possible?

1 Like

Set WorkerOptions.MaxConcurrentActivityExecutionSize to limit number of activities executing at the same time per worker.

Hmm - this only allows us to limit activities per worker, but what if you wanted to limit the activities on a workflow?

For example, if you have a workflow that needs to execute 64 activities, but only want 4 activities executing concurrently. Is that possible?

Also, is there a limit on how many activities you can have running in parallel at any point in time?

Actually, is that what the “swarm” example here is: samples-go/swarm.go at e89517ba16b481a03b5a0b4edb8733aab6b135e3 · temporalio/samples-go · GitHub?

I haven’t heard of swarm before, so maybe that terminology is why I didn’t jump to this. Do you have a rough description of what this attempts to do?

Hmm - this only allows us to limit activities per worker, but what if you wanted to limit the activities on a workflow?
For example, if you have a workflow that needs to execute 64 activities, but only want 4 activities executing concurrently. Is that possible?

You start 4 activities and start more when one of the executing activities is completed. Note that in Go SDK ExecuteActivity returns a Future. So the call is already asynchronous. You can use Selector for waiting on multiple Futures.

Also, is there a limit on how many activities you can have running in parallel at any point in time?

Limiting the number of executing activities across multiple workers is not supported. Only a global rate list per task queue is supported.

I haven’t heard of swarm before, so maybe that terminology is why I didn’t jump to this. Do you have a rough description of what this attempts to do?

It was contributed. So I’m not an expert in the mathematical model it implements.

I see - so you could start 4 activities, then with the Selector, you can just kick off another one as an activity finishes. This seems to be a good fit for our case, but is there a reason you wouldn’t call this a cap on the activity parallelism? Is it because this isn’t a global activity limit (i.e. across all workflows) but only a limit on the parallelism of the single workflow?

This seems to be a good fit for our case, but is there a reason you wouldn’t call this a cap on the activity parallelism? Is it because this isn’t a global activity limit (i.e. across all workflows) but only a limit on the parallelism of the single workflow?

Yes, I was talking about the global cap on activity parallelism which applies to multiple workflows.

On this topic, in Java, is this a reasonable way to pause the workflow if too many activities are queued up, or is it a busy-wait?

Workflow.await(() -> activities.stream().filter(p -> !p.isCompleted()).count() < MAX_RUNNING)

This is in a cpu-intensive workflow, where WorkerOptions.MaxConcurrentActivityExecutionSize is set to the number of cpu cores. The list of activities far exceeds that and I’d rather not have the workflow get too far ahead of the queue.

That await should work fine. It is not a busy-wait as the condition is evaluated only if the state of the workflow is changed due to some external event like activity completion.

hi Maxim,
can I set: MaxConcurrentActivityExecutionSize to a specific activity?
I have a case in which I launch some activity according to an input list, and I’d like to limit the async to up to 5 at a time. example:
definition:

    @ActivityStub(retryOptions = RetryActivityOptions(maximumAttempts = 5, maximumInterval = 10))
    lateinit var bulkEditActivities: BulkEditActivities

execution:

val promises = itemsList.map { sessionId ->
    Async.function {
        bulkEditActivitiesDB.updateSessionToDB(
            bulkId = bulkId,

how could I apply this property correctly?

Thanks,
Shai

can I set: MaxConcurrentActivityExecutionSize to a specific activity?

It’s a worker specific limit you can set via WorkerOptions to place a limit of parallel activities that can be invoked. You would need a specific worker for this activity and set it to 5 for your case.

Schedule this activity using a specific task queue and create a worker that listens on that task queue.

thanks!
Shai

In case anyone is interested in the Go implementation there’s a discussion in this close PR:

A good example to start is

And to limit number of max concurrent activities it just needs to execute an activity once one completes.

We have limitation on maximum 5 concurrency, but we have 10 kubernetes pods running, each pod registers one worker.
If we put 5 max concurrency to one worker, total 10 workers will be there, so total concurrency will be 5*10 =50.

So I was thinking to create separate deployment with this worker enabled with only one pod and other’s have this worker disabled.
Is there any other better solution for this?

Are your activities cpu intensive? If not having one activity specific worker process should be ok. Another option could be to maybe have 5 worker processes and set MaxConcurrentActivityExecution to 1 on each but that’s probably more expensive.

We have limitation on maximum 5 concurrency, but we have 10 kubernetes pods running, each pod registers one worker.

What is the rate per second for this use case? If it is low then you can have a single workflow execution (instance) that executes those activities. A single workflow execution can easily guarantee their parallelism completely independently from the worker configuration.

It is on average 2-3 requests/ per second.

Does single workflow mean a dedicated wrapper workflow which executes bunch of activities?
Is there any sample for single workflow execution? or code snippet to understand

Something like this:

@WorkflowInterface
public interface SerializedExecutionWorkflow {

    @WorkflowMethod
    void execute();

    @SignalMethod
    void addTask(Task t);
}

@ActivityInterface
public interface TaskProcessorActivity {
    void process(Task poll);
}

public class SerializedExecutionWorkflowImpl implements SerializedExecutionWorkflow {

    private final Queue<Task> taskQueue = new ArrayDeque<>();
    private final TaskProcesorActivity processor = Workflow.newActivityStub(TaskProcesorActivity.class);

    @Override
    public void execute() {
        while(!taskQueue.isEmpty()) {
            // Add logic to limit parallelism
            processor.process(taskQueue.poll());
        }
        // Add continue as new logic.
    }

    @Override
    public void addTask(Task t) {
        taskQueue.add(t);
    }
}

In this we are adding task via signal method, Output of activity is ignored. But if we also we want to get output of our activity, what are the ways?

One way is we will have to call signal method of original workflow after completion of activity. But any other better ways?

can this single workflow be modified in such a way?