Recursive Workflows and determinism

Hi all,

I have a workflow which runs a child workflow to process entries. And it will do a continue_as_new workflow for the next item that is linked to the current item. I have a fake example as listed, will this break the determinism rule for workflows? Thanks for any help in advance

import asyncio
import logging
from dataclasses import dataclass
from datetime import timedelta
from temporalio import workflow, activity
from temporalio.client import Client
from temporalio.worker import Worker
from typing import List

data = {
    "a": { "entries": [i for i in range(3)],  "next": "b" },
    "b": { "entries": [i for i in range(6)],  "next": "c" },
    "c": { "entries": [i for i in range(5)],  "next": "d" },
    "d": { "entries": None,  "next": "e" },
    "e": { "entries": [i for i in range(7)],  "next": "f" },
    "f": { "entries": [i for i in range(4)],  "next": None }
}

@activity.defn
async def process_entry(entry: int) -> str:
    return f"Entry: {entry}"

@workflow.defn
class EntriesWorkflow:
    @workflow.run
    async def run(self, input: str) -> List[str]:
        entries = data[input]["entries"]
        if not entries: return
        results = []
        for entry in entries:
            results.append(
                await workflow.execute_activity(
                     process_entry,
                     entry,
                     start_to_close_timeout=timedelta(seconds=5),
                )
            )
        return results 

@workflow.defn
class LoopingWorkflow:
    @workflow.run
    async def run(self, iteration: str) -> None:
        if not iteration:
            return
        
        workflow.logger.info("Running workflow iteration %s", iteration)
        results = await workflow.execute_child_workflow(
            EntriesWorkflow.run,
            iteration,
            id=f"entries-workflow-workflow-child-{iteration}",
        )
        workflow.logger.info("Processed: %s", results)
        await asyncio.sleep(1)
        workflow.continue_as_new(data[iteration]["next"])


async def main():
    # Enable logging for this sample
    logging.basicConfig(level=logging.INFO)

    # Start client
    client = await Client.connect("localhost:7233")

    # Run a worker for the workflow
    async with Worker(
        client,
        task_queue="hello-continue-as-new-task-queue",
        workflows=[LoopingWorkflow, EntriesWorkflow],
        activities=[process_entry]
    ):

        # While the worker is running, use the client to run the workflow. Note,
        # in many production setups, the client would be in a completely
        # separate process from the worker.
        await client.execute_workflow(
            LoopingWorkflow.run,
            "a",
            id="hello-continue-as-new-workflow-id",
            task_queue="hello-continue-as-new-task-queue",
        )


if __name__ == "__main__":
    asyncio.run(main())
1 Like

This looks ok to me. Might want to check for next=None before you continueasnew so you dont create an extra execution, but up to you

1 Like