Each worker instance can serve a single task queue. In the majority of the situations, each such worker is residing in its own service. Something like:
main1:
Worker worker = factory.newWorker(TASK_QUEUE1);
worker.registerActivitiesImplementations(
new SampleActivities.SampleActivitiesImpl1());
factory.start();
main2:
Worker worker = factory.newWorker(TASK_QUEUE2);
worker.registerActivitiesImplementations(
new SampleActivities.SampleActivitiesImpl2());
factory.start();
Then make sure that ActivityOptions.taskQueue is set to the correct queue name when calling activities from the workflow.
Sure, that make sense with a queue for each workflow.
My original question was for the case where you would have a workflow that have registered multiple activities and each activities would have their own task queues? How would that work?
I don’t think there is a concept of workflow registering activities. Workflow calls activities through correspondent activity stubs. Use ActivityOptions to specify the task queue per stub.
@maxim I had a question relating to the DSL translation.
Below is code that represents running statement in parallel when the code will create a promise so it can run all the task in parallel. It wraps it in a cancelation scope which will cancel the rest if it fails.
Can CancellationScopes be nested within other CancellationScopes? For example, in this case:
could be processing more nested parallel statements which would create new cancellation scopes. I guess in this case, if there was an issue the exception will be rethrow and the caller will fail as well resulting in the other cancelation scopes to eventually be called as well?
public void execute(UUID pipelineId, Parallel parallel, Map<String, ActivityPayload> bindings) {
if(parallel == null || parallel.getBranches().length == 0) {
return;
}
// In the parallel block, we want to execute all of them in parallel and wait for all of them.
// if one activity fails then we want to cancel all the rest of them as well.
List<Promise<Void>> results = new ArrayList<>(bindings.size());
CancellationScope scope =
Workflow.newCancellationScope(
() -> {
for (Statement statement : parallel.getBranches()) {
results.add(Async.function(this::execute, pipelineId, statement, bindings));
}
});
// As code inside the scope is non blocking the run doesn't block.
scope.run();
try {
// If one activity fails then all the rest will fail
Promise.allOf(results).get();
} catch (RuntimeException ex) {
// Cancel uncompleted activities
scope.cancel();
log.error("One of the Activities failed. Canceling the rest.", ex);
throw ex;
}
}
Yes, cancellation scopes are hierarchical. Each scope is automatically attached to its parent scope. If parent is cancelled all the child scopes are cancelled. The main workflow method is invoked in the context of a root scope which gets cancelled when the whole workflow is cancelled.
When you need to run some cleanup code in a cancelled scope create a detached one using Workflow.newDetachedCancellationScope.
If possible can you take a look at Testing Framework for Java SDK?. The question relates to the exception being thrown within a Promise.all, but not seeing it being caught in the catch statement. Not sure if Promise.all just swallows the exception?