WorkflowEnvironment+Worker pytest parametrized fixture not working

Hello, I’m trying to creating the following parametrized pytest fixture for automatically starting the Temporal WorkflowEnvironment and the Worker:

# conftest.py

@dataclass
class TemporalEnv:
    client: Client
    task_queue_name: str


@pytest_asyncio.fixture
async def temporal_env(request: pytest.FixtureRequest) -> AsyncGenerator[TemporalEnv]:
    async with await WorkflowEnvironment.start_time_skipping() as workflow_env:
        task_queue_name = str(uuid.uuid4())
        async with Worker(
            workflow_env.client,
            task_queue=task_queue_name,
            workflows=request.param.workflows,
            activities=request.param.activities):
            yield TemporalEnv(
                client=workflow_env.client, task_queue_name=task_queue_name
            )

The idea is that it could be used like this:

# test_example.py

@activity.defn(name="my_activity")
async def my_activity_mocked() -> bool:
    return True


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "temporal_env",
    [
        {
            "workflows": [MyWorkflow],
            "activities": [my_activity_mocked],
        }
    ],
    indirect=True,
)
async def test_example(temporal_env):
    assert True == await temporal_env.client.execute_workflow(
        MyWorkflow.run,
        task_queue=temporal_env.task_queue_name,
    )

…to save some boilerplate with starting the Worker in each test (of course it is debatable whether this is worth it).

However, it is not working: the test hangs indefinitely. I have tried putting different scopes on the fixture, but it didn’t help. The test suite does reach the test_example but then it hangs on execute_workflow. Probably, there is something weird going one with the double async context managers in the fixture and the parametrization — seems like the client isn’t connecting to the Temporal (test) Server?

Any help/examples appreciated :folded_hands: I’ve looked through GitHub - temporalio/samples-python: Samples for working with the Temporal Python SDK

Okay, I needed to set

@pytest_asyncio.fixture(loop_scope="function")

because I had asyncio_default_fixture_loop_scope = “session” in my pytest.ini.

Leaving this post up in case this fixture is useful to anyone.