Hello, I will attach the context below
- I have a corn job that’s scheduled to run every 1 minute defined as following
@workflow.defn(name="TaskExecutor")
class TaskExecutor:
@workflow.run
async def run(self) -> None:
return await workflow.execute_activity(
Activities.PollExecuteTasks,
start_to_close_timeout=timedelta(days=365),
)
caller:
def initTaskExecutor():
print('INITTASKEXECUTOR called init_tast_executor')
result = asyncio.run(settings.TEMPORAL_CLIENT.start_workflow(
Workflows.TaskExecutor.run,
id="workflow-process-task-queue-new",
task_queue="task-queue-1",
id_reuse_policy=WorkflowIDReusePolicy.ALLOW_DUPLICATE,
cron_schedule="* * * * *",
))
return result
- Whenever the worker that called this dies and another is spawned it also tries to start it but gets error:
Workflow execution already started
- Here is my worker code:
async def main():
client = settings.TEMPORAL_CLIENT
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
worker = Worker(
client,
task_queue="task-queue-1",
workflows=[
...
Workflows.TaskExecutor,
],
activities=[
...
],
activity_executor=activity_executor,
)
print("Starting the worker....")
await worker.run()
if __name__ == "__main__":
while True:
try:
asyncio.run(main())
except Exception as e:
# Log the exception
print(f"Exception occurred: {e}")
print("Retrying in 5 seconds...")
The issue is that despite the workflow still running on Temporal cloud it never gets scheduled to the new worker. (It is scheduled properly, if I terminated the corn job and let the new worker start the workflow again)
Thank you in advance.