Cannot get all workflow executions

I have an error in logs which tells that I have error in workflow with execute-flow-97bc4090-4cb4-4652-9841-7f01b8fda6f8 id:

Completing activity as failed ({'activity_id': '1', 'activity_type': 'send_email', 'attempt': 23928, 'namespace': 'default', 'task_queue': 'execute-flow-task-queue', 'workflow_id': 'execute-flow-97bc4090-4cb4-4652-9841-7f01b8fda6f8', 'workflow_run_id': 'c2baa95b-8660-42e5-a451-221a05cf54c0', 'workflow_type': 'ExecuteFlowWorkflow'})

So I want to execute a debug script in python to find a similar problem in other workflows. But temporal_client.list_workflows() does not return all workflows that currently running.

To proof it I wrote a simple python code which retrieves a workflow with problem by it’s id, then the code tries to find it by temporal_client.list_workflows() but the workflow wasn’t found and the number of workflows returned by temporal_client.list_workflows() does not exceeds the limit of page.

async def main():
    temporal_settings = TemporalSettings()
    temporal_client = await create_temporal_client(temporal_settings)

    workflow_handle = temporal_client.get_workflow_handle("execute-flow-97bc4090-4cb4-4652-9841-7f01b8fda6f8")
    print("Workflow id:", workflow_handle.id)

    workflow_executions_count = 0
    async for workflow_execution in temporal_client.list_workflows(page_size=1000):
        workflow_executions_count += 1

        if workflow_execution.id != workflow_handle.id:
            continue

        print("Workflow execution found")
        break

    else:
        print("Workflow execution not found")

    print("Workflow executions count:", workflow_executions_count)

asyncio.run(main())

Output:

Workflow id: execute-flow-97bc4090-4cb4-4652-9841-7f01b8fda6f8
Workflow execution not found
Workflow executions count:  145

Also I cannot find it by the web ui:

We use docker images for temporal server and ui with following versions:

temporalio/auto-setup:1.22
temporalio/ui:2.21.3

Getting a workflow handle is not a network call. You can get a workflow handle for any ID you’d like, this will not fail. It is simply a handle to make calls on. Attempt an await workflow_handle.describe() to actually make a server call and see if the workflow exists.

I’ve added a workflow_handle.describe() call. Here is the modified code and the new output:

async def main():
    temporal_settings = TemporalSettings()
    temporal_client = await create_temporal_client(temporal_settings)

    workflow_handle = temporal_client.get_workflow_handle("execute-flow-97bc4090-4cb4-4652-9841-7f01b8fda6f8")
    print("Workflow id:", workflow_handle.id)

    workflow_description = await workflow_handle.describe()
    print("Description: ", str(workflow_description).replace("\n", ""))

    workflow_executions_count = 0
    async for workflow_execution in temporal_client.list_workflows(page_size=1000):
        workflow_executions_count += 1

        if workflow_execution.id != workflow_handle.id:
            continue

        print("Workflow execution found")
        break

    else:
        print("Workflow execution not found")

    print("Workflow executions count:", workflow_executions_count)

asyncio.run(main())
Workflow id: execute-flow-97bc4090-4cb4-4652-9841-7f01b8fda6f8
Description:  WorkflowExecutionDescription(close_time=None, data_converter=DataConverter(payload_converter_class=<class 'flows.temporal.pydantic_data_converter.PydanticPayloadConverter'>, payload_codec=None, failure_converter_class=<class 'temporalio.converter.DefaultFailureConverter'>, payload_converter=<flows.temporal.pydantic_data_converter.PydanticPayloadConverter object at 0x7fea3a68b390>, failure_converter=<temporalio.converter.DefaultFailureConverter object at 0x7fea3eb57d10>), execution_time=datetime.datetime(2024, 5, 31, 12, 54, 18, 248542, tzinfo=datetime.timezone.utc), history_length=28, id='execute-flow-97bc4090-4cb4-4652-9841-7f01b8fda6f8', parent_id=None, parent_run_id=None, raw_info=execution {  workflow_id: "execute-flow-97bc4090-4cb4-4652-9841-7f01b8fda6f8"  run_id: "c2baa95b-8660-42e5-a451-221a05cf54c0"}type {  name: "ExecuteFlowWorkflow"}start_time {  seconds: 1717160058  nanos: 248542716}status: WORKFLOW_EXECUTION_STATUS_RUNNINGhistory_length: 28execution_time {  seconds: 1717160058  nanos: 248542716}memo {}search_attributes {  indexed_fields {    key: "workspaceId"    value {      metadata {        key: "type"        value: "Text"      }      metadata {        key: "encoding"        value: "json/plain"      }      data: ""018c1bec-682c-7518-992a-ec875e8efe63""    }  }  indexed_fields {    key: "reservationId"    value {      metadata {        key: "type"        value: "Text"      }      metadata {        key: "encoding"        value: "json/plain"      }      data: ""018fa93d-9afe-7128-a5d1-4eecf405f097""    }  }  indexed_fields {    key: "personId"    value {      metadata {        key: "type"        value: "Text"      }      metadata {        key: "encoding"        value: "json/plain"      }      data: ""018f9bbe-63f3-7f12-a390-55e6bf3f2a86""    }  }  indexed_fields {    key: "flowId"    value {      metadata {        key: "type"        value: "Text"      }      metadata {        key: "encoding"        value: "json/plain"      }      data: ""018f9d15-1b5f-70ba-b1bb-4f9794575d95""    }  }  indexed_fields {    key: "BuildIds"    value {      metadata {        key: "type"        value: "KeywordList"      }      metadata {        key: "encoding"        value: "json/plain"      }      data: "["unversioned","unversioned:6bab872a30ffc490c48e1ecbb6a979cb"]"    }  }}auto_reset_points {  points {    build_id: "6bab872a30ffc490c48e1ecbb6a979cb"    run_id: "c2baa95b-8660-42e5-a451-221a05cf54c0"    first_workflow_task_completed_id: 4    create_time {      seconds: 1717160058      nanos: 451842839    }    resettable: true  }}task_queue: "execute-flow-task-queue"state_transition_count: 48272history_size_bytes: 12213most_recent_worker_version_stamp {  build_id: "6bab872a30ffc490c48e1ecbb6a979cb"}, run_id='c2baa95b-8660-42e5-a451-221a05cf54c0', search_attributes={'BuildIds': ['unversioned', 'unversioned:6bab872a30ffc490c48e1ecbb6a979cb'], 'workspaceId': ['018c1bec-682c-7518-992a-ec875e8efe63'], 'flowId': ['018f9d15-1b5f-70ba-b1bb-4f9794575d95'], 'reservationId': ['018fa93d-9afe-7128-a5d1-4eecf405f097'], 'personId': ['018f9bbe-63f3-7f12-a390-55e6bf3f2a86']}, start_time=datetime.datetime(2024, 5, 31, 12, 54, 18, 248542, tzinfo=datetime.timezone.utc), status=<WorkflowExecutionStatus.RUNNING: 1>, task_queue='execute-flow-task-queue', typed_search_attributes=TypedSearchAttributes(search_attributes=[SearchAttributePair(key=_SearchAttributeKey(_name='BuildIds', _indexed_value_type=<SearchAttributeIndexedValueType.KEYWORD_LIST: 7>, _value_type=typing.Sequence[str]), value=['unversioned', 'unversioned:6bab872a30ffc490c48e1ecbb6a979cb']), SearchAttributePair(key=_SearchAttributeKey(_name='flowId', _indexed_value_type=<SearchAttributeIndexedValueType.TEXT: 1>, _value_type=<class 'str'>), value='018f9d15-1b5f-70ba-b1bb-4f9794575d95'), SearchAttributePair(key=_SearchAttributeKey(_name='personId', _indexed_value_type=<SearchAttributeIndexedValueType.TEXT: 1>, _value_type=<class 'str'>), value='018f9bbe-63f3-7f12-a390-55e6bf3f2a86'), SearchAttributePair(key=_SearchAttributeKey(_name='reservationId', _indexed_value_type=<SearchAttributeIndexedValueType.TEXT: 1>, _value_type=<class 'str'>), value='018fa93d-9afe-7128-a5d1-4eecf405f097'), SearchAttributePair(key=_SearchAttributeKey(_name='workspaceId', _indexed_value_type=<SearchAttributeIndexedValueType.TEXT: 1>, _value_type=<class 'str'>), value='018c1bec-682c-7518-992a-ec875e8efe63')]), workflow_type='ExecuteFlowWorkflow', raw_description=execution_config {  task_queue {    name: "execute-flow-task-queue"    kind: TASK_QUEUE_KIND_NORMAL  }  default_workflow_task_timeout {    seconds: 10  }}workflow_execution_info {  execution {    workflow_id: "execute-flow-97bc4090-4cb4-4652-9841-7f01b8fda6f8"    run_id: "c2baa95b-8660-42e5-a451-221a05cf54c0"  }  type {    name: "ExecuteFlowWorkflow"  }  start_time {    seconds: 1717160058    nanos: 248542716  }  status: WORKFLOW_EXECUTION_STATUS_RUNNING  history_length: 28  execution_time {    seconds: 1717160058    nanos: 248542716  }  memo {  }  search_attributes {    indexed_fields {      key: "workspaceId"      value {        metadata {          key: "type"          value: "Text"        }        metadata {          key: "encoding"          value: "json/plain"        }        data: ""018c1bec-682c-7518-992a-ec875e8efe63""      }    }    indexed_fields {      key: "reservationId"      value {        metadata {          key: "type"          value: "Text"        }        metadata {          key: "encoding"          value: "json/plain"        }        data: ""018fa93d-9afe-7128-a5d1-4eecf405f097""      }    }    indexed_fields {      key: "personId"      value {        metadata {          key: "type"          value: "Text"        }        metadata {          key: "encoding"          value: "json/plain"        }        data: ""018f9bbe-63f3-7f12-a390-55e6bf3f2a86""      }    }    indexed_fields {      key: "flowId"      value {        metadata {          key: "type"          value: "Text"        }        metadata {          key: "encoding"          value: "json/plain"        }        data: ""018f9d15-1b5f-70ba-b1bb-4f9794575d95""      }    }    indexed_fields {      key: "BuildIds"      value {        metadata {          key: "type"          value: "KeywordList"        }        metadata {          key: "encoding"          value: "json/plain"        }        data: "["unversioned","unversioned:6bab872a30ffc490c48e1ecbb6a979cb"]"      }    }  }  auto_reset_points {    points {      build_id: "6bab872a30ffc490c48e1ecbb6a979cb"      run_id: "c2baa95b-8660-42e5-a451-221a05cf54c0"      first_workflow_task_completed_id: 4      create_time {        seconds: 1717160058        nanos: 451842839      }      resettable: true    }  }  task_queue: "execute-flow-task-queue"  state_transition_count: 48272  history_size_bytes: 12213  most_recent_worker_version_stamp {    build_id: "6bab872a30ffc490c48e1ecbb6a979cb"  }}pending_activities {  activity_id: "1"  activity_type {    name: "send_email"  }  state: PENDING_ACTIVITY_STATE_SCHEDULED  attempt: 24127  scheduled_time {    seconds: 1719577393    nanos: 361389191  }  last_failure {    message: "unhandled errors in a TaskGroup (1 sub-exception)"    stack_trace: "  File \"/root/.cache/pypoetry/virtualenvs/flows-sFuKiTz_-py3.11/lib/python3.11/site-packages/temporalio/worker/_activity.py\", line 447, in _run_activity\n    result = await impl.execute_activity(input)\n             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n  File \"/root/.cache/pypoetry/virtualenvs/flows-sFuKiTz_-py3.11/lib/python3.11/site-packages/temporalio/worker/_activity.py\", line 703, in execute_activity\n    return await input.fn(*input.args)\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n  File \"/flows/flows/temporal/workflows/execute_flow.py\", line 49, in send_email\n    rendered_template = await self._templates_service.render_template(send_email_action.template)\n                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n  File \"/flows/flows/infrastructure/template_service/httpx.py\", line 27, in render_template\n    async with asyncio.TaskGroup() as tg:\n\n  File \"/usr/local/lib/python3.11/asyncio/taskgroups.py\", line 147, in __aexit__\n    raise me from None\n"    application_failure_info {      type: "ExceptionGroup"    }  }  last_worker_identity: "8@e1fed2719dab"})
Workflow execution not found
Workflow executions count: 143

As you can see, the workflow exists, but it is still not returned in the list of all workflows

Is it possible to provide a standalone replication? I have attempted replicating using your code (with different workflow ID of course) and the workflow appears in the list. Note, workflow listing is eventually consistent, so it may not appear there right away. Does the workflow appear in the list in the UI and yet not with this code after you’ve confirmed it’s in the UI? If possible, an exact replication could help.

Hi! We spent a lot of time trying to create a guaranteed reproducible example, but unfortunately we failed. Our workflow and the execution of their activations are generated dynamically by complicated algorithm and we noticed that on the same data, the workflow may or may not appear and unfortunately, we were not able to find the relationship between the input data and the appearance of the error. Unfortunately, we can only track the workflows that cause any errors in our system. We can’t exclude that workflow completed without errors may also not be displayed. We don’t event know how many workflows we have in total to compare with number of displayed workflows.

If it helps, we can provide how we run wokflow and activiti, as well as an example payload from wokflow that is displayed:

            await self._temporal_client.start_workflow(
                ExecuteFlowWorkflow.run,
                flow,  # It is a pydantic model 
                id=f"execute-flow-{uuid4()}",
                task_queue=self._temporal_settings.task_queue,
                search_attributes=TypedSearchAttributes([
                    SearchAttributePair(SearchAttributeKey.for_text(f"flowId"), str(flow.id)),
                    SearchAttributePair(SearchAttributeKey.for_text(f"personId"), str(person_id)),
                    SearchAttributePair(SearchAttributeKey.for_text(f"reservationId"), str(reservation_id)),
                    SearchAttributePair(SearchAttributeKey.for_text(f"workspaceId"), str(workspace_id)),
                ])
            )

Inside workflow:

@workflow.run
    async def run(self, flow: Flow):
        for action in flow.actions:
            if isinstance(action, TriggerAction):
                trigger = action
                break
        else:
            return

        call_next_actions = [
            CallNextAction(action=action, parent_action=trigger)
            for action in flow.actions if action.id in set(trigger.call_next_action_ids)
        ]

        while True:
            if not call_next_actions:
                break

            call_next_actions, actions_to_wait = self._filter_actions_to_wait(call_next_actions)
            if not call_next_actions:
                if actions_to_wait:
                    while True:
                        await workflow.wait_condition(lambda: self._has_unread_incoming_message)
                        self._has_unread_incoming_message = False

                        call_next_actions, actions_to_wait = self._filter_actions_to_wait(
                            call_next_actions + actions_to_wait
                        )

                        if call_next_actions:
                            break
                else:
                    break

            call_next_actions = await self._execute_current_actions(call_next_actions, flow)
            call_next_actions += actions_to_wait
....
    async def _execute_action(self, action: FlowAction, flow: Flow) -> None:
        if isinstance(action, WaitAction) and action.call_next_action_ids:
            if isinstance(action, WaitReservationBookedAction):
                if not action.departure_starts_on:
                    return None

                now = workflow.now().replace(tzinfo=None)

                if action.type == WaitReservationBookedActionType.Before:
                    seconds = int(
                        ((action.departure_starts_on - now) - action.value).total_seconds()
                    )

                elif action.type == WaitReservationBookedActionType.After:
                    seconds = int(
                        ((action.departure_starts_on - now) + action.value).total_seconds()
                    )

                else:
                    return None

                if seconds > 0:
                    await workflow.execute_activity_method(
                        FlowActivities.wait,
                        seconds,
                        start_to_close_timeout=timedelta(seconds=seconds + 60)
                    )

            else:
                if action.seconds > 0:
                    await workflow.execute_activity_method(
                        FlowActivities.wait,
                        action.seconds,
                        start_to_close_timeout=timedelta(seconds=action.seconds + 60)
                    )

        elif isinstance(action, SendEmailAction):
            email_ids = await workflow.execute_activity_method(
                FlowActivities.send_email,
                args=(action, flow),
                start_to_close_timeout=timedelta(minutes=1)
            )

            if action.call_next_action_ids and email_ids:
                send_id = workflow.uuid4()
                self._emails_to_wait += [
                    WaitedEmail(
                        id=email_id,
                        action_id=action.id,
                        call_next_action_ids=action.call_next_action_ids,
                        send_id=send_id
                    )
                    for email_id in email_ids
                ]
class FlowActivities:
    def __init__(
            self,
            email_repository: EmailRepository,
            templates_service: TemplatesService,
    ) -> None:
        self._email_repository = email_repository
        self._templates_service = templates_service

    @activity.defn
    async def send_email(
            self,
            send_email_action: SendEmailAction | SendReservationBookedEmailAction,
            flow: Flow,
    ) -> list[str]:
        email_create_dtos = []

        if isinstance(send_email_action, SendReservationBookedEmailAction):
            for person in send_email_action.to:
                encoded_id = base64.b64encode(f'["Person", "{person.id}"]'.encode()).decode()
                send_email_action.template.variables["person"]["variables"]["personId"] = encoded_id
                rendered_template = await self._templates_service.render_template(send_email_action.template)

                email_create_dtos.append(
                    EmailCreateDto(
                        subject=rendered_template.subject,
                        body=rendered_template.body,
                        to=person,
                        workspace=flow.workspace,
                    )
                )
        else:
            rendered_template = await self._templates_service.render_template(send_email_action.template)

            email_create_dtos = [
                EmailCreateDto(
                    subject=rendered_template.subject,
                    body=rendered_template.body,
                    to=flow.target_person,
                    workspace=flow.workspace,
                )
            ]

        email_ids = await self._email_repository.create(email_create_dtos)

        return list(map(str, email_ids)) if email_ids else None

    @activity.defn
    async def wait(self, second: int) -> None:
        await asyncio.sleep(second)

Payload:

[
  {
    "id": "018fcbab-6642-7b8a-ab59-43ded6b563bb",
    "workspace": {
      "id": "018c1bec-682c-7518-992a-ec875e8efe63",
      "reservation_email": "our_email"
    },
    "actions": [
      {
        "id": "flow_node_1717109505357",
        "order": 2,
        "call_next_action_ids": [],
        "action_type": "reservation_send_email",
        "template": {
          "subject": "email_subject",
          "body": "email_text",
          "variables": {
            "person": {
              "query": "query PersonQuery($personId: ID!) { personById(id: $personId) { contacts { firstName } } }",
              "variables": {
                "personId": "WyJQZXJzb24iLCAiMDE5MDdhOWMtNGI3OS03Y2Q3LTk0ZjgtNTcxYjJmNTc1NDMyIl0="
              }
            }
          }
        },
        "destination": "all_guests",
        "entity": "per_invoice",
        "to": [
          {
            "id": "01907a9c-4b79-7cd7-94f8-571b2f575432",
            "email": "real_person_email",
            "point_of_contact_id": "018c1c40-21c9-7c67-89bc-07d611dc740f"
          }
        ]
      },
      {
        "id": "flow_node_1717108919120",
        "order": 1,
        "call_next_action_ids": [
          "flow_node_1717109505357"
        ],
        "action_type": "wait_reservation",
        "seconds": 432000,
        "type": "after_event",
        "departure_starts_on": "2024-07-24T11:16:00",
        "value": "P5D"
      },
      {
        "id": "flow_node_1717108853210",
        "order": 0,
        "call_next_action_ids": [
          "flow_node_1717108919120"
        ],
        "action_type": "trigger"
      }
    ],
    "target_person": {
      "id": "01907a9c-4b79-7cd7-94f8-571b2f575432",
      "email": "real_person_email",
      "point_of_contact_id": "018c1c40-21c9-7c67-89bc-07d611dc740f"
    }
  }
]

Could this be related to the workflow storage settings? Because all the workflows that are causing errors and not displaying are relatively old (created more than two weeks ago)
изображение

Are you able to reliably replicate? Can you continually replicate while reducing the code further and further?

Possibly. I assume they aren’t in the UI either? Completed workflows will be archived/removed after the retention period.