Workflow With Cron Schedule Not pulled by new workers on same task queue

Hello, I will attach the context below

  1. I have a corn job that’s scheduled to run every 1 minute defined as following
@workflow.defn(name="TaskExecutor")
class TaskExecutor:
    @workflow.run
    async def run(self) -> None:
        return await workflow.execute_activity(
            Activities.PollExecuteTasks,
            start_to_close_timeout=timedelta(days=365),
        )

caller:
def initTaskExecutor():
    print('INITTASKEXECUTOR called init_tast_executor')
    result = asyncio.run(settings.TEMPORAL_CLIENT.start_workflow(
        Workflows.TaskExecutor.run,
        id="workflow-process-task-queue-new",
        task_queue="task-queue-1",
        id_reuse_policy=WorkflowIDReusePolicy.ALLOW_DUPLICATE,
        cron_schedule="* * * * *",
    ))
    return result
  1. Whenever the worker that called this dies and another is spawned it also tries to start it but gets error: Workflow execution already started
  2. Here is my worker code:
async def main(): 
    client = settings.TEMPORAL_CLIENT

    with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
        worker = Worker(
            client,
            task_queue="task-queue-1",
            workflows=[
                ... 
                Workflows.TaskExecutor, 
            ],
            activities=[
                ...
            ],
            activity_executor=activity_executor,
        )
        print("Starting the worker....")
        await worker.run()

if __name__ == "__main__":
    while True:
        try:
            asyncio.run(main())
        except Exception as e:
            # Log the exception
            print(f"Exception occurred: {e}")
            print("Retrying in 5 seconds...")

The issue is that despite the workflow still running on Temporal cloud it never gets scheduled to the new worker. (It is scheduled properly, if I terminated the corn job and let the new worker start the workflow again)

Thank you in advance.

@maxim Any input out be appreciated, thank you!

Do you handle the “Workflow execution already started” exception? Could it be that the worker was not appropriately started when this exception was thrown?

I don’t think, I would go to the cloud.temporal.io dashboard and I would see the number of workers reflecting the workers running.

Here is my call stack, I run it as a django Management command as following: Python3 manage.py run_worker

import asyncio
from django.core.management.base import BaseCommand
from plato_app.workflows import worker as Worker

class Command(BaseCommand):
    help = 'Runs the Temporal worker'

    def add_arguments(self, parser):
        # Optional: add command line arguments here
        pass

    def handle(self, *args, **options):
        # Your script logic here
        print('RUN_WORKER HANDLE about to run worker.main()')
        asyncio.run(Worker.main())
        self.stdout.write(self.style.SUCCESS('Successfully ran script'))

Worker:

async def main(): #StevenTodo: client below may be relevant to some changes I made earlier to settings.py temporal client
    client = settings.TEMPORAL_CLIENT

    with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
        worker = Worker(
            client,
            task_queue="task-queue-1",
            workflows=[
                Workflows.YourWorkflow, 
                Workflows.TaskExecutor, 
                Workflows.AddAudioClassFile, 
                Workflows.AddPDFClassFile, 
                Workflows.WorkflowCaller
            ],
            activities=[
                Activities.your_activity, 
                Activities.PollExecuteTasks, 
                Activities.FileSpeechToText, 
                Activities.ChunkVectorizeSrtFile, 
                Activities.FilePDFToText,
                Activities.ChunkVectorizeTextFile,
                Activities.MarkProcessedWF,
            ],
            activity_executor=activity_executor,
        )
        print("Starting the worker....")
        await worker.run()

if __name__ == "__main__":
    while True:
        try:
            asyncio.run(main())
        except Exception as e:
            # Log the exception
            print(f"Exception occurred: {e}")
            print("Retrying in 5 seconds...")

I don’t know anything about djungo to know if a command is the right approach here.

What is the workflow history in Temporal UI/CLI?

@maxim Here is the history.
Also, I realized if I redeploy the same code as is, it works fine and pulls the scheduled code otherwise it doesn’t.

{
  "eventId": "1",
  "eventTime": "2024-09-12T03:52:01.353488198Z",
  "eventType": "WorkflowExecutionStarted",
  "version": "1301",
  "taskId": "104876508",
  "workflowExecutionStartedEventAttributes": {
    "workflowType": {
      "name": "TaskExecutor"
    },
    "taskQueue": {
      "name": "task-queue-1",
      "kind": "TASK_QUEUE_KIND_NORMAL"
    },
    "workflowTaskTimeout": "10s",
    "continuedExecutionRunId": "765cfa3e-3a7f-4635-8a1b-8dfcc3a56970",
    "initiator": "CONTINUE_AS_NEW_INITIATOR_CRON_SCHEDULE",
    "lastCompletionResult": {
      "payloads": [
        null
      ]
    },
    "originalExecutionRunId": "c3564772-61fe-40e5-a98d-fdab1a02fe8c",
    "firstExecutionRunId": "ad8456cb-9937-42ee-9049-7206c9a0def7",
    "attempt": 1,
    "cronSchedule": "* * * * *",
    "firstWorkflowTaskBackoff": "59s",
    "prevAutoResetPoints": {
      "points": [
        {
          "binaryChecksum": "3ac9fa34ebcbe2c4bc2313589bf438cd",
          "runId": "ad8456cb-9937-42ee-9049-7206c9a0def7",
          "firstWorkflowTaskCompletedId": "4",
          "createTime": "2024-09-07T02:48:01.070633067Z",
          "expireTime": "2024-10-07T02:48:04.691561946Z",
          "resettable": true
        }
      ]
    },
    "workflowId": "workflow-process-task-queue-new"
  },
  "links": []
}

Is this the complete history consisting of a single event?

That’s the History of the Cron Scheduled job on Temporal UI. All of it yes (At least all of what I can see in the History section of the UI)

Could you create a support ticket (support.temporal.io) where please put your namespace name (so you don’t share it here publicly) and then share your support ticket number here in thread and will take a look.

Does this happen all the time or just for that workflow execution?
Can you try to signalWithStart the running cron execution from a client and see if that triggers scheduling of first workflow task?

Ticket number: 8329

I tried running it as follows:

    await client.start_workflow(
        Workflows.TaskExecutor.run, 
        id="workflow-process-task-queue-new",
        task_queue="task-queue-1",
        start_signal="submit_greeting",
        start_signal_args=["User Signal with Start"],
    )

Still no worker is running it. However other workflows get scheduled normally when the worker is updated.

Here is how I define this workflow compared to other workflows, and here is how I am running the worker:

from datetime import timedelta
import pickle
from temporalio import workflow
from temporalio.common import WorkflowIDReusePolicy
from temporalio.workflow import ParentClosePolicy

# Import our activity, passing it through the sandbox
with workflow.unsafe.imports_passed_through():
    from plato_app.workflows import wf_params as WfParams, activities as Activities, activity_params as ActivityParams
    

# ---------------------------------- Test Activities ---------------------------------- 
@workflow.defn(name="YourWorkflow")
class YourWorkflow:
    @workflow.run
    async def run(self, params: WfParams.YourParams) -> str:
        return await workflow.execute_activity(
            Activities.your_activity,
            params,
            start_to_close_timeout=timedelta(seconds=10),
        )

# ---------------------------------- Taskk Executor Workflows ---------------------------------- 
@workflow.defn(name="TaskExecutor")
class TaskExecutor:
    @workflow.run
    async def run(self) -> None:
        return await workflow.execute_activity(
            Activities.PollExecuteTasks,
            start_to_close_timeout=timedelta(days=365),
        )
    
@workflow.defn(name="WorkFlowCaller")
class WorkflowCaller:
    @workflow.run
    async def run(self, params: WfParams.WorkFlowCallerParams) -> None:
        from .events import EVENTS_TO_WORKFLOWS, EVENTS_TO_WORKFLOW_ID_PREFIX

        print("Executing and wiating on this wf: ", params.workflow_id, "event_type: ", params.event_type)
        wf_params = pickle.loads(params.payload)
        print('WF_PARAMS is ', wf_params)
        _ = await workflow.execute_child_workflow(
            EVENTS_TO_WORKFLOWS[params.event_type].run,
            wf_params,
            id=EVENTS_TO_WORKFLOW_ID_PREFIX[params.event_type] + params.workflow_id,
            task_queue="task-queue-1",
            id_reuse_policy=WorkflowIDReusePolicy.ALLOW_DUPLICATE_FAILED_ONLY,
            parent_close_policy=ParentClosePolicy.ABANDON
        )
        
        print("\nwf executed successfully, marking as processed: ", params.workflow_id, "event_type: ", params.event_type)
        return await workflow.execute_activity(
            Activities.MarkProcessedWF,
            ActivityParams.MarkProcessedWFParams(
                workflow_id=params.workflow_id
            ),
            start_to_close_timeout=timedelta(seconds=100),
        )

That’s how I run the worker:

import asyncio
import concurrent.futures
from temporalio.worker import Worker
from . import workflows as Workflows
from . import activities as Activities
from django.conf import settings
from temporalio.worker import Worker
from plato_app.workflows import workflows as Workflows
from plato_app.workflows import activities as Activities

async def main(): 
    client = settings.TEMPORAL_CLIENT

    with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
        worker = Worker(
            client,
            task_queue="task-queue-1",
            workflows=[
                Workflows.YourWorkflow, 
                Workflows.TaskExecutor, 
                Workflows.AddAudioClassFile, 
                Workflows.AddPDFClassFile, 
                Workflows.WorkflowCaller
            ],
            activities=[
                Activities.your_activity, 
                Activities.PollExecuteTasks, 
                Activities.FileSpeechToText, 
                Activities.ChunkVectorizeSrtFile, 
                Activities.FilePDFToText,
                Activities.ChunkVectorizeTextFile,
                Activities.MarkProcessedWF,
            ],
            activity_executor=activity_executor,
        )
        print("Starting the worker....")
        await worker.run()

if __name__ == "__main__":
    while True:
        try:
            asyncio.run(main())
        except Exception as e:
            # Log the exception
            print(f"Exception occurred: {e}")
            print("Retrying in 5 seconds...")