I am experiencing an issue with Temporal workflows. Here’s the scenario:
I have a parent workflow that spawns 25 child workflows in parallel. Each child workflow runs an activity and, after completing the activity, uses ContinueAsNew
to create a new instance of itself. I have 4 worker pods processing these workflows, but I noticed that:
- After calling
ContinueAsNew
, the child workflow often stays bound to the same worker pod instead of being picked up by another pod. - Many workflows get stuck at the activity execution stage, specifically at the
ActivityTaskScheduled
step, seemingly waiting for their turn, and eventually fail with a timeout (StartToCloseTimeout
).
My expectation is that after ContinueAsNew
, the new workflow instance would be distributed to any available worker pod. However, this doesn’t seem to be happening. It appears that Temporal is not balancing workflows effectively across the pods.
Here are my questions:
- Is it expected behavior for workflows to remain on the same worker pod after calling
ContinueAsNew
? - How can I configure Temporal or the worker options to ensure better load distribution across worker pods, especially after
ContinueAsNew
? - Could this behavior be related to
Sticky Execution
or task queue configuration, and how should I address it?
Parent Workflow
class BatchProcessWorkflow
{
use ActivityOptionTrait;
public const WORKFLOW_NAME = 'BatchProcess';
#[Workflow\WorkflowMethod(self::WORKFLOW_NAME)]
public function run(): \Generator
{
$activity = Workflow::newActivityStub(
FetchIdsActivity::class,
$this->activityOptions(
'default',
timeout: CarbonInterval::year(),
retryAttempts: 1
),
);
$ids = yield $activity->fetch();
$childWorkflows = [];
foreach ($ids as $id) {
$childWorkflows[] = Workflow::executeChildWorkflow(
SingleProcessWorkflow::WORKFLOW_NAME,
[
$id,
],
Workflow\ChildWorkflowOptions::new()
);
}
yield Promise::all($childWorkflows);
}
}
ChildWorkflow
#[Workflow\WorkflowInterface]
class SingleProcessWorkflow
{
use ActivityOptionTrait;
public const WORKFLOW_NAME = 'SingleProcess';
#[Workflow\WorkflowMethod(self::WORKFLOW_NAME)]
public function run(int $id): \Generator
{
$activity = Workflow::newActivityStub(
ProcessActivity::class,
$this->activityOptions(
'default',
timeout: CarbonInterval::minute(5),
retryAttempts: 1,
),
);
$isProcessed = yield $activity->process($id);
if ($isProcessed === false) {
/** @psalm-suppress TooManyTemplateParams */
yield Workflow::timer(CarbonInterval::hour());
}
yield Workflow::newContinueAsNewStub(self::class)->run($id);
}
}