How to bypass activity retries or schedule_to_close_timeout when testing workflows

Hello! I am attempting to test the workflow below to ensure that the compensation activity (implemented with the saga design pattern in mind) is executed correctly. The issue here is that both the test_activity and compensation_activity have retry policies of 10 maximum attempts and a timeout of 60 minutes before reaching an un-retryable state, causing the workflow tests to hang. I have tried various methods of bypassing the activity retries or skipping the time spent between retries with no success.

class TestActivities:
    test_activity_name = 'test_activity'
    compensation_activity_name = 'compensation_activity'

    @activity.defn(name=test_activity_name)
    async def test_activity() -> Optional[dict]:
        ...

    @activity.defn(name=compensation_activity_name)
    async def compensation_activity() -> None:
        ...


@workflow.defn
class TestWorkflow:

    @workflow.run
    async def run(self, args: WorkflowArgs):
        workflow_compensations = Compensations()

        execute_activity_args = {
            "start_to_close_timeout": timedelta(seconds=10),
            "schedule_to_close_timeout": timedelta(minutes=60),
            "retry_policy": RetryPolicy(maximum_attempts=10)
        }

        workflow_compensations.add(TestActivities.compensation_activity, **execute_activity_args)

        try:
            await workflow.execute_activity(TestActivities.test_activity, **execute_activity_args)

        except (ActivityError, Exception) as exc:
            if isinstance(exc, ActivityError) and exc.retry_state != RetryState.IN_PROGRESS:
                workflow_compensations.compensate()
            else:
                workflow_compensations.compensate()

Failing test:

@activity.defn(name=TestActivities.activity_1_name)
async def stubbed_activity_failure() -> Optional[dict]:
    raise RuntimeError("Update failed.")


@activity.defn(name=TestActivities.compensation_activity_name)
async def stubbed_compensation_activity() -> None:
    return


@pytest.mark.asyncio
async def test_workflow(workflow_args: WorkflowArgs):
    task_queue_name = str(uuid.uuid4())

    async with await WorkflowEnvironment.start_time_skipping() as test_env:
        async with Worker(
            test_env.client,
            task_queue=task_queue_name,
            workflows=[TestWorkflow],
            activities=[stubbed_activity_failure, stubbed_compensation_activity],
        ):
            workflow_handler = await test_env.client.start_workflow(
                TestWorkflow.run,
                workflow_args,
                id=str(uuid.uuid4()),
                task_queue=task_queue_name
            )
            # await test_env.sleep(duration=timedelta(minutes=59, seconds=30))
            await workflow_handler.result()

            async for event in workflow_handler.fetch_history_events():
                if event.event_type == EventType.EVENT_TYPE_ACTIVITY_TASK_FAILED:
                    assert event.activity_task_failed_event_attributes.failure.activity_failure_info.activity_type.name == TestActivities.test_activity_name

The stubbed activity stubbed_activity_failure raises an exception, but the workflow test still hangs until the timeout or retry count is reached. I have tried mocking the execute_activity_args variable to have a maximum retry count of 1 until I found that mock.patch doesn’t work inside workflows. I have also attempted test_env.sleep (commented out in the test), but get the following exception:

self = <temporalio.service._BridgeServiceClient object at 0x12837eb20>
rpc = 'unlock_time_skipping_with_sleep', req = duration {
  seconds: 2400
}

resp_type = <class 'temporal.api.testservice.v1.request_response_pb2.SleepResponse'>

    async def _rpc_call(
        self,
        rpc: str,
        req: google.protobuf.message.Message,
        resp_type: Type[ServiceResponse],
        *,
        service: str,
        retry: bool,
        metadata: Mapping[str, str],
        timeout: Optional[timedelta],
    ) -> ServiceResponse:
        global LOG_PROTOS
        if LOG_PROTOS:
            logger.debug("Service %s request to %s: %s", service, rpc, req)
        try:
            client = await self._connected_client()
            resp = await client.call(
                service=service,
                rpc=rpc,
                req=req,
                resp_type=resp_type,
                retry=retry,
                metadata=metadata,
                timeout=timeout,
            )
            if LOG_PROTOS:
                logger.debug("Service %s response from %s: %s", service, rpc, resp)
            return resp
        except temporalio.bridge.client.RPCError as err:
            # Intentionally swallowing the cause instead of using "from"
            status, message, details = err.args
>           raise RPCError(message, RPCStatusCode(status), details)
E           temporalio.service.RPCError: Timeout expired

TLDR; I cannot figure out how to bypass the RetryPolicy(maximum_attempts=10) or schedule_to_close_timeout=timedelta(minutes=60) args passed to execute_activity when I am attempting to test a workflow using the async with await WorkflowEnvironment.start_time_skipping() as env context.

Is there something here I am missing? Any help would be greatly appreciated. Thanks!

To bypass retries in your stubbed out activity, raise temporalio.exceptions.ApplicationError("Intentional error", non_retryable=True)

1 Like