How to correctly orchestrate a dynamic DAG in the python SDK

I’m trying to write a Temporal workflow that is triggered by CLI events and takes a list of nodes as input, where each node represented as a dict with name, command , dependencies, where dependencies is the list of other node names to wait for before executing.

For example:

[
{"name": "node_a", "command": "sh some_script.sh", "dependencies": []},
{"name": "node_b", "command": "sh some_other_script.sh", "dependencies": ["node_a"]},
...

So it’s a dynamic dag that takes a different shape depending on the input from CLI.

Here’s my first attempt at implementing this in Temporal:

# Activity for executing a command
@activity.defn
async def execute_command(command: str):
    print(f"Starting command: {command}")
    subprocess.Popen(command)
    return f"Executed {command}"

# Define the workflow 
@workflow.defn
class TemporalWorkflow:
    @workflow.run
    async def run(self, nodes):
        completed_nodes = []
        results = []
        while len(completed_nodes) < len(nodes):
            next_batch = [node for node in nodes if (set(node["dependencies"]) <= set(completed_nodes) and node["name"] not in completed_nodes) ]
            actions = [workflow.execute_activity(execute_command, node['command'], start_to_close_timeout=timedelta(days=1)) for node in next_batch]
            batch_results = await asyncio.gather(*actions)
            results.append(batch_results)
            for node in next_batch:
                completed_nodes.append(node["name"])
        
        return results

This is pretty clunky compared to the implementation with Airflow’s API:

node_to_task = {node.name: self.schedule_task(node) for node in flow.nodes}
        for node in flow.nodes:
            task = node_to_task[node.name]
            for dep in node.dependencies:
                dep_task = node_to_task[dep.name]
                self.set_dependencies(task, dep_task)
        return self.dag

And even then, it’s not actually a “correct” orchestration of the dag (i.e. doesn’t optimally parallelize and execute). But when I tried to get fancier with awaits I was bumping into a lot of RuntimeError: cannot reuse already awaited coroutine issues for cases with multiple dependencies, so I ended up going with this simpler approach.

All of this got me thinking that maybe I’m not approaching this the right way in Temporal (this is my first time using it), so I wanted to reach out and see if anyone could guide me in the right direction.

You’ll likely want more type safety, but that is about it. You can see this DSL sample for a similar situation.

Of course the Temporal approach gives you a lot of freedom and you are not locked into a DAG structure

You can parallelize as you wish. You could asyncio.create_task everything if you wanted. See the DSL sample linked above.

I am not familiar with this error in relation to Temporal, it may be an incorrect usage of asyncio.

Sure, it is very common to do this approach. Basically if you have a DAG or a DSL or any other way you want to express the logic flow of your workflow via workflow input, it is very common to interpret that in the workflow to execute things in the manner you prefer. This gives a lot of freedom around loops, try/except, choosing what to parallelize when, etc.

1 Like

Thanks, @Chad_Retz ! This is really helpful. The DSL example is exactly what I was looking for. Looks like I’m on the right path, just need to play around with the asyncio usage a bit more.

1 Like