Clarification: combination max_concurrent_activities and start_to_close_timeout

Hi Temporal guys!

I don’t understand well the combination of the two configuration
max_concurrent_activities/max_concurrent_workflow_tasks and start_to_close_timeout.

In my scenario, I need to limit each worker instance to process only a set number of activities, based on my settings.TEMPORAL_NUM_WORKERS, because my machines have limited resources and the operations are CPU-intensive. Workflows and activities are currently 1-to-1.

The problem is for example that when a client with one worker instance tries to start 8 workflows and settings.TEMPORAL_NUM_WORKERS is set to 2, all 8 workflows start their timeout countdown immediately. I want only 2 workflows to run at a time, and the timeout for the others should start only after a previous one finishes.

Since the workflows can take anywhere from 0 to 5 minutes, if the timeout countdown starts before they’re actually processed, the other activities end up accumulating the execution time of the previous ones.

my code is this:

@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self, data: WorkflowData) -> str:
            result = await workflow.execute_activity(
                my_cpu_intensive_activity,
                data,
                start_to_close_timeout=timedelta(minutes=5),
                task_queue=settings.TEMPORAL_QUEUE,
                retry_policy=RetryPolicy(maximum_attempts=1),
            )
            return result
        worker = Worker(
            sdk_client,
            task_queue=settings.TEMPORAL_QUEUE,
            activities=[my_cpu_intensive_activity],
            workflows=[MyWorkflow],

            activity_executor=ProcessPoolExecutor(max_workers=settings.TEMPORAL_NUM_WORKERS),

            max_concurrent_workflow_tasks=settings.TEMPORAL_NUM_WORKERS,
            max_concurrent_workflow_task_polls=settings.TEMPORAL_NUM_WORKERS,
            max_concurrent_activities=settings.TEMPORAL_NUM_WORKERS,
            max_concurrent_activity_task_polls=settings.TEMPORAL_NUM_WORKERS,
            
            shared_state_manager=SharedStateManager.create_from_multiprocessing(
                multiprocessing.Manager()
            ),
        )
        await worker.run()


Based on the image above, my mental model was:
if I set max_concurrent_activities and max_concurrent_workflow_tasks to 2 and have 8 potential workflows (started with temporal_client.start_workflow(...)).
I expected only 2 to start, with the remaining 6 being scheduled but waiting for the current ones to finish before starting.
Am I misunderstanding how this works, or is there something wrong with my configuration?

expected:
-------------------------------------------------------------------------------------------------------
STARTED | STARTED | SCHEDULED | SCHEDULED | SCHEDULED | SCHEDULED | SCHEDULED | SCHEDULED |  
-------------------------------------------------------------------------------------------------------
current:
-------------------------------------------------------------------------------------------------------
STARTED | STARTED | STARTED | STARTED | STARTED | STARTED | STARTED | STARTED |  
-------------------------------------------------------------------------------------------------------

The confusion comes from misunderstanding what the workflow task is. Each workflow is executed as a sequence of workflow tasks that are created when there are new events to process. Limiting the number of parallel workflow tasks does not limit the number of parallel workflows. It limits the throughput and resource utilization (only for workflow code) per worker. We recommend not changing the default value of max_concurrent_workflow_tasks for use cases like yours. Changing the number of pollers is also not needed unless you need to run a very high throughput scenario.

Currently, Temporal doesn’t directly support limiting the number of parallel workflow instances. One possible workaround is to create a workflow that starts the processing workflows in a controlled manner. See batch samples from the java-samples repository.

1 Like

hi @maxim,

Thanks so much for your answer! Now it’s more clear.

now, some points:

Batch samples

I couldn’t understand it fully. I tried to figure it out by looking at the batch samples, but from what I understand, the samples seem to process elements within the same workflow execution “message”. I might be wrong(I need to overcome my learning curve with Temporal), but it in my scenario, I can’t group ‘my items to be processed’ into pairs or triples… from the client, as I don’t have control over them. Let me know if my intuition is off.

A possible solution?

I found a solution, but it requires a global worker semaphore, something like:

with workflow.unsafe.imports_passed_through():
    from temporal_project.multiprocessing_worker_semaphore import WorkerSemaphore

@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self, data: WorkflowData) -> str:
        await WorkerSemaphore.acquire()
        try:
            result = await workflow.execute_activity(
                my_cpu_intensive_activity,
                data,
                start_to_close_timeout=timedelta(minutes=5),
                task_queue=settings.TEMPORAL_QUEUE,
                retry_policy=RetryPolicy(maximum_attempts=1),
            )
         except Exception as error:
            WorkerSemaphore.release()
            raise error

        WorkerSemaphore.release()
        return result

To implement this, I wanted to use Python’s Semaphore from asyncio, but since each workflow has its own event loop, an event loop-level lock isn’t suitable. So, I used a multiprocessing lock, hoping it won’t interfere with the underlying workings of the Temporal worker.

I don’t know if its an optimal solution:

class WorkerSemaphore:
    free_workflows_number: multiprocessing.Value = None
    lock: multiprocessing.Lock = None

    @staticmethod
    def set_max_parallel_workflows_number(manager: multiprocessing.Manager, max_parallel_workflow_number: int):
        WorkerSemaphore.lock = manager.Lock()
        WorkerSemaphore.free_workflows_number = manager.Value("i", max_parallel_workflow_number)

    @staticmethod
    async def acquire():
        while True:
            with WorkerSemaphore.lock:
                if WorkerSemaphore.free_workflows_number.value > 0:
                    WorkerSemaphore.free_workflows_number.value -= 1
                    return
            await asyncio.sleep(20)

    @staticmethod
    def release():
        with WorkerSemaphore.lock:
            WorkerSemaphore.free_workflows_number.value += 1

In my tests, it appears to work well, but I’d like feedback from experts. Using a semaphore for this seems a bit wrong to me… Also, the web panel indicates that workflows are polling every tick seconds to be ready for processing activities. While this is useful, I’m worried I might be falling into an anti-pattern. What do you think?

Worker configuration

so, to better understand the worker configuration in my usecase:

activity_executor=ProcessPoolExecutor(max_workers=settings.TEMPORAL_NUM_WORKERS),

max_concurrent_workflow_tasks=settings.TEMPORAL_NUM_WORKERS,            max_concurrent_workflow_task_polls=settings.TEMPORAL_NUM_WORKERS,
max_concurrent_activities=settings.TEMPORAL_NUM_WORKERS,
max_concurrent_activity_task_polls=settings.TEMPORAL_NUM_WORKERS,

Given that my activity involve thread blocking, thread spawning, and is CPU-intensive, and since I don’t control its internals (as it come from a third-party library and I can’t implement a heartbeat in the middle of its execution), I need to configure the activity_executor accordingly. I plan to adjust the TEMPORAL_NUM_WORKERS setting to a ratio of 1:2 relative to the number of cores on the worker machine.

So, the only configuration I’m unsure about is max_concurrent_activities. Although it controls the number of concurrent activities within a single workflow, the app issues a warning if this number is bigger than the ProcessPoolExecutor(max_workers=)

So, is this configuration okay?

activity_executor=ProcessPoolExecutor(max_workers=settings.TEMPORAL_NUM_WORKERS),
max_concurrent_activities=settings.TEMPORAL_NUM_WORKERS,

You cannot use any global variables inside the workflow code. Also, note that workflows are distributed across multiple hosts, so such a semaphore cannot work by definition.

What is the maximum rate (per second) and number of parallel workflows that you need to support for this use case?

It’s fine for me to limit the number per worker instance. The idea is that if I have 4 worker instances and each can process 2 workflows at a time, then all workers combined can handle 8 workflows simultaneously.

It’s difficult to determine the exact rate per second because, in my scenario, users have a cap on the maximum number of jobs they can run in parallel, with each job lasting anywhere from 0 to 5 minutes.

So, if I have 5 users with a maximum of 10 background operations each, they will likely want to use their full capacity. I could receive 50 requests initially, with new requests arriving as soon as previous jobs finish.

The issue is that the CPU-intensive activity slows down or crashes the application if more than 2 activities are run in parallel on a machine with 4 cores, for example.

But in practice is that the timeout countdown for each task starts immediately, instead of waiting for the computation resources to process the next activity to become available.

But in practice is that the timeout countdown for each task starts immediately, instead of waiting for the computation resources to process the next activity to become available.

If you limit the number of activities per worker then this cannot happen. The activity StartToClose timeout starts only when there is a free slot in that worker.

It’s fine for me to limit the number per worker instance. The idea is that if I have 4 worker instances and each can process 2 workflows at a time, then all workers combined can handle 8 workflows simultaneously.

There is no way to limit number of workflows per worker instance. You have to do it yourself. If the rate is low you can create a special “mutex” workflow to act as semaphore.

  1. If I understand correctly, the activity limit applies within a single workflow. However, since each of my activities corresponds to one workflow, 8 workflows mean 8 activities. The configuration doesn’t seem to affect activities across different workflows.

  2. Do you have any guidance or documentation on creating a semaphore/mutex workflow without using a global variable?

  1. Incorrect. The worker configuration applies to activities that are executed by a worker and is not related to workflows in any way.
  2. Here is a Go sample.
1 Like

If that’s true, I wouldn’t need a mutex :). However, in my current worker, I just started 4 workflows, and all 4 activities started instantly, even though max_concurrent_activities is set to 2. Could I have misconfigured the worker about synchronous tasks or something else that might be causing this effect?

Please check the identity field at ActivityTaskStartedEvent for these four activities. I suspect you have more than one worker running.




No, the worker is one, but I notice that the start timestamp is accurate :thinking:.

My issue is with the timeout for pending activities:

Could it not be related to start_to_close_timeout? I just modified the start_to_close_timeout, but this expiration remains unchanged.

The above means not to retry the activity in case of failure and wait 5 minutes before reporting the timeout. I’m not sure what the question is.

hi @maxim,

Nothing. Thanks for all your support; you’ve been super helpful. The problem was with the run_timeout in the client code, which overrides the activity code’s timeouts ( This configuration was setting both ScheduleToCloseTimeout and ScheduleToStartTimeout to 5). I guess I got a bit too creative with my leaps of logic :sweat_smile:.

        await self.temporal_client.start_workflow(
            "MyWorkflow",
            JobEvent(body, job_id),
            id=job_id.workflow_id,
            task_queue=self.settings.TEMPORAL_QUEUE,
            run_timeout=timedelta(minutes=5),
        )

We recommend not setting a workflow timeout at all. Would you really want your workflow terminated without any ability to cleanup if your worker was down for 5 minutes?

I don’t have control over the user input for background operations. While they should generally take less than 5 minutes, complex inputs could cause activities to run for hours. Therefore, I need to enforce an execution timeout to ensure the computation time stays within limits. Is there a better solution?

Who implements the activities? Can they fail if the input is not the one you want to support?

The input is user-defined, similar to HTML, with a lot of variety, and I don’t currently have effective heuristics to determine if the job will fail or time out before processing it.

Use activity heartbeating. This way, you can have activity that can run for a long time, but timeout if the process crashes or hangs.

1 Like