Executing a DAG in a workflow

I’m wanting to execute a DAG (Directed Acyclic Graph) where multiple “nodes” can run in parallel. Depending on when various nodes complete, new nodes can kick off. Because the nodes are running as activities, they need to be deterministic. My thought on handling that is call an activity that queries the workflow to determine what should be run next. That way, on replay, the next node to run is deterministic.

My question is basically am I missing something and over complicating my solution? I’m very new to Temporal so any advice is much appreciated.

To give a concrete example, say there is a graph that looks like:

1  2
|  |
3  4
\ /
 5

1 and 2 run at the same time but can finish in any order. Let’s say 2 finishes first and we kick 4 off and then 1 finishes and we kick off 3. At this point, something crashes and we have to replay the workflow. Because an action is run to determine what to run next, we will deterministically return, in order, 1, 2, 3, and 4.

This is exactly what Temporal workflows are built for. For example (just hand-typed here, may have typos):

[Workflow]
public class MyWorkflow
{
    [WorkflowRun]
    public async Task RunAsync()
    {
        // Wait for both path 1 and path 2 to complete concurrently
        await Task.WhenAll(DoPath1Async(), DoPath2Async());

        // Do thing 5
        await Workflow.ExecuteActivityAsync(
            (MyActivities act) => act.DoThing5(),
            new() { StartToCloseTimeout = TimeSpan.FromMinutes(5) });
    }

    private async Task DoPath1Async()
    {
        // Run 1 and 3
        await Workflow.ExecuteActivityAsync(
            (MyActivities act) => act.DoThing1(),
            new() { StartToCloseTimeout = TimeSpan.FromMinutes(5) });
        await Workflow.ExecuteActivityAsync(
            (MyActivities act) => act.DoThing3(),
            new() { StartToCloseTimeout = TimeSpan.FromMinutes(5) });
    }

    private async Task DoPath2Async()
    {
        // Run 2 and 4
        await Workflow.ExecuteActivityAsync(
            (MyActivities act) => act.DoThing2(),
            new() { StartToCloseTimeout = TimeSpan.FromMinutes(5) });
        await Workflow.ExecuteActivityAsync(
            (MyActivities act) => act.DoThing4(),
            new() { StartToCloseTimeout = TimeSpan.FromMinutes(5) });
    }
}

Workflows automatically handle crashes since that code above is automatically deterministic (replay will occur in the same order every time guaranteed). How you develop the activity to deal with retries on crashes my be a worth discussion however.

Thanks for the reply, Chad. Sorry, one thing that I did not make clear in my original post was that these DAGs are being read in at runtime so we do not have the ability to code out the methods like the example you have here. Does that make sense?

Sure! We call these “DSL workflows” a lot of times. We have examples in Go, Java, and TypeScript. I have now opened a request for one for .NET.

You can easily just accept input that tells you what to run and how. Many workflows do this. Our tests have a big example of this we call the “kitchen sink workflow” (here is the interface and types, and here is the implementation.

Feel free to dynamically take whatever paths are needed based on input. I hope that helps.

Can we have an example for “DSL workflows” in Python too?

The issue has been open for a while, we just haven’t gotten around to it. But it’s fairly simple to take some input, convert to dataclass structures, and send that to workflow and have workflow code execute based on that.