We have a DSL for DAG workflow which is defined from UI. Users can create workflows similar to the one in below image.
Sample JSON definition of Task 5 from above DAG. It has references to both parent and children tasks.
TaskDefinition
{
"task": "Task 5",
"parentTasks": [
"Task 1"
"Task 2"
],
"childrenTasks": [
"Task 7"
]
}
Workflow implementation to run the DSL workflow.
public class DslWorkflowImpl implements DslWorkflow {
private TaskActivity taskActivity = getActivityStub();
private Map<String, Object> resultMap; // maintains the return object of an activity
private Map<String, TaskDefinition> taskMap; // DSL of the workflow
@Override
public String startDslWorkflow(String startTask, Map<String, TaskDefinition> taskMap) {
resultMap = new HashMap<>();
this.taskMap = taskMap;
Promise<Object> promise =
Async.function(taskActivity::executeTask, taskMap.get(startTask));
promise.handle(getHandleFn(startTask));
promise.get();
return "SUCCESS";
}
private Functions.Func2<Object, RuntimeException, Object> getHandleFn(String task) {
return (taskResult, e) -> {
resultMap.put(task, taskResult);
List<Promise<Object>> promiseList = triggerChildTasks(taskResult, task);
Promise.allOf(promiseList).get();
return taskResult;
};
}
private List<Promise<Object>> triggerChildTasks(String task) {
List<String> childrenTasks = taskMap.get(task).getChildrenTasks();
List<Promise<Object>> promiseList = childrenTasks.stream().map(childTask -> {
TaskDefinition childTaskDefinition = taskMap.get(childTask);
// Check if all the parent tasks are completed. Only then start this task.
if (childTaskDefinition.getParentTasks().stream().allMatch(parent -> resultMap.containsKey(parent))) {
Promise<Object> promise =
Async.function(taskActivity::executeTask, childTaskDefinition);
// Calling getHandleFn recursively here.
promise = promise.handle(getHandleFn(childTask));
return promise;
} else {
log.info("Parent tasks are not yet done. task:" + childTask);
return null;
}
}).filter(Objects::nonNull).collect(Collectors.toList());
return promiseList;
}
}
I am mainly concerned about the case where one task has multiple parent tasks and it needs to be started only when all parents are completed. What if two parent tasks are completed at the same time, then temporal workflow threads will try to invoke the getHandleFn at the same time and what if both the threads fails to run the child task. Is there any way to synchronise the workflow code execution using locks?
So far above code seems to be working fine. I have never run into this issue after testing the scenario hundreds of times. But I am worried about it as it is like a race condition.
Also please advice if this is the correct approach or needs any improvement.
Thanks,
Swaroop