Executing a DAG in a workflow

I’m wanting to execute a DAG (Directed Acyclic Graph) where multiple “nodes” can run in parallel. Depending on when various nodes complete, new nodes can kick off. Because the nodes are running as activities, they need to be deterministic. My thought on handling that is call an activity that queries the workflow to determine what should be run next. That way, on replay, the next node to run is deterministic.

My question is basically am I missing something and over complicating my solution? I’m very new to Temporal so any advice is much appreciated.

To give a concrete example, say there is a graph that looks like:

1  2
|  |
3  4
\ /
 5

1 and 2 run at the same time but can finish in any order. Let’s say 2 finishes first and we kick 4 off and then 1 finishes and we kick off 3. At this point, something crashes and we have to replay the workflow. Because an action is run to determine what to run next, we will deterministically return, in order, 1, 2, 3, and 4.

This is exactly what Temporal workflows are built for. For example (just hand-typed here, may have typos):

[Workflow]
public class MyWorkflow
{
    [WorkflowRun]
    public async Task RunAsync()
    {
        // Wait for both path 1 and path 2 to complete concurrently
        await Task.WhenAll(DoPath1Async(), DoPath2Async());

        // Do thing 5
        await Workflow.ExecuteActivityAsync(
            (MyActivities act) => act.DoThing5(),
            new() { StartToCloseTimeout = TimeSpan.FromMinutes(5) });
    }

    private async Task DoPath1Async()
    {
        // Run 1 and 3
        await Workflow.ExecuteActivityAsync(
            (MyActivities act) => act.DoThing1(),
            new() { StartToCloseTimeout = TimeSpan.FromMinutes(5) });
        await Workflow.ExecuteActivityAsync(
            (MyActivities act) => act.DoThing3(),
            new() { StartToCloseTimeout = TimeSpan.FromMinutes(5) });
    }

    private async Task DoPath2Async()
    {
        // Run 2 and 4
        await Workflow.ExecuteActivityAsync(
            (MyActivities act) => act.DoThing2(),
            new() { StartToCloseTimeout = TimeSpan.FromMinutes(5) });
        await Workflow.ExecuteActivityAsync(
            (MyActivities act) => act.DoThing4(),
            new() { StartToCloseTimeout = TimeSpan.FromMinutes(5) });
    }
}

Workflows automatically handle crashes since that code above is automatically deterministic (replay will occur in the same order every time guaranteed). How you develop the activity to deal with retries on crashes my be a worth discussion however.

Thanks for the reply, Chad. Sorry, one thing that I did not make clear in my original post was that these DAGs are being read in at runtime so we do not have the ability to code out the methods like the example you have here. Does that make sense?

Sure! We call these “DSL workflows” a lot of times. We have examples in Go, Java, and TypeScript. I have now opened a request for one for .NET.

You can easily just accept input that tells you what to run and how. Many workflows do this. Our tests have a big example of this we call the “kitchen sink workflow” (here is the interface and types, and here is the implementation.

Feel free to dynamically take whatever paths are needed based on input. I hope that helps.

Can we have an example for “DSL workflows” in Python too?

1 Like

The issue has been open for a while, we just haven’t gotten around to it. But it’s fairly simple to take some input, convert to dataclass structures, and send that to workflow and have workflow code execute based on that.

1 Like

@tgrieger What did you eventually do? I have somewhat the same usecase

Assume this scenario. Task B is dependent on Task A. Task E is also dependent on Task A. In this case will Task A not get executed twice ?

Not if you code it correctly.

I understand. In a way - the code must ensure that the task has already been executed, so the next try will be invalidated.
Something, like a node is already visited.

I have further questions-

  1. Will temporal be able to identify the cycles and prompt the developer/user that the DAG isn’t correctly modelled or its the developer to take care of it ?
  2. Consider the above example again: Task B is dependent on Task A. Task E is also dependent on Task A. So, After Task A is executed, Task B & E can be executed.
    I believe that programmatically it has to be ensured that Task A is triggered first. Pls correct me ?
    Second - from Task A, Task B & Task E to be modelled such that it runs in parallel. Pls correct me ?
    Third - When the flow enters from Task A to Task B (as an example), Task B also triggers that Task A needs to be executed before it goes further. It can be a stalemate situation. For sure we can handle it programmatically but I guess a lot of conditions must be well thought of in advance.
  1. Temporal workflows are coded in a general-purpose programming language. Such languages don’t contain built in support for identifying cycles in user provided classes/objects. It is up to your implementation to detect this.
  2. These types of requirements are pretty easy to implement using async programming abstractions (Tasks, Promises, etc) that a programming language provides.