Infinite loop waiting for signal

Hi I am new to temporal, trying to understand how to model a workflow that is waiting for a signal forever, and only exits when the signal received meets condition.


public class Sample {
    class Application {
        private String status;
    }
    
    @WorkflowInterface
    public interface SampleWorkflow {

        @WorkflowMethod
        void submit(Application application);

        @SignalMethod
        void update(Application application);
    }
    
    public static class SampleWorkflowImpl implements SampleWorkflow {
        private Application application;

        @Override
        public void submit(Application application) {
            this.application = application;
            while (true) {
                var oldApp = this.application;
                Workflow.await(() -> !Objects.equals(application.status, oldApp.status));
                if ("1".equals(application.status)) {
                    System.out.println("Success");
                    return;
                } else {
                    System.out.println("Condition not met yet, continue");
                }
            }
        }

        @Override
        public void update(Application application) {
            this.application = application;
        }
    }
}

Is the above code the correct way to model such problem?

Also, any suggestions for how to unit test the code?

Yes, this code should work fine. One caveat if you expect the number of iterations to be unbounded you have to call continue as new periodically to reset the history size back to 0.

Use the Temporal SDK unit testing framework to test workflows. Here is the unit test for HelloPeriodic Sample.

Thanks @maxim. A couple of questions:

  1. This workflow does not expect too many signals. The signalMethod is a “human-in-the-loop” step, expecting to be less than 100 times. Does it still make sense to use “continue as new”
  2. I looked at the HelloPeriodic test example, one thing I want to clarify is the usage of sleep. In my test, looks like only when I invoke sleep, then I will get an actual invocation of the activity method. So is it required to invoke sleep every time? or is it just the specific case of my use case? if it is required to invoke, what duration should I put?
  3. I was also trying to verify the method invocation after the required signal is received, but was not able to. Could you suggest the best way to test that? See How to test this ??? part

Please see attached slightly updated sample workflow and its unit test

public class Sample {
    public static class Application {
        public Application() {
            status = "initial";
        };
        private String status;
    }

    @WorkflowInterface
    public interface SampleWorkflow {

        @WorkflowMethod
        void submit(Application application);

        @SignalMethod
        void update(Application application);
    }

    @ActivityInterface
    public interface Activity {
        @ActivityMethod
        void persist(Application application);

        @ActivityMethod
        void notifyOperator();
    }

    public static class ActivityImpl implements Activity {
        private ServiceImpl service;
        public ActivityImpl(ServiceImpl service) {
            this.service = service;
        }

        @Override
        public void persist(Application application) {
            service.dbPersist(application);
        }

        @Override
        public void notifyOperator() {
            service.notifyOperator();
        }
    }

    public static class ServiceImpl {
        public void dbPersist(Application application) {}

        public void notifyOperator() {}
    }

    public static class SampleWorkflowImpl implements SampleWorkflow {
        private Application application;

        private Activity activity = Workflow.newActivityStub(Activity.class, ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build());

        @Override
        public void submit(Application application) {
            this.application = application;
            while (true) {
                var oldApp = this.application;
                Workflow.await(() -> !Objects.equals(application.status, oldApp.status));
                if ("1".equals(application.status)) {
                    activity.notifyOperator();
                    System.out.println("Success");
                    return;
                } else {
                    System.out.println("Condition not met yet, continue");
                }
            }
        }

        @Override
        public void update(Application application) {
            activity.persist(application);
            this.application = application;
        }
    }
}

Test


public class SampleTest {
    private TestWorkflowEnvironment testEnv;
    private Worker worker;
    private WorkflowClient client;

    // Set up the test workflow environment
    @BeforeEach
    public void setUp() {
        testEnv = TestWorkflowEnvironment.newInstance();
        worker = testEnv.newWorker(TaskQueue.DEFAULT_TASK_QUEUE);
        // Register your workflow implementations
        worker.registerWorkflowImplementationTypes(Sample.SampleWorkflowImpl.class);

        client = testEnv.getWorkflowClient();
    }

    @AfterEach
    public void tearDown() {
        testEnv.close();
    }

    @Test
    public void testWorkflow() {
        var mock = Mockito.mock(Sample.ServiceImpl.class);
        worker.registerActivitiesImplementations(new Sample.ActivityImpl(mock));
        testEnv.start();

        Sample.SampleWorkflow workflow = client.newWorkflowStub(
                Sample.SampleWorkflow.class,
                WorkflowOptions.newBuilder().setWorkflowId("1")
                        .setWorkflowExecutionTimeout(Duration.ofSeconds(1000))
                        .setTaskQueue(TaskQueue.DEFAULT_TASK_QUEUE).build());

        WorkflowExecution execution = WorkflowClient.start(workflow::submit, new Sample.Application());
        workflow = client.newWorkflowStub(Sample.SampleWorkflow.class, "1");
        workflow.update(new Sample.Application());

        testEnv.sleep(Duration.ofSeconds(1));

        Mockito.verify(mock).dbPersist(Mockito.any());

        // How to test this ???
        Mockito.verify(mock).notifyOperator();
    }
}

I also tried to simply “invoke the workflow method again” hoping to complete workflow synchronously, based on samples-java/HelloSignalTest.java at master · temporalio/samples-java · GitHub

public class SampleTest {
    private TestWorkflowEnvironment testEnv;
    private Worker worker;
    private WorkflowClient client;

    // Set up the test workflow environment
    @BeforeEach
    public void setUp() {
        testEnv = TestWorkflowEnvironment.newInstance();
        worker = testEnv.newWorker(TaskQueue.DEFAULT_TASK_QUEUE);
        // Register your workflow implementations
        worker.registerWorkflowImplementationTypes(Sample.SampleWorkflowImpl.class);

        client = testEnv.getWorkflowClient();
    }

    @AfterEach
    public void tearDown() {
        testEnv.close();
    }

    @Test
    public void testWorkflow() {
        var mock = Mockito.mock(Sample.ServiceImpl.class);
        worker.registerActivitiesImplementations(new Sample.ActivityImpl(mock));
        testEnv.start();

        Sample.SampleWorkflow workflow = client.newWorkflowStub(
                Sample.SampleWorkflow.class,
                WorkflowOptions.newBuilder().setWorkflowId("1")
                        .setWorkflowExecutionTimeout(Duration.ofSeconds(1000))
                        .setRetryOptions(DefaultWorkflowOptions.DEFAULT_RETRY_OPTIONS)
                        .setWorkflowIdReusePolicy(
                                WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE)
                        .setTaskQueue(TaskQueue.DEFAULT_TASK_QUEUE).build());

        WorkflowExecution execution = WorkflowClient.start(workflow::submit, new Sample.Application());
//        workflow = client.newWorkflowStub(Sample.SampleWorkflow.class, "1");
        workflow.update(new Sample.Application());

//        testEnv.sleep(Duration.ofSeconds(1));


        // sync call to reconnect to the workflow?
        workflow.submit(new Sample.Application());
        // How to test this ???
        Mockito.verify(mock).dbPersist(Mockito.any());
        Mockito.verify(mock).notifyOperator();
    }
}

but i got the following exception

io.temporal.client.WorkflowFailedException: workflowId='1', runId='8a6a0832-9a5f-41a2-8fa9-3123668cdb62', workflowType='SampleWorkflow', retryState=RETRY_STATE_TIMEOUT, workflowTaskCompletedEventId=0

	at io.temporal.internal.common.WorkflowExecutionUtils.getResultFromCloseEvent(WorkflowExecutionUtils.java:201)
	at io.temporal.internal.common.WorkflowExecutionUtils.getWorkflowExecutionResult(WorkflowExecutionUtils.java:135)
	at io.temporal.internal.client.RootWorkflowClientInvoker.getResult(RootWorkflowClientInvoker.java:94)
	at io.temporal.internal.sync.WorkflowStubImpl.getResult(WorkflowStubImpl.java:243)
	at io.temporal.internal.sync.WorkflowStubImpl.getResult(WorkflowStubImpl.java:225)
	at io.temporal.testing.TestWorkflowEnvironmentInternal$TimeLockingInterceptor$TimeLockingWorkflowStub.getResult(TestWorkflowEnvironmentInternal.java:296)
	at io.temporal.internal.sync.WorkflowInvocationHandler$SyncWorkflowInvocationHandler.startWorkflow(WorkflowInvocationHandler.java:315)
	at io.temporal.internal.sync.WorkflowInvocationHandler$SyncWorkflowInvocationHandler.invoke(WorkflowInvocationHandler.java:270)
	at io.temporal.internal.sync.WorkflowInvocationHandler.invoke(WorkflowInvocationHandler.java:178)
	at jdk.proxy2/jdk.proxy2.$Proxy29.submit(Unknown Source)
	at io.quickpower.hoa.hoaserver.workflow.SampleTest.testWorkflow(SampleTest.java:59)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:567)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
	at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
	at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: io.temporal.failure.TimeoutFailure: timeoutType=TIMEOUT_TYPE_START_TO_CLOSE
	... 76 more
  1. This workflow does not expect too many signals. The signalMethod is a “human-in-the-loop” step, expecting to be less than 100 times. Does it still make sense to use “continue as new”

No, in this case continue as new is not needed unless each signal processing invokes dozens of activities.

  1. I looked at the HelloPeriodic test example, one thing I want to clarify is the usage of sleep . In my test, looks like only when I invoke sleep , then I will get an actual invocation of the activity method. So is it required to invoke sleep every time? or is it just the specific case of my use case? if it is required to invoke, what duration should I put?

You are using asynchronous workflow start. In this case you need to call testEnv.sleep to give the workflow time to execute the activity. If you invoked the workflow synchronously as workflow.submit then no testEnv.sleep would be required as the workflow would complete by the time the call returns.

  1. I was also trying to verify the method invocation after the required signal is received, but was not able to. Could you suggest the best way to test that? See How to test this ??? part

To reconnect to a running workflow you have to use a stub that is used to start a workflow, not the one that takes the workflow id as a parameter. I realized that it is not an ideal experience and filed an issue to get this fixed.

Try changing your code to:

        WorkflowOptions options =   WorkflowOptions.newBuilder().setWorkflowId("1")
                        .setWorkflowExecutionTimeout(Duration.ofSeconds(1000))
                        .setRetryOptions(DefaultWorkflowOptions.DEFAULT_RETRY_OPTIONS)
                        .setWorkflowIdReusePolicy(
                                WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE)
                        .setTaskQueue(TaskQueue.DEFAULT_TASK_QUEUE).build();

        Sample.SampleWorkflow workflow = client.newWorkflowStub(Sample.SampleWorkflow.class, options);

        // async start
        WorkflowExecution execution = WorkflowClient.start(workflow::submit, new Sample.Application());
        workflow.update(new Sample.Application());

        // sync call to reconnect to the workflow
        Sample.SampleWorkflow workflow = client.newWorkflowStub(Sample.SampleWorkflow.class, options);
        workflow.submit(new SampleApplication()); // arg is ignored if reconnected

        Mockito.verify(mock).dbPersist(Mockito.any());
        Mockito.verify(mock).notifyOperator();
    

Another way to wait for a workflow completion is using WorkflowStub:

  WorkflowStub stub = Workflow.newUntypedWorkflowStub(workflowId, Optional.empty(), Optional.empty());
  stub.getResult(...);