Async Activity - Not invoking in parallel

Looking for some guidance on how we invoke several activities in parallel. I followed the HelloParallelAcitivity.java but i do see from the timestamp in the logs they are not calling in parallel

2024-04-09T14:40:53.614-05:00 INFO 18272 — [ce=“test”: 1] o.example.Activity.GreetingActivityImpl : Hello Mary1!
2024-04-09T14:40:54.456-05:00 INFO 18272 — [ce=“test”: 2] o.example.Activity.GreetingActivityImpl : Hello Mary!
2024-04-09T14:40:57.978-05:00 INFO 18272 — [ce=“test”: 2] o.example.Activity.GreetingActivityImpl : Hello Jennet1!
2024-04-09T14:41:06.902-05:00 INFO 18272 — [ce=“test”: 2] o.example.Activity.GreetingActivityImpl : Hello Michael!
2024-04-09T14:41:14.412-05:00 INFO 18272 — [ce=“test”: 2] o.example.Activity.GreetingActivityImpl : Hello Jennet!
2024-04-09T14:41:30.984-05:00 INFO 18272 — [ce=“test”: 3] o.example.Activity.GreetingActivityImpl : Hello Michael1!
2024-04-09T14:41:40.840-05:00 INFO 18272 — [ce=“test”: 3] o.example.Activity.GreetingActivityImpl : Hello John!
2024-04-09T14:41:45.374-05:00 INFO 18272 — [ce=“test”: 3] o.example.Activity.GreetingActivityImpl : Hello John1!

They are just too fast. Here is the output:

22:31:35.149 { } [main] INFO  i.t.s.WorkflowServiceStubsImpl - Created WorkflowServiceStubs for channel: ManagedChannelOrphanWrapper{delegate=ManagedChannelImpl{logId=1, target=127.0.0.1:7233}} 
22:31:35.852 { } [main] INFO  io.temporal.internal.worker.Poller - start: Poller{name=Workflow Poller taskQueue="HelloParallelActivityTaskQueue", namespace="default", identity=10097@Maxims-MacBook-Pro.local} 
22:31:35.862 { } [main] INFO  io.temporal.internal.worker.Poller - start: Poller{name=Activity Poller taskQueue="HelloParallelActivityTaskQueue", namespace="default", identity=10097@Maxims-MacBook-Pro.local} 
composeGreeting begin: Hello, John
composeGreeting begin: Hello, Jennet
composeGreeting begin: Hello, Michael
composeGreeting begin: Hello, Mary
composeGreeting end: Hello, Michael
composeGreeting end: Hello, John
composeGreeting end: Hello, Jennet
composeGreeting end: Hello, Mary
Hello John!
Hello Mary!
Hello Michael!
Hello Jennet!

Here is the updated code.

    public String composeGreeting(String greeting, String name) {
      System.out.println("composeGreeting begin: " + greeting + ", " + name);
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
      System.out.println("composeGreeting end: " + greeting + ", " + name);
      return greeting + " " + name + "!";
    }
  }

Hi Maxim,
my output with your updated code. its sequential

2024-04-09T16:53:10.242-05:00 INFO 28532 — [ce=“test”: 1] o.example.Activity.GreetingActivityImpl : composeGreeting begin: Hello, Mary
2024-04-09T16:53:11.244-05:00 INFO 28532 — [ce=“test”: 1] o.example.Activity.GreetingActivityImpl : composeGreeting end: Hello, Mary
2024-04-09T16:53:11.680-05:00 INFO 28532 — [ce=“test”: 2] o.example.Activity.GreetingActivityImpl : composeGreeting begin: Hello, Jennet
2024-04-09T16:53:12.682-05:00 INFO 28532 — [ce=“test”: 2] o.example.Activity.GreetingActivityImpl : composeGreeting end: Hello, Jennet
2024-04-09T16:53:21.252-05:00 INFO 28532 — [ce=“test”: 2] o.example.Activity.GreetingActivityImpl : composeGreeting begin: Hello, Michael
2024-04-09T16:53:22.255-05:00 INFO 28532 — [ce=“test”: 2] o.example.Activity.GreetingActivityImpl : composeGreeting end: Hello, Michael
2024-04-09T16:53:22.843-05:00 INFO 28532 — [ce=“test”: 3] o.example.Activity.GreetingActivityImpl : composeGreeting begin: Hello, John
2024-04-09T16:53:23.846-05:00 INFO 28532 — [ce=“test”: 3] o.example.Activity.GreetingActivityImpl : composeGreeting end: Hello, John

Pasted my Impl class . Any issue in the way i am invoking it

@Slf4j
public class MultiGreetingWorkflowImpl implements MultiGreetingWorkflow {

    private final GreetingActivity activities =
            Workflow.newActivityStub(
                    GreetingActivity.class,
                    ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(2)).build());


    @Override
    public List<String> getGreetings(List<String> names) {

        List<String> results = new ArrayList();

        List<Promise<String>> promiseList = new ArrayList<>();
        if (names != null) {
            for (String name : names) {
                promiseList.add(Async.function(activities::composeGreeting, "Hello", name));
            }
            // Invoke all activities in parallel. Wait for all to complete
            Promise.allOf(promiseList).get();

            // Loop through promises and get results
            for (Promise<String> promise : promiseList) {
                if (promise.getFailure() == null) {
                    results.add(promise.get());
                }
            }
        }
        return results;
    }
}


@Component
@Slf4j
public class GreetingActivityImpl implements GreetingActivity {
    @Override
    public String composeGreeting(String greeting, String name) {
        log.info("composeGreeting begin: " + greeting + ", " + name);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        log.info("composeGreeting end: " + greeting + ", " + name);
        return greeting + " " + name + "!";
    }
}

Could it be that the @Component annotation put some sort of lock on the instance?

Another possibility is if you changed the activity execution thread pool size to 1.

Could you post the workflow execution history for this workflow?

/** Sample Temporal workflow that executes multiple Activity methods in parallel. */
@Slf4j
public class HelloParallelActivity {

  // Define the task queue name
  static final String TASK_QUEUE = "greetings-queue";

  // Define our workflow unique id
  static final String WORKFLOW_ID = "HelloParallelActivityWorkflow";

  /**
   * The Workflow Definition's Interface must contain one method annotated with @WorkflowMethod.
   *
   * <p>Workflow Definitions should not contain any heavyweight computations, non-deterministic
   * code, network calls, database operations, etc. Those things should be handled by the
   * Activities.
   *
   * @see WorkflowInterface
   * @see WorkflowMethod
   */
  @WorkflowInterface
  public interface MultiGreetingWorkflow {

    /**
     * This is the method that is executed when the Workflow Execution is started. The Workflow
     * Execution completes when this method finishes execution.
     */
    @WorkflowMethod
    List<String> getGreetings(List<String> names);
  }

  /**
   * This is the Activity Definition's Interface. Activities are building blocks of any Temporal
   * Workflow and contain any business logic that could perform long running computation, network
   * calls, etc.
   *
   * <p>Annotating Activity Definition methods with @ActivityMethod is optional.
   *
   * @see ActivityInterface
   * @see io.temporal.activity.ActivityMethod
   */
  @ActivityInterface
  public interface GreetingActivities {

    // Define your activity method which can be called during workflow execution
    String composeGreeting(String greeting, String name);
  }

  /** Simple activity implementation, that concatenates two strings. */
  static class GreetingActivitiesImpl implements GreetingActivities {
    @Override
    public String composeGreeting(String greeting, String name) {
      log.info("composeGreeting begin: " + greeting + ", " + name);
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
      log.info("composeGreeting end: " + greeting + ", " + name);
      return greeting + " " + name + "!";

    }
  }

  // Define the workflow implementation which implements our getGreeting workflow method.
  public static class MultiGreetingWorkflowImpl implements MultiGreetingWorkflow {

    /**
     * Define the GreetingActivities stub. Activity stubs are proxies for activity invocations that
     * are executed outside of the workflow thread on the activity worker, that can be on a
     * different host. Temporal is going to dispatch the activity results back to the workflow and
     * unblock the stub as soon as activity is completed on the activity worker.
     *
     * <p>In the {@link ActivityOptions} definition the "setStartToCloseTimeout" option sets the
     * overall timeout that our workflow is willing to wait for activity to complete. For this
     * example it is set to 2 seconds.
     */
    private final GreetingActivities activities =
        Workflow.newActivityStub(
            GreetingActivities.class,
            ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(2)).build());

    @Override
    public List<String> getGreetings(List<String> names) {
      List<String> results = new ArrayList();

      List<Promise<String>> promiseList = new ArrayList<>();
      if (names != null) {
        for (String name : names) {
          promiseList.add(Async.function(activities::composeGreeting, "Hello", name));
        }

        // Invoke all activities in parallel. Wait for all to complete
        Promise.allOf(promiseList).get();

        // Loop through promises and get results
        for (Promise<String> promise : promiseList) {
          if (promise.getFailure() == null) {
            results.add(promise.get());
          }
        }
      }
      return results;
    }
  }

  /**
   * With our Workflow and Activities defined, we can now start execution. The main method starts
   * the worker and then the workflow.
   */

  private  static WorkflowServiceStubsOptions getWorkflowServiceStubsOptions() {
    WorkflowServiceStubsOptions.Builder serviceStubsOptions = WorkflowServiceStubsOptions.newBuilder().setTarget("xxxxxxxx");
    return serviceStubsOptions.build();
  }


  public static void main(String[] args) {
    WorkflowServiceStubsOptions serviceStubsOptions = getWorkflowServiceStubsOptions();
    WorkflowServiceStubs service = WorkflowServiceStubs.newServiceStubs(serviceStubsOptions);
    /*
     * Get a Workflow service client which can be used to start, Signal, and Query Workflow Executions.
     *
     *
     */

    WorkflowClientOptions clientOptions = WorkflowClientOptions.newBuilder().setNamespace("xxxxxx")
            .setIdentity("hello")
            .build();


    WorkflowClient client = WorkflowClient.newInstance(service, clientOptions);

    /*
     * Define the workflow factory. It is used to create workflow workers for a specific task queue.
     */
    WorkerFactory factory = WorkerFactory.newInstance(client);

    /*
     * Define the workflow worker. Workflow workers listen to a defined task queue and process
     * workflows and activities.
     */


    WorkerOptions options=WorkerOptions.newBuilder().setBuildId("buildId").setMaxConcurrentActivityExecutionSize(1).build();

    Worker worker = factory.newWorker(TASK_QUEUE,options);

    /*
     * Register our workflow implementation with the worker.
     * Workflow implementations must be known to the worker at runtime in
     * order to dispatch workflow tasks.
     */
    worker.registerWorkflowImplementationTypes(MultiGreetingWorkflowImpl.class);

    /*
     * Register our Activity Types with the Worker. Since Activities are stateless and thread-safe,
     * the Activity Type is a shared instance.
     */
    worker.registerActivitiesImplementations(new GreetingActivitiesImpl());

    /*
     * Start all the workers registered for a specific task queue.
     * The started workers then start polling for workflows and activities.
     */
    factory.start();

    // Create the workflow client stub. It is used to start our workflow execution.
    MultiGreetingWorkflow workflow =
        client.newWorkflowStub(
            MultiGreetingWorkflow.class,
            WorkflowOptions.newBuilder()
                    .setTaskQueue(TASK_QUEUE)
                    .setWorkflowId("test")
                .build());


    /*
     * Execute our workflow and wait for it to complete. The call to our getGreetings method is
     * synchronous.
     *
     */
    List<String> results =
        workflow.getGreetings(Arrays.asList("John", "Mary", "Michael", "Jennet"));

    // Display workflow execution results
    for (String result : results) {
      System.out.println(result);
    }

    System.exit(0);
  }

}

Response


2024-04-09T16:53:10.242-05:00  INFO 28532 --- [ce="test": 1] o.example.Activity.GreetingActivityImpl  : composeGreeting begin: Hello, Mary
2024-04-09T16:53:11.244-05:00  INFO 28532 --- [ce="test": 1] o.example.Activity.GreetingActivityImpl  : composeGreeting end: Hello, Mary
2024-04-09T16:53:11.680-05:00  INFO 28532 --- [ce="test": 2] o.example.Activity.GreetingActivityImpl  : composeGreeting begin: Hello, Jennet
2024-04-09T16:53:12.682-05:00  INFO 28532 --- [ce="test": 2] o.example.Activity.GreetingActivityImpl  : composeGreeting end: Hello, Jennet
2024-04-09T16:53:21.252-05:00  INFO 28532 --- [ce="test": 2] o.example.Activity.GreetingActivityImpl  : composeGreeting begin: Hello, Michael
2024-04-09T16:53:22.255-05:00  INFO 28532 --- [ce="test": 2] o.example.Activity.GreetingActivityImpl  : composeGreeting end: Hello, Michael
2024-04-09T16:53:22.843-05:00  INFO 28532 --- [ce="test": 3] o.example.Activity.GreetingActivityImpl  : composeGreeting begin: Hello, John
2024-04-09T16:53:23.846-05:00  INFO 28532 --- [ce="test": 3] o.example.Activity.GreetingActivityImpl  : composeGreeting end: Hello, John

You explicitly prohibit parallel activity execution by this worker:

say they were 3 activities called in async way
activity1, activity2,activity 3

if activity1 alone failed on retry we are expecting only activity 1 to execute. But we see other 2 activities are running again. how do we handle this ?

What do you mean by run again? Only failed activities are retried. If you post your workflow code and the event history, we can look.