Exception in Saga compensation and multiple retries even though retry count is 1

Hello,

I am using Springboot to write Temporal workflow as follows.

@SpringBootApplication
public class TemporalRestDemoApp {
    public static void main(String[] args) {

        ConfigurableApplicationContext appContext = SpringApplication.run(TemporalRestDemoApp.class, args);
        WorkerFactory factory = appContext.getBean(WorkerFactory.class);
        TransferActivity transferActivity = appContext.getBean(TransferActivity.class);
        Worker worker = factory.newWorker(TransactionWorkflow.QUEUE_NAME);
        worker.registerWorkflowImplementationTypes(TransactionWorkflowImpl.class);
        worker.registerActivitiesImplementations(transferActivity);
        factory.start();


        /* WITHOUT SPRINGBOOT
       // WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
       // WorkflowClient client = WorkflowClient.newInstance(service);
        // Worker factory is used to create Workers that poll specific Task Queues.
        WorkerFactory factory = WorkerFactory.newInstance(client);
        Worker worker = factory.newWorker(Shared.MONEY_TRANSFER_TASK_QUEUE);
        // This Worker hosts both Workflow and Activity implementations.
        // Workflows are stateful so a type is needed to create instances.
        worker.registerWorkflowImplementationTypes(MoneyTransferWorkflowImpl.class);
        // Activities are stateless and thread safe so a shared instance is used.
        worker.registerActivitiesImplementations(new AccountActivityImpl());
        // Start listening to the Task Queue.
        factory.start();

         */
    }
}

My Config class is as follows.

@Component
@Configuration
public class TemporalConfig {

    @Value("${temporal.serviceAddress}")
    private String temporalServiceAddress;

    @Value("${temporal.namespace}")
    private String temporalNamespace;

    @Bean
    public WorkflowServiceStubs workflowServiceStubs() {
        return WorkflowServiceStubs
                .newInstance(WorkflowServiceStubsOptions.newBuilder().setTarget(temporalServiceAddress).build());
    }

    @Bean
    public WorkflowClient workflowClient(WorkflowServiceStubs workflowServiceStubs) {
        return WorkflowClient.newInstance(workflowServiceStubs,
                WorkflowClientOptions.newBuilder().setNamespace(temporalNamespace).build());
    }

    @Bean
    public WorkerFactory workerFactory(WorkflowClient workflowClient) {
        return WorkerFactory.newInstance(workflowClient);
    }

    @Bean
    public TransferActivityImpl MoneyTransferActivity() {

        return new TransferActivityImpl();
    }

}

I am starting workflow using REST.

@RestController
public class AccountController {

    @Autowired
    TransferService transferService;


    @PostMapping("/startWorkflow")
    public void startWorkflow(@RequestBody TransferRequest request) {

        transferService.startMoneyTransfer(request.getSenderAccountId(), request.getReceiverAccountId());
    }
}

Inside workflow method, I am blocking execution so that workflow advances to the next level on getting signal.

public class TransactionWorkflowImpl implements TransactionWorkflow {

    private final Account retAcct = new Account();
    private final RetryOptions retryoptions = RetryOptions.newBuilder()
          //  .setInitialInterval(Duration.ofSeconds(5))
          //  .setMaximumInterval(Duration.ofSeconds(20))
          //  .setBackoffCoefficient(2)
            .setMaximumAttempts(1)
            .build();
    private final ActivityOptions defaultActivityOptions = ActivityOptions.newBuilder()
            // Timeout options specify when to automatically timeout Activities if the process is taking too long.
            .setStartToCloseTimeout(Duration.ofSeconds(300))
            // Optionally provide customized RetryOptions.
            // Temporal retries failures by default, this is simply an example.
            .setRetryOptions(retryoptions)
            .build();



    private final TransferActivity transferActivity = Workflow.newActivityStub(TransferActivity.class, defaultActivityOptions);

    private final Saga.Options sagaOptions = new Saga.Options.Builder().setParallelCompensation(false).setContinueWithError(true).build();
    private final Saga saga = new Saga(sagaOptions);

    public boolean isTransferCompleted = false;

    public boolean isAccountInfoRetrieved = false;

    public boolean isTransactionCompleted = false;
    private boolean iscustomerActivityRegistered = false;
    private Boolean isBackupCompleted = false;

   
    @Override
    public void startAccountTransferWorkflow(long idSender, long receiverAccountId) {

        try {

            while(!this.isBackupCompleted) {
                Workflow.await(() -> this.isBackupCompleted);
            }

           while(!this.isTransferCompleted) {
                Workflow.await(() -> this.isTransferCompleted);
            }

          
            while(!this.iscustomerActivityRegistered) {
                Workflow.await(() -> this.iscustomerActivityRegistered);
            }

            while(!this.isTransactionCompleted) {
                Workflow.await(() -> this.isTransactionCompleted);
            }
     
        }
        catch (Exception e) {
            throw e;
        }
    }

Question # 1 : Do I need while loop around Workflow.await()? As per documentation, it supposed to block the current thread but in debugger ( after setting TEMPORAL_DEBUG=1 ), the workflow start method gets called multiple times.

I am writting customer’s activity in Cassandra but before writting, I am intentionally dropping the table to trigger saga compensation.


 @Override
    public void signalCustomerActivityRegistered(long idSender, long idReceiver, BigDecimal amount) {
        try {
             transferActivity.registerActivity(idSender, idReceiver, amount);
            saga.addCompensation(transferActivity::registerFailedActivity, idSender, idReceiver, amount);
            this.iscustomerActivityRegistered = true;
            System.out.println("Signal - Customer activity registered.");
        }
        catch (ActivityFailure e) {
            System.out.println("Failed to register customer activity. Execute saga compensation");
            saga.compensate();
            throw e;
        }
        catch(ApplicationFailure ex) {
            System.out.println("Application Failure");
            throw ex;
        }
        catch(WorkflowException we) {
            System.out.println("\n Stack Trace:\n" + Throwables.getStackTraceAsString(we));
            Throwable cause = Throwables.getRootCause(we);
            System.out.println("\n Root cause: " + cause.getMessage());
            throw we;
        }
        catch(Exception ge) {
            ge.printStackTrace();
            throw ge;
        }

    }

The saga compensation gets called here.

@Override
    public void registerActivity(long idSender, long idReceiver, BigDecimal amount) {
        try {
            Account sender = accountRepository.findAccountById(idSender);
            Account receiver = accountRepository.findAccountById(idReceiver);

            String cActivity = "Sent " + amount.doubleValue() + " to " + receiver.getId() + " - " + receiver.getName();
            customerRepository.save(new Customer((int) sender.getId(), sender.getName(), sender.getAmount().doubleValue(), Instant.now(), cActivity));

            cActivity = "Received " + amount.doubleValue() + " from " + sender.getId() + " - " + sender.getName();
            customerRepository.save(new Customer((int) receiver.getId(), receiver.getName(), receiver.getAmount().doubleValue(), Instant.now(), cActivity));
        }
        catch(Exception e) {
        
            throw Activity.wrap(e);  // Triggers saga compensation.
             }
    }

Question # 2: Even though retry count for activity is set to 1 and Saga option is set to true for continueWithError, the workflow is still retrying. Do I need to explicitly stop workflow? Is this the desired behavior?

Q1. No you don’t need the while loop around workflow.await, it’s a blocking call until the unblock condition evaluates to true.

What calls the signalCustomerActivityRegistered method? Seems its not part of your workflow code.

It gets called through REST.

@PostMapping("/transfer")
    public void transferMoney(@RequestBody TransferRequest request) {
        transferService.transferMoney(request.getSenderAccountId(), request.getReceiverAccountId(), request.getAmount());
    }

The transferService method is as follows.

public void transferMoney(long idSender,
                              long idReceiver,
                              BigDecimal amount) {

        TransactionWorkflow bkworkflow = workflowClient.newWorkflowStub(TransactionWorkflow.class, "MoneyTransfer_" + idSender + "_" + idReceiver);
        try {
            bkworkflow.signalBackupCompleted(idSender, idReceiver);
        } catch (ApplicationFailure ex) {
            System.out.println("Application Failure");
            throw ex;
        } catch (WorkflowException we) {
            System.out.println("\n Stack Trace:\n" + Throwables.getStackTraceAsString(we));
            Throwable cause = Throwables.getRootCause(we);
            System.out.println("\n Root cause: " + cause.getMessage());
            throw we;
        } catch (Exception ge) {
            ge.printStackTrace();
            throw ge;
        }

        TransactionWorkflow workflow = workflowClient.newWorkflowStub(TransactionWorkflow.class, "MoneyTransfer_" + idSender + "_" + idReceiver);
        try {
            workflow.signalTransferCompleted(idSender, idReceiver, amount);
        }
        catch(ApplicationFailure ex) {
            System.out.println("Application Failure");
            throw ex;
        }
        catch(WorkflowException we) {
            System.out.println("\n Stack Trace:\n" + Throwables.getStackTraceAsString(we));
            Throwable cause = Throwables.getRootCause(we);
            System.out.println("\n Root cause: " + cause.getMessage());
            throw we;
        }
        catch(Exception ge) {
            ge.printStackTrace();
            throw ge;
        }

        TransactionWorkflow newWorkflow = workflowClient.newWorkflowStub(TransactionWorkflow.class, "MoneyTransfer_" + idSender + "_" + idReceiver);
        try {
            newWorkflow.signalCustomerActivityRegistered(idSender, idReceiver, amount);
        }
        catch(ApplicationFailure ex) {
            System.out.println("Application Failure");
            throw ex;
        }
        catch(WorkflowException we) {
            System.out.println("\n Stack Trace:\n" + Throwables.getStackTraceAsString(we));
            Throwable cause = Throwables.getRootCause(we);
            System.out.println("\n Root cause: " + cause.getMessage());
            throw we;
        }
        catch(Exception ge) {
            ge.printStackTrace();
            throw ge;
        }
    }

Ok, so /startWorkflow starts the workflow invocation, and /transfer sends the signals.

Even tho you can call activities in your signal method, it’s not really recommended, best to have all your logic (and error handling logic) in workflow method code.

you could rewrite your signalCustomerActivityRegistered (and the other signal methods)
to just set the data:

public void signalCustomerActivityRegistered(long idSender, long idReceiver, BigDecimal amount) {
  this.idSender = idSender;
  this.idReceiver = idReceiver;
  this.amount = amount;
  this.registerCustomerActivity = true;
}

and the associated workflow method code could be:

Workflow.await(() -> registerCustomerActivity);
saga.addCompensation(transferActivity::registerFailedActivity, idSender, idReceiver, amount);
try {
   transferActivity.registerActivity(idSender, idReceiver, amount);
 } catch (ActivityFailure failure) {
 saga.compensate();
}

let’s start with that and then see what your workflow code looks like.

The following code doesn’t make sense. As addCompensation will not be called if registerActivity throws.

        try {
             transferActivity.registerActivity(idSender, idReceiver, amount);
            saga.addCompensation(transferActivity::registerFailedActivity, idSender, idReceiver, amount);
            this.iscustomerActivityRegistered = true;
            System.out.println("Signal - Customer activity registered.");
        }
        catch (ActivityFailure e) {
            System.out.println("Failed to register customer activity. Execute saga compensation");
            saga.compensate();
            throw e;
        }

I agree. I had changed in my demo code. The saga documentation also shows saga.addCompensation after calling activity method.

https://www.javadoc.io/static/io.temporal/temporal-sdk/1.0.0/io/temporal/workflow/Saga.html

If activity throws an exception then saga.addCompensation will never get called and on calling saga.compensate, the list of functions may be empty.

I think the code should be as follows.


public void signalCustomerActivityRegistered() {
    this.registerCustomerActivity = true;
}


Workflow.await(() -> registerCustomerActivity);
saga.addCompensation(transferActivity::registerFailedActivity, idSender, idReceiver, amount);
try {
   transferActivity.registerActivity(idSender, idReceiver, amount);
   signalCustomerActivityRegistered();
 } catch (ActivityFailure failure) {
 saga.compensate();
}

The problem is if I make code changes like that then on just one REST call, entire workflow will get executed.

I start workflow as follows.

curl -X POST -H “content-type:application/json” -d ‘{“senderAccountId”: 2, “receiverAccountId”: 1}’ http://localhost:9070/startWorkflow

Later I initiate transfer as follows.

curl -X POST -H “content-type:application/json” -d ‘{“senderAccountId”: 2, “receiverAccountId”: 1, “amount”: 400}’ http://localhost:9070/transfer

Please note that amount gets added in transfer not while starting workflow. Due to such separation of the activity calls, I have to call activities within signal method.

The problem is if I make code changes like that then on just one REST call, entire workflow will get executed.

You can wait for multiple conditions in Workflow.await, for example:

Workflow.await(() -> a || b || c);

if your workflow is expected to handle multiple transfers, you could wrap your signal handling logic in while loop. Just note the docs What is a Temporal Workflow? | Temporal Documentation regarding long-running workflows and event history size.

Sure I can do that. The saga compensate triggers but the activity never recovers. If the workflow was running and I intentionally shut down mysql or Cassandra to test Saga compensation activity, the connection never gets re-established on restarting the database. It works only when I I terminate workflow and start again. Also while workflow was running and I make any code changes and execute jar again, the new changes never gets picked up. It seems the previous jar information gets stored by Temporal. I assume the only way it can work if I introduce workflow versioning. Is my understanding correct? Please let me know how do I recover failed activity while workflow is running. I assume Spring retry is not working properly but needed confirmation from Temporal side too.

Can you give more info on this? Do you mean the compensation activity does not execute or finish execution? Is it failing due to some errors?

Also while workflow was running and I make any code changes and execute jar again, the new changes never gets picked up. It seems the previous jar information gets stored by Temporal. I assume the only way it can work if I introduce workflow versioning.

You need to use versioning depending on your workflow code changes. For example anything that alters wf history (adding/removing activity invocations for example) would require you to version those changes. You can change the activity impl to fix an error without versioning (note any updates versioned or not require worker restart)

The entire demo code url is GitHub - mehul40/TemporalSagaWithRESTDemo: temporal.io support for Saga along with REST demo.

The problem is while testing negative scenarios to trigger saga compensate(), if I stop database and restart or just drop table and re-create, the running workflow can never recognize changes. I have to terminate workflow then only it reconnects to database OR recognizes the new table.

My guess is that your Spring datasource is not handling the reconnect, and not something to do with Temporal.
Also if that reconnect does work or you get it to work, you seem to set your activity retry max to 1, which means that your activities where you use the datasource are not retrying, so maybe that explains why you experience what you describe in tests.

There seems to me more things that you could improve, especially around error handling. Note that a workflow does not fail on intermittent errors such as NPE, but replays that workflow task, waiting for a fix.

If you want it to fail let’s say on a NPE you can set

WorkflowImplementationOptions workflowImplementationOptions =
                WorkflowImplementationOptions.newBuilder()
                        .setFailWorkflowExceptionTypes(NullPointerException.class)
                        .build();

and set that when you register your workflow impl class with worker.
If you want your workflow to fail on all intermittent exceptions (not extending TemporalFailure), which should be a really rare case, you can set setFailWorkflowExceptionTypes to Throwable.

Also the provided code calls activities inside signal methods, which i think could be moved to your main workflow method code.

I wanted to trigger Saga compensation so I kept retry count to 1 just like in your saga example code. After changing retry count to 5 or 10, I noticed that retry works. If activity fails after MAX RETRY then saga compensation triggers. It reverts back changes in the data. I want execution to be synchronous so I removed code from signal method. The updated code is on git at GitHub - mehul40/TemporalSagaWithRESTDemo: temporal.io support for Saga along with REST demo.

I have few questions.

  1. Saga compensation can also fail so I think the saga.compensate() in the catch block should be called with another try catch block.

  2. If we get saga.CompensationException(…) should we terminate workflow with Failed status?

  3. If there are new changes done like increasing retry count and a jar has been re-created, does temporal ignore new changes if workflow is already running?

(1,2) Yes, activity you register with saga could fail. For any exception that does not extend TemporalFailure workflow gets blocked and does not fail.

If you do want to fail on Saga$CompensationException (which does not extend TemporalFailure) you can
either catch it and re-throw it as ApplicationFailure
or add it to setFailWorkflowExceptionTypes as shown in previous response.

(3) You would need to restart your workers to pick up any new changes to workflow and activity impls registered with it. You should be able to make changes to ActivityOptions without having to version them. Note that workflow executions that are already past the point where you made a change will not pick it up, but any executions that aren’t will.

I noticed that after saga compensation, workflow fails if saga compensation activity fails. For production, failed workflow is not a good option. There should be a way to set retry option for happy path activity and different retry option for saga compensation activity so that saga compensation activity can be retried either forever or for a very long time. If database changes cannot be rolled back then reports will have error.
Also noticed that below code had no effect.

WorkflowImplementationOptions workflowImplementationOptions =
WorkflowImplementationOptions.newBuilder()
.setFailWorkflowExceptionTypes(Saga.CompensationException.class)
.build();
worker.registerWorkflowImplementationTypes(workflowImplementationOptions, MoneyTransferWorkflowImpl.class);

Also there is no way to set multiple workflow exception types.

(3) I didn’t understand how to restart workflow? By downloading history and using it with worker?

I assume there is a way to download history in JSON format from web UI and can be replayed using worker.replayHistory(…).

I noticed that after saga compensation, workflow fails if saga compensation activity fails.

That should not be the case. Activity retries based on its retry policy, regardless if it’s part of compensation or not. Can you show your code where you see that happening?

There should be a way to set retry option for happy path activity and different retry option for saga compensation activity so that saga compensation activity can be retried either forever or for a very long time.

You can use WorkflowImplementationOptions->setActivityOptions that allows you to set individual activity options per activity type, for example:

Map<String, ActivityOptions> activityOptionsMap = new HashMap<>();
activityOptionsMap.put("myActivity", ActivityOptions.newBuilder().build());
activityOptionsMap.put("myActivityCompensation", ActivityOptions.newBuilder().build());

WorkflowImplementationOptions options =
                WorkflowImplementationOptions.newBuilder()
                        .setActivityOptions(activityOptionsMap)
                        .build();

worker.registerWorkflowImplementationTypes(options, MyWorkflowImpl.class);

This way you don’t have to set single ActivityOptions in your workflow code.

Also noticed that below code had no effect…

It should fail workflow on Saga.CompensationException. if in your Saga you set: setParallelCompensation(true).
For sequential compensation you would need to set the actual exception (for example NullPointerException) in setFailWorkflowExceptionTypes.

Also there is no way to set multiple workflow exception types.

setFailWorkflowExceptionTypes has a variable number of arguments, so you can pass in more than one.

I didn’t understand how to restart workflow? By downloading history and using it with worker?

You can terminate the previous run and start it again.
Another option is to use tctl (cli) to reset the workflow to its first workflow task.
You can also use ResetWorkflowExecutionRequest as shown in this forum post to reset to a particular event id.

I assume there is a way to download history in JSON format from web UI and can be replayed using worker.replayHistory(…).

Yes, use tctl, for example:

tctl wf show -w <workflow_id> -r <run_id> --output_filename myhistory.json

(note you should use java sdk 1.4.0 or greater in order for the tctl json output to be replayable with WorkflowReplayer)

I am passing map while creating activity instance as follows but still default activity options gets used instead of merged options.

private final RetryOptions defaultRetryOptions = RetryOptions.newBuilder().setMaximumAttempts(3).build();
private final RetryOptions happyPathRetryOptions = RetryOptions.newBuilder().setMaximumAttempts(5).build();
private final RetryOptions sagaRetryOptions = RetryOptions.newBuilder().setMaximumAttempts(1000).build();

private final ActivityOptions defaultActivityOptions = ActivityOptions.newBuilder()
        .setStartToCloseTimeout(Duration.ofSeconds(120))
        .setRetryOptions(defaultRetryOptions).build();

private final ActivityOptions happyPathActivityOptions = ActivityOptions.newBuilder()
                                                       .setStartToCloseTimeout(Duration.ofSeconds(120))
                                                       .setRetryOptions(happyPathRetryOptions).build();
private final ActivityOptions sagaCompActivityOptions = ActivityOptions.newBuilder()
                                                        .setStartToCloseTimeout(Duration.ofSeconds(120))
                                                        .setRetryOptions(sagaRetryOptions).build();

// private final MoneyTransferActivity moneyTransferActivity = Workflow.newActivityStub(
// MoneyTransferActivity.class, defaultActivityOptions);

private final MoneyTransferActivity moneyTransferActivity = BuildActivity();

private MoneyTransferActivity BuildActivity() {
Map<String, ActivityOptions> activityOptionsMap = new HashMap<>();
activityOptionsMap.put(“cancelTransfer”, sagaCompActivityOptions);
activityOptionsMap.put(“registerFailedTransaction”, sagaCompActivityOptions);
activityOptionsMap.put(“registerTransactionActivity”, happyPathActivityOptions);
activityOptionsMap.put(“getCustomerAccountDetails”, happyPathActivityOptions);
activityOptionsMap.put(“initiateTransfer”, happyPathActivityOptions);

    return Workflow.newActivityStub(
        MoneyTransferActivity.class,
        defaultActivityOptions,
        activityOptionsMap);
}