@ActivityInterface(namePrefix = "InterfaceA")
public interface InterfaceA extends BaseInterface<List<Float>> {
public List<Float> process();
}
and
@ActivityInterface(namePrefix = "InterfaceB")
public interface InterfaceB extends BaseInterface<List<String>> {
public List<String> process();
}
When we initialize the activity interface for A and B and call process() on BaseInterface, it will throw Duplicated methods (overloads are not allowed in activity interfaces), is there anyway to solve this?
Hello Maxim, allow me to elaborate on the problem.
The problem to solve is to have a generic piece of orchestration code that can execute these “process” activities.
The child interfaces have have dependencies on other interfaces such as requiring the output of another interface before the child interface can be run.
We can have a Map of child-interface to parent-interface(s) or vice-versa and use a for loop over a list of parent-interface activity promises to know which parent has completed so we can run the child-interface activitiy.
Below is a super simple example but the actual graph is complex and contains more than 30-40 Interfaces with dependency on other interfaces.
So this is why being able to extend a single BaseInterface with generic types would be useful.
Example:
List<Promise<?>> promises = new LinkedList<>(); // contains some promises of some "process" calls. We can use a DoublyLinkedList here to improve remove performance.
// Parent BaseInterface to Child BaseInterface map
Map<BaseInterface, BaseInterface> parentToChildMap = new HashMap<>();
parentToChildMap.put(InterfaceA, InterfaceB);
Promise<Object> promiseA = Async.function(InterfaceA::process);
// Map a Promise back to it's parent when created.
Map<Promise<?>, BaseInterface> promiseInterfaceMap = new HashMap<>();
promiseInterfaceMap.put(promiseA, InterfaceA);
while(!promises.isEmpty()) {
// Get the first completed promise
Promise<Object> completed = Promise.anyOf(promises);
promises.remove(completed);
// Get the BaseInterface the promise maps to
BaseInterface completedPromiseParent = promiseInterfaceMap.get(completed);
// Run the activity method of children BaseInterfaces
for(BaseInterface child: parentToChildMap.get(completedPromiseParent)) {
Promise<?> promiseChild = Async.function(child::process);
promises.add(promiseChild);
promiseInterfaceMap.put(child, promiseChild);
}
}
I don’t understand why BaseInterface has to be generic as the whole task tree has to share the same result type.
BTW. Promise.anyOf returns a promise that becomes ready when any of the promises in the list is ready. This promise is not one of the promises in the list. You have to go over the list checking Promise.isReady() to find out which promises are ready.
Thanks for pointing out the Promise.anyOf working. I was actually meaning that we don’t care which Promise is returned from the list as long as we get the one thats completed. Because I can use the maps to run respective child interface activities accordingly.
About the other question.
The problem is that each “process” activity is complex and makes some API call and produces an output.
Technically, yes, we can rid of the BaseInterface needing a generic type. But that then makes us lose the ability to get and store an output from 1 activity and pass it onto some other activity for another class.
And if we have class level variables in these Activity classes for storing the output we then lose the ability to have a generic orchestration code than can take the output from one activity and call another activity with that output because now all classes have their own interface.
The example code and our requirement needs this generalization/base interface to avoid manually plugging in activity outputs and inputs (which means modifying the orchestrator every time a new class is added to this graph) when we have more than 30-40 of these classes.
Unless I am missing some other concept of Activities and Workflows I do not see how else I can have a some common orchestration code that can apply to a graph of activity classes.
For more context. The way I have solved this without Temporal is run each “process” method in a new thread and the orchestrator class would be able to pass outputs of some activity to another even if we lose type-safety. I was hoping to use Temporal as a backend in place of threads for these activities.
You can use the DynamicActivity implementation that delegates calls to your own task interfaces that use generics. Then in the workflow you can use an untyped activity stub (Workflow.newUntypedActivityStub) to call into activities generically.