Hello everyone,
I have an issue which occurs from time to time with a workflow execution.
Here is the workflow code:
@workflow.defn
class DiscoveryProcessTriggerWorkflow:
@workflow.run
async def run(self, input: DiscoveryProcessTriggerInput) -> None:
workflow.logger.info("Fetching pending processes from the database...")
discovery_processes = await workflow.execute_activity_method(
DiscoveryProcessFetcherActivity.fetch_discovery_processes,
DiscoveryProcessFetcherInput(batch_size=input.batch_size),
start_to_close_timeout=default_start_to_close_timeout,
retry_policy=default_retry_policy,
)
if len(discovery_processes) == 0:
workflow.logger.info("No pending processes found. Sleeping...")
else:
workflow.logger.info(
f"Spawning {len(discovery_processes)} child workflows..."
)
for discovery_process in discovery_processes:
await workflow.start_child_workflow(
DiscoveryProcessWorkflow.run,
args=[discovery_process],
task_queue=DISCOVERY_TASK_QUEUE,
parent_close_policy=workflow.ParentClosePolicy.ABANDON,
retry_policy=default_retry_policy,
id=f"discovery-child-{discovery_process.id}",
)
await workflow.sleep(timedelta(seconds=input.poll_interval_seconds))
new_input = DiscoveryProcessTriggerInput(
batch_size=input.batch_size,
poll_interval_seconds=input.poll_interval_seconds,
)
workflow.continue_as_new(new_input)
Here is the code for running the workflow:
....
client = await Client.connect(
settings.TEMPORAL_SERVER_URL,
interceptors=[TracingInterceptor()],
runtime=runtime,
data_converter=pydantic_data_converter,
)
input = DiscoveryProcessTriggerInput(
batch_size=args.batch_size, poll_interval_seconds=args.poll_interval_seconds
)
await client.execute_workflow(
DiscoveryProcessTriggerWorkflow.run,
input,
id=f"discovery-parent-{uuid.uuid4()}",
task_queue=DISCOVERY_TASK_QUEUE,
)
....
I am using Pydantic models for passing data around workflows and activities.
Temporal: temporalio/auto-setup:1.28.0
Temporal SDK: temporalio:1.14.0
Python: 3.13
Here is the activation error traceback:
Traceback (most recent call last):
File "/temporal/.venv/lib/python3.13/site-packages/temporalio/worker/_workflow_instance.py", line 412, in activate
self._run_once(check_conditions=index == 1 or index == 2)
~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/temporal/.venv/lib/python3.13/site-packages/temporalio/worker/_workflow_instance.py", line 2130, in _run_once
raise self._current_activation_error
File "/temporal/.venv/lib/python3.13/site-packages/temporalio/worker/_workflow_instance.py", line 2148, in _run_top_level_workflow_function
await coro
File "/temporal/.venv/lib/python3.13/site-packages/temporalio/worker/_workflow_instance.py", line 974, in run_workflow
result = await self._inbound.execute_workflow(input)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/temporal/.venv/lib/python3.13/site-packages/temporalio/contrib/opentelemetry.py", line 373, in execute_workflow
return await super().execute_workflow(input)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/temporal/.venv/lib/python3.13/site-packages/temporalio/worker/_interceptor.py", line 385, in execute_workflow
return await self.next.execute_workflow(input)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/temporal/.venv/lib/python3.13/site-packages/temporalio/worker/_workflow_instance.py", line 2529, in execute_workflow
return await input.run_fn(*args)
~~~~~~~~~~~~^^^^^^^
TypeError: DiscoveryProcessTriggerWorkflow.run() takes 2 positional arguments but 3 were given
I am assuming that when this error is raised, temporal tries to run the workflow with the following 3 parameters: self, batch_size, poll_interval_seconds instead of self and a instance of the input model.
Any idea on why this is happening?
Got this error once out of 320 runs.
Thank you!
