Hi I am new to temporal, trying to understand how to model a workflow that is waiting for a signal forever, and only exits when the signal received meets condition.
public class Sample {
class Application {
private String status;
}
@WorkflowInterface
public interface SampleWorkflow {
@WorkflowMethod
void submit(Application application);
@SignalMethod
void update(Application application);
}
public static class SampleWorkflowImpl implements SampleWorkflow {
private Application application;
@Override
public void submit(Application application) {
this.application = application;
while (true) {
var oldApp = this.application;
Workflow.await(() -> !Objects.equals(application.status, oldApp.status));
if ("1".equals(application.status)) {
System.out.println("Success");
return;
} else {
System.out.println("Condition not met yet, continue");
}
}
}
@Override
public void update(Application application) {
this.application = application;
}
}
}
Is the above code the correct way to model such problem?
Also, any suggestions for how to unit test the code?
Yes, this code should work fine. One caveat if you expect the number of iterations to be unbounded you have to call continue as new periodically to reset the history size back to 0.
This workflow does not expect too many signals. The signalMethod is a “human-in-the-loop” step, expecting to be less than 100 times. Does it still make sense to use “continue as new”
I looked at the HelloPeriodic test example, one thing I want to clarify is the usage of sleep. In my test, looks like only when I invoke sleep, then I will get an actual invocation of the activity method. So is it required to invoke sleep every time? or is it just the specific case of my use case? if it is required to invoke, what duration should I put?
I was also trying to verify the method invocation after the required signal is received, but was not able to. Could you suggest the best way to test that? See How to test this ??? part
Please see attached slightly updated sample workflow and its unit test
public class Sample {
public static class Application {
public Application() {
status = "initial";
};
private String status;
}
@WorkflowInterface
public interface SampleWorkflow {
@WorkflowMethod
void submit(Application application);
@SignalMethod
void update(Application application);
}
@ActivityInterface
public interface Activity {
@ActivityMethod
void persist(Application application);
@ActivityMethod
void notifyOperator();
}
public static class ActivityImpl implements Activity {
private ServiceImpl service;
public ActivityImpl(ServiceImpl service) {
this.service = service;
}
@Override
public void persist(Application application) {
service.dbPersist(application);
}
@Override
public void notifyOperator() {
service.notifyOperator();
}
}
public static class ServiceImpl {
public void dbPersist(Application application) {}
public void notifyOperator() {}
}
public static class SampleWorkflowImpl implements SampleWorkflow {
private Application application;
private Activity activity = Workflow.newActivityStub(Activity.class, ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build());
@Override
public void submit(Application application) {
this.application = application;
while (true) {
var oldApp = this.application;
Workflow.await(() -> !Objects.equals(application.status, oldApp.status));
if ("1".equals(application.status)) {
activity.notifyOperator();
System.out.println("Success");
return;
} else {
System.out.println("Condition not met yet, continue");
}
}
}
@Override
public void update(Application application) {
activity.persist(application);
this.application = application;
}
}
}
Test
public class SampleTest {
private TestWorkflowEnvironment testEnv;
private Worker worker;
private WorkflowClient client;
// Set up the test workflow environment
@BeforeEach
public void setUp() {
testEnv = TestWorkflowEnvironment.newInstance();
worker = testEnv.newWorker(TaskQueue.DEFAULT_TASK_QUEUE);
// Register your workflow implementations
worker.registerWorkflowImplementationTypes(Sample.SampleWorkflowImpl.class);
client = testEnv.getWorkflowClient();
}
@AfterEach
public void tearDown() {
testEnv.close();
}
@Test
public void testWorkflow() {
var mock = Mockito.mock(Sample.ServiceImpl.class);
worker.registerActivitiesImplementations(new Sample.ActivityImpl(mock));
testEnv.start();
Sample.SampleWorkflow workflow = client.newWorkflowStub(
Sample.SampleWorkflow.class,
WorkflowOptions.newBuilder().setWorkflowId("1")
.setWorkflowExecutionTimeout(Duration.ofSeconds(1000))
.setTaskQueue(TaskQueue.DEFAULT_TASK_QUEUE).build());
WorkflowExecution execution = WorkflowClient.start(workflow::submit, new Sample.Application());
workflow = client.newWorkflowStub(Sample.SampleWorkflow.class, "1");
workflow.update(new Sample.Application());
testEnv.sleep(Duration.ofSeconds(1));
Mockito.verify(mock).dbPersist(Mockito.any());
// How to test this ???
Mockito.verify(mock).notifyOperator();
}
}
public class SampleTest {
private TestWorkflowEnvironment testEnv;
private Worker worker;
private WorkflowClient client;
// Set up the test workflow environment
@BeforeEach
public void setUp() {
testEnv = TestWorkflowEnvironment.newInstance();
worker = testEnv.newWorker(TaskQueue.DEFAULT_TASK_QUEUE);
// Register your workflow implementations
worker.registerWorkflowImplementationTypes(Sample.SampleWorkflowImpl.class);
client = testEnv.getWorkflowClient();
}
@AfterEach
public void tearDown() {
testEnv.close();
}
@Test
public void testWorkflow() {
var mock = Mockito.mock(Sample.ServiceImpl.class);
worker.registerActivitiesImplementations(new Sample.ActivityImpl(mock));
testEnv.start();
Sample.SampleWorkflow workflow = client.newWorkflowStub(
Sample.SampleWorkflow.class,
WorkflowOptions.newBuilder().setWorkflowId("1")
.setWorkflowExecutionTimeout(Duration.ofSeconds(1000))
.setRetryOptions(DefaultWorkflowOptions.DEFAULT_RETRY_OPTIONS)
.setWorkflowIdReusePolicy(
WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE)
.setTaskQueue(TaskQueue.DEFAULT_TASK_QUEUE).build());
WorkflowExecution execution = WorkflowClient.start(workflow::submit, new Sample.Application());
// workflow = client.newWorkflowStub(Sample.SampleWorkflow.class, "1");
workflow.update(new Sample.Application());
// testEnv.sleep(Duration.ofSeconds(1));
// sync call to reconnect to the workflow?
workflow.submit(new Sample.Application());
// How to test this ???
Mockito.verify(mock).dbPersist(Mockito.any());
Mockito.verify(mock).notifyOperator();
}
}
but i got the following exception
io.temporal.client.WorkflowFailedException: workflowId='1', runId='8a6a0832-9a5f-41a2-8fa9-3123668cdb62', workflowType='SampleWorkflow', retryState=RETRY_STATE_TIMEOUT, workflowTaskCompletedEventId=0
at io.temporal.internal.common.WorkflowExecutionUtils.getResultFromCloseEvent(WorkflowExecutionUtils.java:201)
at io.temporal.internal.common.WorkflowExecutionUtils.getWorkflowExecutionResult(WorkflowExecutionUtils.java:135)
at io.temporal.internal.client.RootWorkflowClientInvoker.getResult(RootWorkflowClientInvoker.java:94)
at io.temporal.internal.sync.WorkflowStubImpl.getResult(WorkflowStubImpl.java:243)
at io.temporal.internal.sync.WorkflowStubImpl.getResult(WorkflowStubImpl.java:225)
at io.temporal.testing.TestWorkflowEnvironmentInternal$TimeLockingInterceptor$TimeLockingWorkflowStub.getResult(TestWorkflowEnvironmentInternal.java:296)
at io.temporal.internal.sync.WorkflowInvocationHandler$SyncWorkflowInvocationHandler.startWorkflow(WorkflowInvocationHandler.java:315)
at io.temporal.internal.sync.WorkflowInvocationHandler$SyncWorkflowInvocationHandler.invoke(WorkflowInvocationHandler.java:270)
at io.temporal.internal.sync.WorkflowInvocationHandler.invoke(WorkflowInvocationHandler.java:178)
at jdk.proxy2/jdk.proxy2.$Proxy29.submit(Unknown Source)
at io.quickpower.hoa.hoaserver.workflow.SampleTest.testWorkflow(SampleTest.java:59)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:567)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: io.temporal.failure.TimeoutFailure: timeoutType=TIMEOUT_TYPE_START_TO_CLOSE
... 76 more
This workflow does not expect too many signals. The signalMethod is a “human-in-the-loop” step, expecting to be less than 100 times. Does it still make sense to use “continue as new”
No, in this case continue as new is not needed unless each signal processing invokes dozens of activities.
I looked at the HelloPeriodic test example, one thing I want to clarify is the usage of sleep . In my test, looks like only when I invoke sleep , then I will get an actual invocation of the activity method. So is it required to invoke sleep every time? or is it just the specific case of my use case? if it is required to invoke, what duration should I put?
You are using asynchronous workflow start. In this case you need to call testEnv.sleep to give the workflow time to execute the activity. If you invoked the workflow synchronously as workflow.submit then no testEnv.sleep would be required as the workflow would complete by the time the call returns.
I was also trying to verify the method invocation after the required signal is received, but was not able to. Could you suggest the best way to test that? See How to test this ??? part
To reconnect to a running workflow you have to use a stub that is used to start a workflow, not the one that takes the workflow id as a parameter. I realized that it is not an ideal experience and filed an issue to get this fixed.
Try changing your code to:
WorkflowOptions options = WorkflowOptions.newBuilder().setWorkflowId("1")
.setWorkflowExecutionTimeout(Duration.ofSeconds(1000))
.setRetryOptions(DefaultWorkflowOptions.DEFAULT_RETRY_OPTIONS)
.setWorkflowIdReusePolicy(
WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE)
.setTaskQueue(TaskQueue.DEFAULT_TASK_QUEUE).build();
Sample.SampleWorkflow workflow = client.newWorkflowStub(Sample.SampleWorkflow.class, options);
// async start
WorkflowExecution execution = WorkflowClient.start(workflow::submit, new Sample.Application());
workflow.update(new Sample.Application());
// sync call to reconnect to the workflow
Sample.SampleWorkflow workflow = client.newWorkflowStub(Sample.SampleWorkflow.class, options);
workflow.submit(new SampleApplication()); // arg is ignored if reconnected
Mockito.verify(mock).dbPersist(Mockito.any());
Mockito.verify(mock).notifyOperator();
Another way to wait for a workflow completion is using WorkflowStub: