Workflow Executing Unexpected Activities Not Defined in Its Code [Under High Load]

Hello Temporal community

We’re new to Temporal and have encountered an issue under high load conditions.
We’ve observed a child workflows executing activities that either don’t exist in that workflow code or don’t belong to the initiating workflow, its completely random picking up activity +payload from other workflows, some cases

  • (event log and code attached) Our child workflow unexpectedly picks up a ‘remove temporary directory’ task twice/or more when we dont have that activity in workflow code at all, that activity is defined in Mother workflow. its good to mention that payload is definitely coming from other workflow, and its running.
  • we also observed that mother workflow runs remove temporary directory activity multiple times with different payload from other running workflow!
  • in attched case there is no No-deterministic error as we didnt have any replay, but in case we have replay (most of the cases) we will face non-deterministic error which is expected as the activity is not defined in the workflow code!

One more thing, when it’s happening in the Temporal UI we are seeing multiple remove temp directory activity is running but without workflow task schedule/completed…

here is example from mother workflow:

Its hard to reproduce error, i tried many times but it happens randomly and not that much, only with high load of workflows in queue, And i was out of idea and to me its bug, becuase AFAIK even with changing the code you shouldnt be able to do such a thing.

Has anyone experienced similar behavior or can provide insights into potential causes and solutions for this issue?

PS: I cleaned up the company related code/data but the structure is same as original code, the event logs is also attached and you can import it.

Temporal version:

Python SDK: 1.5.0 
Server: 1.24.2

Worker config:

max_concurrent_workflow_tasks=8,
max_cached_workflows=8,
activity_executor=ThreadPoolExecutor(
            8
        ),
max_concurrent_activities=8
#running on k8s

event histroy: event logs: you can see remove dir activity running multiple times but it wasn't in the code. · GitHub

Child workflow code: the workflow dosent have the remove dir activity but running that


import logging
from dataclasses import dataclass
from datetime import timedelta

from temporalio import workflow
from temporalio.exceptions import ApplicationError

with workflow.unsafe.imports_passed_through():
    from worker.activities.create_file_activities import (
        CreateFilesActivities,
        CreateFilesArguments,
        UploadFileArguments,
        RemoveSchemaFromDBArguments,
        PrepareSchemaArguments,

    )
    from worker.utils import compute_schedule_to_close_timeout, WorkflowConstants

logger = logging.getLogger(__name__)


@dataclass
class CreateFilesInDBParams:
    source_directory: str
    ...

# its a child workflow
@workflow.defn
class CreateFilesInDBWorkflow:
    params_object = CreateFilesInDBParams
    TASK_QUEUE = "modification"

    @workflow.run
    async def run(self, params: CreateFilesInDBParams):
        file_id = await workflow.execute_activity(
            CreateFilesActivities.create_file_id_in_db,
            CreateFilesInDBParams(
                ...

            ),
            start_to_close_timeout=timedelta(
                seconds=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
            ),
            schedule_to_close_timeout=timedelta(
                seconds=compute_schedule_to_close_timeout(
                    start_to_close_timeout=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
                )
            ),
            retry_policy=WorkflowConstants.DEFAULT_RETRY_POLICY,
        )

        try:
            await workflow.execute_activity(
                CreateFilesActivities.upload_files,
                UploadFileArguments(
                    file_id
                    ...
                ),
                start_to_close_timeout=timedelta(
                    seconds=WorkflowConstants.FILE_UPLOAD_DOWNLOAD_START_TO_CLOSE_TIMEOUT
                ),
                schedule_to_close_timeout=timedelta(
                    seconds=compute_schedule_to_close_timeout(
                        start_to_close_timeout=WorkflowConstants.FILE_UPLOAD_DOWNLOAD_START_TO_CLOSE_TIMEOUT
                    )
                ),
                retry_policy=WorkflowConstants.DEFAULT_RETRY_POLICY,
                heartbeat_timeout=timedelta(
                    seconds=WorkflowConstants.FILE_UPLOAD_DOWNLOAD_HEARTBEAT_TIMEOUT
                ),
            )

            files_schema = await workflow.execute_activity(
                CreateFilesActivities.prepare_schema,
                PrepareSchemaArguments(
                    file_id
                    ...
                ),
                start_to_close_timeout=timedelta(
                    seconds=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
                ),
                schedule_to_close_timeout=timedelta(
                    seconds=compute_schedule_to_close_timeout(
                        start_to_close_timeout=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
                    )
                ),
                retry_policy=WorkflowConstants.DEFAULT_RETRY_POLICY,
            )

            await workflow.execute_activity(
                CreateFilesActivities.create_schema,
                CreateSchemaArguments(
                    file_id,
                    files_schema
                ),
                start_to_close_timeout=timedelta(
                    seconds=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
                ),
                schedule_to_close_timeout=timedelta(
                    seconds=compute_schedule_to_close_timeout(
                        start_to_close_timeout=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
                    )
                ),
                retry_policy=WorkflowConstants.DEFAULT_RETRY_POLICY,
            )

            await workflow.execute_activity(
                CreateFilesActivities.save_file_schema_to_db,
                SaveFileSchemaToDbArguments(
                    file_id,
                    files_schema,
                ),
                start_to_close_timeout=timedelta(
                    seconds=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
                ),
                schedule_to_close_timeout=timedelta(
                    seconds=compute_schedule_to_close_timeout(
                        start_to_close_timeout=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
                    )
                ),
                retry_policy=WorkflowConstants.DEFAULT_RETRY_POLICY,
            )
        except Exception as exc:
            logger.error(f"File creation failed with error {exc}")
            await workflow.execute_activity(
                CreateFilesActivities.remove_schema_from_db,
                RemoveSchemaFromDBArguments(
                    file_id
                ),
                start_to_close_timeout=timedelta(
                    seconds=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
                ),
                schedule_to_close_timeout=timedelta(
                    seconds=compute_schedule_to_close_timeout(
                        start_to_close_timeout=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
                    )
                ),
                retry_policy=WorkflowConstants.DEFAULT_RETRY_POLICY,
            )
            raise ApplicationError(message=str(exc), non_retryable=True)

Mother workflow code that is running the child workflow

import asyncio
from dataclasses import dataclass, fields
from datetime import datetime, timedelta

from temporalio import workflow
from temporalio.exceptions import ApplicationError, FailureError
from temporalio.workflow import ParentClosePolicy

from exceptions import get_exception_message

with workflow.unsafe.imports_passed_through():
    from worker.activities.shared_execution_activities import (
        RemoveTempDirectoryArguments,
        SharedExecutionActivities,
    )
    from worker.utils import compute_schedule_to_close_timeout, WorkflowConstants
    from worker.workflows.create_files_workflow import (
        CreateFilesInDBParams,
        CreateFilesInDBWorkflow,
    )

@dataclass
class MotherParams:
    file_name: str
    ...



@workflow.defn
class MotherWorkflow:
    """
    this workflow is the parent workflow that runs the child workflow CreateFilesInDBWorkflow
    the only workflow that is responsible to clean-up temporary directory
    """

    params_object = MotherParams
    TASK_QUEUE = "modification"

    @workflow.run
    async def run(self, params: MotherParams) -> None:
        execution_status = None
        latest_data_tmp_dir = None

        try:

            latest_data_tmp_dir = await workflow.execute_activity(
                SharedExecutionActivities.create_temporary_directory,
                start_to_close_timeout=timedelta(
                    seconds=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
                ),
                schedule_to_close_timeout=timedelta(
                    seconds=compute_schedule_to_close_timeout(
                        start_to_close_timeout=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
                    )
                ),
                retry_policy=WorkflowConstants.DEFAULT_RETRY_POLICY,
            )
            """
            Do some processing running some child workflows to create the files in temporary directory
            """

            await workflow.execute_child_workflow(
                CreateFilesInDBWorkflow.run,
                CreateFilesInDBParams(
                   ....
                    source_directory=latest_data_tmp_dir,
                ),
                id=f"{params.execution_id}_{params.file_name}",
                parent_close_policy=ParentClosePolicy.REQUEST_CANCEL,
                execution_timeout=timedelta(
                    seconds=WorkflowConstants.CHILD_WORKFLOW_EXECUTION_TIMEOUT
                ),
            )

            execution_status = ExecutionStatus.SUCCEEDED

        except Exception as exc:
            execution_status = ExecutionStatus.FAILED
            if isinstance(exc, FailureError):
                execution_error_message = str(exc.cause or exc.message)
            else:
                execution_error_message = get_exception_message(exc)


        finally:
            if latest_data_tmp_dir:
                await workflow.execute_activity(
                    SharedExecutionActivities.remove_temporary_directory,
                    RemoveTempDirectoryArguments(directory=latest_data_tmp_dir),
                    start_to_close_timeout=timedelta(
                        seconds=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
                    ),
                    schedule_to_close_timeout=timedelta(
                        seconds=compute_schedule_to_close_timeout(
                            start_to_close_timeout=WorkflowConstants.DEFAULT_START_TO_CLOSE_TIMEOUT
                        )
                    ),
                    retry_policy=WorkflowConstants.DEFAULT_RETRY_POLICY,
                )
            # some other clean up here

The root cause was the final block was waking up in another thread that was running another workflow as its described here and here it has been fixed with upgrade the python-sdk to 1.6 but i recommend :smiley: to upgrade to latest or at least 1.7