I do have one more question. This is relating to validating when an activity fails.
The code I have is the following:
public void testSequentialWorkflow_failedActivity() {
worker = testEnv.newWorker(DslWorkflowImpl.TASK_QUEUE);
worker.registerWorkflowImplementationTypes(DslWorkflowImpl.class);
BaseActivity activities = mock(BaseActivity.class);
when(activities.start(Mockito.any(UUID.class), Mockito.any(ActivityPayload[].class))).thenThrow(new IllegalArgumentException());
Worker newWorker = testEnv.newWorker("BaseActivity");
newWorker.registerActivitiesImplementations(activities);
testEnv.start();
// Get a workflow stub using the same task queue the worker uses.
DslWorkflow workflow =
client.newWorkflowStub(
DslWorkflow.class,
WorkflowOptions.newBuilder().setRetryOptions(RetryOptions.newBuilder().setDoNotRetry().build()).setTaskQueue(DslWorkflowImpl.TASK_QUEUE).build());
workflow.execute(UUID.randomUUID(), invalidParallelConfig);
// config has two activities so two should be processed
verify(activities, Mockito.times(2)).start(Mockito.any(UUID.class), Mockito.any(ActivityPayload[].class));
}
What I expect to happen is that when the activity gets triggered, the mock version will throw an exception. I see that the exception gets thrown via stack trace.
00:10:42.120 [Test worker] INFO io.temporal.serviceclient.WorkflowServiceStubsImpl - Created GRPC client for channel: ManagedChannelOrphanWrapper{delegate=ManagedChannelImpl{logId=2, target=directaddress:///2b755a1a-b191-4fe3-ba31-59f9d7a638f3}}
00:10:42.131 [Test worker] INFO io.temporal.serviceclient.WorkflowServiceStubsImpl - Created GRPC client for channel: ManagedChannelOrphanWrapper{delegate=ManagedChannelImpl{logId=5, target=directaddress:///b7fb6de9-405f-48b4-b4e4-3ae6675540b8}}
00:10:43.724 [Time-limited test] INFO io.temporal.internal.worker.Poller - start(): Poller{options=PollerOptions{maximumPollRateIntervalMilliseconds=1000, maximumPollRatePerSecond=0.0, pollBackoffCoefficient=2.0, pollBackoffInitialInterval=PT0.1S, pollBackoffMaximumInterval=PT1M, pollThreadCount=2, pollThreadNamePrefix='Workflow Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default"'}, identity=10051@C02C71F6MD6V}
00:10:43.733 [Time-limited test] INFO io.temporal.internal.worker.Poller - start(): Poller{options=PollerOptions{maximumPollRateIntervalMilliseconds=1000, maximumPollRatePerSecond=0.0, pollBackoffCoefficient=2.0, pollBackoffInitialInterval=PT0.1S, pollBackoffMaximumInterval=PT1M, pollThreadCount=5, pollThreadNamePrefix='Activity Poller taskQueue="BaseActivity", namespace="default"'}, identity=10051@C02C71F6MD6V}
00:10:43.734 [Time-limited test] INFO io.temporal.internal.worker.Poller - start(): Poller{options=PollerOptions{maximumPollRateIntervalMilliseconds=1000, maximumPollRatePerSecond=0.0, pollBackoffCoefficient=2.0, pollBackoffInitialInterval=PT0.1S, pollBackoffMaximumInterval=PT1M, pollThreadCount=5, pollThreadNamePrefix='Host Local Workflow Poller'}, identity=7fcbce44-a976-4788-aa56-3f32b9a6bb65}
00:10:44.840 [workflow[dc089c4e-1726-4228-bf9b-b5fb92d83cb3]-1] INFO com.snapchat.ci_cd_orchestrator.workflow.DslWorkflowImpl - Scheduling activity ActivityInvocation(name=BaseActivity, method=start, arguments=null, result=result2) for pipeline 40ac7dbf-1f3c-4a91-9a27-e7ce88d30a70 to task queue BaseActivity
00:10:44.907 [Workflow Executor taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default": 1] DEBUG io.temporal.internal.replay.ReplayWorkflowTaskHandler - WorkflowTask startedEventId=3, WorkflowId=dc089c4e-1726-4228-bf9b-b5fb92d83cb3, RunId=13a193be-2907-4a07-9f5b-db2a21baa342 completed with 1 new commands
00:10:44.977 [Activity Executor taskQueue="BaseActivity", namespace="default": 1] WARN io.temporal.internal.sync.POJOActivityTaskHandler - Activity failure. ActivityId=5bfeca07-d19b-3d85-aad5-1a2c626c821a, activityType=BaseActivityStart, attempt=1
java.lang.IllegalArgumentException: null
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at io.temporal.internal.sync.POJOActivityTaskHandler$POJOActivityInboundCallsInterceptor.execute(POJOActivityTaskHandler.java:279)
at io.temporal.internal.sync.POJOActivityTaskHandler$POJOActivityImplementation.execute(POJOActivityTaskHandler.java:218)
at io.temporal.internal.sync.POJOActivityTaskHandler.handle(POJOActivityTaskHandler.java:183)
at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:194)
at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:154)
at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
00:10:45.000 [Test worker] INFO io.temporal.worker.WorkerFactory - shutdownNow
00:10:45.000 [Test worker] INFO io.temporal.internal.worker.Poller - shutdownNow poller=Host Local Workflow Poller
00:10:45.001 [Host Local Workflow Poller: 3] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.001 [Host Local Workflow Poller: 2] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.001 [Host Local Workflow Poller: 5] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.001 [Host Local Workflow Poller: 1] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.001 [Host Local Workflow Poller: 4] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.003 [Test worker] INFO io.temporal.internal.worker.Poller - shutdownNow poller=Workflow Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default"
00:10:45.004 [Test worker] INFO io.temporal.internal.worker.Poller - shutdownNow poller=Activity Poller taskQueue="BaseActivity", namespace="default"
00:10:45.004 [Test worker] INFO io.temporal.worker.WorkerFactory - awaitTermination begin
00:10:45.004 [Workflow Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default": 1] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.004 [Activity Poller taskQueue="BaseActivity", namespace="default": 1] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.004 [Workflow Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default": 2] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.004 [Activity Poller taskQueue="BaseActivity", namespace="default": 5] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.005 [Activity Poller taskQueue="BaseActivity", namespace="default": 4] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.004 [Activity Poller taskQueue="BaseActivity", namespace="default": 2] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.005 [Activity Poller taskQueue="BaseActivity", namespace="default": 3] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.017 [Test worker] INFO io.temporal.worker.WorkerFactory - awaitTermination done
00:10:45.020 [Test worker] INFO io.temporal.serviceclient.WorkflowServiceStubsImpl - shutdown
Stored Workflows:
ExecutionId{namespace='default', execution=workflow_id: "dc089c4e-1726-4228-bf9b-b5fb92d83cb3"
run_id: "13a193be-2907-4a07-9f5b-db2a21baa342"
}
event_id: 1
event_time {
seconds: 1605082244
nanos: 651000000
}
event_type: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
workflow_execution_started_event_attributes {
workflow_type {
name: "DslWorkflow"
}
task_queue {
name: "CI_CD_PIPELINE_TASK_QUEUE"
}
input {
payloads {
metadata {
key: "encoding"
value: "json/plain"
}
data: "\"40ac7dbf-1f3c-4a91-9a27-e7ce88d30a70\""
}
payloads {
metadata {
key: "encoding"
value: "json/plain"
}
data: "{\"variables\":{\"arg1\":null,\"arg2\":null,\"arg3\":null},\"root\":{\"activity\":null,\"sequence\":null,\"parallel\":{\"branches\":[{\"activity\":{\"name\":\"BaseActivity\",\"method\":\"start\",\"arguments\":null,\"result\":\"result2\"},\"sequence\":null,\"parallel\":null}]}}}"
}
}
workflow_execution_timeout {
seconds: 315360000
}
workflow_run_timeout {
seconds: 315360000
}
workflow_task_timeout {
seconds: 10
}
original_execution_run_id: "13a193be-2907-4a07-9f5b-db2a21baa342"
identity: "10051@C02C71F6MD6V"
attempt: 1
}
event_id: 2
event_time {
seconds: 1605082244
nanos: 651000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
workflow_task_scheduled_event_attributes {
task_queue {
name: "CI_CD_PIPELINE_TASK_QUEUE"
}
start_to_close_timeout {
seconds: 10
}
attempt: 1
}
event_id: 3
event_time {
seconds: 1605082244
nanos: 672000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_STARTED
workflow_task_started_event_attributes {
scheduled_event_id: 2
identity: "10051@C02C71F6MD6V"
}
event_id: 4
event_time {
seconds: 1605082244
nanos: 945000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
workflow_task_completed_event_attributes {
scheduled_event_id: 2
identity: "10051@C02C71F6MD6V"
}
event_id: 5
event_time {
seconds: 1605082244
nanos: 945000000
}
event_type: EVENT_TYPE_ACTIVITY_TASK_SCHEDULED
activity_task_scheduled_event_attributes {
activity_id: "5bfeca07-d19b-3d85-aad5-1a2c626c821a"
activity_type {
name: "BaseActivityStart"
}
namespace: "default"
task_queue {
name: "BaseActivity"
}
header {
}
input {
payloads {
metadata {
key: "encoding"
value: "json/plain"
}
data: "\"40ac7dbf-1f3c-4a91-9a27-e7ce88d30a70\""
}
payloads {
metadata {
key: "encoding"
value: "json/plain"
}
data: "[]"
}
}
schedule_to_close_timeout {
seconds: 315360000
}
schedule_to_start_timeout {
seconds: 315360000
}
start_to_close_timeout {
seconds: 300
}
heartbeat_timeout {
}
workflow_task_completed_event_id: 3
retry_policy {
initial_interval {
seconds: 1
}
backoff_coefficient: 2.0
maximum_interval {
seconds: 100
}
}
}
However in the code that triggers the activity
public void execute(UUID pipelineId, Parallel parallel, Map<String, ActivityPayload> bindings) {
if(parallel == null || parallel.getBranches().length == 0) {
return;
}
// In the parallel block, we want to execute all of them in parallel and wait for all of them.
// if one activity fails then we want to cancel all the rest of them as well.
List<Promise<Void>> results = new ArrayList<>(bindings.size());
CancellationScope scope =
Workflow.newCancellationScope(
() -> {
for (Statement statement : parallel.getBranches()) {
results.add(Async.function(this::execute, pipelineId, statement, bindings));
}
});
// As code inside the scope is non blocking the run doesn't block.
scope.run();
try {
// If one activity fails then all the rest will fail
Promise.allOf(results).get();
} catch (Exception ex) {
// Cancel uncompleted activities
scope.cancel();
log.error("One of the Activities failed. Canceling the rest.", ex);
throw ex;
}
}
Since the exception gets thrown, I would expect the Promise.all
to fail and an exception gets thrown and the catch statement gets executed. But the catch end point never gets triggered.
I feel like I might have a misunderstanding of how to properly handle this.