is there a way to retrieve metadata of failed events from a workflow using temporal’s python sdk? For instance lets say I have a workflow called ProcessingWorkflow
@workflow.defn(name="ProcessingWorkflow")
class ProcessingWorkflow:
@workflow.run
async def run(self, document_id: UUID):
try:
result = await workflow.execute_activity(
some_activity,
document_id,
)
return result
except ActivityError as e:
raise ApplicationError(
"processing failed",
type="ProcessingError",
details={"document_id": document_id},
)
that receives inputs such as document_id: UUID. Assuming a couple of events, or document ids in this case, failed, is there a way to retrieve the ids of all failed events via the python sdk?
Do you mean detecting which activity executions fail inside a workflow execution, and do you want to detect them within the workflow? If that’s the case, you are already picking up exceptions, so you can store the document ids for any failed items and return them at the end of execution.
If you want to query failures while the workflow is running, you could add a query returning the failed ids.
If you want to detect workflow failures for past executions, you can do so via the temporal api for as long as they are persisted.
One approach would be to query failed workflow executions using https://python.temporal.io/temporalio.client.Client.html#list_workflows
queried by workflow name and failed execution status. (N.B. You might need to modify your visibility store setup to do this.)
You could then get the document id (or other input data) either by adding a document_id query to the workflow and calling that, or by fetching the history and getting the workflow input.
However, this solution isn’t really native to temporal, so there may be an XY problem here.
How are you trying to use the failed execution metadata?
For example, if you are trying to periodically process the failed documents, in the event of a failure you could use signal-with-start a long-running workflow that processes failures and then sleeps for an hour or day or whatever period is appropriate.
Hi Charles, thank you for the reply. I’m basically trying to do something similar to a DLQ as far as retrieving all failed IDs so that I can rerun the pipeline on a later date for all document IDs that failed
I was wondering how I could implement this suggestion:
either by adding a document_id query to the workflow and calling that, or by fetching the history and getting the workflow input
would it be something like this?
from temporalio.client import Client
async def get_failed_document_ids():
client = await Client.connect("your-temporal-server-address")
# Query failed workflows
workflows = await client.list_workflows(
query="ExecutionStatus='Failed' AND WorkflowType='ProcessingWorkflow'"
)
failed_document_ids = []
async for workflow in workflows:
details = await client.describe_workflow_execution(
workflow_id=workflow.id
)
# Extract document_id from input
document_id = details.execution_config.workflow_type.input.get("document_id")
if document_id:
failed_document_ids.append(document_id)
return failed_document_ids
# Run this inside an asyncio event loop