How to mock child workflow calls in Python SDK?

Hello,

I’m looking for some advice regarding how to properly test this case via Python SDK:
I have an orchestrator workflow and some action-workflows that it dispatches as short-lived child workflows. But I can’t seem to find an easy way to mock those child workflows invocations! I.e. I want to simply ensure, that the child workflow is started at an expected moment by Parent’s logic, but I don’t want to “sense” it through some activity invocations done by Child itself as it muddles tests considerably:

@workflow.defn
class ParentWorkflow:
    @workflow.run
    async def run(self, params: ParamsV1) -> None:
        await self._run_child()  # <---- I want to check that this happens

    async def _run_child(self):
        return await workflow.execute_child_workflow(
            ChildWorkflow.run,
            ChildWorkflow.Params(
                some="param"
            ),
            parent_close_policy=workflow.ParentClosePolicy.REQUEST_CANCEL,
        )

Essentially, I want to simply check, that _run_child is invoked inside the parent, but I’m hesitant to subclass/patch/… the Parent workflow as I could have done this for a usual Python code because I can’t wrap my head around how this is supposed to interact with the sandbox, the real worker and so on…

Any help would be greatly appreciated!

1 Like