Hello,
I am using Springboot to write Temporal workflow as follows.
@SpringBootApplication
public class TemporalRestDemoApp {
public static void main(String[] args) {
ConfigurableApplicationContext appContext = SpringApplication.run(TemporalRestDemoApp.class, args);
WorkerFactory factory = appContext.getBean(WorkerFactory.class);
TransferActivity transferActivity = appContext.getBean(TransferActivity.class);
Worker worker = factory.newWorker(TransactionWorkflow.QUEUE_NAME);
worker.registerWorkflowImplementationTypes(TransactionWorkflowImpl.class);
worker.registerActivitiesImplementations(transferActivity);
factory.start();
/* WITHOUT SPRINGBOOT
// WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
// WorkflowClient client = WorkflowClient.newInstance(service);
// Worker factory is used to create Workers that poll specific Task Queues.
WorkerFactory factory = WorkerFactory.newInstance(client);
Worker worker = factory.newWorker(Shared.MONEY_TRANSFER_TASK_QUEUE);
// This Worker hosts both Workflow and Activity implementations.
// Workflows are stateful so a type is needed to create instances.
worker.registerWorkflowImplementationTypes(MoneyTransferWorkflowImpl.class);
// Activities are stateless and thread safe so a shared instance is used.
worker.registerActivitiesImplementations(new AccountActivityImpl());
// Start listening to the Task Queue.
factory.start();
*/
}
}
My Config class is as follows.
@Component
@Configuration
public class TemporalConfig {
@Value("${temporal.serviceAddress}")
private String temporalServiceAddress;
@Value("${temporal.namespace}")
private String temporalNamespace;
@Bean
public WorkflowServiceStubs workflowServiceStubs() {
return WorkflowServiceStubs
.newInstance(WorkflowServiceStubsOptions.newBuilder().setTarget(temporalServiceAddress).build());
}
@Bean
public WorkflowClient workflowClient(WorkflowServiceStubs workflowServiceStubs) {
return WorkflowClient.newInstance(workflowServiceStubs,
WorkflowClientOptions.newBuilder().setNamespace(temporalNamespace).build());
}
@Bean
public WorkerFactory workerFactory(WorkflowClient workflowClient) {
return WorkerFactory.newInstance(workflowClient);
}
@Bean
public TransferActivityImpl MoneyTransferActivity() {
return new TransferActivityImpl();
}
}
I am starting workflow using REST.
@RestController
public class AccountController {
@Autowired
TransferService transferService;
@PostMapping("/startWorkflow")
public void startWorkflow(@RequestBody TransferRequest request) {
transferService.startMoneyTransfer(request.getSenderAccountId(), request.getReceiverAccountId());
}
}
Inside workflow method, I am blocking execution so that workflow advances to the next level on getting signal.
public class TransactionWorkflowImpl implements TransactionWorkflow {
private final Account retAcct = new Account();
private final RetryOptions retryoptions = RetryOptions.newBuilder()
// .setInitialInterval(Duration.ofSeconds(5))
// .setMaximumInterval(Duration.ofSeconds(20))
// .setBackoffCoefficient(2)
.setMaximumAttempts(1)
.build();
private final ActivityOptions defaultActivityOptions = ActivityOptions.newBuilder()
// Timeout options specify when to automatically timeout Activities if the process is taking too long.
.setStartToCloseTimeout(Duration.ofSeconds(300))
// Optionally provide customized RetryOptions.
// Temporal retries failures by default, this is simply an example.
.setRetryOptions(retryoptions)
.build();
private final TransferActivity transferActivity = Workflow.newActivityStub(TransferActivity.class, defaultActivityOptions);
private final Saga.Options sagaOptions = new Saga.Options.Builder().setParallelCompensation(false).setContinueWithError(true).build();
private final Saga saga = new Saga(sagaOptions);
public boolean isTransferCompleted = false;
public boolean isAccountInfoRetrieved = false;
public boolean isTransactionCompleted = false;
private boolean iscustomerActivityRegistered = false;
private Boolean isBackupCompleted = false;
@Override
public void startAccountTransferWorkflow(long idSender, long receiverAccountId) {
try {
while(!this.isBackupCompleted) {
Workflow.await(() -> this.isBackupCompleted);
}
while(!this.isTransferCompleted) {
Workflow.await(() -> this.isTransferCompleted);
}
while(!this.iscustomerActivityRegistered) {
Workflow.await(() -> this.iscustomerActivityRegistered);
}
while(!this.isTransactionCompleted) {
Workflow.await(() -> this.isTransactionCompleted);
}
}
catch (Exception e) {
throw e;
}
}
Question # 1 : Do I need while loop around Workflow.await()? As per documentation, it supposed to block the current thread but in debugger ( after setting TEMPORAL_DEBUG=1 ), the workflow start method gets called multiple times.
I am writting customer’s activity in Cassandra but before writting, I am intentionally dropping the table to trigger saga compensation.
@Override
public void signalCustomerActivityRegistered(long idSender, long idReceiver, BigDecimal amount) {
try {
transferActivity.registerActivity(idSender, idReceiver, amount);
saga.addCompensation(transferActivity::registerFailedActivity, idSender, idReceiver, amount);
this.iscustomerActivityRegistered = true;
System.out.println("Signal - Customer activity registered.");
}
catch (ActivityFailure e) {
System.out.println("Failed to register customer activity. Execute saga compensation");
saga.compensate();
throw e;
}
catch(ApplicationFailure ex) {
System.out.println("Application Failure");
throw ex;
}
catch(WorkflowException we) {
System.out.println("\n Stack Trace:\n" + Throwables.getStackTraceAsString(we));
Throwable cause = Throwables.getRootCause(we);
System.out.println("\n Root cause: " + cause.getMessage());
throw we;
}
catch(Exception ge) {
ge.printStackTrace();
throw ge;
}
}
The saga compensation gets called here.
@Override
public void registerActivity(long idSender, long idReceiver, BigDecimal amount) {
try {
Account sender = accountRepository.findAccountById(idSender);
Account receiver = accountRepository.findAccountById(idReceiver);
String cActivity = "Sent " + amount.doubleValue() + " to " + receiver.getId() + " - " + receiver.getName();
customerRepository.save(new Customer((int) sender.getId(), sender.getName(), sender.getAmount().doubleValue(), Instant.now(), cActivity));
cActivity = "Received " + amount.doubleValue() + " from " + sender.getId() + " - " + sender.getName();
customerRepository.save(new Customer((int) receiver.getId(), receiver.getName(), receiver.getAmount().doubleValue(), Instant.now(), cActivity));
}
catch(Exception e) {
throw Activity.wrap(e); // Triggers saga compensation.
}
}
Question # 2: Even though retry count for activity is set to 1 and Saga option is set to true for continueWithError, the workflow is still retrying. Do I need to explicitly stop workflow? Is this the desired behavior?