Testing an activity implementation when using ActivityExecutionContext to asynchronously execute activities

Hello!

We’ve started using the ManualActivityCompletionClient as a way to execute the code in our activities asynchronously . The question we have is on how to test the activities after this change, as calling the activities in our test code now gives the following IllegalStateException:

ActivityExecutionContext can be used only inside of activity implementation methods and in the same thread that invoked an activity.
java.lang.IllegalStateException: ActivityExecutionContext can be used only inside of activity implementation methods and in the same thread that invoked an activity.

Any examples on how to test code that makes use of the ManualActivityCompletionClient would be greatly appreciated!

In the java sdk samples repo you can look at

  1. HelloAsyncActivityCompletion
  2. HelloAsyncActivityCompletionTest

is this what you are looking for?

Hey!
Thanks for the reply!
I think the example you gave is for testing a full workflow. In my case it’s just the activity I want to be testing. Before using the async logic, I was able to simply instantiate the activities implementation, mocking out the backend, so I can define exactly what to return when the method on the activity is called. Since I was awaiting the result in the activity, I could then compare the output of the call with the expected outcome.
I hope that makes some sense. Please tell me if you need me to clarify a bit more

I don’t think it is currently supported. Here is the associated issue.

@tihomir it seems the temporal test is based on junit-4, “public class TestWorkflowRule implements TestRule {” which there is no TestRule in junit-5. How can we use temporal test if all our projects are junit-5?

Java SDK provides support for both JUnit 4 and 5, more info here.
For JUnit 5 use TestWorkflowExtension and TestActivityExtension for isolated activity tests.
See samples repo tests here and here.

I also want to junit test a method having Activity.getExecutionContext().heartbeat()
However, it throws the same error.
java.lang.IllegalStateException: ActivityExecutionContext can be used only inside of activity implementation methods and in the same thread that invoked an activity.

Is this stlll not supported ? Please advice how to test when we have activity methods using the Activity.getExecutionContext() using junit / mockito ?

Use TestActivityEnvironment.

Thanks for your prompt replies. Greatly helpful. Can you please provide me some sample (Just like @tihomir provided earlier using TestworkflowEnvironment) using this TestActivityEnvironment to test an Activity which has Activity.getExecutionContext(). Existing tests written with Mockito / junit5 for this Activity breaks once I add the HeartBeat code to an Activity. I would like to not only test the HeartBeat, but to refactor the testcode for an activity which uses this Activity.getExecutionContext()

This code does not seem to have an example using the TestActivityEnvironment. However, I referrred to your previous link and implemented a test as mentioned in the comments.
testEnvironment.registerActivitiesImplementations(new ActivityImpl());

  • TestActivity activity = testEnvironment.newActivityStub(TestActivity.class);
    
  • String result = activity.activity1("input1");
    
  • assertEquals("TestActivity::activity1-input1", result);
    

But test fails with below exception. Can you please point me to an example using TestActivityEnvironment to test an Activity containing ActivityExecutionContext

org.junit.jupiter.api.extension.ParameterResolutionException: No ParameterResolver registered for parameter [io.temporal.testing.TestActivityEnvironment arg0]

Sorry, posted the wrong link.

Thanks. However, I found the below file very useful. Posting for the benefit of others if they also want to test their Activites with AcitivtyExecutionContext in their Activities.

I was testing the Heartbeat timeout scenario using TestWorkflowEnvironment in my junit and another way(case2) below.
Observed that in case1, I could not catch the AcitivityNotExistsException whereas case2 I could see this exception. Can you please see the below stack trace and advise why I dont see the Exception although both seem to have message in the stacktrace…

Case 1: (Using TestWorkflowEnvironment)

19:49:01.147 [Activity Executor taskQueue=“WorkflowTest-testActivityImpl(TestWorkflowEnvironment, Worker, GreetingWorkflow)-[engine:junit-jupiter]//[$GreetingWorkflow)]”, namespace=“UnitTest”: 1] DEBUG io.temporal.internal.retryer.GrpcRetryer - Final exception, throwing
io.grpc.StatusRuntimeException: NOT_FOUND: invalid activityID or activity already timed out or invoking workflow is completed
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:271)
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:252)
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:165)
at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.respondActivityTaskCompleted(WorkflowServiceGrpc.java:3840)
at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.lambda$sendReply$0(ActivityWorker.java:304)
at io.temporal.internal.retryer.GrpcRetryer.lambda$retry$0(GrpcRetryer.java:52)
at io.temporal.internal.retryer.GrpcSyncRetryer.retry(GrpcSyncRetryer.java:69)
at io.temporal.internal.retryer.GrpcRetryer.retryWithResult(GrpcRetryer.java:60)
at io.temporal.internal.retryer.GrpcRetryer.retry(GrpcRetryer.java:50)
at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.sendReply(ActivityWorker.java:299)
at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handleActivity(ActivityWorker.java:253)
at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:207)
at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:180)
at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:103)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
19:49:01.149 [Activity Executor taskQueue=“WorkflowTest-testActivityImpl(TestWorkflowEnvironment, Worker, GreetingWorkflow)-[engine:junit-jupiter]//[$GreetingWorkflow)]”, namespace=“UnitTest”: 1] DEBUG io.temporal.internal.worker.ActivityWorker - Failure during reporting of activity result to the server. ActivityId = 38f8cee6-51a5-3f42-98cd-82924b142618, ActivityType = greet, WorkflowId=7632902d-13e9-4cb6-9db3-1709e04e08c9, WorkflowType=GreetingWorkflow, RunId=c38be44a-58b8-4c8c-be3a-e0cfd2039834, ActivityResult=Result{activityId=‘38f8cee6-51a5-3f42-98cd-82924b142618’, taskCompleted=result {
payloads {
metadata {
key: “encoding”
value: “json/plain”
}
data: “"Hello World!"”
}
}
, taskFailed=null, taskCanceled=null}
io.grpc.StatusRuntimeException: NOT_FOUND: invalid activityID or activity already timed out or invoking workflow is completed
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:271)
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:252)
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:165)
at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.respondActivityTaskCompleted(WorkflowServiceGrpc.java:3840)
at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.lambda$sendReply$0(ActivityWorker.java:304)
at io.temporal.internal.retryer.GrpcRetryer.lambda$retry$0(GrpcRetryer.java:52)
at io.temporal.internal.retryer.GrpcSyncRetryer.retry(GrpcSyncRetryer.java:69)
at io.temporal.internal.retryer.GrpcRetryer.retryWithResult(GrpcRetryer.java:60)
at io.temporal.internal.retryer.GrpcRetryer.retry(GrpcRetryer.java:50)
at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.sendReply(ActivityWorker.java:299)
at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handleActivity(ActivityWorker.java:253)
at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:207)
at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:180)
at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:103)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)


Case 2: (Actual workflow)

2023-05-19 17:09:15.377 ERROR 584 — [ce=“payment”: 1] c.d.c.p.t.m.a.c.NormalizationImpl : **** ActivityNotExistsException on Heartbeat HeartBeatDetail{currentCount=4} while executing task number 4 Exception is : {}

io.temporal.client.ActivityNotExistsException: WorkflowId=f8d5536c-ee14-48f4-9fa7-354bb6c86d3e, RunId=b68b069a-1923-433c-b6ad-04feeef963fa, ActivityType=NormalizeToSDP, ActivityId=3e6e7409-aa1c-372f-9289-1a307532bba2
at io.temporal.internal.activity.HeartbeatContextImpl.sendHeartbeatRequest(HeartbeatContextImpl.java:200) ~[temporal-sdk-1.18.1.jar:na]
at io.temporal.internal.activity.HeartbeatContextImpl.doHeartBeatLocked(HeartbeatContextImpl.java:144) ~[temporal-sdk-1.18.1.jar:na]
at io.temporal.internal.activity.HeartbeatContextImpl.lambda$scheduleNextHeartbeatLocked$0(HeartbeatContextImpl.java:167) ~[temporal-sdk-1.18.1.jar:na]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Activity is in INITIATED state
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:271) ~[grpc-stub-1.52.1.jar:1.52.1]
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:252) ~[grpc-stub-1.52.1.jar:1.52.1]
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:165) ~[grpc-stub-1.52.1.jar:1.52.1]
at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.recordActivityTaskHeartbeat(WorkflowServiceGrpc.java:3813) ~[temporal-serviceclient-1.18.1.jar:na]
at io.temporal.internal.client.ActivityClientHelper.sendHeartbeatRequest(ActivityClientHelper.java:67) ~[temporal-sdk-1.18.1.jar:na]
at io.temporal.internal.activity.HeartbeatContextImpl.sendHeartbeatRequest(HeartbeatContextImpl.java:185) ~[temporal-sdk-1.18.1.jar:na]
… 8 common frames omitted

2023-05-19 17:09:15.379 WARN 584 — [ce=“payment”: 1] tivityTaskExecutors$ActivityTaskExecutor : Activity failure. ActivityId=3e6e7409-aa1c-372f-9289-1a307532bba2, activityType=NormalizeToSDP, attempt=2

io.temporal.client.ActivityNotExistsException: WorkflowId=f8d5536c-ee14-48f4-9fa7-354bb6c86d3e, RunId=b68b069a-1923-433c-b6ad-04feeef963fa, ActivityType=NormalizeToSDP, ActivityId=3e6e7409-aa1c-372f-9289-1a307532bba2
at io.temporal.internal.activity.HeartbeatContextImpl.sendHeartbeatRequest(HeartbeatContextImpl.java:200) ~[temporal-sdk-1.18.1.jar:na]
at io.temporal.internal.activity.HeartbeatContextImpl.doHeartBeatLocked(HeartbeatContextImpl.java:144) ~[temporal-sdk-1.18.1.jar:na]
at io.temporal.internal.activity.HeartbeatContextImpl.lambda$scheduleNextHeartbeatLocked$0(HeartbeatContextImpl.java:167) ~[temporal-sdk-1.18.1.jar:na]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Activity is in INITIATED state
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:271) ~[grpc-stub-1.52.1.jar:1.52.1]
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:252) ~[grpc-stub-1.52.1.jar:1.52.1]
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:165) ~[grpc-stub-1.52.1.jar:1.52.1]
at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.recordActivityTaskHeartbeat(WorkflowServiceGrpc.java:3813) ~[temporal-serviceclient-1.18.1.jar:na]
at io.temporal.internal.client.ActivityClientHelper.sendHeartbeatRequest(ActivityClientHelper.java:67) ~[temporal-sdk-1.18.1.jar:na]
at io.temporal.internal.activity.HeartbeatContextImpl.sendHeartbeatRequest(HeartbeatContextImpl.java:185) ~[temporal-sdk-1.18.1.jar:na]
… 8 common frames omitted

2023-05-19 17:09:15.384  WARN 584 --- [ce="payment": 1] i.t.internal.worker.ActivityWorker       : Failure during reporting of activity result to the server. ActivityId = 3e6e7409-aa1c-372f-9289-1a307532bba2, ActivityType = NormalizeToSDP, WorkflowId=f8d5536c-ee14-48f4-9fa7-354bb6c86d3e, WorkflowType=ClientFileWorkflow, RunId=b68b069a-1923-433c-b6ad-04feeef963fa

io.grpc.StatusRuntimeException: NOT_FOUND: invalid activityID or activity already timed out or invoking workflow is completed
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:271) ~[grpc-stub-1.52.1.jar:1.52.1]
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:252) ~[grpc-stub-1.52.1.jar:1.52.1]
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:165) ~[grpc-stub-1.52.1.jar:1.52.1]
at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.respondActivityTaskFailed(WorkflowServiceGrpc.java:3866) ~[temporal-serviceclient-1.18.1.jar:na]
at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.lambda$sendReply$1(ActivityWorker.java:321) ~[temporal-sdk-1.18.1.jar:na]
at io.temporal.internal.retryer.GrpcRetryer.lambda$retry$0(GrpcRetryer.java:52) ~[temporal-serviceclient-1.18.1.jar:na]
at io.temporal.internal.retryer.GrpcSyncRetryer.retry(GrpcSyncRetryer.java:69) ~[temporal-serviceclient-1.18.1.jar:na]
at io.temporal.internal.retryer.GrpcRetryer.retryWithResult(GrpcRetryer.java:60) ~[temporal-serviceclient-1.18.1.jar:na]
at io.temporal.internal.retryer.GrpcRetryer.retry(GrpcRetryer.java:50) ~[temporal-serviceclient-1.18.1.jar:na]
at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.sendReply(ActivityWorker.java:316) ~[temporal-sdk-1.18.1.jar:na]
at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handleActivity(ActivityWorker.java:253) ~[temporal-sdk-1.18.1.jar:na]
at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:207) ~[temporal-sdk-1.18.1.jar:na]
at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:180) ~[temporal-sdk-1.18.1.jar:na]
at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:103) ~[temporal-sdk-1.18.1.jar:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]