Integration Testing For Java-SDK

Hi Team,

I have few questions about integration testing of my temporal workflow.

My temporal workflow method has 4 activities (all blocking) with 2 activities getting data from 2 different endpoints via RPC and remaining 2 activities store data from these endpoints inside the db.
This workflow instance is started using the temporal client from a service class (this class resides inside the same service that hosts the workflow).

I was able to unit test my temporal workflow method successfully by using the TestWorkflowExtension by mocking all of the 4 activities inside my activity implementation class.

Now for integration testing, I want to test the complete interaction from the service class that creates an instance of the workflow i.e I want to assert on the data stored inside the db as a part of this test. I have registered an actual instance of my activity implementation class (unlike in unit testing where I registered a mock impl with worker itself). I have made sure to mock the response of the 2 RPCs calls made from 2 of the activities.
I tried using TestWorkflowExtension for this but I see that the worker from this TestWorkflowEnvironment only polls workflow tasks and my activity tasks are not starting. I guess this is expected behavior when using this TestWorkflowExtension?

I have gone through many of the discussions here and the github java samples but I do not see anything tied to something like what I want above. Most of the tests are mocking all of the activities in a workflow.

Is there a way to truly test the full interaction without mocking all the activities? Also when I look at the definition of integration testing here it does say mock activities and start workflow using the client.

Any help will be really appreciated.

Note that TestWorkflowExtension doesn’t know anything about mocking. From its point of view, you register an activity implementation. So the non-mocked activity should work fine with it.

Look at TripBookingWorkflowTest. It registers normal activity implementations here.

I looked at the example you pointed out. There is a subtle difference between what I am trying to do and what the example is doing. The example is creating the workflow instance using the client in the same task queue as the worker from the test environment.
In my case the client creates the workflow instance from the service class in a specific task queue called expense-category but I see the worker that gets created in the TestWorkFlowEnvironment is not this and hence I do not see any of my workflow tasks being polled.
I tried setting the test environment’s worker to use this queue but still same result but just that it retried the workflow task twice. Not sure what I am missing. I will paste snippets of what I am doing below.

I see. It looks like we need to extend TestWorkflowExtension to support specifying your task queue name.
For now you have to use TestWorkflowEnvironment directly and create worker for your task queue explicitly. Here is a test that uses this approach.

Please file an issue to get this added to the TestWorkflowExtension.

Workflow Method

override fun automateExpenseCategory(expense: ExpenseInputDTO) {
        logger.info("automating category for expense: ${expense.expenseId}")
  // RPC
  val mcc = activity.getMccForExpense(GetMccForExpenseInputDTO(expense.accountId,
  UUID.fromString(expense.transactionId)))
  activity.insertMccForExpense(InsertMccForExpenseInputDTO(expense.expenseId, expense.accountId, mcc,
  Instant.ofEpochMilli(Workflow.currentTimeMillis())))
  val categoryId = activity.getCategoriesForExpense(GetCategoriesForExpenseInputDTO(expense.expenseId,
  expense.accountId, mcc)).first().categoryId
  //RPC
  if (activity.setCategoryForExpense(SetCategoryForExpenseInputDTO(expense.accountId, expense.expenseId,
    categoryId))) {
  activity.insertExpenseAutomationEvent(ExpenseAutomationEventDTO(expense.expenseId, expense.accountId,
    FieldName.EXPENSE_CATEGORY, categoryId.toString(), Workflow.DEFAULT_VERSION.toString(),
    Instant.ofEpochMilli(Workflow.currentTimeMillis())))
        }
    }

Class creating the workflow Instance

class ExpenseCategorizationServiceImpl(
    private val expenseCategoryService: ExpenseCategoryService,
    private val workflowClient: WorkflowClient
) : ExpenseCategorizationService {
    override fun automateExpense(expense: ExpenseInputDTO) {
 
val expenseCategoryWorkflowRequest = workflowClient.newWorkflowStub(
    ExpenseCategoryWorkflow::class.java,
    WorkflowOptions.newBuilder().setTaskQueue("expense-category")
        .setWorkflowId("expense-category-${expense.expenseId}")
        .setWorkflowExecutionTimeout(Duration.ZERO) // execution doesn't timeout, but fails on error
        .setWorkflowRunTimeout(Duration.ZERO)
            // prevent running the workflow for a same expense twice
        .setWorkflowIdReusePolicy(WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY)
        .build()
)
WorkflowClient.execute(expenseCategoryWorkflowRequest::automateExpenseCategory, expense)
      }
}

Test Snippet


@RegisterExtension
@JvmField
val testWorkflowExtension: TestWorkflowExtension = TestWorkflowExtension.newBuilder()
    .setWorkflowTypes(ExpenseCategoryWorkflowImpl::class.java)
    .setDoNotStart(true)
    .build()

  @Test
  fun `should automate expense category successfully`
          (testEnv: TestWorkflowEnvironment, worker: Worker, workflow: ExpenseCategoryWorkflow) {
    
// mocking the RPC calls from 2 of the activities            
   whenever(transactionViewsServiceRpc.getTransaction(any(), any())).thenReturn(cardTransaction)
  whenever(setDefaultCategoryRpc.setExpenseCategory(any())).thenReturn(true)

  val worker = testEnv.newWorker("expense-category")
  worker.registerWorkflowImplementationTypes(ExpenseCategoryWorkflowImpl::class.java)
  worker.registerActivitiesImplementations(expenseCategoryActivityImpl) // instance of activity impl
  testEnv.start()

assertDoesNotThrow { expenseCategorizationServiceImpl.automateExpense(expenseInputDTO) }
// this is the assertion on checking the data from db.
assertThat(expenseCategoryMapper.getMccAndCategoryForExpense(TestFixtures.expenseId)?.mcc)
    .isEqualTo(TestFixtures.mcc)
              }
          }
      }

Is it a question or a working example?

Wanted to post what I have. I just saw your comment about not being able to set my task queue in TestWorkflowExtension and hence the workers aren’t polling from the right queue. In fact I am creating a new worker with my task queue from the TestEnvironment as you see in the code snippet and it still doesnt work.

Just to confirm, are you saying I do not use TestWorkflowExtension at all and just use TestWorkflowEnvironment like the sample you posted? The reason I ask is, if you look at my test snippet, I am setting my task queue on the worker I grab from TestWorkflowEnvironment but difference is that my test environment is managed by the TestWorkflowEnvironment. Is that where the issue is?

I think your approach should work as well.

Ok I think I figured the issue. Given that I was using WorkflowClient.execute(expenseCategoryWorkflowRequest::automateExpenseCategory, expense) which is async, my test code moved on and went to assertion. If my workflow didn’t complete by then my assertion was failing. I put a thread sleep in my test code before assertion (after assertDoesNotThrow { expenseCategorizationServiceImpl.automateExpense(expenseInputDTO) } in my test snippet) and then my assertions worked. I do not want to put thread.sleep in my integration test before assertions. Is there a cleaner way to wait for my service calling the workflow which is async to finish before asserting? Like can I make any blocking call on the testEnv object before asserting?

testEnv.sleep is the way to go. It supports time skipping. So even if your workflow is long-running, you can still test it.

Sweet. Thank you Maxim. I guess testEnv.sleep more or less becomes integral whenever we test a workflow that is async or is long running in nature.