Verifying error handling and fixing buggy code within Async::procedure promises

Context: I have a batch processing workflow that uses Async::procedure to execute a series of activities and non-activity methods against each item in the batch. I want to write a test that confirms my error handling is working as intended. I need to understand what will happen how the workflow will behave in different error scenarios.

I want to test if my workflow code correctly handles exceptions thrown inside of Async.procedure. Are specifically, I want to verify that if some of the non-activity code is buggy, I can fix the code without causing the workflow + activities to retry (replaying is fine). Here’s a very simplified version of the workflow. Please forgive any syntax errors in the code samples.

class MyWorkflow {
	
	public void doProcess(List<MyObject> input) {
        List<Promise<Void>> processingPromises = new ArrayList<>();

        for (MyObject obj : input) {
            // Using Temporal's Async makes replay occur in a consistent order.
            Promise<Void> promise = Async.procedure(this::processMyObject, obj);
            processingPromises.add(promise);
        }

        // Get each promise result one by one.
        for (Promise<Void> promise : processingPromises) {
            try {
                promise.get();
            } catch (TemporalFailure temporalFailure) {
                logger.error("processMyObject threw a TemporalFailure exception. Swallowing the exception to allow"
                    + " Workflow to continue processing other MyObject", temporalFailure);
            } catch (Exception exception) {
                logger.error("processMyObject threw a nonTemporalFailure exception. Propagating the exception to allow"
                    + " MyWorkflow to retry processing this MyObject by replaying the workflow.", exception);
                throw exception;
            }
        }
	}

	private void processMyObject(MyObject obj) {
		String apiResult = activity.callApis(obj)
		String doWorkResult = dependency.doWork(apiResult);
		activity.doActivity(obj, doWorkResult);
	}
}

I want to make sure my try/catch block around promise.get() works as expected. I’m trying to simulate the case where the dependency.doWork(...) is buggy. Here’s a simplified version of the test I wrote. It’s supposed to show that dependency.doWork(...), and then is fixed. I’ve read the Versioning reference of the SDK docs, but I am not sure if the use of versioning is required here. Even if it is, I do not know how to do workflow versioning in a JUnit test.

@Test
public void test() {
	MyDependency dependency = mock(MyDependency.class);
	// throw an exception on ONLY the first execution to cause a replay
	doThrow(new NullPointerException()).doReturn("result")
		.when(dependency).doWork();

	// Register workflow implementation with mocked dependency

	// Assertions...
}

The test fails intermittently due to the workflow not completing. In reality, the workflow is longer than the version I have above, so event_id=29 makes sense. The error I see is:

java.lang.RuntimeException: Failure processing workflow task. WorkflowId=3464846248, RunId=992d59e5-d9e2-4e50-b3b8-cce2f35fab9c, Attempt=2
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:349) ~[temporal-sdk-1.1.0.jar:?]
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:279) ~[temporal-sdk-1.1.0.jar:?]
	at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:79) ~[temporal-sdk-1.1.0.jar:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: io.temporal.internal.replay.InternalWorkflowTaskException: Failure handling event 29 of 'EVENT_TYPE_ACTIVITY_TASK_SCHEDULED' type. IsReplaying=true, PreviousStartedEventId=27, workflowTaskStartedEventId=36, Currently Processing StartedEventId=27
	at io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:193) ~[temporal-sdk-1.1.0.jar:?]
	at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleEvent(ReplayWorkflowRunTaskHandler.java:140) ~[temporal-sdk-1.1.0.jar:?]
	....
Caused by: java.lang.IllegalStateException: COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK doesn't match EVENT_TYPE_ACTIVITY_TASK_SCHEDULED with EventId=29
	at io.temporal.internal.statemachines.WorkflowStateMachines.assertMatch(WorkflowStateMachines.java:786) ~[temporal-sdk-1.1.0.jar:?]
	at io.temporal.internal.statemachines.WorkflowStateMachines.validateCommand(WorkflowStateMachines.java:737) ~[temporal-sdk-1.1.0.jar:?]

Using the debugger, I found that a mismatch in activityId when scheduling doActivity is causing the replay fail. I don’t know what exactly is causing this mismatch.

event_id: 29
event_time {
  seconds: 1636647841
  nanos: 720000000
}
event_type: EVENT_TYPE_ACTIVITY_TASK_SCHEDULED
activity_task_scheduled_event_attributes {
  activity_id: "2025c65e-6dbb-3a30-b702-089d75b32c0b"
  activity_type {
    name: "doActivity"
  }

command_type: COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK
schedule_activity_task_command_attributes {
  activity_id: "e9860613-8c28-311d-a218-607668da59b6"
  activity_type {
    name: "doActivity"
  }

Given all this, I have a few questions.

  1. In reality, dependency.doWork() would never be fixed without deploying a new version of the workflow. Does that mean that this test I’ve written is not valid, as it isn’t testing a real scenario?
  2. If the test I’ve written is testing a valid scenario, how can I fix either the test or workflow code to make the test pass reliably?
  3. If the test is not testing a valid scenario, what is the correct way to verify that I’ve written my error handling in a way that fixing non-activity code will not force the workflow to retry? Can this be accomplished in a JUnit test?

As far as the error goes, it seems as a non-deterministic issue in your workflow code.
See list of workflow impl constraints here. Another thing to look into is possible use of non-temporal libs in your workflow code that might depend on system time.
If the call of your dependency.doWork method is not deterministic (can produce random results), you might want to wrap it inside a SideEffect, or consider calling it inside an activity. Note that if you use SideEffect, the code called inside should not throw unchecked exceptions.

For versioning, you can change the activity impl without worrying about breaking determinism. You should be able to make changes your dependency code without versioning it in the workflow, if its executed inside the mentioned side effect block, or inside activity.
For testing version changes (determinism) best thing would be to use WorkflowReplayer. Idea is to get the json execution history of the unversioned workflow (via tctl for example) and replay it using WorkflowReplayer using a workflow that includes versioned changes.

To get the json exec history you can do for example:
tctl wf show -w <workflow_id> -r <workflow_run_id> --output_filename myrun.json

Regarding the thrown exceptions in workflow code, unknown exceptions thrown from workflow code are always retryable, meaning your workflow execution will not fail, but keep trying to replay the failed workflow task, waiting for a fix.
If your intention is to fail workflow execution in this case, you can make an exception (that does not extend TemporalFailure) non-retryable via WorkflowImplementationOptions.failWorkflowExceptionTypes, for example:

WorkflowImplementationOptions workflowImplementationOptions =
    WorkflowImplementationOptions.newBuilder()
        .setFailWorkflowExceptionTypes(IllegalArgumentException.class)
        .build();
workflowWorker.registerWorkflowImplementationTypes(
        workflowImplementationOptions, MyWorkflowImpl.class);

If you want to fail on any exception (rare case) set failWorkflowExceptionTypes to Throwable.class.

throws RuntimeException. In case of activity failure, after all the activity retries have been exhausted, the runtime exception, includes ApplicationFailure as its cause with the original exception and message in its message.

Note that you can also, in case of activity errors, catch ActivityFailure directly in your Async.procedure/function block, for example:

Promise<String> myPromise =
          Async.function(
              () -> {
                try {
                  return activities.doSomething();
                } catch (ActivityFailure e) {
                 // handle exception...

                 return "someDefault";
                }
              });

Another thing you can do is check if the promise has a failure before you call .get() on it, for example:

if (myPromise.getFailure() != null) {
       result1.getFailure();
...
}

where getFailure, again in case of activity failure, should be of type ActivityFailure.

As far as the error goes, it seems as a non-deterministic issue in your workflow code.

I agree, but I haven’t been able to track it down. That’s why I want to check if the test I wrote is “valid,” or if I have written a test for a situation that Temporal doesn’t allow. This mocking was supposed to simulate fixing buggy code, but I am thinking that perhaps it’s actually just adding non-deterministic behavior to the workflow. Can you confirm whether I’m successfully simulating fixing buggy code, or if I’m just adding non-determinism?

	MyDependency dependency = mock(MyDependency.class);
	// throw an exception on ONLY the first execution to cause a replay
	doThrow(new NullPointerException()).doReturn("result")
		.when(dependency).doWork();

Regarding the thrown exceptions in workflow code, unknown exceptions thrown from workflow code are always retryable, meaning your workflow execution will not fail, but keep trying to replay the failed workflow task, waiting for a fix.

This is the scenario I’m trying to figure out. I want to prove that I can fix buggy code in my workflow and that everything behaves as expected. In the examples I’ve seen in the docs for fixing buggy workflows, we need to restart the worker after the code is fixed. Can I do this somehow in a Junit test, or do I need to use tctl?

I’ve seen in the docs for fixing buggy workflows, we need to restart the worker after the code is fixed. Can I do this somehow in a Junit test, or do I need to use tctl?

Currently you cannot test versioning changes directly via TestWorkflowEnvironment until this enhancement has been added.
To test versioning best way atm is to use the mentioned WorkflowReplayer in your tests.

Can you confirm whether I’m successfully simulating fixing buggy code, or if I’m just adding non-determinism?

Writing a local test thats based on your provided code and will get back to you asap on this.

@nathan_shields Hey Nathan, it does look like it may be a bug in state machines / our threading model. But it’s hard to get and confirm just from reading the explanation, to make a full sense out of it I need a reproduction.
I can go and do my best to put it together myself based on the code snippets you provided. But is there any chance you can help me and make a simplified reproduction from your code and share with me ideally with a unit test that fails with this exception that you report?

BTW, if you had an updated java-sdk, you would see a mismatch in ActivityIds without debugging :wink: You would get this info right in the exception message.

Does not seem as dependency is causing non-deterministic behavior. Assumption is that “doWork” does not use any workflow apis like invoke activities/child workflows etc.
Maybe it’s worth doing through your whole code to see what might be causing the IllegalStateException.

I can go and do my best to put it together myself based on the code snippets you provided. But is there any chance you can help me and make a simplified reproduction from your code and share with me ideally with a unit test that fails with this exception that you report?

After a bit more research, I’ve learned that this test will only fail when another test class in my repo is run before it. The classes test different Workflows, which use some of the same activities. They are both using TestWorkflowEnvironment. I’ll try to narrow down the issue. Until then, I don’t think I’ll be able to provide a useful code sample.

@spikhalskiy @tihomir I managed to make a sample that reproduces the issue. The repository is absolutely hideous, but it’s all I had time for: GitHub - neshield/temporal-break-replay-history

See the README.md instructions: In intelliJ, select run all the test in the "retries" directory. It should fail around 5% of the time. I tried removing some of the tests in the retries directory, but it either prevented the issue from appearing, or made it happen less frequently. So, the best I could do is the set of 4 tests that need to be run over and over until they eventually fail.

Thanks for the reproducer! Went through your code and here are some initial things:

  1. I think injecting your dependencies inside a workflow using WorkflowImplementationFactory is dangerous and should be avoided if possible (you can inject anything you want inside your activities code). Reason is that you have to worry if your injected code has some shared mutable state, as well as you have to make sure that if you change your dependency code (since it uses workflow methods), those changes have to be backwards compatible .

  2. You are actually breaking determinism inside your test with for example:


when(subDependency.doSubWork(any())).thenThrow(new NullPointerException()).thenReturn("val");

If you make your changes to these depends, you need to version them. This comes back to point 1. regarding being backwards compatible. Probably makes more sense not to inject them into your workflow, but just instantiate them inside the workflow code.

Overall I think suggestion is not to inject dependencies. It can work (and in your case I think that MyDependency and MySubDependency “doWork” and “doSubWork” methods are fine to inject), but it just introduces some possible weird and edge cases issues, and you just have to worry about many things that can be avoided if you just write all this inside workflow code if possible (or inside activities).

We will keep looking and provide more info when possible, but yeah for now I think would be better to not use injection via addWorkflowImplementationFactory if possible.

I finally was able to dig out what’s going on here and it looks like it’s a bug and a problem in our abstractions or state machines. I filed it here Exception in one of several workflow async function leads to non-deterministic execution · Issue #902 · temporalio/sdk-java · GitHub
with the explanation of what’s going on.

Sorry for the delay, it took a lot of effort to reduce the reproduction to something that I was able to comprehend and handle and materialize in a stable reproduction.

Thanks @nathan_shields for your report!

Awesome! A question about the workaround:

Workaround

  1. Using Promise.allOf to wait on all the promises doesn’t allow the exception to be “ignored” because we wait only on the one promise at a time. Has to be implemented correctly in this way before the problem actually happened and the incorrect history is already produced.
  2. Promoting all exceptions happening in Async functions/procedures to Errors solves this by immediately failing the workflow task. This has to be done before the workflow execution and before the incompatible history is already produced.

The only solution if the problem already manifested itself is history reset + one of the workarounds to prevent it from happening again.

Method 1: This causes any exception in one of the promises to halt execution for all of the promises, right?

Method 2: What does this mean? Is there some sort of Error class to use?