How to get status of workflow in other workflow? (incl. handling forced terminations)

:wave: How to get workflow status in another workflow?

I have a SeamphoreWorkflow, which has the purpose of ensuring only 1 workflow can run simultaneously based on my business logic. It contains a request_semaphore signal with args workflow_id and run_id. A separate workflow sends a signal to this SemaphoreWorkflow, it adds the request to the queue and then sends a grant_semaphore signal back to the requesting workflow when it is ready.
Anyway, it has an infinite loop supposed to check whether the execution of the workflow, currently having the semaphore, has ended (completed, terminated, cancelled, etc.), so that it can take away the semaphore.

Am I supposed to use DescribeWorkflowExecutionRequest?

ChatGPT is telling me this example:

class MyActivity:
    @activity.method
    async def describe_workflow(self, workflow_id: str, run_id: str):
        client = ServiceClient.create('localhost', 7233)
        request = DescribeWorkflowExecutionRequest(
            namespace="default",
            execution=WorkflowExecution(
                workflow_id=workflow_id,
                run_id=run_id,
            ),
        )
        response = await client.DescribeWorkflowExecution(request=request)
        return response

However, creating this client instance feels very sketchy; considering I may have a test environment, started with WorkflowEnvironment.start_local and not even know the port. I could possibly move client to a class variable though.

I could consider a custom status variable & query inside the workflows but how would I handle cases when the workflow is force terminated, e.g. with Temporal UI? I don’t find any on_end callback I can put into workflow, which would be called at the end of any outcome (finished, cancelled, terminated).

More code for context:

@activity.defn
async def check_has_workflow_ended(workflow_id: str, run_id: str) -> bool:
    raise NotImplementedError()


@workflow.defn
class AgentSemaphoreWorkflow:
    """Purpose of this workflow is to ensure an agent is not executing more than one workflow at a time"""

    class AgentTaskIdentifier(NamedTuple):
        workflow_id: str
        run_id: str

    _task_queue: Queue[AgentTaskIdentifier] = Queue()
    _is_busy: bool = False

    @workflow.run
    async def run(self, agent_id: str) -> None:
        workflow.upsert_search_attributes({"agent_id": ["agent_id"]})
        while True:
            # Wait for task
            if self._task_queue.qsize() == 0:
                await asyncio.sleep(5)
                continue
            task = self._task_queue.get(block=False)
            self._is_busy = True
            workflow_id, run_id = task.workflow_id, task.run_id
            workflow.upsert_search_attributes({"is_busy": [True]})

            # Grant semaphore to task workflow
            workflow.logger.debug(
                f"Granting agent execution semaphore to workflow ID {workflow_id}, run ID {run_id}"
            )
            task_handle = workflow.get_external_workflow_handle_for(
                AgentSemaphoreWorkflow.run, workflow_id="workflow-id"
            )
            await task_handle.signal("grant_semaphore")

            # Wait for task to end
            while True:
                has_ended = await workflow.execute_activity_method(
                    check_has_workflow_ended, args=[workflow_id, run_id]
                )
                if has_ended:
                    workflow.logger.debug(
                        f"Releasing agent execution semaphore from workflow ID {workflow_id}, run ID {run_id}"
                    )
                    self._is_busy = False
                    workflow.upsert_search_attributes({"is_busy": [False]})
                    break
                await asyncio.sleep(5)

    @workflow.signal
    async def request_semaphore(self, workflow_id: str, run_id: str) -> None:
        self._task_queue.put(
            AgentSemaphoreWorkflow.AgentTaskIdentifier(
                workflow_id=workflow_id, run_id=run_id
            )
        )

    @workflow.query
    def is_busy(self) -> bool:
        return self._is_busy


@workflow.defn
class SampleAgentTask:
    _received_agent_semaphore: bool = False

    @workflow.signal
    async def grant_semaphore(self) -> None:
        self._received_agent_semaphore = True

    @workflow.run
    async def run(self, agent_id: str) -> None:
        # Request semaphore
        agent_workflow_handle = workflow.get_external_workflow_handle_for(
            AgentSemaphoreWorkflow.run, workflow_id="workflow-id"
        )
        await agent_workflow_handle.signal(
            "request_semaphore",
            args=[workflow.info().workflow_id, workflow.info().run_id],
        )

        # Wait for semaphore
        while not self._received_agent_semaphore:
            await asyncio.sleep(1)

        # Now we can begin
        workflow.logger.info(
            f"Running workflow with scraping configuration id = {agent_id}"
        )

You can just use the Temporal client that you pass in when creating the activity class just before registering, e.g.

class Activities:
    def __init__(self, client: temporalio.client.Client) -> None:
        self._client = client

    @activity.defn
    async def is_workflow_running(self, workflow_id: str) -> bool:
        # May want to catch not-found RPCError here if it's not required to exist
        desc = await self._client.get_workflow_handle(workflow_id).describe()
        return desc.status == temporalio.client.WorkflowExecutionStatus.RUNNING

Correct, there is no workflow-side way for a workflow to reliably report that it has completed, because some completions occur server side (timeouts and termination). So you have to poll for status. If you needed to check the status of multiple you could use the listing feature on the client.

Also, since you’re polling regularly, consider using the infrequent polling pattern here to avoid unnecessarily increasing up history waiting for workflow complete. (so you’d call the activity wait_for_workflow_complete and raise an error while still running with fixed backoff)

1 Like