Context: I have a batch processing workflow that uses Async::procedure
to execute a series of activities and non-activity methods against each item in the batch. I want to write a test that confirms my error handling is working as intended. I need to understand what will happen how the workflow will behave in different error scenarios.
I want to test if my workflow code correctly handles exceptions thrown inside of Async.procedure
. Are specifically, I want to verify that if some of the non-activity code is buggy, I can fix the code without causing the workflow + activities to retry (replaying is fine). Here’s a very simplified version of the workflow. Please forgive any syntax errors in the code samples.
class MyWorkflow {
public void doProcess(List<MyObject> input) {
List<Promise<Void>> processingPromises = new ArrayList<>();
for (MyObject obj : input) {
// Using Temporal's Async makes replay occur in a consistent order.
Promise<Void> promise = Async.procedure(this::processMyObject, obj);
processingPromises.add(promise);
}
// Get each promise result one by one.
for (Promise<Void> promise : processingPromises) {
try {
promise.get();
} catch (TemporalFailure temporalFailure) {
logger.error("processMyObject threw a TemporalFailure exception. Swallowing the exception to allow"
+ " Workflow to continue processing other MyObject", temporalFailure);
} catch (Exception exception) {
logger.error("processMyObject threw a nonTemporalFailure exception. Propagating the exception to allow"
+ " MyWorkflow to retry processing this MyObject by replaying the workflow.", exception);
throw exception;
}
}
}
private void processMyObject(MyObject obj) {
String apiResult = activity.callApis(obj)
String doWorkResult = dependency.doWork(apiResult);
activity.doActivity(obj, doWorkResult);
}
}
I want to make sure my try/catch block around promise.get()
works as expected. I’m trying to simulate the case where the dependency.doWork(...)
is buggy. Here’s a simplified version of the test I wrote. It’s supposed to show that dependency.doWork(...)
, and then is fixed. I’ve read the Versioning reference of the SDK docs, but I am not sure if the use of versioning is required here. Even if it is, I do not know how to do workflow versioning in a JUnit test.
@Test
public void test() {
MyDependency dependency = mock(MyDependency.class);
// throw an exception on ONLY the first execution to cause a replay
doThrow(new NullPointerException()).doReturn("result")
.when(dependency).doWork();
// Register workflow implementation with mocked dependency
// Assertions...
}
The test fails intermittently due to the workflow not completing. In reality, the workflow is longer than the version I have above, so event_id=29 makes sense. The error I see is:
java.lang.RuntimeException: Failure processing workflow task. WorkflowId=3464846248, RunId=992d59e5-d9e2-4e50-b3b8-cce2f35fab9c, Attempt=2
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:349) ~[temporal-sdk-1.1.0.jar:?]
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:279) ~[temporal-sdk-1.1.0.jar:?]
at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:79) ~[temporal-sdk-1.1.0.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: io.temporal.internal.replay.InternalWorkflowTaskException: Failure handling event 29 of 'EVENT_TYPE_ACTIVITY_TASK_SCHEDULED' type. IsReplaying=true, PreviousStartedEventId=27, workflowTaskStartedEventId=36, Currently Processing StartedEventId=27
at io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:193) ~[temporal-sdk-1.1.0.jar:?]
at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleEvent(ReplayWorkflowRunTaskHandler.java:140) ~[temporal-sdk-1.1.0.jar:?]
....
Caused by: java.lang.IllegalStateException: COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK doesn't match EVENT_TYPE_ACTIVITY_TASK_SCHEDULED with EventId=29
at io.temporal.internal.statemachines.WorkflowStateMachines.assertMatch(WorkflowStateMachines.java:786) ~[temporal-sdk-1.1.0.jar:?]
at io.temporal.internal.statemachines.WorkflowStateMachines.validateCommand(WorkflowStateMachines.java:737) ~[temporal-sdk-1.1.0.jar:?]
Using the debugger, I found that a mismatch in activityId when scheduling doActivity
is causing the replay fail. I don’t know what exactly is causing this mismatch.
event_id: 29
event_time {
seconds: 1636647841
nanos: 720000000
}
event_type: EVENT_TYPE_ACTIVITY_TASK_SCHEDULED
activity_task_scheduled_event_attributes {
activity_id: "2025c65e-6dbb-3a30-b702-089d75b32c0b"
activity_type {
name: "doActivity"
}
command_type: COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK
schedule_activity_task_command_attributes {
activity_id: "e9860613-8c28-311d-a218-607668da59b6"
activity_type {
name: "doActivity"
}
Given all this, I have a few questions.
- In reality,
dependency.doWork()
would never be fixed without deploying a new version of the workflow. Does that mean that this test I’ve written is not valid, as it isn’t testing a real scenario? - If the test I’ve written is testing a valid scenario, how can I fix either the test or workflow code to make the test pass reliably?
- If the test is not testing a valid scenario, what is the correct way to verify that I’ve written my error handling in a way that fixing non-activity code will not force the workflow to retry? Can this be accomplished in a JUnit test?