I am looking for some working examples to write integration tests for temporal workflows. I have referred the documentation which says, the temporal library supports the integration tests, but I was not able to find any reference for it.
Can you please guide me on how do we write integration tests for temporal?
@antonio.perez - Thank you for sharing the details. I was looking for writing the tests in below way and find some examples for it:
start the temporal server using test container
Run the workflow service
Test the business scenarios by mocking the dependent events/messages. As part of the test, connect to temporal and get the status and activity list of workflow for validation. I was checking if there is any API/Example which could help us to get the workflow and activity list for a particular workflow.
Temporal sdk test framework does not require server to be running. If you wanted to run your tests against a test server you could do:
@Rule
public TestWorkflowRule testWorkflowRule =
TestWorkflowRule.newBuilder()
// ...
.setUseExternalService(true)
.setTarget("<service:port>") // set if target is not 127.0.0.1:7233 default
.build();
Note that automatic time skipping provided by sdk test framework is disabled when running tests against a real service this way.
if there is any API/Example which could help us to get the workflow and activity list for a particular workflow.
If you use a real test server then you can use all List apis available in your test. If you use the sdk test environment it doesnot support all list apis currently (for example it does not support ListWorkflowExecutions with visibility query) but you can use ListOpenWorkflowExecutions, ListClosedWorkflowExecutions. You could have for example util method:
This was really helpful. I was able to get the list of workflows.
I was trying to find method to fetch the list of activities under each worfklow based on Run id. But I was not able to do that. Could you please guide me?
The simplest way to do it would be something like:
//declare a variable in your class
private List<String> completedActivities = new ArrayList<String>();
// in your workflow method
activities.activity_1("Hello", name);
completedActivities.add("activity_1");
activities.activity_2("Hello", name);
completedActivities.add("activity_2");
activities.activity_3("Hello", name);
completedActivities.add("activity_3");
//and them, implement the queryMethod like
@Override
public List<String> getCompletedActivities() {
return this.completedActivities;
}
I think you can accomplish this with interceptors as well, I will take a look.
The use case which we are working on, the list of activities are not consistent for all workflows. When a request is recevied for a workflow, there will be rule which will be called via GRPC and this rule would define the list of activities which will be part of the workflow. In this scenario how can I fetch the activity list?
If I understand correctly, this rule is executed/called inside an activity. You can store the activity result in a variable and implement a QueryMethod that returns the value.
Correct me if I am wrong
Before starting the workflow, you execute the rule that gives you the list of activities the workflow has to execute. Then you start the workflow, how the workflow knows the activities they have to execute? are you passing then the list of activities to the workflow when you start the workflow?
Also, have you considered executing the rule to calculate the list of activities as part of the workflow execution instead of before stating it?
Then from an external service/client (not inside the workflow code), you want to get the list of activities that are part of a workflow execution.
then you can store the workflow input (list of activities) in a variable and implement a QueryMethod that returns the value.
//And then from your external service, create a WorkflowStub and query the workflow
{
//You can create an UntypedWorkflowStub if you don't have access to the type
List<String> result = client.newUntypedWorkflowStub(workflowId,
Optional.of(runId),
Optional.empty()).query("getActivities", List.class);
System.out.println(result);
}
{
//Or a workflowStub if you have access to the workflowType
List<String> result = client.newWorkflowStub(HelloActivity.GreetingWorkflow.class, workflowId,
Optional.of(runId)).getActivities();
System.out.println(result);
}
I did give a try based on your suggestion, but still I am receiving the same error. Below is the code:
Workflow Interface
package org.example;
import io.temporal.workflow.QueryMethod;
import io.temporal.workflow.WorkflowInterface;
import java.util.ArrayList;
@WorkflowInterface
public interface BookingWorkflow {
String QUEUE = "Customer_Order";
// Workflow query method. Used to return our greeting as a query value
@QueryMethod
ArrayList<String> getActivities();
}
Workflow Interface Implementation
I have created an arraylist variable with list of activities.
package org.example;
import java.util.ArrayList;
public class BookingWorkflowImpl implements BookingWorkflow {
ArrayList<String> workflowActivties = new ArrayList<>();
public BookingWorkflowImpl() {
workflowActivties.add("Capture");
workflowActivties.add("Validate");
workflowActivties.add("Fulfillment");
workflowActivties.add("Record Booking");
workflowActivties.add("Print Booking");
workflowActivties.add("Send notification");
}
// our workflow query method returns the greeting
@Override
public ArrayList<String> getActivities() {
return workflowActivties;
}
}
Test Code
Test code is currently main method, as I am still exploring to get the relevant details. I will be moving it in appropriate place.
package org.example;
import com.google.protobuf.ByteString;
import io.temporal.api.workflow.v1.PendingActivityInfo;
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
import io.temporal.api.workflowservice.v1.*;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.testing.TestWorkflowRule;
import java.util.ArrayList;
import java.util.Optional;
public class Main {
public static TestWorkflowRule testWorkflowRule =
TestWorkflowRule.newBuilder()
// ...
.setNamespace("default")
.setUseExternalService(true)
//.setTarget("<service:port>") // set if target is not 127.0.0.1:7233 default
.build();
public static void main(String[] args) {
listAllWorkflowExecutions(testWorkflowRule.getWorkflowClient(), testWorkflowRule.getWorkflowServiceStubs(), null);
GetWorkflowExecutionHistoryRequest request =
GetWorkflowExecutionHistoryRequest.newBuilder()
.setNamespace("default")
.build();
}
private static void listAllWorkflowExecutions(
WorkflowClient client, WorkflowServiceStubs service, ByteString token) {
ListWorkflowExecutionsRequest request;
if (token == null) {
request =
ListWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.build();
} else {
request =
ListWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.setNextPageToken(token)
.build();
}
ListWorkflowExecutionsResponse response =
service.blockingStub().listWorkflowExecutions(request);
for (WorkflowExecutionInfo info : response.getExecutionsList()) {
;
DescribeWorkflowExecutionRequest describeWorkflowExecutionRequest = DescribeWorkflowExecutionRequest.newBuilder().setNamespace("default").setExecution(info.getExecution()).build();
DescribeWorkflowExecutionResponse describeWorkflowExecutionResponse = service.blockingStub().describeWorkflowExecution(describeWorkflowExecutionRequest);
ArrayList<String> activities = client.newUntypedWorkflowStub(info.getExecution().getWorkflowId(),
Optional.of(info.getExecution().getRunId()), Optional.empty()).query("getActivities", ArrayList.class);
System.out.println(activities);
for (PendingActivityInfo pendingActivityInfo : describeWorkflowExecutionResponse.getPendingActivitiesList()) {
System.out.println(pendingActivityInfo.getActivityType());
}
}
if (response.getNextPageToken().size() > 0) {
listAllWorkflowExecutions(client, service, response.getNextPageToken());
}
}
}
Exception:
io.temporal.client.WorkflowQueryException: workflowId='CUSTOM_DYNAMIC_FLOW_2', runId='ddef92e0-3707-43a1-ad32-d6d87600b74a}
io.temporal.client.WorkflowQueryException: workflowId='CUSTOM_DYNAMIC_FLOW_2', runId='ddef92e0-3707-43a1-ad32-d6d87600b74a}
at io.temporal.internal.sync.WorkflowStubImpl.throwAsWorkflowFailureExceptionForQuery(WorkflowStubImpl.java:371)
at io.temporal.internal.sync.WorkflowStubImpl.query(WorkflowStubImpl.java:274)
at io.temporal.internal.sync.WorkflowStubImpl.query(WorkflowStubImpl.java:260)
at org.example.Main.listClosedWorkflowExecutions(Main.java:65)
at org.example.Main.main(Main.java:32)
Caused by: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: java.lang.IllegalArgumentException: Unknown query type: getActivities, knownTypes=[]
at io.temporal.internal.sync.QueryDispatcher.handleQuery(QueryDispatcher.java:79)
at io.temporal.internal.sync.SyncWorkflowContext.handleQuery(SyncWorkflowContext.java:275)
at io.temporal.internal.sync.WorkflowExecuteRunnable.handleQuery(WorkflowExecuteRunnable.java:118)
at io.temporal.internal.sync.SyncWorkflow.query(SyncWorkflow.java:187)
at io.temporal.internal.replay.ReplayWorkflowExecutor.query(ReplayWorkflowExecutor.java:136)
at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleQueryWorkflowTask(ReplayWorkflowRunTaskHandler.java:241)
at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithQuery(ReplayWorkflowTaskHandler.java:117)
at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:97)
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handleTask(WorkflowWorker.java:277)
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:231)
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:173)
at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:93)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:271)
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:252)
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:165)
at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.queryWorkflow(WorkflowServiceGrpc.java:4099)
at io.temporal.internal.client.external.GenericWorkflowClientImpl.lambda$query$9(GenericWorkflowClientImpl.java:206)
at io.temporal.internal.retryer.GrpcSyncRetryer.retry(GrpcSyncRetryer.java:67)
at io.temporal.internal.retryer.GrpcRetryer.retryWithResult(GrpcRetryer.java:60)
at io.temporal.internal.client.external.GenericWorkflowClientImpl.query(GenericWorkflowClientImpl.java:201)
at io.temporal.internal.client.RootWorkflowClientInvoker.query(RootWorkflowClientInvoker.java:140)
at io.temporal.internal.sync.WorkflowStubImpl.query(WorkflowStubImpl.java:270)
... 3 more
Exception in thread "main" io.temporal.client.WorkflowQueryException: workflowId='CUSTOM_DYNAMIC_FLOW_2', runId='ddef92e0-3707-43a1-ad32-d6d87600b74a}
at io.temporal.internal.sync.WorkflowStubImpl.throwAsWorkflowFailureExceptionForQuery(WorkflowStubImpl.java:371)
at io.temporal.internal.sync.WorkflowStubImpl.query(WorkflowStubImpl.java:274)
at io.temporal.internal.sync.WorkflowStubImpl.query(WorkflowStubImpl.java:260)
at org.example.Main.listClosedWorkflowExecutions(Main.java:65)
at org.example.Main.main(Main.java:32)
Caused by: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: java.lang.IllegalArgumentException: Unknown query type: getActivities, knownTypes=[]
at io.temporal.internal.sync.QueryDispatcher.handleQuery(QueryDispatcher.java:79)
at io.temporal.internal.sync.SyncWorkflowContext.handleQuery(SyncWorkflowContext.java:275)
at io.temporal.internal.sync.WorkflowExecuteRunnable.handleQuery(WorkflowExecuteRunnable.java:118)
at io.temporal.internal.sync.SyncWorkflow.query(SyncWorkflow.java:187)
at io.temporal.internal.replay.ReplayWorkflowExecutor.query(ReplayWorkflowExecutor.java:136)
at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleQueryWorkflowTask(ReplayWorkflowRunTaskHandler.java:241)
at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithQuery(ReplayWorkflowTaskHandler.java:117)
at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:97)
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handleTask(WorkflowWorker.java:277)
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:231)
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:173)
at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:93)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:271)
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:252)
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:165)
at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.queryWorkflow(WorkflowServiceGrpc.java:4099)
at io.temporal.internal.client.external.GenericWorkflowClientImpl.lambda$query$9(GenericWorkflowClientImpl.java:206)
at io.temporal.internal.retryer.GrpcSyncRetryer.retry(GrpcSyncRetryer.java:67)
at io.temporal.internal.retryer.GrpcRetryer.retryWithResult(GrpcRetryer.java:60)
at io.temporal.internal.client.external.GenericWorkflowClientImpl.query(GenericWorkflowClientImpl.java:201)
at io.temporal.internal.client.RootWorkflowClientInvoker.query(RootWorkflowClientInvoker.java:140)
at io.temporal.internal.sync.WorkflowStubImpl.query(WorkflowStubImpl.java:270)
... 3 more
Disconnected from the target VM, address: '127.0.0.1:54460', transport: 'socket'
Process finished with exit code 1