Hi there,
I’m currently experiencing an intermittent test failure in my Temporal Workflow that uses OTP (One-Time Password) verification. I have a workflow named IVerifyOTPWorkflow
which includes a loop for re-entering the OTP code. The workflow is defined as follows:
@WorkflowInterface
interface IVerifyOTPWorkflow {
@WorkflowMethod
fun execute(operationDto: OperationDto)
@SignalMethod(name = HANDLE_CONFIRMATION_SIGNAL_NAME)
fun handleConfirmation(confirmationDto: ConfirmationDto)
@UpdateMethod
fun getOtpNotification(): OTPStatus
}
@WorkflowImpl(taskQueues = [EXAMPLE_TASK_QUEUE])
class VerifyOTPWorkflow : IVerifyOTPWorkflow {
@Value("\${otp.max-retries}")
private var maxRetries: Int = 3
@Value("\${otp.timeout-seconds}")
private var timeoutSeconds: Long = 40L
private val defaultActivityOptions = ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(1000))
.build()
private val confirmationActivity by lazy { Workflow.newActivityStub(IConfirmationActivity::class.java, defaultActivityOptions) }
private var otpNotification: CompletablePromise<OTPStatus> = Workflow.newPromise()
private var confirmationDtoPromise: CompletablePromise<ConfirmationDto> = Workflow.newPromise()
override fun execute(operationDto: OperationDto) {
var retriesLeft = maxRetries
while (retriesLeft > 0) {
val confirmationDto = confirmationDtoPromise.cancellableGet(timeoutSeconds, TimeUnit.SECONDS)
if (confirmationActivity.verifyConfirmation(operationDto, confirmationDto)) {
completeOtpNotification(OTPStatus.SUCCESS)
break
} else {
retriesLeft--
completeOtpNotification(OTPStatus.FAILURE)
if (retriesLeft <= 0) {
throw applicationNonRetryableFailure(InvalidConfirmationException(operationDto.oprId))
}
}
}
}
override fun handleConfirmation(confirmationDto: ConfirmationDto) {
confirmationDtoPromise.complete(confirmationDto)
confirmationDtoPromise = Workflow.newPromise()
}
override fun getOtpNotification(): OTPStatus {
return this.otpNotification.cancellableGet(10, TimeUnit.SECONDS)
}
private fun completeOtpNotification(status: OTPStatus) {
otpNotification.complete(status)
otpNotification = Workflow.newPromise()
}
}
I’m calling this IVerifyOTPWorkflow as a child workflow from other workflows, like so:
val verifyOTPWorkflow = Workflow.newChildWorkflowStub(
IVerifyOTPWorkflow::class.java,
ChildWorkflowOptions {
this.setWorkflowId("$workflowId:${IVerifyOTPWorkflow::class.simpleName}")
this.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
})
verifyOTPWorkflow.execute(op)
otpConfirmation.cancellableGet(180, TimeUnit.SECONDS)
When the OTP is entered correctly, the otpConfirmation.complete(Unit) method is called, and the parent workflow continues its execution.
However, I’m experiencing an intermittent test failure when I test the scenario where the OTP code is entered incorrectly three times. The test occasionally fails with the following error:
Unit test:
@Test
@Timeout(20)
fun `should fail DCBuyingWorkflow on invalid confirmation after 3 attempts`(): Unit = runBlocking {
val (wf, oprId) = processTillConfirm()
checkCode(wiremock, "+909181111111", smsConfig)
val otpWorkflowId = "${wf.workflowId}:${IVerifyOTPWorkflow::class.java.simpleName}"
client.newWorkflowStub<IVerifyOTPWorkflow>(otpWorkflowId).handleConfirmation(ConfirmationDto(ConfirmationType.OTP, "2341", oprId))
Assertions.assertEquals(client.newWorkflowStub<IVerifyOTPWorkflow>(otpWorkflowId).getOtpNotification(), OTPStatus.FAILURE)
client.newWorkflowStub<IVerifyOTPWorkflow>(otpWorkflowId).handleConfirmation(ConfirmationDto(ConfirmationType.OTP, "1231", oprId))
Assertions.assertEquals(client.newWorkflowStub<IVerifyOTPWorkflow>(otpWorkflowId).getOtpNotification(), OTPStatus.FAILURE)
client.newWorkflowStub<IVerifyOTPWorkflow>(otpWorkflowId).handleConfirmation(ConfirmationDto(ConfirmationType.OTP, "1231", oprId))
assertThrows<WorkflowFailedException> {
wf.getResult()
}
}
Error:
java.util.concurrent.TimeoutException: should fail DCBuyingWorkflow on invalid confirmation after 3 attempts() timed out after 20 seconds
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
Suppressed: io.temporal.client.WorkflowServiceException: workflowId=‘befd8e52-4a6f-4502-9a1f-ec7a4c9409a4:IVerifyOTPWorkflow’, runId=‘’, workflowType=‘IVerifyOTPWorkflow’}
at app//io.temporal.client.WorkflowStubImpl.startUpdate(WorkflowStubImpl.java:362)
at app//io.temporal.client.WorkflowStubImpl.update(WorkflowStubImpl.java:303)
at app//io.temporal.testing.TimeLockingInterceptor$TimeLockingWorkflowStub.update(TimeLockingInterceptor.java:238)
at app//io.temporal.client.WorkflowInvocationHandler$SyncWorkflowInvocationHandler.updateWorkflow(WorkflowInvocationHandler.java:317)
at app//io.temporal.client.WorkflowInvocationHandler$SyncWorkflowInvocationHandler.invoke(WorkflowInvocationHandler.java:274)
at app//io.temporal.client.WorkflowInvocationHandler.invoke(WorkflowInvocationHandler.java:175)
at app/jdk.proxy3/jdk.proxy3.$Proxy316.getOtpNotification(Unknown Source)
at app//temporal.CustomerDCBuying3Test$should fail DCBuyingWorkflow on invalid confirmation after 3 attempts$1.invokeSuspend(CustomerDCBuying3Test.kt:137)
at app//kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at app//kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
at app//kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:280)
at app//kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:85)
at app//kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
at app//kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
at app//kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
at app//kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
at app//temporal.CustomerDCBuying3Test.should fail DCBuyingWorkflow on invalid confirmation after 3 attempts(CustomerDCBuying3Test.kt:130)
at java.base@17.0.10/java.lang.reflect.Method.invoke(Method.java:568)
… 2 more
Caused by: java.util.concurrent.CancellationException: The gRPC request was cancelled
at io.temporal.internal.retryer.GrpcRetryerUtils.createFinalExceptionIfNotRetryable(GrpcRetryerUtils.java:59)
at io.temporal.internal.retryer.GrpcSyncRetryer.retry(GrpcSyncRetryer.java:77)
at io.temporal.internal.retryer.GrpcRetryer.retryWithResult(GrpcRetryer.java:60)
at io.temporal.internal.client.external.GenericWorkflowClientImpl.update(GenericWorkflowClientImpl.java:329)
at io.temporal.internal.client.RootWorkflowClientInvoker.startUpdate(RootWorkflowClientInvoker.java:338)
at io.temporal.common.interceptors.WorkflowClientCallsInterceptorBase.startUpdate(WorkflowClientCallsInterceptorBase.java:66)
at io.temporal.opentracing.internal.OpenTracingWorkflowClientCallsInterceptor.startUpdate(OpenTracingWorkflowClientCallsInterceptor.java:136)
at io.temporal.client.WorkflowStubImpl.startUpdate(WorkflowStubImpl.java:331)
… 19 more
I’m seeking assistance in understanding the root cause of this intermittent test failure and potential solutions to ensure the test’s reliability. I’m also open to suggestions on how to better structure my workflow or test scenario to avoid this issue.
Any insights or suggestions would be greatly appreciated. Thank you for your help!