Test parent workflow with child workflow which is waiting for a signal

I am trying to write tests for a workflow that has a child workflow, the child workflow is waiting for a signal to resolve.
The thing is that I need to send this signal, I am trying to get the child workflow handle with:

handle_child = client.get_workflow_handle("child_workflow_id")

The error:

temporalio.service.RPCError: sql: no rows in result set

But seems like I am doing it in a moment of time when the parent is running but the child is not, so I am waiting for the parent to have the status “Running”:

WorkflowExecutionStatus.RUNNING == (await handle_parent.describe()).status

but how I can check if the child workflow started to get the handler properly?


async def test_workflow(client: Client):
    task_queue_name = str(uuid.uuid4())
    expectedResult = "done"

    async with Worker(
        client,
        task_queue=task_queue_name,
        workflows=[ParentWorkflow, ChildWorkflow, SignalWorkflow],
        activities=[send_front_email],
    ):

        handle_parent = await client.start_workflow(
            ParentWorkflow.run,
            params,
            id="some-id",
            task_queue=config.TASK_QUEUE,
        )
        WorkflowExecutionStatus.RUNNING == (await handle_parent.describe()).status
        # How to get the handle for child workflow

        handle_child = client.get_workflow_handle(child_workflow_id)

        await handle_pr.signal("FORM_SUBMITTED", json.dumps(dataclasses.asdict(signalData)))

        assert await handle_parent.result() == expectedResult

It’s not a problem with the code it works fine when I run the flow manually, but it’s not working properly in tests.

1 Like

It may be coincidence in non-test run that your handle started the child before you tried to signal the child.

You can check that the child is running the same way you check the parent is running. But if you don’t want to poll describe(), you can instead poll the parent workflow via query and have it tell you whether it started the child.

But the better approach is probably to signal the parent from the client and have the parent delegate the signal down to the child workflow it knows it has or hasn’t started.

Do I have to implement query on my parent workflow to request a status of the exact child workflow? Or there is builtin query I can pull to get the status of child workflows and activities?
I guess maybe describe () will return some info about child workflow?

Why

client.list_workflows()

have only parent workflow inside?

If workflow with ID=X does not exist client will return an empty handle or will fail?

No, you can also request status via top-level client workflow handle describe() like any other workflow

Child workflows are (mostly) just like any workflows to the client and you can access them as such

Exactly

Probably because your parent workflow hasn’t run far enough to start the child yet.

Getting a handle from a client does not communicate with the server. It is not async, it just creates the handle for making workflow-specific calls. describe() will fail if the workflow does not exist.

sleep(30) doesn’t help to wait for a child, the output is the same only parent is there.

I wrote a quick example to check this:

import asyncio
import logging

from temporalio import workflow
from temporalio.worker import Worker
from temporalio.testing import WorkflowEnvironment


@workflow.defn
class MyWorkflowParent:
    @workflow.run
    async def run(self) -> None:
        workflow.logger.info("Running parent workflow")
        await workflow.execute_child_workflow(
            MyWorkflowChild.run,
            id="my-child-workflow",
        )
        workflow.logger.info("Completing parent workflow")


@workflow.defn
class MyWorkflowChild:
    def __init__(self) -> None:
        self.got_signal = False

    @workflow.run
    async def run(self) -> None:
        workflow.logger.info("Running child workflow, waiting on signal")
        await workflow.wait_condition(lambda: self.got_signal)
        workflow.logger.info("Completing child workflow")

    @workflow.signal
    def signal(self) -> None:
        self.got_signal = True


async def main():
    # Uncomment the line below to see logging
    logging.basicConfig(level=logging.INFO)

    logging.info("Starting local server")
    async with await WorkflowEnvironment.start_local() as env:
        logger = logging.getLogger("root")
        logger.info("logging in main", extra={"foo": "in main"})

        logging.info("Starting worker")
        async with Worker(
            env.client,
            task_queue="my-task-queue",
            workflows=[MyWorkflowParent, MyWorkflowChild],
        ):
            logging.info("Starting parent")
            handle = await env.client.start_workflow(
                MyWorkflowParent.run,
                id="my-parent-workflow",
                task_queue="my-task-queue",
            )

            logging.info("Waiting 5 seconds then listing workflows")
            await asyncio.sleep(5)
            async for workflow in env.client.list_workflows():
                logging.info("List returned workflow: %s" % workflow.id)

            logging.info("Signalling child")
            child_handle = env.client.get_workflow_handle_for(
                MyWorkflowChild.run,
                "my-child-workflow",
            )
            await child_handle.signal(MyWorkflowChild.signal)

            logging.info("Waiting 5 seconds then confirming workflow complete")
            await asyncio.sleep(5)
            await handle.result()
            logging.info("Parent workflow complete")


if __name__ == "__main__":
    asyncio.run(main())

This logs the following output:

INFO:root:Starting local server
INFO:root:logging in main
INFO:root:Starting worker
INFO:root:Starting parent
INFO:root:Waiting 5 seconds then listing workflows
INFO:temporalio.workflow:Running parent workflow ({'attempt': 1, 'namespace': 'default', 'run_id': 'dd58c921-4e2e-4550-a178-6b516c9365f4', 'task_queue': 'my-task-queue', 'workflow_id': 'my-parent-workflow', 'workflow_type': 'MyWorkflowParent'})
INFO:temporalio.workflow:Running child workflow, waiting on signal ({'attempt': 1, 'namespace': 'default', 'run_id': '4b0297b1-e819-404c-bfd8-1581ba4849d6', 'task_queue': 'my-task-queue', 'workflow_id': 'my-child-workflow', 'workflow_type': 'MyWorkflowChild'})
INFO:root:List returned workflow: my-child-workflow
INFO:root:List returned workflow: my-parent-workflow
INFO:root:Signalling child
INFO:root:Waiting 5 seconds then confirming workflow complete
INFO:temporalio.workflow:Completing child workflow ({'attempt': 1, 'namespace': 'default', 'run_id': '4b0297b1-e819-404c-bfd8-1581ba4849d6', 'task_queue': 'my-task-queue', 'workflow_id': 'my-child-workflow', 'workflow_type': 'MyWorkflowChild'})
INFO:temporalio.workflow:Completing parent workflow ({'attempt': 1, 'namespace': 'default', 'run_id': 'dd58c921-4e2e-4550-a178-6b516c9365f4', 'task_queue': 'my-task-queue', 'workflow_id': 'my-parent-workflow', 'workflow_type': 'MyWorkflowParent'})
INFO:root:Parent workflow complete
INFO:temporalio.worker._worker:Beginning worker shutdown, will wait 0:00:00 before cancelling activities

Notice how the child is returned from list as expected

1 Like