Handling exceptions thrown in Promises

Extending the question in this post Async activities in workflow - #2 by maxim, I am working on what to do in the catch block to get my intended behavior:

  1. Never fail or retry the whole workflow.
  2. All of the promises should execute in their entirety exactly once, unless an activity executing in that promise throws the Exception type in RetryOptions.setDoNotRetry(), OR the activity execution time reaches setScheduleToCloseTimeout. In those cases, the exception should be logged and swallowed.
  3. All activities should execute exactly once, as they call APIs that aren’t idempotent Edit: All activities should execute at most once.

Code samples:

Workflow Code Snippet

List<Promise<Void>> transactionStatusPromises = new ArrayList<>();
for (TransactionStatus transactionStatus : transactionStatuses) {
    Promise<Void> transactionStatusPromise = Async.procedure(this::processTransactionStatus, updateTimestamp,
        inboundUpdateWorkflowId, transactionStatus);
    transactionStatusPromises.add(transactionStatusPromise);
}

for (Promise<Void> promise : transactionStatusPromises) {
    try {
        promise.get();
    } catch (Exception exception) {
        if (exception instanceof TemporalFailure) {
            logger.error("processTransactionStatus threw a TemporalFailure exception. Swallowing the exception to allow"
                + " InboundUpdateWorkflow to continue processing other TransactionStatuses", exception);
        } else {
            logger.error("processTransactionStatus threw a nonTemporalFailure exception. Propagating the exception"
                + " so Temporal retries the previous workflow task.", exception);
            throw exception;
        }
    }
}

processTransactionStatus PsuedoCode

foo()
activityA()
bar()
activityB()
baz()
activityC()
qux()
return;

I’ve run a tests on the code above against TestEnvironment. The input is a list of 2 TransactionStatues, which creates two promises. I also mocked baz() to throw an NPE on the first execution, and then return success on all future executions. activityB and activityC are mocked so I can verify the number of calls they receive. My intent is to confirm that if there is a bug in baz(), activityA and activityB won’t run additional times. My understanding is that when a non TemporalFailure exception is thrown and not caught, only the most recent Workflow task will retry, which should mean that NPEs won’t cause activities to rerun (Throwing Exception vs Failure in workflow - #3 by nathan_shields).

Test: Run the workflow with RetryOptions.setMaxAttempts(2)
Expected behavior

  • ActivityA executes twice (once per TransactionStatus, not retrying after baz() throws an NPE)
  • ActivityB executes twice (once per TransactionStatus, not retrying after baz() throws an NPE)
  • ActivityC executes twice (once per TransactionStatus)

Observed behavior

  • ActivityA executes four times (once per TransactionStatus, retries after baz() throws an NPE)
  • ActivityB executes four times (once per TransactionStatus, retries after baz() throws an NPE)
  • ActivityC executes twice (once per TransactionStatus)
    Error message: 15:04:12.850 [workflow-method] ERROR io.temporal.internal.sync.POJOWorkflowImplementationFactory - Workflow execution failure WorkflowId=164b1724-fd6c-422e-a094-a0326654a829, RunId=3e2e9185-76d8-4b68-9456-6bcace3e67ad, WorkflowType=InboundUpdateWorkflow

Final event in event history:

event_id: 24
event_time {
  seconds: 1629399852
  nanos: 902000000
}
event_type: EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW
workflow_execution_continued_as_new_event_attributes {
  new_execution_run_id: "ddfe8132-e016-42f6-a158-7137cb160ff9"
  workflow_type {
    name: "<REDACTED>"
  }
  task_queue {
    name: "<REDACTED>"
  }
  input {
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: <REDACTED>
    }
  }
  workflow_run_timeout {
    seconds: 315360000
  }
  workflow_task_timeout {
    seconds: 10
  }
  workflow_task_completed_event_id: 21
  backoff_start_interval {
    seconds: 1
  }
  last_completion_result {
  }
}

Additional error message, potentially because of a non-graceful shutdown?

Aug 19, 2021 3:04:14 PM io.grpc.internal.SerializeReentrantCallsDirectExecutor execute
SEVERE: Exception while executing runnable io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@6863b6d5
java.lang.IllegalStateException: Stream is already completed, no further calls are allowed
	at com.google.common.base.Preconditions.checkState(Preconditions.java:507)
	at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:340)
	at io.temporal.internal.testservice.TestWorkflowService.pollActivityTaskQueue(TestWorkflowService.java:463)
	at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$MethodHandlers.invoke(WorkflowServiceGrpc.java:3662)
	at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:180)
	at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
	at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:814)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializeReentrantCallsDirectExecutor.execute(SerializeReentrantCallsDirectExecutor.java:49)
	at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener.halfClosed(ServerImpl.java:825)
	at io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream.halfClose(InProcessTransport.java:791)
	at io.grpc.internal.ForwardingClientStream.halfClose(ForwardingClientStream.java:72)
	at io.grpc.internal.DelayedStream$9.run(DelayedStream.java:320)
	at io.grpc.internal.DelayedStream.drainPendingCalls(DelayedStream.java:163)
	at io.grpc.internal.DelayedStream.setStream(DelayedStream.java:132)
	at io.grpc.internal.DelayedClientTransport$PendingStream.createRealStream(DelayedClientTransport.java:358)
	at io.grpc.internal.DelayedClientTransport$PendingStream.access$300(DelayedClientTransport.java:341)
	at io.grpc.internal.DelayedClientTransport$5.run(DelayedClientTransport.java:300)
	at io.grpc.stub.ClientCalls$ThreadlessExecutor.waitAndDrain(ClientCalls.java:740)
	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:149)
	at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.pollActivityTaskQueue(WorkflowServiceGrpc.java:2702)
	at io.temporal.internal.worker.ActivityPollTask.poll(ActivityPollTask.java:95)
	at io.temporal.internal.worker.ActivityPollTask.poll(ActivityPollTask.java:38)
	at io.temporal.internal.worker.Poller$PollExecutionTask.run(Poller.java:273)
	at io.temporal.internal.worker.Poller$PollLoopTask.run(Poller.java:242)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

I did not think the NPE would cause ActivityA and ActivityB to retry. Is there some concept I’m not understanding? What do I need to change in order to get the expected behavior?

This is not possible. Temporal can guarantee that the activity is executed at most once and the workflow is going to get a timeout exception if activity didn’t execute. The problem is that it is possible for an activity to execute and workflow still to get the exception. What are you going to do in this case?

Sorry, you are correct, I meant at most once. The issue I’m seeing is the activities running more than once.

Update: I wanted to check if activityA() was being called on the retry after the NPE, but Temporal would skip the execution of the activity. So instead of mocking activityA, I mocked one of its dependencies. I still saw that this dependency was being called 4 times (expected 2), suggesting that activityA() is running again after the NPE thrown by baz(). However, when I view the event history, it shows the activity was only run twice, once per TransactionStatus. This is what I expect to see, but it doesn’t match the behavior recorded by the mocks. Is something causing activityA() to execute again, but preventing the execution from being added to the event history?

processTransactionStatus PsuedoCode (copied from the first post, with added comments)

foo()
activityA() // the dependency of activityA is interacted with 4 times
bar()
activityB()
baz() // NPE thrown here on first execution
activityC()
qux()
return;

If you want activites running at most once you have to set ActivityOptions.RetryOptions.MaxAttempts to 1.

I feel like I’m not understanding something fundamental about Temporal, so sorry for asking this same question over and over. I’ve made a simple example that displays the behavior I’m confused about. I’m trying to understand exactly what happens when an error is thrown in a Workflow, outside an Activity, due to buggy code. I thought that Temporal would periodically retry only the most recent workflow task, but I haven’t been able to reproduce this behavior against the TestWorkflowEnvironment. Please help me figure out what I’m not understanding.

Intended behavior:
On an NPE, get Temporal to automatically retry the non-activity part of the code that threw the NPE, and then continue the Workflow execution if there is no error. Ideally, no activities executed before the NPE was thrown will be executed again. Question: is this behavior possible?

Observed behavior:
Case 1 (without Workflow RetryOptions set): After the NPE is thrown, the workflow closes with EVENT_TYPE_WORKFLOW_EXECUTION_FAILED and throws a WorkflowFailedException.

Case 2 (with Workflow RetryOptions.maxAttempts(2)): After the NPE is thrown, the workflow restarts and completes successfully. However, MockRequestActivity:: makeMockRequest runs twice, where I only expected it to run once. The activity output is:
Making request to non-idempotent API with UUID: ff9bb0cd-73a3-3fd3-9429-25d2ce9c9b58
Making request to non-idempotent API with UUID: 890fff92-7d80-33bd-b8ca-ace112b6c9c0
The executionHistory only shows the activity running once. The final event in the executionHistory is:

event_id: 11
event_time {
  seconds: 1629482275
  nanos: 42000000
}
event_type: EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW
workflow_execution_continued_as_new_event_attributes {
  new_execution_run_id: "a75493c4-c6a1-4635-895c-cbac8d0aedd0"
  workflow_type {
    name: "SampleWorkflow"
  }
  task_queue {
    name: "queue_name"
  }
  input {
  }
  workflow_run_timeout {
    seconds: 315360000
  }
  workflow_task_timeout {
    seconds: 60
  }
  workflow_task_completed_event_id: 9
  backoff_start_interval {
    seconds: 1
  }
  last_completion_result {
  }
}

Self contained test class:

public class SampleRetryTest {
    private TestWorkflowEnvironment testEnv;
    private WorkflowClient client;
    private interface BuggyDependency {
        void doWork();
    }

    @ActivityInterface public interface MockRequestActivity {
        void makeMockRequest(UUID uuid);
    }

    private static class MockRequestActivityImpl implements MockRequestActivity {
        @Override public void makeMockRequest(UUID uuid) {
            // Pretend to make a request / do something not allowed in workflows
            // to see how many times this executes
            System.out.println("Making request to non-idempotent API with UUID: " + uuid);
        }
    }

    @WorkflowInterface public interface SampleWorkflow {
        @WorkflowMethod
        UUID execute();
    }

    private static class SampleWorkflowImpl implements SampleWorkflow {
        private final BuggyDependency buggyDependency;
        private final ActivityOptions requestActivityOptions;

        public SampleWorkflowImpl(BuggyDependency buggyDependency) {
            this.buggyDependency = buggyDependency;
            this.requestActivityOptions = ActivityOptions.newBuilder()
                .setTaskQueue("queue_name")
                .setStartToCloseTimeout(Duration.ofMinutes(1))
                .build();
        }

        @Override public UUID execute() {
            UUID uuid = Workflow.randomUUID();
            MockRequestActivity activity = Workflow.newActivityStub(MockRequestActivity.class, requestActivityOptions);
            activity.makeMockRequest(uuid);
            buggyDependency.doWork();
            return uuid;
        }
    }

    @BeforeEach
    public void setUp() {
        testEnv = TestWorkflowEnvironment.newInstance();
        client = testEnv.getWorkflowClient();
    }

    @AfterEach
    public void tearDown() {
        testEnv.close();
    }

    @Test public void testDependencyHasBug() {
        BuggyDependency dependency = mock(BuggyDependency.class);
        // Throws an exception the first time, works the second time, to simulate a bug being fixed
        doThrow(new NullPointerException("oh no!")).doNothing().when(dependency).doWork();

        Worker worker = testEnv.newWorker("queue_name");
        worker.addWorkflowImplementationFactory(SampleWorkflow.class,
            () -> new SampleWorkflowImpl(dependency));
        worker.registerActivitiesImplementations(new MockRequestActivityImpl());

        testEnv.start();
        SampleWorkflow workflow = client.newWorkflowStub(SampleWorkflow.class,
            WorkflowOptions.newBuilder().setTaskQueue("queue_name")
                .setWorkflowTaskTimeout(Duration.ofMinutes(1))
                .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(2).build())
                .build());
        WorkflowStub stub = WorkflowStub.fromTyped(workflow);
        WorkflowExecution execution = WorkflowClient.start(workflow::execute);

        try {
            stub.getResult(UUID.class);
        } catch (Exception e) {
            fail("An exception was thrown during getResult.", e);
        } finally {
            printWorkflowExecutionHistoryForVerification(testEnv, execution);
        }
    }

    private void printWorkflowExecutionHistoryForVerification(TestWorkflowEnvironment testEnv, WorkflowExecution execution) {
        GetWorkflowExecutionHistoryRequest request =
            GetWorkflowExecutionHistoryRequest.newBuilder()
                .setNamespace("default")
                .setExecution(execution)
                .build();

        GetWorkflowExecutionHistoryResponse result =
            testEnv.getWorkflowService().blockingStub().getWorkflowExecutionHistory(request);

        List<HistoryEvent> events = result.getHistory().getEventsList();
        System.out.println(events);
    }
}

Now I do understand the question. Thanks a lot for the good explanation.

I believe it happens because the unit testing framework by default doesn’t retry failures in the workflow code as it is very confusing for first-time users. Are you able to reproduce this problem when running against the real service?

I’ll try to find out how exactly the unit testing framework disables the exception retries and post here.

Here is the option that disables retries of the exceptions thrown from the workflow code. Note that it only happens when you use addWorkflowImplementationFactory method.

Thank you! This is exactly what I was looking for. Changing the addWorkflowImplementationFactory call to the following caused the intended behavior. I feel much more comfortable with error handling, now :smiley:

        worker.addWorkflowImplementationFactory(WorkflowImplementationOptions.newBuilder().build(),
            SampleWorkflow.class,
            () -> new SampleWorkflowImpl(dependency));
1 Like