How to get status of workflow in other workflow? (incl. handling forced terminations)

:wave: How to get workflow status in another workflow?

I have a SeamphoreWorkflow, which has the purpose of ensuring only 1 workflow can run simultaneously based on my business logic. It contains a request_semaphore signal with args workflow_id and run_id. A separate workflow sends a signal to this SemaphoreWorkflow, it adds the request to the queue and then sends a grant_semaphore signal back to the requesting workflow when it is ready.
Anyway, it has an infinite loop supposed to check whether the execution of the workflow, currently having the semaphore, has ended (completed, terminated, cancelled, etc.), so that it can take away the semaphore.

Am I supposed to use DescribeWorkflowExecutionRequest?

ChatGPT is telling me this example:

class MyActivity:
    async def describe_workflow(self, workflow_id: str, run_id: str):
        client = ServiceClient.create('localhost', 7233)
        request = DescribeWorkflowExecutionRequest(
        response = await client.DescribeWorkflowExecution(request=request)
        return response

However, creating this client instance feels very sketchy; considering I may have a test environment, started with WorkflowEnvironment.start_local and not even know the port. I could possibly move client to a class variable though.

I could consider a custom status variable & query inside the workflows but how would I handle cases when the workflow is force terminated, e.g. with Temporal UI? I don’t find any on_end callback I can put into workflow, which would be called at the end of any outcome (finished, cancelled, terminated).

More code for context:

async def check_has_workflow_ended(workflow_id: str, run_id: str) -> bool:
    raise NotImplementedError()

class AgentSemaphoreWorkflow:
    """Purpose of this workflow is to ensure an agent is not executing more than one workflow at a time"""

    class AgentTaskIdentifier(NamedTuple):
        workflow_id: str
        run_id: str

    _task_queue: Queue[AgentTaskIdentifier] = Queue()
    _is_busy: bool = False
    async def run(self, agent_id: str) -> None:
        workflow.upsert_search_attributes({"agent_id": ["agent_id"]})
        while True:
            # Wait for task
            if self._task_queue.qsize() == 0:
                await asyncio.sleep(5)
            task = self._task_queue.get(block=False)
            self._is_busy = True
            workflow_id, run_id = task.workflow_id, task.run_id
            workflow.upsert_search_attributes({"is_busy": [True]})

            # Grant semaphore to task workflow
                f"Granting agent execution semaphore to workflow ID {workflow_id}, run ID {run_id}"
            task_handle = workflow.get_external_workflow_handle_for(
      , workflow_id="workflow-id"
            await task_handle.signal("grant_semaphore")

            # Wait for task to end
            while True:
                has_ended = await workflow.execute_activity_method(
                    check_has_workflow_ended, args=[workflow_id, run_id]
                if has_ended:
                        f"Releasing agent execution semaphore from workflow ID {workflow_id}, run ID {run_id}"
                    self._is_busy = False
                    workflow.upsert_search_attributes({"is_busy": [False]})
                await asyncio.sleep(5)

    async def request_semaphore(self, workflow_id: str, run_id: str) -> None:
                workflow_id=workflow_id, run_id=run_id

    def is_busy(self) -> bool:
        return self._is_busy

class SampleAgentTask:
    _received_agent_semaphore: bool = False

    async def grant_semaphore(self) -> None:
        self._received_agent_semaphore = True
    async def run(self, agent_id: str) -> None:
        # Request semaphore
        agent_workflow_handle = workflow.get_external_workflow_handle_for(
  , workflow_id="workflow-id"
        await agent_workflow_handle.signal(

        # Wait for semaphore
        while not self._received_agent_semaphore:
            await asyncio.sleep(1)

        # Now we can begin
            f"Running workflow with scraping configuration id = {agent_id}"

You can just use the Temporal client that you pass in when creating the activity class just before registering, e.g.

class Activities:
    def __init__(self, client: temporalio.client.Client) -> None:
        self._client = client

    async def is_workflow_running(self, workflow_id: str) -> bool:
        # May want to catch not-found RPCError here if it's not required to exist
        desc = await self._client.get_workflow_handle(workflow_id).describe()
        return desc.status == temporalio.client.WorkflowExecutionStatus.RUNNING

Correct, there is no workflow-side way for a workflow to reliably report that it has completed, because some completions occur server side (timeouts and termination). So you have to poll for status. If you needed to check the status of multiple you could use the listing feature on the client.

Also, since you’re polling regularly, consider using the infrequent polling pattern here to avoid unnecessarily increasing up history waiting for workflow complete. (so you’d call the activity wait_for_workflow_complete and raise an error while still running with fixed backoff)

1 Like