Hi all,
I have a workflow which runs a child workflow to process entries. And it will do a continue_as_new workflow for the next item that is linked to the current item. I have a fake example as listed, will this break the determinism rule for workflows? Thanks for any help in advance
import asyncio
import logging
from dataclasses import dataclass
from datetime import timedelta
from temporalio import workflow, activity
from temporalio.client import Client
from temporalio.worker import Worker
from typing import List
data = {
"a": { "entries": [i for i in range(3)], "next": "b" },
"b": { "entries": [i for i in range(6)], "next": "c" },
"c": { "entries": [i for i in range(5)], "next": "d" },
"d": { "entries": None, "next": "e" },
"e": { "entries": [i for i in range(7)], "next": "f" },
"f": { "entries": [i for i in range(4)], "next": None }
}
@activity.defn
async def process_entry(entry: int) -> str:
return f"Entry: {entry}"
@workflow.defn
class EntriesWorkflow:
@workflow.run
async def run(self, input: str) -> List[str]:
entries = data[input]["entries"]
if not entries: return
results = []
for entry in entries:
results.append(
await workflow.execute_activity(
process_entry,
entry,
start_to_close_timeout=timedelta(seconds=5),
)
)
return results
@workflow.defn
class LoopingWorkflow:
@workflow.run
async def run(self, iteration: str) -> None:
if not iteration:
return
workflow.logger.info("Running workflow iteration %s", iteration)
results = await workflow.execute_child_workflow(
EntriesWorkflow.run,
iteration,
id=f"entries-workflow-workflow-child-{iteration}",
)
workflow.logger.info("Processed: %s", results)
await asyncio.sleep(1)
workflow.continue_as_new(data[iteration]["next"])
async def main():
# Enable logging for this sample
logging.basicConfig(level=logging.INFO)
# Start client
client = await Client.connect("localhost:7233")
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-continue-as-new-task-queue",
workflows=[LoopingWorkflow, EntriesWorkflow],
activities=[process_entry]
):
# While the worker is running, use the client to run the workflow. Note,
# in many production setups, the client would be in a completely
# separate process from the worker.
await client.execute_workflow(
LoopingWorkflow.run,
"a",
id="hello-continue-as-new-workflow-id",
task_queue="hello-continue-as-new-task-queue",
)
if __name__ == "__main__":
asyncio.run(main())