How to get workflow status in another workflow?
I have a SeamphoreWorkflow, which has the purpose of ensuring only 1 workflow can run simultaneously based on my business logic. It contains a request_semaphore
signal with args workflow_id
and run_id
. A separate workflow sends a signal to this SemaphoreWorkflow, it adds the request to the queue and then sends a grant_semaphore
signal back to the requesting workflow when it is ready.
Anyway, it has an infinite loop supposed to check whether the execution of the workflow, currently having the semaphore, has ended (completed, terminated, cancelled, etc.), so that it can take away the semaphore.
Am I supposed to use DescribeWorkflowExecutionRequest
?
ChatGPT is telling me this example:
class MyActivity:
@activity.method
async def describe_workflow(self, workflow_id: str, run_id: str):
client = ServiceClient.create('localhost', 7233)
request = DescribeWorkflowExecutionRequest(
namespace="default",
execution=WorkflowExecution(
workflow_id=workflow_id,
run_id=run_id,
),
)
response = await client.DescribeWorkflowExecution(request=request)
return response
However, creating this client instance feels very sketchy; considering I may have a test environment, started with WorkflowEnvironment.start_local
and not even know the port. I could possibly move client
to a class variable though.
I could consider a custom status
variable & query
inside the workflows but how would I handle cases when the workflow is force terminated, e.g. with Temporal UI? I don’t find any on_end
callback I can put into workflow, which would be called at the end of any outcome (finished, cancelled, terminated).
More code for context:
@activity.defn
async def check_has_workflow_ended(workflow_id: str, run_id: str) -> bool:
raise NotImplementedError()
@workflow.defn
class AgentSemaphoreWorkflow:
"""Purpose of this workflow is to ensure an agent is not executing more than one workflow at a time"""
class AgentTaskIdentifier(NamedTuple):
workflow_id: str
run_id: str
_task_queue: Queue[AgentTaskIdentifier] = Queue()
_is_busy: bool = False
@workflow.run
async def run(self, agent_id: str) -> None:
workflow.upsert_search_attributes({"agent_id": ["agent_id"]})
while True:
# Wait for task
if self._task_queue.qsize() == 0:
await asyncio.sleep(5)
continue
task = self._task_queue.get(block=False)
self._is_busy = True
workflow_id, run_id = task.workflow_id, task.run_id
workflow.upsert_search_attributes({"is_busy": [True]})
# Grant semaphore to task workflow
workflow.logger.debug(
f"Granting agent execution semaphore to workflow ID {workflow_id}, run ID {run_id}"
)
task_handle = workflow.get_external_workflow_handle_for(
AgentSemaphoreWorkflow.run, workflow_id="workflow-id"
)
await task_handle.signal("grant_semaphore")
# Wait for task to end
while True:
has_ended = await workflow.execute_activity_method(
check_has_workflow_ended, args=[workflow_id, run_id]
)
if has_ended:
workflow.logger.debug(
f"Releasing agent execution semaphore from workflow ID {workflow_id}, run ID {run_id}"
)
self._is_busy = False
workflow.upsert_search_attributes({"is_busy": [False]})
break
await asyncio.sleep(5)
@workflow.signal
async def request_semaphore(self, workflow_id: str, run_id: str) -> None:
self._task_queue.put(
AgentSemaphoreWorkflow.AgentTaskIdentifier(
workflow_id=workflow_id, run_id=run_id
)
)
@workflow.query
def is_busy(self) -> bool:
return self._is_busy
@workflow.defn
class SampleAgentTask:
_received_agent_semaphore: bool = False
@workflow.signal
async def grant_semaphore(self) -> None:
self._received_agent_semaphore = True
@workflow.run
async def run(self, agent_id: str) -> None:
# Request semaphore
agent_workflow_handle = workflow.get_external_workflow_handle_for(
AgentSemaphoreWorkflow.run, workflow_id="workflow-id"
)
await agent_workflow_handle.signal(
"request_semaphore",
args=[workflow.info().workflow_id, workflow.info().run_id],
)
# Wait for semaphore
while not self._received_agent_semaphore:
await asyncio.sleep(1)
# Now we can begin
workflow.logger.info(
f"Running workflow with scraping configuration id = {agent_id}"
)