Waiting for child workflows

My requirement,

  • The main workflow receives input.
  • Splits the input and for each part executes child workflow.
  • Child workflows are executed in parallel.
  • The main workflow waits for a specific duration and returns the result of the child workflows that have been completed.
  • The remaining child workflows continue to execute and eventually completes.

What is the recommended way of doing it? I am using the code:

public List<String> execute(String flowType, String feedPayload) {
    List<String> itemPayloads = parseFeedPayload(feedPayload);
    List<Promise<String>> promises = new ArrayList<>(itemPayloads.size());
    List<String> responses = new ArrayList<>(itemPayloads.size());

    ChildWorkflowOptions itemSetupWorkflowOptions = ChildWorkflowOptions.newBuilder()
            .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
            .setWorkflowExecutionTimeout(Duration.ofSeconds(10))
            .setRetryOptions(retryoptions)
            .setTaskQueue(TASK_QUEUE)
            .build();

    for (String itemPayload : itemPayloads) {
        ItemSetupWorkflow itemSetupWorkflow = Workflow.newChildWorkflowStub(ItemSetupWorkflow.class, itemSetupWorkflowOptions);
        Promise<String> promise = Async.function(itemSetupWorkflow::execute, flowType, itemPayload);
        promises.add(promise);
    }

    try {
        Promise.allOf(promises).get(2000, TimeUnit.MILLISECONDS);
    } catch (TimeoutException e) {
        e.printStackTrace();
    }

    for (Promise<String> promise : promises) {
        if (promise.isCompleted()) {
            responses.add(promise.get());
        } else {
            responses.add("timeout");
        }
    }

    return responses;
}

get() is a blocking call. What will be the impact on the overall system performance and throughput if I need to execute many such workflows concurrently?

Looks like Promise.get(long var1, TimeUnit var3) does not use var3. The code uses Duration.ofMillis(timeout). Not sure if I missed something here.

Looks good to me. One way to avoid relying on the timeout exception is to use Promise.anyOf:

    Promise<Void> timer = Workflow.newTimer(Duration.ofSeconds(2));
    Promise.anyOf(timer, Promise.allOf(promises)).get();

Looks like Promise.get(long var1, TimeUnit var3) does not use var3. The code uses Duration.ofMillis(timeout). Not sure if I missed something here.

What version of the SDK are you using? The code looks fine to me:

  Duration.ofMillis(unit.toMillis(timeout)),

Hi @maxim, thanks for sharing the code.

I also had a query regarding the usage of threads. For our requirement mentioned above, the main workflow will wait up to 2 minutes for the child workflows. That means the thread will be blocked for 2 minutes in the worst case when we call Promise.get(). Please correct me if I am wrong here. I found that the workflow code is getting executed in “workflow-method” thread. How many threads are created by default to execute workflows? Is there any non-blocking way of achieving what I am trying to do?

I am using version 0.29.1 of java sdk and looks like the issue for Promise.get(long var1, TimeUnit var3) has been fixed now. The old code is:
WorkflowInternal.await(Duration.ofMillis(timeout),

We have plans to support fully async workflow execution. But at this point, at least one thread is always used. This doesn’t limit the total number of parallel workflows as they can be removed from a worker and reconstructed back at any time. This limits the number of workflows that can be cached per worker.