Hi Temporal guys!
I don’t understand well the combination of the two configuration
max_concurrent_activities
/max_concurrent_workflow_tasks
and start_to_close_timeout
.
In my scenario, I need to limit each worker instance to process only a set number of activities, based on my settings.TEMPORAL_NUM_WORKERS
, because my machines have limited resources and the operations are CPU-intensive. Workflows and activities are currently 1-to-1.
The problem is for example that when a client with one worker instance tries to start 8 workflows and settings.TEMPORAL_NUM_WORKERS
is set to 2, all 8 workflows start their timeout countdown immediately. I want only 2 workflows to run at a time, and the timeout for the others should start only after a previous one finishes.
Since the workflows can take anywhere from 0 to 5 minutes, if the timeout countdown starts before they’re actually processed, the other activities end up accumulating the execution time of the previous ones.
my code is this:
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self, data: WorkflowData) -> str:
result = await workflow.execute_activity(
my_cpu_intensive_activity,
data,
start_to_close_timeout=timedelta(minutes=5),
task_queue=settings.TEMPORAL_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=1),
)
return result
worker = Worker(
sdk_client,
task_queue=settings.TEMPORAL_QUEUE,
activities=[my_cpu_intensive_activity],
workflows=[MyWorkflow],
activity_executor=ProcessPoolExecutor(max_workers=settings.TEMPORAL_NUM_WORKERS),
max_concurrent_workflow_tasks=settings.TEMPORAL_NUM_WORKERS,
max_concurrent_workflow_task_polls=settings.TEMPORAL_NUM_WORKERS,
max_concurrent_activities=settings.TEMPORAL_NUM_WORKERS,
max_concurrent_activity_task_polls=settings.TEMPORAL_NUM_WORKERS,
shared_state_manager=SharedStateManager.create_from_multiprocessing(
multiprocessing.Manager()
),
)
await worker.run()
Based on the image above, my mental model was:
if I set
max_concurrent_activities
and max_concurrent_workflow_tasks
to 2
and have 8 potential workflows (started with temporal_client.start_workflow(...)
).I expected only 2 to start, with the remaining 6 being scheduled but waiting for the current ones to finish before starting.
Am I misunderstanding how this works, or is there something wrong with my configuration?
expected:
-------------------------------------------------------------------------------------------------------
STARTED | STARTED | SCHEDULED | SCHEDULED | SCHEDULED | SCHEDULED | SCHEDULED | SCHEDULED |
-------------------------------------------------------------------------------------------------------
current:
-------------------------------------------------------------------------------------------------------
STARTED | STARTED | STARTED | STARTED | STARTED | STARTED | STARTED | STARTED |
-------------------------------------------------------------------------------------------------------