Hello Temporal community
We’re new to Temporal and have encountered an issue under high load conditions.
We’ve observed a child workflows executing activities that either don’t exist in that workflow code or don’t belong to the initiating workflow, its completely random picking up activity +payload from other workflows, some cases
- (event log and code attached) Our child workflow unexpectedly picks up a ‘remove temporary directory’ task twice/or more when we dont have that activity in workflow code at all, that activity is defined in Mother workflow. its good to mention that payload is definitely coming from other workflow, and its running.
- we also observed that mother workflow runs remove temporary directory activity multiple times with different payload from other running workflow!
- in attched case there is no No-deterministic error as we didnt have any replay, but in case we have replay (most of the cases) we will face non-deterministic error which is expected as the activity is not defined in the workflow code!
One more thing, when it’s happening in the Temporal UI we are seeing multiple remove temp directory activity is running but without workflow task schedule/completed…
here is example from mother workflow:
Its hard to reproduce error, i tried many times but it happens randomly and not that much, only with high load of workflows in queue, And i was out of idea and to me its bug, becuase AFAIK even with changing the code you shouldnt be able to do such a thing.
Has anyone experienced similar behavior or can provide insights into potential causes and solutions for this issue?
PS: I cleaned up the company related code/data but the structure is same as original code, the event logs is also attached and you can import it.
Temporal version:
Python SDK: 1.5.0
Server: 1.24.2
Worker config:
max_concurrent_workflow_tasks=8,
max_cached_workflows=8,
activity_executor=ThreadPoolExecutor(
8
),
max_concurrent_activities=8
#running on k8s
event histroy: event logs: you can see remove dir activity running multiple times but it wasn't in the code. · GitHub
Child workflow code: the workflow dosent have the remove dir activity but running that
import logging
from dataclasses import dataclass
from datetime import timedelta
from temporalio import workflow
from temporalio.exceptions import ApplicationError
with workflow.unsafe.imports_passed_through():
from worker.activities.create_file_activities import (
CreateFilesActivities,
CreateFilesArguments,
UploadFileArguments,
RemoveSchemaFromDBArguments,
PrepareSchemaArguments,
)
from worker.utils import compute_schedule_to_close_timeout, WorkflowConstants
logger = logging.getLogger(__name__)
@dataclass
class CreateFilesInDBParams:
source_directory: str
...
# its a child workflow
@workflow.defn
class CreateFilesInDBWorkflow:
params_object = CreateFilesInDBParams
TASK_QUEUE = "modification"
@workflow.run
async def run(self, params: CreateFilesInDBParams):
file_id = await workflow.execute_activity(
CreateFilesActivities.create_file_id_in_db,
CreateFilesInDBParams(
...
),
start_to_close_timeout=timedelta(
seconds=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
),
schedule_to_close_timeout=timedelta(
seconds=compute_schedule_to_close_timeout(
start_to_close_timeout=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
)
),
retry_policy=WorkflowConstants.DEFAULT_RETRY_POLICY,
)
try:
await workflow.execute_activity(
CreateFilesActivities.upload_files,
UploadFileArguments(
file_id
...
),
start_to_close_timeout=timedelta(
seconds=WorkflowConstants.FILE_UPLOAD_DOWNLOAD_START_TO_CLOSE_TIMEOUT
),
schedule_to_close_timeout=timedelta(
seconds=compute_schedule_to_close_timeout(
start_to_close_timeout=WorkflowConstants.FILE_UPLOAD_DOWNLOAD_START_TO_CLOSE_TIMEOUT
)
),
retry_policy=WorkflowConstants.DEFAULT_RETRY_POLICY,
heartbeat_timeout=timedelta(
seconds=WorkflowConstants.FILE_UPLOAD_DOWNLOAD_HEARTBEAT_TIMEOUT
),
)
files_schema = await workflow.execute_activity(
CreateFilesActivities.prepare_schema,
PrepareSchemaArguments(
file_id
...
),
start_to_close_timeout=timedelta(
seconds=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
),
schedule_to_close_timeout=timedelta(
seconds=compute_schedule_to_close_timeout(
start_to_close_timeout=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
)
),
retry_policy=WorkflowConstants.DEFAULT_RETRY_POLICY,
)
await workflow.execute_activity(
CreateFilesActivities.create_schema,
CreateSchemaArguments(
file_id,
files_schema
),
start_to_close_timeout=timedelta(
seconds=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
),
schedule_to_close_timeout=timedelta(
seconds=compute_schedule_to_close_timeout(
start_to_close_timeout=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
)
),
retry_policy=WorkflowConstants.DEFAULT_RETRY_POLICY,
)
await workflow.execute_activity(
CreateFilesActivities.save_file_schema_to_db,
SaveFileSchemaToDbArguments(
file_id,
files_schema,
),
start_to_close_timeout=timedelta(
seconds=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
),
schedule_to_close_timeout=timedelta(
seconds=compute_schedule_to_close_timeout(
start_to_close_timeout=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
)
),
retry_policy=WorkflowConstants.DEFAULT_RETRY_POLICY,
)
except Exception as exc:
logger.error(f"File creation failed with error {exc}")
await workflow.execute_activity(
CreateFilesActivities.remove_schema_from_db,
RemoveSchemaFromDBArguments(
file_id
),
start_to_close_timeout=timedelta(
seconds=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
),
schedule_to_close_timeout=timedelta(
seconds=compute_schedule_to_close_timeout(
start_to_close_timeout=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
)
),
retry_policy=WorkflowConstants.DEFAULT_RETRY_POLICY,
)
raise ApplicationError(message=str(exc), non_retryable=True)
Mother workflow code that is running the child workflow
import asyncio
from dataclasses import dataclass, fields
from datetime import datetime, timedelta
from temporalio import workflow
from temporalio.exceptions import ApplicationError, FailureError
from temporalio.workflow import ParentClosePolicy
from exceptions import get_exception_message
with workflow.unsafe.imports_passed_through():
from worker.activities.shared_execution_activities import (
RemoveTempDirectoryArguments,
SharedExecutionActivities,
)
from worker.utils import compute_schedule_to_close_timeout, WorkflowConstants
from worker.workflows.create_files_workflow import (
CreateFilesInDBParams,
CreateFilesInDBWorkflow,
)
@dataclass
class MotherParams:
file_name: str
...
@workflow.defn
class MotherWorkflow:
"""
this workflow is the parent workflow that runs the child workflow CreateFilesInDBWorkflow
the only workflow that is responsible to clean-up temporary directory
"""
params_object = MotherParams
TASK_QUEUE = "modification"
@workflow.run
async def run(self, params: MotherParams) -> None:
execution_status = None
latest_data_tmp_dir = None
try:
latest_data_tmp_dir = await workflow.execute_activity(
SharedExecutionActivities.create_temporary_directory,
start_to_close_timeout=timedelta(
seconds=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
),
schedule_to_close_timeout=timedelta(
seconds=compute_schedule_to_close_timeout(
start_to_close_timeout=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
)
),
retry_policy=WorkflowConstants.DEFAULT_RETRY_POLICY,
)
"""
Do some processing running some child workflows to create the files in temporary directory
"""
await workflow.execute_child_workflow(
CreateFilesInDBWorkflow.run,
CreateFilesInDBParams(
....
source_directory=latest_data_tmp_dir,
),
id=f"{params.execution_id}_{params.file_name}",
parent_close_policy=ParentClosePolicy.REQUEST_CANCEL,
execution_timeout=timedelta(
seconds=WorkflowConstants.CHILD_WORKFLOW_EXECUTION_TIMEOUT
),
)
execution_status = ExecutionStatus.SUCCEEDED
except Exception as exc:
execution_status = ExecutionStatus.FAILED
if isinstance(exc, FailureError):
execution_error_message = str(exc.cause or exc.message)
else:
execution_error_message = get_exception_message(exc)
finally:
if latest_data_tmp_dir:
await workflow.execute_activity(
SharedExecutionActivities.remove_temporary_directory,
RemoveTempDirectoryArguments(directory=latest_data_tmp_dir),
start_to_close_timeout=timedelta(
seconds=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
),
schedule_to_close_timeout=timedelta(
seconds=compute_schedule_to_close_timeout(
start_to_close_timeout=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
)
),
retry_policy=WorkflowConstants.DEFAULT_RETRY_POLICY,
)
# some other clean up here