I’m trying to write a Temporal workflow that is triggered by CLI events and takes a list of node
s as input, where each node represented as a dict
with name
, command
, dependencies
, where dependencies
is the list of other node names to wait for before executing.
For example:
[
{"name": "node_a", "command": "sh some_script.sh", "dependencies": []},
{"name": "node_b", "command": "sh some_other_script.sh", "dependencies": ["node_a"]},
...
So it’s a dynamic dag that takes a different shape depending on the input from CLI.
Here’s my first attempt at implementing this in Temporal:
# Activity for executing a command
@activity.defn
async def execute_command(command: str):
print(f"Starting command: {command}")
subprocess.Popen(command)
return f"Executed {command}"
# Define the workflow
@workflow.defn
class TemporalWorkflow:
@workflow.run
async def run(self, nodes):
completed_nodes = []
results = []
while len(completed_nodes) < len(nodes):
next_batch = [node for node in nodes if (set(node["dependencies"]) <= set(completed_nodes) and node["name"] not in completed_nodes) ]
actions = [workflow.execute_activity(execute_command, node['command'], start_to_close_timeout=timedelta(days=1)) for node in next_batch]
batch_results = await asyncio.gather(*actions)
results.append(batch_results)
for node in next_batch:
completed_nodes.append(node["name"])
return results
This is pretty clunky compared to the implementation with Airflow’s API:
node_to_task = {node.name: self.schedule_task(node) for node in flow.nodes}
for node in flow.nodes:
task = node_to_task[node.name]
for dep in node.dependencies:
dep_task = node_to_task[dep.name]
self.set_dependencies(task, dep_task)
return self.dag
And even then, it’s not actually a “correct” orchestration of the dag (i.e. doesn’t optimally parallelize and execute). But when I tried to get fancier with awaits I was bumping into a lot of RuntimeError: cannot reuse already awaited coroutine
issues for cases with multiple dependencies, so I ended up going with this simpler approach.
All of this got me thinking that maybe I’m not approaching this the right way in Temporal (this is my first time using it), so I wanted to reach out and see if anyone could guide me in the right direction.