As mentioned in this thread, I wanted to run 10 activities, but 4 activities asynchronously at time T.
As you responded there I’ll be starting with 4 activities and will be creating more if any activity gets completed.
How will I achieve it via java code? Do I need to use Promise.anyOf to know if any activity gets completed. If so, if any one of the activities gets completed my workflow code gets completed.
If I try to add a newly created promise object in the promiseList then it is throwing concurrentModificationException
I tried the below code,
int limit = 4, size = 10;
for(int i=0; i<limit; i++){
promiseList.add(Async.function(this::executeBindTask, task, taskIOIterator.next()));
}
Promise.anyOf(promiseList).get();
List<Result> resultList = new ArrayList<>();
for (Promise<Result> promise : promiseList) {
if(promise.isCompleted()){
resultList.add(promise.get());
if(limit < size) {
limit++;
Async.function(this::executeBindTask, task, taskIOIterator.next()); // calling activity if any activity completes
}
}
}
Kindly help me to achieve it via java code. Thanks
Make sure that your main workflow method doesn’t return before all the computation is done. The simplest way to block the thread is to use Workflow.await on some variable like count of unprocessed items.
I doubt your code is going to work as I believe you cannot add to the list while iterating through it.
Yes, I did not update my list while iterating because it s throwing concurrentModification exception
if(limit < size) {
limit++;
Async.function(this::executeBindTask, task, taskIOIterator.next()); // calling activity if any activity completes
}
In this code, I am not storing the promise object to some variable, but I have to store it into my promise list so that it will be included in Promise.anyOf.
like below,
if(limit < size) {
limit++;
promiseList.add ( Async.function(this::executeBindTask, task, taskIOIterator.next()) ); // calling activity if any activity completes
}
If I did not add my promise object in the list then how can put .get on the object?
Thanks @maxim it is working, but not for few cases as my workflow code is not deterministic.
Q1). In the java doc, it says that (Workflow.await(condition))condition is called on every state transition, what does state transition mean?
As you said I have a variable called count which is reduced for every promise callbacks promise.thenApply
I have the below await call to block my code. count initialized with listSize; Workflow.await(()-> count == 0);
If the worker has restarted in the middle of the execution then my count is re-initialized with listSize and it never reaches zero though all my activities are completed.
Q2). What change should I do to make my code to be deterministic?
Note: If the worker has not restarted then it is executing as expected
I will be adding my results into resultList, once resultList reaches the size, await will be released.
initially, it is declared with an empty list
Before I had a variable count, on each callback I will be increasing it and using the same in await also. Since I need the resultList, I removed the counter and used my resultList for checking.
According to documentation using Java synchronization is not allowed in the workflow code. Don’t use synchronizedList inside the workflow code as it breaks this rule.
I think you might have but around index as you check it for boundary condition first and then increment in the thenApply method.
In general the code should work after all the bugs are fixed.