Testing Framework for Java SDK?

I notice that there is doc on testing for GO https://docs.temporal.io/docs/go-workflow-testing/#setup but Im not seeing one for java. Wondering if I just missed it.

Thanks,
Derek

We are working on rewriting SDK documentation to ensure parity.

In the meantime, check out unit tests from the Java Samples.

I’m having an issue where it expires when it tries to poll for activities. Any suggestion for trouble shooting. I’ve debugged the code where the dslWorkflow will execute the activity via

private Void execute(ActivityInvocation activity, Map<String, ActivityPayload> bindings) {
    ActivityPayload[] args = makeInput(activity.getArguments(), bindings);
    ActivityStub stub =
        Workflow.newUntypedActivityStub(
            ActivityOptions.newBuilder()
                .setStartToCloseTimeout(Duration.ofMinutes(5))
                .setTaskQueue(activity.getName())
                .build());

    String methodNameDescriptor =
        activity.getName()
            + activity.getMethod().substring(0, 1).toUpperCase()
            + activity.getMethod().substring(1);
    activityStatusMap.put(activity.getName(), "Starting");
    ActivityPayload results =
        stub.execute(methodNameDescriptor, ActivityPayload.class, new Object[] {args});

    activityStatusMap.put(activity.getName(), "Completed");

    if (!Strings.isNullOrEmpty(activity.getResult())) {
      bindings.put(activity.getResult(), results);
    }

    return null;
  }
10:26:39.473 [Test worker] INFO io.temporal.internal.grpc.WorkflowServiceStubsImpl - Created GRPC client for channel: ManagedChannelOrphanWrapper{delegate=ManagedChannelImpl{logId=2, target=directaddress:///451b4a6a-be67-4645-8efd-e462b56cc3c2}}
10:26:39.481 [Test worker] INFO io.temporal.internal.grpc.WorkflowServiceStubsImpl - Created GRPC client for channel: ManagedChannelOrphanWrapper{delegate=ManagedChannelImpl{logId=5, target=directaddress:///baef9014-eea1-4d6f-9d26-7e6cb64b58a1}}
10:26:39.591 [Test worker] INFO io.temporal.internal.worker.Poller - start(): Poller{options=PollerOptions{maximumPollRateIntervalMilliseconds=1000, maximumPollRatePerSecond=0.0, pollBackoffCoefficient=2.0, pollBackoffInitialInterval=PT0.1S, pollBackoffMaximumInterval=PT1M, pollThreadCount=2, pollThreadNamePrefix='Workflow Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default"'}, identity=41415@C02C71F6MD6V}
10:26:39.597 [Test worker] INFO io.temporal.internal.worker.Poller - start(): Poller{options=PollerOptions{maximumPollRateIntervalMilliseconds=1000, maximumPollRatePerSecond=0.0, pollBackoffCoefficient=2.0, pollBackoffInitialInterval=PT0.1S, pollBackoffMaximumInterval=PT1M, pollThreadCount=5, pollThreadNamePrefix='Activity Poller taskQueue="SampleActivity1", namespace="default"'}, identity=41415@C02C71F6MD6V}
10:26:39.598 [Test worker] INFO io.temporal.internal.worker.Poller - start(): Poller{options=PollerOptions{maximumPollRateIntervalMilliseconds=1000, maximumPollRatePerSecond=0.0, pollBackoffCoefficient=2.0, pollBackoffInitialInterval=PT0.1S, pollBackoffMaximumInterval=PT1M, pollThreadCount=5, pollThreadNamePrefix='Host Local Workflow Poller'}, identity=1335cabc-fc75-40f8-b68c-d12d30a612b4}
10:26:40.332 [Workflow Executor taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default": 1] DEBUG io.temporal.internal.replay.ReplayWorkflowTaskHandler - WorkflowTask startedEventId=3, WorkflowId=7cbd4302-bc65-47d1-beb0-5071be151c3a, RunId=d3a01127-47e7-41e5-aa82-4599102d7567 completed with 1 new commands
io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 120.960848191s. [buffered_nanos=49374405]
	at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:244)
	at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:225)
	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:142)
	at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.pollActivityTaskQueue(WorkflowServiceGrpc.java:2702)
	at io.temporal.internal.worker.ActivityPollTask.poll(ActivityPollTask.java:95)
	at io.temporal.internal.worker.ActivityPollTask.poll(ActivityPollTask.java:38)
	at io.temporal.internal.worker.Poller$PollExecutionTask.run(Poller.java:273)
	at io.temporal.internal.worker.Poller$PollLoopTask.run(Poller.java:242)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
10:28:40.636 [Host Local Workflow Poller: 4] WARN io.temporal.internal.worker.Poller - Failure in thread Host Local Workflow Poller: 4

 @Before
    public void setUp() throws IOException {
        testEnv = TestWorkflowEnvironment.newInstance();
        worker = testEnv.newWorker(DslWorkflowImpl.TASK_QUEUE);
        worker.registerWorkflowImplementationTypes(DslWorkflowImpl.class);
        client = testEnv.getWorkflowClient();
    }

 @Test
    public void testWorkflow() {
        Worker activity1Worker = testEnv.newWorker("SampleActivity1");
        activity1Worker.registerActivitiesImplementations(new SampleActivities.SampleActivitiesImpl1());

        testEnv.start();

        WorkflowOptions workflowOptions =
                WorkflowOptions.newBuilder().setTaskQueue(DslWorkflowImpl.TASK_QUEUE).build();

        DslWorkflow workflow = client.newWorkflowStub(DslWorkflow.class, workflowOptions);
        workflow.execute(sequentialConfig);

        System.out.println("success");
        // Execute a workflow waiting for it to complete.

Thanks,
Derek

Do you have any doc going into detail how the test framework is setup? Example does it start another grpc process which the test will communicate with? or does it just mock out everything?

The logged error is benign. I filed an issue to return an empty poll after a minute to avoid it.

It implements a fake version of the service. The fake is a gRPC service.

I recommend adding the following @Rule to your unit test to print workflow event history on test failure/timeout:

  /** Prints a history of the workflow under test in case of a test failure. */
  @Rule
  public TestWatcher watchman =
      new TestWatcher() {
        @Override
        protected void failed(Throwable e, Description description) {
          if (testEnv != null) {
            System.err.println(testEnv.getDiagnostics());
            testEnv.close();
          }
        }
      };

Taken from this sample.

If i use the same worker for the activity and the workflow, it works.

However, when I use different workers it looks like the activity isn’t able to poll.

  testEnv = TestWorkflowEnvironment.newInstance();
  worker = testEnv.newWorker(DslWorkflowImpl.TASK_QUEUE);
  worker.registerWorkflowImplementationTypes(DslWorkflowImpl.class);
  Worker activity1Worker = testEnv.newWorker("SampleActivity1");
  activity1Worker.registerActivitiesImplementations(new SampleActivities.SampleActivitiesImpl1());
  testEnv.start();   

From the logs it looks like it’s starting to poll from both CI_CD_PIPELINE_TASK_QUEUE and SampleActivity1

 ActivityStub stub =
        Workflow.newUntypedActivityStub(
            ActivityOptions.newBuilder()
                .setStartToCloseTimeout(Duration.ofMinutes(5))
                .setTaskQueue('SampleActivity1')
                .build());
  ActivityPayload results =
        stub.execute(methodNameDescriptor, ActivityPayload.class, new Object[] {args});

but nothing ever happens.

However, if i just change the activity to CI_CD_PIPELINE_TASK_QUEUE and use one worker it works.

22:15:42.935 [Test worker] INFO io.temporal.internal.grpc.WorkflowServiceStubsImpl - Created GRPC client for channel: ManagedChannelOrphanWrapper{delegate=ManagedChannelImpl{logId=2, target=directaddress:///70b961f9-f020-45df-9325-b08a859c67f7}}
22:15:42.943 [Test worker] INFO io.temporal.internal.grpc.WorkflowServiceStubsImpl - Created GRPC client for channel: ManagedChannelOrphanWrapper{delegate=ManagedChannelImpl{logId=5, target=directaddress:///66ded101-79bd-46b3-8409-9d9875a007f8}}
22:15:43.089 [Time-limited test] INFO io.temporal.internal.worker.Poller - start(): Poller{options=PollerOptions{maximumPollRateIntervalMilliseconds=1000, maximumPollRatePerSecond=0.0, pollBackoffCoefficient=2.0, pollBackoffInitialInterval=PT0.1S, pollBackoffMaximumInterval=PT1M, pollThreadCount=2, pollThreadNamePrefix='Workflow Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default"'}, identity=49528@C02C71F6MD6V}
22:15:43.096 [Time-limited test] INFO io.temporal.internal.worker.Poller - start(): Poller{options=PollerOptions{maximumPollRateIntervalMilliseconds=1000, maximumPollRatePerSecond=0.0, pollBackoffCoefficient=2.0, pollBackoffInitialInterval=PT0.1S, pollBackoffMaximumInterval=PT1M, pollThreadCount=5, pollThreadNamePrefix='Activity Poller taskQueue="SampleActivity1", namespace="default"'}, identity=49528@C02C71F6MD6V}
22:15:43.097 [Time-limited test] INFO io.temporal.internal.worker.Poller - start(): Poller{options=PollerOptions{maximumPollRateIntervalMilliseconds=1000, maximumPollRatePerSecond=0.0, pollBackoffCoefficient=2.0, pollBackoffInitialInterval=PT0.1S, pollBackoffMaximumInterval=PT1M, pollThreadCount=5, pollThreadNamePrefix='Host Local Workflow Poller'}, identity=f6a56160-0873-44ce-8a76-10dae31402c3}
22:15:47.817 [Workflow Executor taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default": 1] DEBUG io.temporal.internal.replay.ReplayWorkflowTaskHandler - WorkflowTask startedEventId=3, WorkflowId=dfdee61d-7101-458e-929a-e772cd4c1378, RunId=77bf0795-5e05-49ba-8a88-3db74cc23134 completed with 1 new commands
22:15:48.067 [Test worker] INFO io.temporal.worker.WorkerFactory - shutdownNow
22:15:48.067 [Test worker] INFO io.temporal.internal.worker.Poller - shutdownNow poller=Host Local Workflow Poller
22:15:48.068 [Host Local Workflow Poller: 3] INFO io.temporal.internal.worker.Poller - poll loop done
22:15:48.068 [Host Local Workflow Poller: 4] INFO io.temporal.internal.worker.Poller - poll loop done
22:15:48.068 [Host Local Workflow Poller: 5] INFO io.temporal.internal.worker.Poller - poll loop done
22:15:48.068 [Host Local Workflow Poller: 1] INFO io.temporal.internal.worker.Poller - poll loop done
22:15:48.068 [Host Local Workflow Poller: 2] INFO io.temporal.internal.worker.Poller - poll loop done
22:15:48.072 [Test worker] INFO io.temporal.internal.worker.Poller - shutdownNow poller=Workflow Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default"
22:15:48.073 [Test worker] INFO io.temporal.internal.worker.Poller - shutdownNow poller=Activity Poller taskQueue="SampleActivity1", namespace="default"
22:15:48.074 [Test worker] INFO io.temporal.worker.WorkerFactory - awaitTermination begin
22:15:48.074 [Workflow Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default": 2] INFO io.temporal.internal.worker.Poller - poll loop done
22:15:48.074 [Activity Poller taskQueue="SampleActivity1", namespace="default": 1] INFO io.temporal.internal.worker.Poller - poll loop done
22:15:48.074 [Workflow Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default": 1] INFO io.temporal.internal.worker.Poller - poll loop done
22:15:48.074 [Activity Poller taskQueue="SampleActivity1", namespace="default": 4] INFO io.temporal.internal.worker.Poller - poll loop done
22:15:48.074 [Activity Poller taskQueue="SampleActivity1", namespace="default": 3] INFO io.temporal.internal.worker.Poller - poll loop done
22:15:48.074 [Activity Poller taskQueue="SampleActivity1", namespace="default": 2] INFO io.temporal.internal.worker.Poller - poll loop done
22:15:48.074 [Activity Poller taskQueue="SampleActivity1", namespace="default": 5] INFO io.temporal.internal.worker.Poller - poll loop done
22:15:48.077 [Test worker] INFO io.temporal.worker.WorkerFactory - awaitTermination done
22:15:48.082 [Test worker] INFO io.temporal.internal.grpc.WorkflowServiceStubsImpl - shutdown
Stored Workflows:
ExecutionId{namespace='default', execution=workflow_id: "dfdee61d-7101-458e-929a-e772cd4c1378"
run_id: "77bf0795-5e05-49ba-8a88-3db74cc23134"
}
event_id: 1
event_time {
  seconds: 1604902546
  nanos: 123000000
}
event_type: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
workflow_execution_started_event_attributes {
  workflow_type {
    name: "DslWorkflow"
  }
  task_queue {
    name: "CI_CD_PIPELINE_TASK_QUEUE"
  }
  input {
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "{\"variables\":{\"arg1\":{\"pipelineExecutionId\":\"f02043ea-21f8-11eb-adc1-0242ac120002\",\"buildInput\":null,\"buildOutput\":null}},\"root\":{\"activity\":null,\"sequence\":{\"elements\":[{\"activity\":{\"name\":\"SampleActivities1\",\"method\":\"start\",\"arguments\":[\"arg1\"],\"result\":\"result1\"},\"sequence\":null,\"parallel\":null}]},\"parallel\":null}}"
    }
  }
  workflow_execution_timeout {
    seconds: 315360000
  }
  workflow_run_timeout {
    seconds: 315360000
  }
  workflow_task_timeout {
    seconds: 10
  }
  original_execution_run_id: "77bf0795-5e05-49ba-8a88-3db74cc23134"
  identity: "49528@C02C71F6MD6V"
  attempt: 1
}
event_id: 2
event_time {
  seconds: 1604902546
  nanos: 123000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
workflow_task_scheduled_event_attributes {
  task_queue {
    name: "CI_CD_PIPELINE_TASK_QUEUE"
  }
  start_to_close_timeout {
    seconds: 10
  }
  attempt: 1
}
event_id: 3
event_time {
  seconds: 1604902546
  nanos: 141000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_STARTED
workflow_task_started_event_attributes {
  scheduled_event_id: 2
  identity: "49528@C02C71F6MD6V"
}
event_id: 4
event_time {
  seconds: 1604902547
  nanos: 842000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
workflow_task_completed_event_attributes {
  scheduled_event_id: 2
  identity: "49528@C02C71F6MD6V"
}
event_id: 5
event_time {
  seconds: 1604902547
  nanos: 842000000
}
event_type: EVENT_TYPE_ACTIVITY_TASK_SCHEDULED
activity_task_scheduled_event_attributes {
  activity_id: "863e80c9-f52b-3525-8319-581e54bc2b2a"
  activity_type {
    name: "SampleActivities1Start"
  }
  namespace: "default"
  task_queue {
    name: "SampleActivities1"
  }
  header {
  }
  input {
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "[{\"pipelineExecutionId\":\"f02043ea-21f8-11eb-adc1-0242ac120002\",\"buildInput\":null,\"buildOutput\":null}]"
    }
  }
  schedule_to_close_timeout {
    seconds: 315360000
  }
  schedule_to_start_timeout {
    seconds: 315360000
  }
  start_to_close_timeout {
    seconds: 300
  }
  heartbeat_timeout {
  }
  workflow_task_completed_event_id: 3
  retry_policy {
    initial_interval {
      seconds: 1
    }
    backoff_coefficient: 2.0
    maximum_interval {
      seconds: 100
    }
  }
}


22:15:48.188 [Test worker] INFO io.temporal.worker.WorkerFactory - shutdownNow
22:15:48.188 [Test worker] INFO io.temporal.internal.worker.Poller - shutdownNow poller=Host Local Workflow Poller
22:15:48.188 [Test worker] INFO io.temporal.internal.worker.Poller - shutdownNow poller=Workflow Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default"
22:15:48.188 [Test worker] INFO io.temporal.internal.worker.Poller - shutdownNow poller=Activity Poller taskQueue="SampleActivity1", namespace="default"
22:15:48.188 [Test worker] INFO io.temporal.worker.WorkerFactory - awaitTermination begin
22:15:48.188 [Test worker] INFO io.temporal.worker.WorkerFactory - awaitTermination done
22:15:48.188 [Test worker] INFO io.temporal.internal.grpc.WorkflowServiceStubsImpl - shutdown

Based on this history print out, it looks like the activity gets executed and it’s put into the correct SampleActivity1. However, the worker isn’t picking it up.

Here is the printout when I just make the workflow and activity to have the same worker.

22:22:33.267 [Test worker] INFO io.temporal.internal.grpc.WorkflowServiceStubsImpl - Created GRPC client for channel: ManagedChannelOrphanWrapper{delegate=ManagedChannelImpl{logId=2, target=directaddress:///8e89b1f0-bf72-4518-8556-91bd51603941}}
22:22:33.275 [Test worker] INFO io.temporal.internal.grpc.WorkflowServiceStubsImpl - Created GRPC client for channel: ManagedChannelOrphanWrapper{delegate=ManagedChannelImpl{logId=5, target=directaddress:///bddbe59b-220c-4fd2-9e1b-4112bc5fb17b}}
22:22:33.379 [Time-limited test] INFO io.temporal.internal.worker.Poller - start(): Poller{options=PollerOptions{maximumPollRateIntervalMilliseconds=1000, maximumPollRatePerSecond=0.0, pollBackoffCoefficient=2.0, pollBackoffInitialInterval=PT0.1S, pollBackoffMaximumInterval=PT1M, pollThreadCount=2, pollThreadNamePrefix='Workflow Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default"'}, identity=49711@C02C71F6MD6V}
22:22:33.383 [Time-limited test] INFO io.temporal.internal.worker.Poller - start(): Poller{options=PollerOptions{maximumPollRateIntervalMilliseconds=1000, maximumPollRatePerSecond=0.0, pollBackoffCoefficient=2.0, pollBackoffInitialInterval=PT0.1S, pollBackoffMaximumInterval=PT1M, pollThreadCount=1, pollThreadNamePrefix='Local Activity Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default"'}, identity=49711@C02C71F6MD6V}
22:22:33.386 [Time-limited test] INFO io.temporal.internal.worker.Poller - start(): Poller{options=PollerOptions{maximumPollRateIntervalMilliseconds=1000, maximumPollRatePerSecond=0.0, pollBackoffCoefficient=2.0, pollBackoffInitialInterval=PT0.1S, pollBackoffMaximumInterval=PT1M, pollThreadCount=5, pollThreadNamePrefix='Activity Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default"'}, identity=49711@C02C71F6MD6V}
22:22:33.387 [Time-limited test] INFO io.temporal.internal.worker.Poller - start(): Poller{options=PollerOptions{maximumPollRateIntervalMilliseconds=1000, maximumPollRatePerSecond=0.0, pollBackoffCoefficient=2.0, pollBackoffInitialInterval=PT0.1S, pollBackoffMaximumInterval=PT1M, pollThreadCount=5, pollThreadNamePrefix='Host Local Workflow Poller'}, identity=4b97f67e-4433-4daf-a3c2-e22868e5d8cd}
22:22:33.967 [Workflow Executor taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default": 1] DEBUG io.temporal.internal.replay.ReplayWorkflowTaskHandler - WorkflowTask startedEventId=3, WorkflowId=d174ef7f-d843-4bb8-8e10-8aff9e34c1dc, RunId=2e9cac63-61c9-49fa-84d8-380ca1eabcc9 completed with 1 new commands
22:22:34.021 [Workflow Executor taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default": 1] DEBUG io.temporal.internal.replay.ReplayWorkflowTaskHandler - WorkflowTask startedEventId=9, WorkflowId=d174ef7f-d843-4bb8-8e10-8aff9e34c1dc, RunId=2e9cac63-61c9-49fa-84d8-380ca1eabcc9 completed with 1 new commands
success
22:22:34.025 [Test worker] INFO io.temporal.worker.WorkerFactory - shutdownNow
22:22:34.025 [Test worker] INFO io.temporal.internal.worker.Poller - shutdownNow poller=Host Local Workflow Poller
22:22:34.026 [Host Local Workflow Poller: 4] INFO io.temporal.internal.worker.Poller - poll loop done
22:22:34.026 [Host Local Workflow Poller: 5] INFO io.temporal.internal.worker.Poller - poll loop done
22:22:34.026 [Host Local Workflow Poller: 2] INFO io.temporal.internal.worker.Poller - poll loop done
22:22:34.026 [Host Local Workflow Poller: 1] INFO io.temporal.internal.worker.Poller - poll loop done
22:22:34.026 [Host Local Workflow Poller: 3] INFO io.temporal.internal.worker.Poller - poll loop done
22:22:34.027 [Test worker] INFO io.temporal.internal.worker.Poller - shutdownNow poller=Activity Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default"
22:22:34.027 [Test worker] INFO io.temporal.internal.worker.Poller - shutdownNow poller=Local Activity Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default"
22:22:34.027 [Test worker] INFO io.temporal.internal.worker.Poller - shutdownNow poller=Workflow Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default"
22:22:34.027 [Local Activity Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default": 1] INFO io.temporal.internal.worker.Poller - poll loop done
22:22:34.028 [Test worker] INFO io.temporal.worker.WorkerFactory - awaitTermination begin
22:22:34.028 [Activity Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default": 1] INFO io.temporal.internal.worker.Poller - poll loop done
22:22:34.028 [Activity Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default": 3] INFO io.temporal.internal.worker.Poller - poll loop done
22:22:34.028 [Activity Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default": 5] INFO io.temporal.internal.worker.Poller - poll loop done
22:22:34.028 [Activity Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default": 4] INFO io.temporal.internal.worker.Poller - poll loop done
22:22:34.028 [Workflow Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default": 1] INFO io.temporal.internal.worker.Poller - poll loop done
22:22:34.028 [Activity Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default": 2] INFO io.temporal.internal.worker.Poller - poll loop done
22:22:34.028 [Workflow Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default": 2] INFO io.temporal.internal.worker.Poller - poll loop done
22:22:34.030 [Test worker] INFO io.temporal.worker.WorkerFactory - awaitTermination done
22:22:34.033 [Test worker] INFO io.temporal.internal.grpc.WorkflowServiceStubsImpl - shutdown

Strange thou, when I modify the helloWorld example with different workers it works. It’s not clear what the difference is other then i’m creating the workflow from a dsl file.

History shows that the activity is scheduled using “SampleActivities1” task queue. And the task queue used to initialize the worker is “SampleActivity1”:

.setTaskQueue('SampleActivity1')

  task_queue {
    name: "SampleActivities1"
  }

Thanks @maxim

One more question. I tried to follow the below example in my own project:

  @Test
  public void testMockedActivity() {
    GreetingActivities activities = mock(GreetingActivities.class);
    when(activities.composeGreeting("Hello", "World")).thenReturn("Hello World!");
    worker.registerActivitiesImplementations(activities);
    testEnv.start();

    // Get a workflow stub using the same task queue the worker uses.
    GreetingWorkflow workflow =
        client.newWorkflowStub(
            GreetingWorkflow.class,
            WorkflowOptions.newBuilder().setTaskQueue("GreetingActivities").build());
    // Execute a workflow waiting for it to complete.
    String greeting = workflow.getGreeting("World");
    assertEquals("Hello World!", greeting);
  }

But i’m getting this error:

Found @ActivityMethod annotation on "public java.lang.String GreetingActivity$GreetingActivities$MockitoMock$846845575.composeGreeting(java.lang.String,java.lang.String)" This annotation can be used only on the interface method it implements.
java.lang.IllegalArgumentException: Found @ActivityMethod annotation on "public java.lang.String GreetingActivity$GreetingActivities$MockitoMock$846845575.composeGreeting(java.lang.String,java.lang.String)" This annotation can be used only on the interface method it implements.

I’m guessing the error happens because the interface is what’s getting mocked out vs. the actual activity’s implementation. I wanted to get a better understanding of how the this is working in the java sample but not in my version since it make sense that you would have to mock out the implementation vs. the base.

The following works for me:

@ActivityInterface
public interface Account {

  @ActivityMethod
  void deposit(String accountId, String referenceId, int amountCents);

  @ActivityMethod(name = "WithdrawActivity")
  int withdraw(String accountId, String referenceId, int amountCents);
}

  @Test
  public void testTransfer() {
    Account activities = mock(Account.class);
    when(activities.withdraw("account1", "reference1", 123)).thenReturn(100);
    worker.registerActivitiesImplementations(activities);
    testEnv.start();
    WorkflowOptions options = WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build();
    AccountTransferWorkflow workflow =
        workflowClient.newWorkflowStub(AccountTransferWorkflow.class, options);
    long starty = testEnv.currentTimeMillis();
    workflow.transfer("account1", "account2", "reference1", 123);
    verify(activities).withdraw(eq("account1"), eq("reference1"), eq(123));
    verify(activities).deposit(eq("account2"), eq("reference1"), eq(123));
    long duration = testEnv.currentTimeMillis() - starty;
    System.out.println("Duration: " + duration);
  }

I don’t understand why the name of the class in the error you posted is GreetingActivity$GreetingActivities. Is it an inner class?

It’s because the activity is an inner class

public class GreetingActivity {
    static final String TASK_QUEUE = "HelloActivity";

    /** Workflow interface has to have at least one method annotated with @WorkflowMethod. */
    @WorkflowInterface
    public interface GreetingWorkflow {
        @WorkflowMethod
        String getGreeting(String name);
    }

    /** Activity interface is just a POJI. */
    @ActivityInterface
    public interface GreetingActivities {
        @ActivityMethod
        String composeGreeting(String greeting, String name);
    }

    /** GreetingWorkflow implementation that calls GreetingsActivities#composeGreeting. */
    public static class GreetingWorkflowImpl implements GreetingWorkflow {

        /**
         * Activity stub implements activity interface and proxies calls to it to Temporal activity
         * invocations. Because activities are reentrant, only a single stub can be used for multiple
         * activity invocations.
         */
        private final GreetingActivities activities =
                Workflow.newActivityStub(
                        GreetingActivities.class,
                        ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofSeconds(2)).build());

        @Override
        public String getGreeting(String name) {
            // This is a blocking call that returns only after the activity has completed.
            return activities.composeGreeting("Hello", name);
        }
    }

    static class GreetingActivitiesImpl implements GreetingActivities {
        @Override
        public String composeGreeting(String greeting, String name) {
            return greeting + " " + name + "!";
        }
    }
}

Does it work if @ActivityMethod is removed? It is not needed unless you want to give the activity a different name. Also try to move the activity interface to the top level.

Yeah, looks like it get past at least the error.

In your example, put a break point in POJOActivityImplMetadata.java. When you call worker.registerActivitiesImplementations(activities);, and it hits your break point you will see that the getAnnotation code doesn’t work. The method Account::deposit has a defined annotation. Not sure why it doesn’t pick it up.

But in my code, the annotation actually picks up @ActivityMethod and throws the exception.

But without the @ActivityMethod I don’t believe workflow would work since it won’t know how to pick it up (from my understanding). So in my case, the activity gets scheduled, but it seems like the workflow doesn’t know how to trigger the activity.

One thing i do notice is that when I debug the sample java project, I notice that all the methods in the mocked out interface contain an enhancerByMockitoWithGLIB which I don’t have on mine.

Mine has MockitoMock.

It looks like your using

  testImplementation group: 'org.powermock', name: 'powermock-api-mockito', version: '1.7.4'

It looks like the different version of mockito and your usage of powermock that may allow you to by pass the annotation that are part of the interface method which will trigger an exception.

Interestingly, if you tried to mock the Activities implementation, you will get the following error

java.lang.IllegalArgumentException: Class doesn't implement any non empty interface annotated with @ActivityInterface: io.temporal.samples.hello.HelloActivity$GreetingActivitiesImpl$$EnhancerByMockitoWithCGLIB$$fb34bcd0
  @Test
  public void testMockedActivity() {
    HelloActivity.GreetingActivitiesImpl activities =
        mock(HelloActivity.GreetingActivitiesImpl.class);
    when(activities.composeGreeting("Hello", "World")).thenReturn("Hello World!");
    worker.registerActivitiesImplementations(activities);
    testEnv.start();

    // Get a workflow stub using the same task queue the worker uses.
    GreetingWorkflow workflow =
        client.newWorkflowStub(
            GreetingWorkflow.class,
            WorkflowOptions.newBuilder().setTaskQueue("GreetingActivities").build());
    // Execute a workflow waiting for it to complete.
    String greeting = workflow.getGreeting("World");
    assertEquals("Hello World!", greeting);
  }
  /** Activity interface is just a POJI. */
  @ActivityInterface
  public interface GreetingActivities {
    @ActivityMethod
    String composeGreeting(String greeting, String name);
  }

  static class GreetingActivitiesImpl implements GreetingActivities {
    @Override
    public String composeGreeting(String greeting, String name) {
      return greeting + " " + name + "!";
    }
  }

looks like it doesn’t handle interface for annotation so doesn’t look like this way is possible

@ActivityMethod is not needed. Only @ActivityInterface is required.

But without the @ActivityMethod I don’t believe workflow would work since it won’t know how to pick it up (from my understanding). So in my case, the activity gets scheduled, but it seems like the workflow doesn’t know how to trigger the activity.

Would you print the workflow history (using @Rule mentioned above) and see what activity name is found in the ActivityTaskScheduled event?

Stored Workflows:
ExecutionId{namespace='default', execution=workflow_id: "78830867-5d7e-41c3-9e67-36a268a4c1f4"
run_id: "274581af-6bf5-4a99-97b2-8b5f43b8ecc8"
}
event_id: 1
event_time {
  seconds: 1605032197
  nanos: 686000000
}
event_type: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
workflow_execution_started_event_attributes {
  workflow_type {
    name: "DslWorkflow"
  }
  task_queue {
    name: "CI_CD_PIPELINE_TASK_QUEUE"
  }
  input {
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "\"913a3420-03f7-4521-80f4-181f4987d3f9\""
    }
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "{\"variables\":{\"arg1\":null},\"root\":{\"activity\":null,\"sequence\":{\"elements\":[{\"activity\":{\"name\":\"BaseActivity\",\"method\":\"start\",\"arguments\":[\"arg1\"],\"result\":\"result1\"},\"sequence\":null,\"parallel\":null}]},\"parallel\":null}}"
    }
  }
  workflow_execution_timeout {
    seconds: 315360000
  }
  workflow_run_timeout {
    seconds: 315360000
  }
  workflow_task_timeout {
    seconds: 10
  }
  original_execution_run_id: "274581af-6bf5-4a99-97b2-8b5f43b8ecc8"
  identity: "88151@C02C71F6MD6V"
  attempt: 1
}
event_id: 2
event_time {
  seconds: 1605032197
  nanos: 686000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
workflow_task_scheduled_event_attributes {
  task_queue {
    name: "CI_CD_PIPELINE_TASK_QUEUE"
  }
  start_to_close_timeout {
    seconds: 10
  }
  attempt: 1
}
event_id: 3
event_time {
  seconds: 1605032197
  nanos: 724000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_STARTED
workflow_task_started_event_attributes {
  scheduled_event_id: 2
  identity: "88151@C02C71F6MD6V"
}
event_id: 4
event_time {
  seconds: 1605032198
  nanos: 262000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
workflow_task_completed_event_attributes {
  scheduled_event_id: 2
  identity: "88151@C02C71F6MD6V"
}
event_id: 5
event_time {
  seconds: 1605032198
  nanos: 262000000
}
event_type: EVENT_TYPE_ACTIVITY_TASK_SCHEDULED
activity_task_scheduled_event_attributes {
  activity_id: "3b32bf82-1129-3f3d-9fff-301eec6e55e1"
  activity_type {
    name: "BaseActivityStart"
  }
  namespace: "default"
  task_queue {
    name: "BaseActivity"
  }
  header {
  }
  input {
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "\"913a3420-03f7-4521-80f4-181f4987d3f9\""
    }
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "[null]"
    }
  }
  schedule_to_close_timeout {
    seconds: 315360000
  }
  schedule_to_start_timeout {
    seconds: 315360000
  }
  start_to_close_timeout {
    seconds: 300
  }
  heartbeat_timeout {
  }
  workflow_task_completed_event_id: 3
  retry_policy {
    initial_interval {
      seconds: 1
    }
    backoff_coefficient: 2.0
    maximum_interval {
      seconds: 100
    }
  }
}
@Test(timeout = 5000)
    public void testBaseMockedActivity() {
        worker = testEnv.newWorker(DslWorkflowImpl.TASK_QUEUE);
        worker.registerWorkflowImplementationTypes(DslWorkflowImpl.class);

        BaseActivity activities = mock(BaseActivity.class);
        when(activities.start(Mockito.any(UUID.class), Mockito.any(ActivityPayload[].class))).thenReturn(null);
        Worker newWorker = testEnv.newWorker("BaseActivity");
        newWorker.registerActivitiesImplementations(activities);

        testEnv.start();

        // Get a workflow stub using the same task queue the worker uses.
        DslWorkflow workflow =
                client.newWorkflowStub(
                        DslWorkflow.class,
                        WorkflowOptions.newBuilder().setTaskQueue(DslWorkflowImpl.TASK_QUEUE).build());


        workflow.execute(UUID.randomUUID(), sequentialConfig);
    }

@ActivityInterface
public interface BaseActivity {
  ActivityPayload start(UUID pipelineId, ActivityPayload[] input);
}

I do execute the activity in the workflow via

    ActivityPayload results =
        stub.execute(methodNameDescriptor, ActivityPayload.class, new Object[]{pipelineId, args});

Not sure if that matter since it looks like the activity does get scheduled just not picked up. I think it might be somehow related to the fact that the mock isn’t be triggered.

Specifically how does temporal SDK take

  input {
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "\"e2cc8600-e76b-4240-9e6e-64cb00abc31f\""
    }
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "[]"
    }
  }

and map it to the arguments for the baseActivity start method.

ActivityPayload start(UUID pipelineId, ActivityPayload[] input)

The name of the scheduled activity (from the ActivityTaskScheduled event) is “BaseActivityStart”. The name of the activity of the interface

@ActivityInterface
public interface BaseActivity {
  ActivityPayload start(UUID pipelineId, ActivityPayload[] input);
}

Is just “Start”. If you want to make it “BaseActivityStart” change the interface to:

@ActivityInterface(namePrefix="BaseActivity")
public interface BaseActivity {
  ActivityPayload start(UUID pipelineId, ActivityPayload[] input);
}

I do have one more question. This is relating to validating when an activity fails.

The code I have is the following:

    public void testSequentialWorkflow_failedActivity() {
        worker = testEnv.newWorker(DslWorkflowImpl.TASK_QUEUE);
        worker.registerWorkflowImplementationTypes(DslWorkflowImpl.class);

        BaseActivity activities = mock(BaseActivity.class);
        when(activities.start(Mockito.any(UUID.class), Mockito.any(ActivityPayload[].class))).thenThrow(new IllegalArgumentException());
        Worker newWorker = testEnv.newWorker("BaseActivity");
        newWorker.registerActivitiesImplementations(activities);

        testEnv.start();

        // Get a workflow stub using the same task queue the worker uses.
        DslWorkflow workflow =
                client.newWorkflowStub(
                        DslWorkflow.class,
                        WorkflowOptions.newBuilder().setRetryOptions(RetryOptions.newBuilder().setDoNotRetry().build()).setTaskQueue(DslWorkflowImpl.TASK_QUEUE).build());

        workflow.execute(UUID.randomUUID(), invalidParallelConfig);

        // config has two activities so two should be processed
        verify(activities, Mockito.times(2)).start(Mockito.any(UUID.class), Mockito.any(ActivityPayload[].class));

    }

What I expect to happen is that when the activity gets triggered, the mock version will throw an exception. I see that the exception gets thrown via stack trace.

00:10:42.120 [Test worker] INFO io.temporal.serviceclient.WorkflowServiceStubsImpl - Created GRPC client for channel: ManagedChannelOrphanWrapper{delegate=ManagedChannelImpl{logId=2, target=directaddress:///2b755a1a-b191-4fe3-ba31-59f9d7a638f3}}
00:10:42.131 [Test worker] INFO io.temporal.serviceclient.WorkflowServiceStubsImpl - Created GRPC client for channel: ManagedChannelOrphanWrapper{delegate=ManagedChannelImpl{logId=5, target=directaddress:///b7fb6de9-405f-48b4-b4e4-3ae6675540b8}}
00:10:43.724 [Time-limited test] INFO io.temporal.internal.worker.Poller - start(): Poller{options=PollerOptions{maximumPollRateIntervalMilliseconds=1000, maximumPollRatePerSecond=0.0, pollBackoffCoefficient=2.0, pollBackoffInitialInterval=PT0.1S, pollBackoffMaximumInterval=PT1M, pollThreadCount=2, pollThreadNamePrefix='Workflow Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default"'}, identity=10051@C02C71F6MD6V}
00:10:43.733 [Time-limited test] INFO io.temporal.internal.worker.Poller - start(): Poller{options=PollerOptions{maximumPollRateIntervalMilliseconds=1000, maximumPollRatePerSecond=0.0, pollBackoffCoefficient=2.0, pollBackoffInitialInterval=PT0.1S, pollBackoffMaximumInterval=PT1M, pollThreadCount=5, pollThreadNamePrefix='Activity Poller taskQueue="BaseActivity", namespace="default"'}, identity=10051@C02C71F6MD6V}
00:10:43.734 [Time-limited test] INFO io.temporal.internal.worker.Poller - start(): Poller{options=PollerOptions{maximumPollRateIntervalMilliseconds=1000, maximumPollRatePerSecond=0.0, pollBackoffCoefficient=2.0, pollBackoffInitialInterval=PT0.1S, pollBackoffMaximumInterval=PT1M, pollThreadCount=5, pollThreadNamePrefix='Host Local Workflow Poller'}, identity=7fcbce44-a976-4788-aa56-3f32b9a6bb65}
00:10:44.840 [workflow[dc089c4e-1726-4228-bf9b-b5fb92d83cb3]-1] INFO com.snapchat.ci_cd_orchestrator.workflow.DslWorkflowImpl - Scheduling activity ActivityInvocation(name=BaseActivity, method=start, arguments=null, result=result2) for pipeline 40ac7dbf-1f3c-4a91-9a27-e7ce88d30a70 to task queue BaseActivity
00:10:44.907 [Workflow Executor taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default": 1] DEBUG io.temporal.internal.replay.ReplayWorkflowTaskHandler - WorkflowTask startedEventId=3, WorkflowId=dc089c4e-1726-4228-bf9b-b5fb92d83cb3, RunId=13a193be-2907-4a07-9f5b-db2a21baa342 completed with 1 new commands
00:10:44.977 [Activity Executor taskQueue="BaseActivity", namespace="default": 1] WARN io.temporal.internal.sync.POJOActivityTaskHandler - Activity failure. ActivityId=5bfeca07-d19b-3d85-aad5-1a2c626c821a, activityType=BaseActivityStart, attempt=1
java.lang.IllegalArgumentException: null
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at io.temporal.internal.sync.POJOActivityTaskHandler$POJOActivityInboundCallsInterceptor.execute(POJOActivityTaskHandler.java:279)
	at io.temporal.internal.sync.POJOActivityTaskHandler$POJOActivityImplementation.execute(POJOActivityTaskHandler.java:218)
	at io.temporal.internal.sync.POJOActivityTaskHandler.handle(POJOActivityTaskHandler.java:183)
	at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:194)
	at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:154)
	at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
00:10:45.000 [Test worker] INFO io.temporal.worker.WorkerFactory - shutdownNow
00:10:45.000 [Test worker] INFO io.temporal.internal.worker.Poller - shutdownNow poller=Host Local Workflow Poller
00:10:45.001 [Host Local Workflow Poller: 3] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.001 [Host Local Workflow Poller: 2] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.001 [Host Local Workflow Poller: 5] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.001 [Host Local Workflow Poller: 1] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.001 [Host Local Workflow Poller: 4] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.003 [Test worker] INFO io.temporal.internal.worker.Poller - shutdownNow poller=Workflow Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default"
00:10:45.004 [Test worker] INFO io.temporal.internal.worker.Poller - shutdownNow poller=Activity Poller taskQueue="BaseActivity", namespace="default"
00:10:45.004 [Test worker] INFO io.temporal.worker.WorkerFactory - awaitTermination begin
00:10:45.004 [Workflow Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default": 1] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.004 [Activity Poller taskQueue="BaseActivity", namespace="default": 1] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.004 [Workflow Poller taskQueue="CI_CD_PIPELINE_TASK_QUEUE", namespace="default": 2] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.004 [Activity Poller taskQueue="BaseActivity", namespace="default": 5] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.005 [Activity Poller taskQueue="BaseActivity", namespace="default": 4] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.004 [Activity Poller taskQueue="BaseActivity", namespace="default": 2] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.005 [Activity Poller taskQueue="BaseActivity", namespace="default": 3] INFO io.temporal.internal.worker.Poller - poll loop done
00:10:45.017 [Test worker] INFO io.temporal.worker.WorkerFactory - awaitTermination done
00:10:45.020 [Test worker] INFO io.temporal.serviceclient.WorkflowServiceStubsImpl - shutdown
Stored Workflows:
ExecutionId{namespace='default', execution=workflow_id: "dc089c4e-1726-4228-bf9b-b5fb92d83cb3"
run_id: "13a193be-2907-4a07-9f5b-db2a21baa342"
}
event_id: 1
event_time {
  seconds: 1605082244
  nanos: 651000000
}
event_type: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
workflow_execution_started_event_attributes {
  workflow_type {
    name: "DslWorkflow"
  }
  task_queue {
    name: "CI_CD_PIPELINE_TASK_QUEUE"
  }
  input {
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "\"40ac7dbf-1f3c-4a91-9a27-e7ce88d30a70\""
    }
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "{\"variables\":{\"arg1\":null,\"arg2\":null,\"arg3\":null},\"root\":{\"activity\":null,\"sequence\":null,\"parallel\":{\"branches\":[{\"activity\":{\"name\":\"BaseActivity\",\"method\":\"start\",\"arguments\":null,\"result\":\"result2\"},\"sequence\":null,\"parallel\":null}]}}}"
    }
  }
  workflow_execution_timeout {
    seconds: 315360000
  }
  workflow_run_timeout {
    seconds: 315360000
  }
  workflow_task_timeout {
    seconds: 10
  }
  original_execution_run_id: "13a193be-2907-4a07-9f5b-db2a21baa342"
  identity: "10051@C02C71F6MD6V"
  attempt: 1
}
event_id: 2
event_time {
  seconds: 1605082244
  nanos: 651000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
workflow_task_scheduled_event_attributes {
  task_queue {
    name: "CI_CD_PIPELINE_TASK_QUEUE"
  }
  start_to_close_timeout {
    seconds: 10
  }
  attempt: 1
}
event_id: 3
event_time {
  seconds: 1605082244
  nanos: 672000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_STARTED
workflow_task_started_event_attributes {
  scheduled_event_id: 2
  identity: "10051@C02C71F6MD6V"
}
event_id: 4
event_time {
  seconds: 1605082244
  nanos: 945000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
workflow_task_completed_event_attributes {
  scheduled_event_id: 2
  identity: "10051@C02C71F6MD6V"
}
event_id: 5
event_time {
  seconds: 1605082244
  nanos: 945000000
}
event_type: EVENT_TYPE_ACTIVITY_TASK_SCHEDULED
activity_task_scheduled_event_attributes {
  activity_id: "5bfeca07-d19b-3d85-aad5-1a2c626c821a"
  activity_type {
    name: "BaseActivityStart"
  }
  namespace: "default"
  task_queue {
    name: "BaseActivity"
  }
  header {
  }
  input {
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "\"40ac7dbf-1f3c-4a91-9a27-e7ce88d30a70\""
    }
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "[]"
    }
  }
  schedule_to_close_timeout {
    seconds: 315360000
  }
  schedule_to_start_timeout {
    seconds: 315360000
  }
  start_to_close_timeout {
    seconds: 300
  }
  heartbeat_timeout {
  }
  workflow_task_completed_event_id: 3
  retry_policy {
    initial_interval {
      seconds: 1
    }
    backoff_coefficient: 2.0
    maximum_interval {
      seconds: 100
    }
  }
}

However in the code that triggers the activity

public void execute(UUID pipelineId, Parallel parallel, Map<String, ActivityPayload> bindings) {
    if(parallel == null || parallel.getBranches().length == 0) {
      return;
    }

    // In the parallel block, we want to execute all of them in parallel and wait for all of them.
    // if one activity fails then we want to cancel all the rest of them as well.
    List<Promise<Void>> results = new ArrayList<>(bindings.size());
    CancellationScope scope =
        Workflow.newCancellationScope(
            () -> {
              for (Statement statement : parallel.getBranches()) {
                results.add(Async.function(this::execute, pipelineId, statement, bindings));
              }
            });

    // As code inside the scope is non blocking the run doesn't block.
    scope.run();

    try {
      // If one activity fails then all the rest will fail
      Promise.allOf(results).get();
    } catch (Exception ex) {
      // Cancel uncompleted activities
      scope.cancel();
      log.error("One of the Activities failed.  Canceling the rest.", ex);
      throw ex;
    }
  }

Since the exception gets thrown, I would expect the Promise.all to fail and an exception gets thrown and the catch statement gets executed. But the catch end point never gets triggered.

I feel like I might have a misunderstanding of how to properly handle this.

My guess is that the activity is being retried

https://github.com/darewreck54/temporal-java-samples/tree/test_issue I took the DSL example and added the unit test to simulate the issue i’m seeing. (Branch is test_issue)

UnitTest: https://github.com/darewreck54/temporal-java-samples/blob/test_issue/src/test/java/io/temporal/samples/dsl/DslActivityTest.java

Code that containers the cancelable action: https://github.com/darewreck54/temporal-java-samples/blob/test_issue/src/main/java/io/temporal/samples/dsl/models/Parallel.java

So in this flow, it takes a yaml dsl which specify that it has one activity that is ran in parallel

1:05.663 [Activity Executor taskQueue="SampleActivities1", namespace="default": 2] WARN  i.t.i.sync.POJOActivityTaskHandler - Activity failure. ActivityId=1363cb1e-889a-3138-902d-bec52d0c7218, activityType=SampleActivities1GetInfo, attempt=5029
java.lang.IllegalArgumentException: null
	at jdk.internal.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:567)
	at io.temporal.internal.sync.POJOActivityTaskHandler$POJOActivityInboundCallsInterceptor.execute(POJOActivityTaskHandler.java:279)
	at io.temporal.internal.sync.POJOActivityTaskHandler$POJOActivityImplementation.execute(POJOActivityTaskHandler.java:218)
	at io.temporal.internal.sync.POJOActivityTaskHandler.handle(POJOActivityTaskHandler.java:183)
	at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:194)
	at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:154)
	at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73)
	at io.temporal.internal.worker.PollTaskExecutor$$Lambda$160/0000000000000000.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:847)

However, I think the Promise swallows up the exception.

Is there another way your suppose to handle this?

Also it wasn’t clear why there was any retries. I didn’t specify any retries, so it seems strange that it keeps retrying when the activity throws an exception

Thanks a lot for the reproduction. I’ll look at it later today.