Hello! I am attempting to test the workflow below to ensure that the compensation activity (implemented with the saga design pattern in mind) is executed correctly. The issue here is that both the test_activity
and compensation_activity
have retry policies of 10 maximum attempts and a timeout of 60 minutes before reaching an un-retryable state, causing the workflow tests to hang. I have tried various methods of bypassing the activity retries or skipping the time spent between retries with no success.
class TestActivities:
test_activity_name = 'test_activity'
compensation_activity_name = 'compensation_activity'
@activity.defn(name=test_activity_name)
async def test_activity() -> Optional[dict]:
...
@activity.defn(name=compensation_activity_name)
async def compensation_activity() -> None:
...
@workflow.defn
class TestWorkflow:
@workflow.run
async def run(self, args: WorkflowArgs):
workflow_compensations = Compensations()
execute_activity_args = {
"start_to_close_timeout": timedelta(seconds=10),
"schedule_to_close_timeout": timedelta(minutes=60),
"retry_policy": RetryPolicy(maximum_attempts=10)
}
workflow_compensations.add(TestActivities.compensation_activity, **execute_activity_args)
try:
await workflow.execute_activity(TestActivities.test_activity, **execute_activity_args)
except (ActivityError, Exception) as exc:
if isinstance(exc, ActivityError) and exc.retry_state != RetryState.IN_PROGRESS:
workflow_compensations.compensate()
else:
workflow_compensations.compensate()
Failing test:
@activity.defn(name=TestActivities.activity_1_name)
async def stubbed_activity_failure() -> Optional[dict]:
raise RuntimeError("Update failed.")
@activity.defn(name=TestActivities.compensation_activity_name)
async def stubbed_compensation_activity() -> None:
return
@pytest.mark.asyncio
async def test_workflow(workflow_args: WorkflowArgs):
task_queue_name = str(uuid.uuid4())
async with await WorkflowEnvironment.start_time_skipping() as test_env:
async with Worker(
test_env.client,
task_queue=task_queue_name,
workflows=[TestWorkflow],
activities=[stubbed_activity_failure, stubbed_compensation_activity],
):
workflow_handler = await test_env.client.start_workflow(
TestWorkflow.run,
workflow_args,
id=str(uuid.uuid4()),
task_queue=task_queue_name
)
# await test_env.sleep(duration=timedelta(minutes=59, seconds=30))
await workflow_handler.result()
async for event in workflow_handler.fetch_history_events():
if event.event_type == EventType.EVENT_TYPE_ACTIVITY_TASK_FAILED:
assert event.activity_task_failed_event_attributes.failure.activity_failure_info.activity_type.name == TestActivities.test_activity_name
The stubbed activity stubbed_activity_failure
raises an exception, but the workflow test still hangs until the timeout or retry count is reached. I have tried mocking the execute_activity_args
variable to have a maximum retry count of 1 until I found that mock.patch doesn’t work inside workflows. I have also attempted test_env.sleep
(commented out in the test), but get the following exception:
self = <temporalio.service._BridgeServiceClient object at 0x12837eb20>
rpc = 'unlock_time_skipping_with_sleep', req = duration {
seconds: 2400
}
resp_type = <class 'temporal.api.testservice.v1.request_response_pb2.SleepResponse'>
async def _rpc_call(
self,
rpc: str,
req: google.protobuf.message.Message,
resp_type: Type[ServiceResponse],
*,
service: str,
retry: bool,
metadata: Mapping[str, str],
timeout: Optional[timedelta],
) -> ServiceResponse:
global LOG_PROTOS
if LOG_PROTOS:
logger.debug("Service %s request to %s: %s", service, rpc, req)
try:
client = await self._connected_client()
resp = await client.call(
service=service,
rpc=rpc,
req=req,
resp_type=resp_type,
retry=retry,
metadata=metadata,
timeout=timeout,
)
if LOG_PROTOS:
logger.debug("Service %s response from %s: %s", service, rpc, resp)
return resp
except temporalio.bridge.client.RPCError as err:
# Intentionally swallowing the cause instead of using "from"
status, message, details = err.args
> raise RPCError(message, RPCStatusCode(status), details)
E temporalio.service.RPCError: Timeout expired
TLDR; I cannot figure out how to bypass the RetryPolicy(maximum_attempts=10)
or schedule_to_close_timeout=timedelta(minutes=60)
args passed to execute_activity
when I am attempting to test a workflow using the async with await WorkflowEnvironment.start_time_skipping() as env
context.
Is there something here I am missing? Any help would be greatly appreciated. Thanks!