Testing Framework for Java SDK?

I do have one more question. This is relating to validating when an activity fails.

The code I have is the following:

    public void testSequentialWorkflow_failedActivity() {
        worker = testEnv.newWorker(DslWorkflowImpl.TASK_QUEUE);
        worker.registerWorkflowImplementationTypes(DslWorkflowImpl.class);

        BaseActivity activities = mock(BaseActivity.class);
        when(activities.start(Mockito.any(UUID.class), Mockito.any(ActivityPayload[].class))).thenThrow(new IllegalArgumentException());
        Worker newWorker = testEnv.newWorker("BaseActivity");
        newWorker.registerActivitiesImplementations(activities);

        testEnv.start();

        // Get a workflow stub using the same task queue the worker uses.
        DslWorkflow workflow =
                client.newWorkflowStub(
                        DslWorkflow.class,
                        WorkflowOptions.newBuilder().setRetryOptions(RetryOptions.newBuilder().setDoNotRetry().build()).setTaskQueue(DslWorkflowImpl.TASK_QUEUE).build());

        workflow.execute(UUID.randomUUID(), invalidParallelConfig);

        // config has two activities so two should be processed
        verify(activities, Mockito.times(2)).start(Mockito.any(UUID.class), Mockito.any(ActivityPayload[].class));

    }

What I expect to happen is that when the activity gets triggered, the mock version will throw an exception. I see that the exception gets thrown via stack trace.

00:10:42.120 [Test worker] INFO io.temporal.serviceclient.WorkflowServiceStubsImpl - Created GRPC client for channel: ManagedChannelOrphanWrapper{delegate=ManagedChannelImpl{logId=2, target=directaddress:///2b755a1a-b191-4fe3-ba31-59f9d7a638f3}}
00:10:42.131 [Test worker] INFO io.temporal.serviceclient.WorkflowServiceStubsImpl - Created GRPC client for channel: ManagedChannelOrphanWrapper{delegate=ManagedChannelImpl{logId=5, target=directaddress:///b7fb6de9-405f-48b4-b4e4-3ae6675540b8}}
00:10:43.724 [Time-limited test] INFO io.temporal.internal.worker.Poller - start(): Poller{options=PollerOptions{maximumPollRateIntervalMilliseconds=1000, maximumPollRatePerSecond=0.0, pollBackoffCoefficient=2.0, pollBackoffInitialInterval=PT0.1S, pollBackoffMaximumInterval=PT1M, pollThreadCount=2, pollThreadNamePrefix='Workflow Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default"'}, identity=10051@C02C71F6MD6V}
00:10:43.733 [Time-limited test] INFO io.temporal.internal.worker.Poller - start(): Poller{options=PollerOptions{maximumPollRateIntervalMilliseconds=1000, maximumPollRatePerSecond=0.0, pollBackoffCoefficient=2.0, pollBackoffInitialInterval=PT0.1S, pollBackoffMaximumInterval=PT1M, pollThreadCount=5, pollThreadNamePrefix='Activity Poller taskQueue="BaseActivity", namespace="default"'}, identity=10051@C02C71F6MD6V}
00:10:43.734 [Time-limited test] INFO io.temporal.internal.worker.Poller - start(): Poller{options=PollerOptions{maximumPollRateIntervalMilliseconds=1000, maximumPollRatePerSecond=0.0, pollBackoffCoefficient=2.0, pollBackoffInitialInterval=PT0.1S, pollBackoffMaximumInterval=PT1M, pollThreadCount=5, pollThreadNamePrefix='Host Local Workflow Poller'}, identity=7fcbce44-a976-4788-aa56-3f32b9a6bb65}
00:10:44.840 [workflow[dc089c4e-1726-4228-bf9b-b5fb92d83cb3]-1] INFO com.snapchat.ci_cd_orchestrator.workflow.DslWorkflowImpl - Scheduling activity ActivityInvocation(name=BaseActivity, method=start, arguments=null, result=result2) for pipeline 40ac7dbf-1f3c-4a91-9a27-e7ce88d30a70 to task queue BaseActivity
00:10:44.907 [Workflow Executor taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default": 1] DEBUG io.temporal.internal.replay.ReplayWorkflowTaskHandler - WorkflowTask startedEventId=3, WorkflowId=dc089c4e-1726-4228-bf9b-b5fb92d83cb3, RunId=13a193be-2907-4a07-9f5b-db2a21baa342 completed with 1 new commands
00:10:44.977 [Activity Executor taskQueue="BaseActivity", namespace="default": 1] WARN io.temporal.internal.sync.POJOActivityTaskHandler - Activity failure. ActivityId=5bfeca07-d19b-3d85-aad5-1a2c626c821a, activityType=BaseActivityStart, attempt=1
java.lang.IllegalArgumentException: null
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at io.temporal.internal.sync.POJOActivityTaskHandler$POJOActivityInboundCallsInterceptor.execute(POJOActivityTaskHandler.java:279)
	at io.temporal.internal.sync.POJOActivityTaskHandler$POJOActivityImplementation.execute(POJOActivityTaskHandler.java:218)
	at io.temporal.internal.sync.POJOActivityTaskHandler.handle(POJOActivityTaskHandler.java:183)
	at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:194)
	at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:154)
	at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
00:10:45.000 [Test worker] INFO io.temporal.worker.WorkerFactory - shutdownNow
00:10:45.000 [Test worker] INFO io.temporal.internal.worker.Poller - shutdownNow poller=Host Local Workflow Poller
00:10:45.001 [Host Local Workflow Poller: 3] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.001 [Host Local Workflow Poller: 2] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.001 [Host Local Workflow Poller: 5] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.001 [Host Local Workflow Poller: 1] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.001 [Host Local Workflow Poller: 4] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.003 [Test worker] INFO io.temporal.internal.worker.Poller - shutdownNow poller=Workflow Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default"
00:10:45.004 [Test worker] INFO io.temporal.internal.worker.Poller - shutdownNow poller=Activity Poller taskQueue="BaseActivity", namespace="default"
00:10:45.004 [Test worker] INFO io.temporal.worker.WorkerFactory - awaitTermination begin
00:10:45.004 [Workflow Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default": 1] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.004 [Activity Poller taskQueue="BaseActivity", namespace="default": 1] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.004 [Workflow Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default": 2] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.004 [Activity Poller taskQueue="BaseActivity", namespace="default": 5] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.005 [Activity Poller taskQueue="BaseActivity", namespace="default": 4] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.004 [Activity Poller taskQueue="BaseActivity", namespace="default": 2] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.005 [Activity Poller taskQueue="BaseActivity", namespace="default": 3] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.017 [Test worker] INFO io.temporal.worker.WorkerFactory - awaitTermination done
00:10:45.020 [Test worker] INFO io.temporal.serviceclient.WorkflowServiceStubsImpl - shutdown
Stored Workflows:
ExecutionId{namespace='default', execution=workflow_id: "dc089c4e-1726-4228-bf9b-b5fb92d83cb3"
run_id: "13a193be-2907-4a07-9f5b-db2a21baa342"
}
event_id: 1
event_time {
  seconds: 1605082244
  nanos: 651000000
}
event_type: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
workflow_execution_started_event_attributes {
  workflow_type {
    name: "DslWorkflow"
  }
  task_queue {
    name: "CI_CD_PIPELINE_TASK_QUEUE"
  }
  input {
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "\"40ac7dbf-1f3c-4a91-9a27-e7ce88d30a70\""
    }
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "{\"variables\":{\"arg1\":null,\"arg2\":null,\"arg3\":null},\"root\":{\"activity\":null,\"sequence\":null,\"parallel\":{\"branches\":[{\"activity\":{\"name\":\"BaseActivity\",\"method\":\"start\",\"arguments\":null,\"result\":\"result2\"},\"sequence\":null,\"parallel\":null}]}}}"
    }
  }
  workflow_execution_timeout {
    seconds: 315360000
  }
  workflow_run_timeout {
    seconds: 315360000
  }
  workflow_task_timeout {
    seconds: 10
  }
  original_execution_run_id: "13a193be-2907-4a07-9f5b-db2a21baa342"
  identity: "10051@C02C71F6MD6V"
  attempt: 1
}
event_id: 2
event_time {
  seconds: 1605082244
  nanos: 651000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
workflow_task_scheduled_event_attributes {
  task_queue {
    name: "CI_CD_PIPELINE_TASK_QUEUE"
  }
  start_to_close_timeout {
    seconds: 10
  }
  attempt: 1
}
event_id: 3
event_time {
  seconds: 1605082244
  nanos: 672000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_STARTED
workflow_task_started_event_attributes {
  scheduled_event_id: 2
  identity: "10051@C02C71F6MD6V"
}
event_id: 4
event_time {
  seconds: 1605082244
  nanos: 945000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
workflow_task_completed_event_attributes {
  scheduled_event_id: 2
  identity: "10051@C02C71F6MD6V"
}
event_id: 5
event_time {
  seconds: 1605082244
  nanos: 945000000
}
event_type: EVENT_TYPE_ACTIVITY_TASK_SCHEDULED
activity_task_scheduled_event_attributes {
  activity_id: "5bfeca07-d19b-3d85-aad5-1a2c626c821a"
  activity_type {
    name: "BaseActivityStart"
  }
  namespace: "default"
  task_queue {
    name: "BaseActivity"
  }
  header {
  }
  input {
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "\"40ac7dbf-1f3c-4a91-9a27-e7ce88d30a70\""
    }
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "[]"
    }
  }
  schedule_to_close_timeout {
    seconds: 315360000
  }
  schedule_to_start_timeout {
    seconds: 315360000
  }
  start_to_close_timeout {
    seconds: 300
  }
  heartbeat_timeout {
  }
  workflow_task_completed_event_id: 3
  retry_policy {
    initial_interval {
      seconds: 1
    }
    backoff_coefficient: 2.0
    maximum_interval {
      seconds: 100
    }
  }
}

However in the code that triggers the activity

public void execute(UUID pipelineId, Parallel parallel, Map<String, ActivityPayload> bindings) {
    if(parallel == null || parallel.getBranches().length == 0) {
      return;
    }

    // In the parallel block, we want to execute all of them in parallel and wait for all of them.
    // if one activity fails then we want to cancel all the rest of them as well.
    List<Promise<Void>> results = new ArrayList<>(bindings.size());
    CancellationScope scope =
        Workflow.newCancellationScope(
            () -> {
              for (Statement statement : parallel.getBranches()) {
                results.add(Async.function(this::execute, pipelineId, statement, bindings));
              }
            });

    // As code inside the scope is non blocking the run doesn't block.
    scope.run();

    try {
      // If one activity fails then all the rest will fail
      Promise.allOf(results).get();
    } catch (Exception ex) {
      // Cancel uncompleted activities
      scope.cancel();
      log.error("One of the Activities failed.  Canceling the rest.", ex);
      throw ex;
    }
  }

Since the exception gets thrown, I would expect the Promise.all to fail and an exception gets thrown and the catch statement gets executed. But the catch end point never gets triggered.

I feel like I might have a misunderstanding of how to properly handle this.