Looking for some guidance on how we invoke several activities in parallel. I followed the HelloParallelAcitivity.java but i do see from the timestamp in the logs they are not calling in parallel
2024-04-09T14:40:53.614-05:00 INFO 18272 — [ce=“test”: 1] o.example.Activity.GreetingActivityImpl : Hello Mary1!
2024-04-09T14:40:54.456-05:00 INFO 18272 — [ce=“test”: 2] o.example.Activity.GreetingActivityImpl : Hello Mary!
2024-04-09T14:40:57.978-05:00 INFO 18272 — [ce=“test”: 2] o.example.Activity.GreetingActivityImpl : Hello Jennet1!
2024-04-09T14:41:06.902-05:00 INFO 18272 — [ce=“test”: 2] o.example.Activity.GreetingActivityImpl : Hello Michael!
2024-04-09T14:41:14.412-05:00 INFO 18272 — [ce=“test”: 2] o.example.Activity.GreetingActivityImpl : Hello Jennet!
2024-04-09T14:41:30.984-05:00 INFO 18272 — [ce=“test”: 3] o.example.Activity.GreetingActivityImpl : Hello Michael1!
2024-04-09T14:41:40.840-05:00 INFO 18272 — [ce=“test”: 3] o.example.Activity.GreetingActivityImpl : Hello John!
2024-04-09T14:41:45.374-05:00 INFO 18272 — [ce=“test”: 3] o.example.Activity.GreetingActivityImpl : Hello John1!
/** Sample Temporal workflow that executes multiple Activity methods in parallel. */
@Slf4j
public class HelloParallelActivity {
// Define the task queue name
static final String TASK_QUEUE = "greetings-queue";
// Define our workflow unique id
static final String WORKFLOW_ID = "HelloParallelActivityWorkflow";
/**
* The Workflow Definition's Interface must contain one method annotated with @WorkflowMethod.
*
* <p>Workflow Definitions should not contain any heavyweight computations, non-deterministic
* code, network calls, database operations, etc. Those things should be handled by the
* Activities.
*
* @see WorkflowInterface
* @see WorkflowMethod
*/
@WorkflowInterface
public interface MultiGreetingWorkflow {
/**
* This is the method that is executed when the Workflow Execution is started. The Workflow
* Execution completes when this method finishes execution.
*/
@WorkflowMethod
List<String> getGreetings(List<String> names);
}
/**
* This is the Activity Definition's Interface. Activities are building blocks of any Temporal
* Workflow and contain any business logic that could perform long running computation, network
* calls, etc.
*
* <p>Annotating Activity Definition methods with @ActivityMethod is optional.
*
* @see ActivityInterface
* @see io.temporal.activity.ActivityMethod
*/
@ActivityInterface
public interface GreetingActivities {
// Define your activity method which can be called during workflow execution
String composeGreeting(String greeting, String name);
}
/** Simple activity implementation, that concatenates two strings. */
static class GreetingActivitiesImpl implements GreetingActivities {
@Override
public String composeGreeting(String greeting, String name) {
log.info("composeGreeting begin: " + greeting + ", " + name);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("composeGreeting end: " + greeting + ", " + name);
return greeting + " " + name + "!";
}
}
// Define the workflow implementation which implements our getGreeting workflow method.
public static class MultiGreetingWorkflowImpl implements MultiGreetingWorkflow {
/**
* Define the GreetingActivities stub. Activity stubs are proxies for activity invocations that
* are executed outside of the workflow thread on the activity worker, that can be on a
* different host. Temporal is going to dispatch the activity results back to the workflow and
* unblock the stub as soon as activity is completed on the activity worker.
*
* <p>In the {@link ActivityOptions} definition the "setStartToCloseTimeout" option sets the
* overall timeout that our workflow is willing to wait for activity to complete. For this
* example it is set to 2 seconds.
*/
private final GreetingActivities activities =
Workflow.newActivityStub(
GreetingActivities.class,
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(2)).build());
@Override
public List<String> getGreetings(List<String> names) {
List<String> results = new ArrayList();
List<Promise<String>> promiseList = new ArrayList<>();
if (names != null) {
for (String name : names) {
promiseList.add(Async.function(activities::composeGreeting, "Hello", name));
}
// Invoke all activities in parallel. Wait for all to complete
Promise.allOf(promiseList).get();
// Loop through promises and get results
for (Promise<String> promise : promiseList) {
if (promise.getFailure() == null) {
results.add(promise.get());
}
}
}
return results;
}
}
/**
* With our Workflow and Activities defined, we can now start execution. The main method starts
* the worker and then the workflow.
*/
private static WorkflowServiceStubsOptions getWorkflowServiceStubsOptions() {
WorkflowServiceStubsOptions.Builder serviceStubsOptions = WorkflowServiceStubsOptions.newBuilder().setTarget("xxxxxxxx");
return serviceStubsOptions.build();
}
public static void main(String[] args) {
WorkflowServiceStubsOptions serviceStubsOptions = getWorkflowServiceStubsOptions();
WorkflowServiceStubs service = WorkflowServiceStubs.newServiceStubs(serviceStubsOptions);
/*
* Get a Workflow service client which can be used to start, Signal, and Query Workflow Executions.
*
*
*/
WorkflowClientOptions clientOptions = WorkflowClientOptions.newBuilder().setNamespace("xxxxxx")
.setIdentity("hello")
.build();
WorkflowClient client = WorkflowClient.newInstance(service, clientOptions);
/*
* Define the workflow factory. It is used to create workflow workers for a specific task queue.
*/
WorkerFactory factory = WorkerFactory.newInstance(client);
/*
* Define the workflow worker. Workflow workers listen to a defined task queue and process
* workflows and activities.
*/
WorkerOptions options=WorkerOptions.newBuilder().setBuildId("buildId").setMaxConcurrentActivityExecutionSize(1).build();
Worker worker = factory.newWorker(TASK_QUEUE,options);
/*
* Register our workflow implementation with the worker.
* Workflow implementations must be known to the worker at runtime in
* order to dispatch workflow tasks.
*/
worker.registerWorkflowImplementationTypes(MultiGreetingWorkflowImpl.class);
/*
* Register our Activity Types with the Worker. Since Activities are stateless and thread-safe,
* the Activity Type is a shared instance.
*/
worker.registerActivitiesImplementations(new GreetingActivitiesImpl());
/*
* Start all the workers registered for a specific task queue.
* The started workers then start polling for workflows and activities.
*/
factory.start();
// Create the workflow client stub. It is used to start our workflow execution.
MultiGreetingWorkflow workflow =
client.newWorkflowStub(
MultiGreetingWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue(TASK_QUEUE)
.setWorkflowId("test")
.build());
/*
* Execute our workflow and wait for it to complete. The call to our getGreetings method is
* synchronous.
*
*/
List<String> results =
workflow.getGreetings(Arrays.asList("John", "Mary", "Michael", "Jennet"));
// Display workflow execution results
for (String result : results) {
System.out.println(result);
}
System.exit(0);
}
}
Response
2024-04-09T16:53:10.242-05:00 INFO 28532 --- [ce="test": 1] o.example.Activity.GreetingActivityImpl : composeGreeting begin: Hello, Mary
2024-04-09T16:53:11.244-05:00 INFO 28532 --- [ce="test": 1] o.example.Activity.GreetingActivityImpl : composeGreeting end: Hello, Mary
2024-04-09T16:53:11.680-05:00 INFO 28532 --- [ce="test": 2] o.example.Activity.GreetingActivityImpl : composeGreeting begin: Hello, Jennet
2024-04-09T16:53:12.682-05:00 INFO 28532 --- [ce="test": 2] o.example.Activity.GreetingActivityImpl : composeGreeting end: Hello, Jennet
2024-04-09T16:53:21.252-05:00 INFO 28532 --- [ce="test": 2] o.example.Activity.GreetingActivityImpl : composeGreeting begin: Hello, Michael
2024-04-09T16:53:22.255-05:00 INFO 28532 --- [ce="test": 2] o.example.Activity.GreetingActivityImpl : composeGreeting end: Hello, Michael
2024-04-09T16:53:22.843-05:00 INFO 28532 --- [ce="test": 3] o.example.Activity.GreetingActivityImpl : composeGreeting begin: Hello, John
2024-04-09T16:53:23.846-05:00 INFO 28532 --- [ce="test": 3] o.example.Activity.GreetingActivityImpl : composeGreeting end: Hello, John
say they were 3 activities called in async way
activity1, activity2,activity 3
if activity1 alone failed on retry we are expecting only activity 1 to execute. But we see other 2 activities are running again. how do we handle this ?