Multiple subprocess running parallel and cancelling subprocess based on any subprocess result

Hi,
I am new to temporal. I am using temporal java in my project. In my project I have multiple child workflows running in parallel . Each workflow will return a string value. Suppose if any one of the workflow return a string called for example “rejected”. Then i need to cancel all other child workflow running parallel. How can I achieve this.

See HelloCancellationScope which demonstrates this use case.

Thank you for your response let me try that

Hi,

List<Promise> approvalSubprocess = new ArrayList<>();
CancellationScope scope = Workflow.newCancellationScope(() →
mygroups.forEach(group → {
MyWorkflow myWorkflow = Workflow.newChildWorkflowStub(MyWorkflow.class,
ChildWorkflowOptions.newBuilder()
.setCancellationType(ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED)
.build());
Promise approval = Async
.function(() → myWorkflow.invokemyWorkflow(group, requestDto.getRequestId()));
approvalSubprocess.add(approval);

  		})
  	);
  	scope.run(); 

In my code, I’m checking the return value of each workflow to determine if it’s ‘Rejected.’ If the first workflow is rejected, I want to cancel the others without waiting. Similarly, if the first returns ‘Approve’ and the second ‘Rejected,’ I want to cancel the third. Is there a recommended approach or method in Java to efficiently handle this workflow processing based on rejection status?

You can use Promise.anyOf to wait for a single promise to become ready and cancel the scope if it has the “Rejected” status.

Hi maxim,
Correct me if i am wrong. In my understanding if we are using Promise.anyOf it will only Returns Promise that becomes completed when any of the arguments is completed . In my case if the first childworkflow return a value other than “Rejected” but my the second childworkflow return “Rejected” but the Promise.anyOf only store the one which will complete first that is the one return value other than “Rejected”. So i cannot give Promise.anyOf alone for this. So i modified my code as

 while (!approvalSubprocess.isEmpty()) {
 				  Promise<String> anyOfPromise = Promise.anyOf(approvalSubprocess);
 				  String result = anyOfPromise.get();
				  if (result.equalsIgnoreCase(CommonConstants.REJECTED)) {
 						userTaskCompleteFlag = false;
 						taskcompleteFlag = false;
 						scope.cancel();
 						return startRequest(request);
 					}else {
 						approvalSubprocess.remove(anyOfPromise);
 				  }
			}

But using this I got an error

[BUG] Workflow thread ‘workflow-method-go2mainprocessWorkflowID_533-8b6a17f5-02f6-4a6e-9d59-7665f071c86f’ of workflow ‘go2mainprocessWorkflowID_533’ can’t be destroyed in time. This will lead to a workflow cache leak. This problem is usually caused by a workflow implementation swallowing java.lang.Error instead of rethrowing it. Thread dump of the stuck thread:
workflow-method-go2mainprocessWorkflowID_533-8b6a17f5-02f6-4a6e-9d59-7665f071c86f

Please help me to solve the same.

I don’t think you want to rely on the value of the Promise returned by anyOf. Use it only for waiting. Then iterate over the collection to find out which promises are ready. Use Promise.IsReady() before calling Promise.get() to avoid blocking.

Hi,
I’m sorry, but I didn’t grasp it. Could you please provide further explanation?. I will restate my usecase.
I have multiple child workflows running in parallel, and each is actively awaiting a signal. If a particular workflow receives a signal with the status “Rejected,” I want to promptly identify that specific workflow and cancel all other parallel workflows.

I’ve successfully implemented a solution using CancellationScope for cancelling all workflows. However, I’m currently grappling with efficiently identifying the first child workflow that returns the value “Rejected.” Also regarding your previous reply promise.isReady() is not available in java

I appreciate the assistance and suggestions. If there’s anything else needed for a clearer understanding, please let me know.

Thanks for your time, and I appreciate your help!

Sorry, I always confuse that function name with Go version. It is called Promise.isCompleted.

 for (Promise<String> promise : promiseList) {
          if (promise.isCompleted()) {
            if (promise.get().equalsIgnoreCase(CommonConstants.REJECTED)) {
  ...

Okey Thank you for your time. It’s working.

1 Like
 while (!approvalSubprocess.isEmpty()) {
	List<Promise<String>> completedPromises = approvalSubprocess.stream().filter(Promise::isCompleted).toList();
		for (Promise<String> promise : completedPromises) {
			if (promise.get().equalsIgnoreCase(CommonConstants.REJECTED)) {
				initiateTaskcompleteFlag = false;
				creditCheckTaskCompleteFlag = false;
				scope.cancel();
				return CommonConstants.REJECTED;
			}
		}
		approvalSubprocess.removeAll(completedPromises);
		Workflow.sleep(Duration.ofMillis(100));	
  }

This is my code according to above discussion. When I use Workflow.sleep() inside the while loop many timers are getting initiated. If I remove Workflow.sleep() the potential deadlock issue occur. Is there any way to solve potential deadlock issue other than Workflow.sleep()?.

If completedPromises is empty then this loop spins, so the control is never returned to the SDK.