Extending the question in this post Async activities in workflow - #2 by maxim, I am working on what to do in the catch block to get my intended behavior:
- Never fail or retry the whole workflow.
- All of the promises should execute in their entirety exactly once, unless an activity executing in that promise throws the Exception type in
RetryOptions.setDoNotRetry()
, OR the activity execution time reachessetScheduleToCloseTimeout
. In those cases, the exception should be logged and swallowed. - All activities should execute exactly once, as they call APIs that aren’t idempotent Edit: All activities should execute at most once.
Code samples:
Workflow Code Snippet
List<Promise<Void>> transactionStatusPromises = new ArrayList<>();
for (TransactionStatus transactionStatus : transactionStatuses) {
Promise<Void> transactionStatusPromise = Async.procedure(this::processTransactionStatus, updateTimestamp,
inboundUpdateWorkflowId, transactionStatus);
transactionStatusPromises.add(transactionStatusPromise);
}
for (Promise<Void> promise : transactionStatusPromises) {
try {
promise.get();
} catch (Exception exception) {
if (exception instanceof TemporalFailure) {
logger.error("processTransactionStatus threw a TemporalFailure exception. Swallowing the exception to allow"
+ " InboundUpdateWorkflow to continue processing other TransactionStatuses", exception);
} else {
logger.error("processTransactionStatus threw a nonTemporalFailure exception. Propagating the exception"
+ " so Temporal retries the previous workflow task.", exception);
throw exception;
}
}
}
processTransactionStatus PsuedoCode
foo()
activityA()
bar()
activityB()
baz()
activityC()
qux()
return;
I’ve run a tests on the code above against TestEnvironment. The input is a list of 2 TransactionStatues, which creates two promises. I also mocked baz()
to throw an NPE on the first execution, and then return success on all future executions. activityB
and activityC
are mocked so I can verify the number of calls they receive. My intent is to confirm that if there is a bug in baz()
, activityA
and activityB
won’t run additional times. My understanding is that when a non TemporalFailure exception is thrown and not caught, only the most recent Workflow task will retry, which should mean that NPEs won’t cause activities to rerun (Throwing Exception vs Failure in workflow - #3 by nathan_shields).
Test: Run the workflow with RetryOptions.setMaxAttempts(2)
Expected behavior
- ActivityA executes twice (once per TransactionStatus, not retrying after baz() throws an NPE)
- ActivityB executes twice (once per TransactionStatus, not retrying after baz() throws an NPE)
- ActivityC executes twice (once per TransactionStatus)
Observed behavior
- ActivityA executes four times (once per TransactionStatus, retries after baz() throws an NPE)
- ActivityB executes four times (once per TransactionStatus, retries after baz() throws an NPE)
- ActivityC executes twice (once per TransactionStatus)
Error message:15:04:12.850 [workflow-method] ERROR io.temporal.internal.sync.POJOWorkflowImplementationFactory - Workflow execution failure WorkflowId=164b1724-fd6c-422e-a094-a0326654a829, RunId=3e2e9185-76d8-4b68-9456-6bcace3e67ad, WorkflowType=InboundUpdateWorkflow
Final event in event history:
event_id: 24
event_time {
seconds: 1629399852
nanos: 902000000
}
event_type: EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW
workflow_execution_continued_as_new_event_attributes {
new_execution_run_id: "ddfe8132-e016-42f6-a158-7137cb160ff9"
workflow_type {
name: "<REDACTED>"
}
task_queue {
name: "<REDACTED>"
}
input {
payloads {
metadata {
key: "encoding"
value: "json/plain"
}
data: <REDACTED>
}
}
workflow_run_timeout {
seconds: 315360000
}
workflow_task_timeout {
seconds: 10
}
workflow_task_completed_event_id: 21
backoff_start_interval {
seconds: 1
}
last_completion_result {
}
}
Additional error message, potentially because of a non-graceful shutdown?
Aug 19, 2021 3:04:14 PM io.grpc.internal.SerializeReentrantCallsDirectExecutor execute
SEVERE: Exception while executing runnable io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@6863b6d5
java.lang.IllegalStateException: Stream is already completed, no further calls are allowed
at com.google.common.base.Preconditions.checkState(Preconditions.java:507)
at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:340)
at io.temporal.internal.testservice.TestWorkflowService.pollActivityTaskQueue(TestWorkflowService.java:463)
at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$MethodHandlers.invoke(WorkflowServiceGrpc.java:3662)
at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:180)
at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:814)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializeReentrantCallsDirectExecutor.execute(SerializeReentrantCallsDirectExecutor.java:49)
at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener.halfClosed(ServerImpl.java:825)
at io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream.halfClose(InProcessTransport.java:791)
at io.grpc.internal.ForwardingClientStream.halfClose(ForwardingClientStream.java:72)
at io.grpc.internal.DelayedStream$9.run(DelayedStream.java:320)
at io.grpc.internal.DelayedStream.drainPendingCalls(DelayedStream.java:163)
at io.grpc.internal.DelayedStream.setStream(DelayedStream.java:132)
at io.grpc.internal.DelayedClientTransport$PendingStream.createRealStream(DelayedClientTransport.java:358)
at io.grpc.internal.DelayedClientTransport$PendingStream.access$300(DelayedClientTransport.java:341)
at io.grpc.internal.DelayedClientTransport$5.run(DelayedClientTransport.java:300)
at io.grpc.stub.ClientCalls$ThreadlessExecutor.waitAndDrain(ClientCalls.java:740)
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:149)
at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.pollActivityTaskQueue(WorkflowServiceGrpc.java:2702)
at io.temporal.internal.worker.ActivityPollTask.poll(ActivityPollTask.java:95)
at io.temporal.internal.worker.ActivityPollTask.poll(ActivityPollTask.java:38)
at io.temporal.internal.worker.Poller$PollExecutionTask.run(Poller.java:273)
at io.temporal.internal.worker.Poller$PollLoopTask.run(Poller.java:242)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
I did not think the NPE would cause ActivityA and ActivityB to retry. Is there some concept I’m not understanding? What do I need to change in order to get the expected behavior?