Workflow Retry - Workflow should skip Activities which are successful in previous run

Hello Gurus,

I have a situation where in I want to trigger entire workflow at the same time I want it to skip those activities which are successful and run only those activities which are failed and never run.

Please suggest how to achieve

Have you considered not failing workflow at all? Just keep retrying activities until they are fixed.

@maxim - pls pardon me if the questions are dump - I m trying to understand the temporal workflow behavior.

Basically I have retry policy for activities in place. But even after retrials of activity that is failing - fails - obviously we need to let the Workflow fail. That is when I want to retry policy for work flow in place where in I want to specify there to skip the activities which were succeeded in the previous run.

But @maxim can you pls tell me - if this is possible at all.

I guess this is possible only if the workflow and activity are designed to be stateful - meaning holding the state from the previous run.

can I make workflow and activity stateful. is it a good practice in the first place to make it stateful?

The simplest solution is to retry the activity forever as you expected it to be eventually fixed, and the duration of retries in Temporal is not limited. Once fixed the workflow continues without any additional intervention.

If you still want to fail an activity, Temporal allows resetting workflow to some past state, which means that all activities after the reset point will be reexecuted. But it doesn’t support reexecuting some activities and then skipping any that follow them.

@maxim . Even I am facing the similar issue . I cant retry for infinite as each downstream call has a associated cost with it .
I am ok with reseting it manually , lets say workflow has total 10 activities . only issue is if I reset the temporal manually to step 4 , which is always : initial step.
Post reset, Workflow failed on 5th activity . I loose my check point , as on reseted workflow it give step 4 as failed ( reseted from UI)
Now I want to retry it from 5th activity to 10th activity .

reset check point 4 is failed , so setting the reset to any step after that is not going to work .
If I go to initial workflow , reset it back to 4 . its going to trigger all the activities.

Any solution for this ?


This is post reset

Where should I reset my reseted workflow ?

plus If I try to reset it to lets say 13 . Its failing with under given Error

{
“message”: “Failure handling event 7 of type ‘EVENT_TYPE_WORKFLOW_TASK_COMPLETED’ during replay. {PreviousStartedEventId=18, WorkflowTaskStartedEventId=31, CurrentStartedEventId=6}”,
“source”: “JavaSDK”,
“stackTrace”: “io.temporal.internal.statemachines.WorkflowStateMachines.createEventProcessingException(WorkflowStateMachines.java:250)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:229)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:203)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:173)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:143)\nio.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithQuery(ReplayWorkflowTaskHandler.java:128)\nio.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:97)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handleTask(WorkflowWorker.java:286)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:231)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:173)\nio.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:93)\njava.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\njava.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\njava.base/java.lang.Thread.run(Thread.java:840)\n”,
“cause”: {
“message”: “WorkflowTask: failure executing STARTED->WORKFLOW_TASK_COMPLETED, transition history is [CREATED->WORKFLOW_TASK_SCHEDULED, SCHEDULED->WORKFLOW_TASK_STARTED]”,
“source”: “JavaSDK”,
“stackTrace”: “io.temporal.internal.statemachines.StateMachine.executeTransition(StateMachine.java:151)\nio.temporal.internal.statemachines.StateMachine.handleHistoryEvent(StateMachine.java:101)\nio.temporal.internal.statemachines.EntityStateMachineBase.handleEvent(EntityStateMachineBase.java:67)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleSingleEvent(WorkflowStateMachines.java:263)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:227)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:203)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:173)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:143)\nio.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithQuery(ReplayWorkflowTaskHandler.java:128)\nio.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:97)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handleTask(WorkflowWorker.java:286)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:231)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:173)\nio.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:93)\njava.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\njava.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\njava.base/java.lang.Thread.run(Thread.java:840)\n”,
“cause”: {
“message”: “Operation allowed only while eventLoop is running”,
“source”: “JavaSDK”,
“stackTrace”: “io.temporal.internal.statemachines.WorkflowStateMachines.checkEventLoopExecuting(WorkflowStateMachines.java:1042)\nio.temporal.internal.statemachines.WorkflowStateMachines.randomUUID(WorkflowStateMachines.java:702)\nio.temporal.internal.replay.ReplayWorkflowContextImpl.scheduleActivityTask(ReplayWorkflowContextImpl.java:211)\nio.temporal.internal.sync.SyncWorkflowContext.executeActivityOnce(SyncWorkflowContext.java:250)\nio.temporal.internal.sync.SyncWorkflowContext.executeActivity(SyncWorkflowContext.java:235)\nio.temporal.internal.sync.ActivityStubImpl.executeAsync(ActivityStubImpl.java:49)\nio.temporal.internal.sync.ActivityStubBase.execute(ActivityStubBase.java:38)\nio.temporal.internal.sync.ActivityInvocationHandler.lambda$getActivityFunc$0(ActivityInvocationHandler.java:77)\nio.temporal.internal.sync.ActivityInvocationHandlerBase.invoke(ActivityInvocationHandlerBase.java:70)\njdk.proxy2/jdk.proxy2.$Proxy152.activateCustomerDetailsInBcif(Unknown Source)\ncom.kotak.unified.currentAccount.temporal.workflow.workflowExecutors.Impls.INAssistedFullAccountPropagationImpl.startAccountPropagationWorkflow(INAssistedFullAccountPropagationImpl.java:65)\ncom.kotak.unified.currentAccount.temporal.workflow.workflowImpls.AccountPropagationWorkflowImpl.startAccountPropagationWorkflow(AccountPropagationWorkflowImpl.java:17)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\njava.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.base/java.lang.reflect.Method.invoke(Method.java:568)\nio.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation$RootWorkflowInboundCallsInterceptor.execute(POJOWorkflowImplementationFactory.java:309)\nio.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation.execute(POJOWorkflowImplementationFactory.java:284)\nio.temporal.internal.sync.WorkflowExecuteRunnable.run(WorkflowExecuteRunnable.java:67)\nio.temporal.internal.sync.SyncWorkflow.lambda$start$0(SyncWorkflow.java:136)\nio.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:101)\nio.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper.run(WorkflowThreadImpl.java:107)\nio.temporal.worker.WorkerFactory.lambda$newWorkflowThreadExecutor$7(WorkerFactory.java:411)\njava.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\njava.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\njava.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\njava.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\njava.base/java.lang.Thread.run(Thread.java:840)\n”,
“applicationFailureInfo”: {
“type”: “java.lang.IllegalStateException”
}
},
“applicationFailureInfo”: {
“type”: “java.lang.RuntimeException”
}
},
“applicationFailureInfo”: {
“type”: “io.temporal.internal.replay.InternalWorkflowTaskException”
}
}

post this error , every workflow start to fail with similar error .
FYI : temporal jdk 1.12.0 , and not using common stubs for activities .

An alternative solution to this problem is not to retry activity forever, but after a few retries, pause it s execution (possibly updating a relevant search attribute) and wait for a signal that would indicate how to proceed.

The “Retry on Signal Interceptor” example demonstrates an interceptor that implements this pattern.

Will this Retry on signal work properly when you have multiple child workflows and few activities .
Below is the snippet of workflow

    public String startAccountPropagationWorkflow(String leadTrackingNumber,
                                                  StartPropagationRequest.JourneyTypeEnum journeyTypeEnum,
                                                  Boolean isPreferred, String downstreamTrackingNumber) throws Exception {

        log.info("Starting IN Assisted Full Account Propagation workflow");

        Promise<Void> filenetChildWorkflow = null;
        filenetChildWorkflow = Async.procedure(() -> {
            try {
                INFilenetChildWorkflow inFilenetChildWorkflowClient = Workflow.newChildWorkflowStub(INFilenetChildWorkflow.class,
                        ChildWorkflowOptions.newBuilder().setWorkflowId(FILENET_ACCOUNT_CHILD_WORKFLOW_PREFIX + downstreamTrackingNumber)
                                .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
                                .build());
                inFilenetChildWorkflowClient.runINFilenetChildWorkflow(leadTrackingNumber, downstreamTrackingNumber);
            } catch (Exception e) {
                log.error("Error during execution of FilenetChildWorkflow for leadId: {}, with exception: {} ", leadTrackingNumber, e.getMessage());
            }
        });

        crnActivationActivity.activateCustomerDetailsInBcif(leadTrackingNumber, downstreamTrackingNumber,journeyTypeEnum);
        log.info("crnActivationActivity  successfully completed for lead id: {}", leadTrackingNumber);

        finacleUpdateAccountOpeningDateActivity.updateAccountOpeningDate(leadTrackingNumber);
        log.info("FinacleUpdateAccountOpeningDate Activity successfully completed for lead id: {}", leadTrackingNumber);

        if (!isPreferred) {
            finacleUpdateBranchActivity.updateBranch(leadTrackingNumber);
            log.info("FinacleUpdateBranch Activity successfully completed for lead id: {}", leadTrackingNumber);
        }

        Workflow.sleep(Duration.ofMinutes(WF_SLEEP_TIME_SCHEME_CODE));
        finacleUpdateSchemeCodeActivity.updateSchemeCode(leadTrackingNumber);
        log.info("FinacleUpdateSchemeCodeActivity successfully completed for lead id: {}", leadTrackingNumber);

        finacleCAModificationActivity.makeCAModification(leadTrackingNumber, downstreamTrackingNumber);
        log.info("CAMODActivity successfully completed for lead id: {}", leadTrackingNumber);

        log.info("All Apis will be triggered in parallel from here onwards");
        Promise<Void> addSignatureChildWorkflow = null;
        addSignatureChildWorkflow = Async.procedure(() -> {
            try {

                AddSignatureChildWorkflow addSignatureChildWorkflowClient = Workflow.newChildWorkflowStub(AddSignatureChildWorkflow.class,
                        ChildWorkflowOptions.newBuilder().setWorkflowId(ADD_SIGNATURE_CHILD_WORKFLOW_PREFIX + downstreamTrackingNumber).build());
                addSignatureChildWorkflowClient.runAddSignatureChildWorkflow(leadTrackingNumber,downstreamTrackingNumber);
            } catch (Exception e) {
                log.error("AddSignatureChildWorkflow failed for leadid : {} , with exception {} ",leadTrackingNumber,e.getMessage());
            }
        });

        Promise<Void> unFreezeAccountChildWorkflow = null;
        unFreezeAccountChildWorkflow = Async.procedure(() -> {
            try {

                UnfreezeAccountChildWorkflow unfreezeAccountChildWorkflowClient = Workflow.newChildWorkflowStub(UnfreezeAccountChildWorkflow.class,
                        ChildWorkflowOptions.newBuilder().setWorkflowId(UN_FREEZE_ACCOUNT_CHILD_WORKFLOW_PREFIX + downstreamTrackingNumber).build());
                unfreezeAccountChildWorkflowClient.runUnfreezeAccountChildWorkflow(leadTrackingNumber);
            } catch (Exception e) {
                log.error("Error during execution of AccountEnquiryPropagationWorkflow for leadid : {} , with exception {} ",leadTrackingNumber,e.getMessage());
            }
        });

        Promise<Void> accountInquiryChildWorkflow = null;
        accountInquiryChildWorkflow = Async.procedure(() -> {
            try {
                AccountEnquiryPropagationChildWorkflow accountEnquiryPropagationChildWorkflow = Workflow.newChildWorkflowStub(AccountEnquiryPropagationChildWorkflow.class,
                        ChildWorkflowOptions.newBuilder().setWorkflowId(ACCOUNT_ENQUIRY_CHILD_WORKFLOW_PREFIX + downstreamTrackingNumber).build());
                accountEnquiryPropagationChildWorkflow.runAccountEnquiryPropagationChildWorkflow(leadTrackingNumber,downstreamTrackingNumber, journeyTypeEnum);
            } catch (Exception e) {
                log.error("Error during execution of AccountEnquiryPropagationWorkflow for leadid : {} , with exception {} ",leadTrackingNumber,e.getMessage());
            }
        });

        Promise<Void> updateFatcaConsentChildWorkflow = null;
        updateFatcaConsentChildWorkflow = Async.procedure(() -> {
            try {
                UpdateFatcaConsentChildWorkflow updateFatcaConsentChildWorkflowClient = Workflow.newChildWorkflowStub(UpdateFatcaConsentChildWorkflow.class,
                        ChildWorkflowOptions.newBuilder().setWorkflowId(UPDATE_FATCA_CONSENT_CHILD_WORKFLOW_PREFIX + downstreamTrackingNumber).build());
                updateFatcaConsentChildWorkflowClient.runUpdateFatcaConsentChildWorkflow(leadTrackingNumber,journeyTypeEnum);
            } catch (Exception e) {
                log.error("UpdateFatcaConsentChildWorkflow failed for leadid : {} , with exception {} ",leadTrackingNumber,e.getMessage());
            }
        });

        Promise<Void> updateFlexiInfoChildWorkflow = null;
        updateFlexiInfoChildWorkflow = Async.procedure(() -> {
            try {
                UpdateFlexiInfoChildWorkflow updateFlexiInfoChildWorkflowClient = Workflow.newChildWorkflowStub(UpdateFlexiInfoChildWorkflow.class,
                        ChildWorkflowOptions.newBuilder().setWorkflowId(UPDATE_FLEXI_INFO_CHILD_WORKFLOW_PREFIX + downstreamTrackingNumber).build());
                updateFlexiInfoChildWorkflowClient.runUpdateFlexiInfoChildWorkflow(leadTrackingNumber);
            } catch (Exception e) {
                log.error("UpdateFlexiInfoChildWorkflow failed for leadid : {} , with exception {} ",leadTrackingNumber,e.getMessage());
            }
        });

        Promise<Void> updateCersaiChildWorkFlow = null;
        updateCersaiChildWorkFlow = Async.procedure(() -> {
            try {
                CersaiPropagationChildWorkflow cersaiPropagationChildWorkflow = Workflow.newChildWorkflowStub(CersaiPropagationChildWorkflow.class,
                        ChildWorkflowOptions.newBuilder().setWorkflowId(CERSAI_PROPAGATION_CHILD_WORKFLOW_PREFIX + downstreamTrackingNumber).build());
                cersaiPropagationChildWorkflow.runCersaiPropagationChildWorkflow(leadTrackingNumber, journeyTypeEnum, downstreamTrackingNumber);
            } catch (Exception e) {
                log.error("CersaiPropagationWorkflow failed for leadid : {} , with exception {} ",leadTrackingNumber,e.getMessage());
            }
        });


        List<Promise<Void>> promises = Arrays.asList(
                addSignatureChildWorkflow,
                unFreezeAccountChildWorkflow,
                updateFatcaConsentChildWorkflow,
                updateFlexiInfoChildWorkflow,
                updateCersaiChildWorkFlow,
                accountInquiryChildWorkflow,
                filenetChildWorkflow
        );

        completeAsyncSteps(promises);

        return "Completed IN Assisted Full Account Propagation workflow";
    }

    private void completeAsyncSteps(List<Promise<Void>> promiseList) {
        for (Promise<Void> promise : promiseList) {
            if (promise != null) {
                promise.get();
            }
        }
    }

I don’t understand the question. The sample interceptor would do it for all activities in flight. However, you can change the sample to support your specific requirements.