Hy, my goal is to patch the function I am calling inside a wf, this function is not an activity, just a regular async function. My test setup is similar to what you guys have in examples.
When I am trying to patch this func like this:
async def test_wf(client: Client):
) as mock_execute_something:
mock_execute_something.return_value = 'invalid_response'
task_queue_name = config.TEMPORAL_PYTHON_TASK_QUEUE
worker = Worker(
async with worker:
handle = await client.start_workflow(
but the function is not patched for some reason.
Workflows are run in a sandbox by default and workflow testing is meant to use the
WorkflowTestingEnvironment (see this part of the README). Workflows should not have workflow code mocked during testing because it works together to build history (but you can replace activities, the only external thing a workflow calls). What is the use case for replacing this workflow code?
I just need to test a case when the function I want to mock returns bad data.
Returns bad data as a result of what? As a result of an activity call inside that function? Then you can register a mock activity with your test worker that returns the bad data. Workflow code should usually be tested holistically for determinism reasons so you don’t want to mock workflow code, you want to mock external pieces the code may call.