Hello,
I’m looking for some advice regarding how to properly test this case via Python SDK:
I have an orchestrator workflow and some action-workflows that it dispatches as short-lived child workflows. But I can’t seem to find an easy way to mock those child workflows invocations! I.e. I want to simply ensure, that the child workflow is started at an expected moment by Parent’s logic, but I don’t want to “sense” it through some activity invocations done by Child itself as it muddles tests considerably:
@workflow.defn
class ParentWorkflow:
@workflow.run
async def run(self, params: ParamsV1) -> None:
await self._run_child() # <---- I want to check that this happens
async def _run_child(self):
return await workflow.execute_child_workflow(
ChildWorkflow.run,
ChildWorkflow.Params(
some="param"
),
parent_close_policy=workflow.ParentClosePolicy.REQUEST_CANCEL,
)
Essentially, I want to simply check, that _run_child is invoked inside the parent, but I’m hesitant to subclass/patch/… the Parent workflow as I could have done this for a usual Python code because I can’t wrap my head around how this is supposed to interact with the sandbox, the real worker and so on…
Any help would be greatly appreciated!