Hi! We spent a lot of time trying to create a guaranteed reproducible example, but unfortunately we failed. Our workflow and the execution of their activations are generated dynamically by complicated algorithm and we noticed that on the same data, the workflow may or may not appear and unfortunately, we were not able to find the relationship between the input data and the appearance of the error. Unfortunately, we can only track the workflows that cause any errors in our system. We can’t exclude that workflow completed without errors may also not be displayed. We don’t event know how many workflows we have in total to compare with number of displayed workflows.
If it helps, we can provide how we run wokflow and activiti, as well as an example payload from wokflow that is displayed:
await self._temporal_client.start_workflow(
ExecuteFlowWorkflow.run,
flow, # It is a pydantic model
id=f"execute-flow-{uuid4()}",
task_queue=self._temporal_settings.task_queue,
search_attributes=TypedSearchAttributes([
SearchAttributePair(SearchAttributeKey.for_text(f"flowId"), str(flow.id)),
SearchAttributePair(SearchAttributeKey.for_text(f"personId"), str(person_id)),
SearchAttributePair(SearchAttributeKey.for_text(f"reservationId"), str(reservation_id)),
SearchAttributePair(SearchAttributeKey.for_text(f"workspaceId"), str(workspace_id)),
])
)
Inside workflow:
@workflow.run
async def run(self, flow: Flow):
for action in flow.actions:
if isinstance(action, TriggerAction):
trigger = action
break
else:
return
call_next_actions = [
CallNextAction(action=action, parent_action=trigger)
for action in flow.actions if action.id in set(trigger.call_next_action_ids)
]
while True:
if not call_next_actions:
break
call_next_actions, actions_to_wait = self._filter_actions_to_wait(call_next_actions)
if not call_next_actions:
if actions_to_wait:
while True:
await workflow.wait_condition(lambda: self._has_unread_incoming_message)
self._has_unread_incoming_message = False
call_next_actions, actions_to_wait = self._filter_actions_to_wait(
call_next_actions + actions_to_wait
)
if call_next_actions:
break
else:
break
call_next_actions = await self._execute_current_actions(call_next_actions, flow)
call_next_actions += actions_to_wait
....
async def _execute_action(self, action: FlowAction, flow: Flow) -> None:
if isinstance(action, WaitAction) and action.call_next_action_ids:
if isinstance(action, WaitReservationBookedAction):
if not action.departure_starts_on:
return None
now = workflow.now().replace(tzinfo=None)
if action.type == WaitReservationBookedActionType.Before:
seconds = int(
((action.departure_starts_on - now) - action.value).total_seconds()
)
elif action.type == WaitReservationBookedActionType.After:
seconds = int(
((action.departure_starts_on - now) + action.value).total_seconds()
)
else:
return None
if seconds > 0:
await workflow.execute_activity_method(
FlowActivities.wait,
seconds,
start_to_close_timeout=timedelta(seconds=seconds + 60)
)
else:
if action.seconds > 0:
await workflow.execute_activity_method(
FlowActivities.wait,
action.seconds,
start_to_close_timeout=timedelta(seconds=action.seconds + 60)
)
elif isinstance(action, SendEmailAction):
email_ids = await workflow.execute_activity_method(
FlowActivities.send_email,
args=(action, flow),
start_to_close_timeout=timedelta(minutes=1)
)
if action.call_next_action_ids and email_ids:
send_id = workflow.uuid4()
self._emails_to_wait += [
WaitedEmail(
id=email_id,
action_id=action.id,
call_next_action_ids=action.call_next_action_ids,
send_id=send_id
)
for email_id in email_ids
]
class FlowActivities:
def __init__(
self,
email_repository: EmailRepository,
templates_service: TemplatesService,
) -> None:
self._email_repository = email_repository
self._templates_service = templates_service
@activity.defn
async def send_email(
self,
send_email_action: SendEmailAction | SendReservationBookedEmailAction,
flow: Flow,
) -> list[str]:
email_create_dtos = []
if isinstance(send_email_action, SendReservationBookedEmailAction):
for person in send_email_action.to:
encoded_id = base64.b64encode(f'["Person", "{person.id}"]'.encode()).decode()
send_email_action.template.variables["person"]["variables"]["personId"] = encoded_id
rendered_template = await self._templates_service.render_template(send_email_action.template)
email_create_dtos.append(
EmailCreateDto(
subject=rendered_template.subject,
body=rendered_template.body,
to=person,
workspace=flow.workspace,
)
)
else:
rendered_template = await self._templates_service.render_template(send_email_action.template)
email_create_dtos = [
EmailCreateDto(
subject=rendered_template.subject,
body=rendered_template.body,
to=flow.target_person,
workspace=flow.workspace,
)
]
email_ids = await self._email_repository.create(email_create_dtos)
return list(map(str, email_ids)) if email_ids else None
@activity.defn
async def wait(self, second: int) -> None:
await asyncio.sleep(second)
Payload:
[
{
"id": "018fcbab-6642-7b8a-ab59-43ded6b563bb",
"workspace": {
"id": "018c1bec-682c-7518-992a-ec875e8efe63",
"reservation_email": "our_email"
},
"actions": [
{
"id": "flow_node_1717109505357",
"order": 2,
"call_next_action_ids": [],
"action_type": "reservation_send_email",
"template": {
"subject": "email_subject",
"body": "email_text",
"variables": {
"person": {
"query": "query PersonQuery($personId: ID!) { personById(id: $personId) { contacts { firstName } } }",
"variables": {
"personId": "WyJQZXJzb24iLCAiMDE5MDdhOWMtNGI3OS03Y2Q3LTk0ZjgtNTcxYjJmNTc1NDMyIl0="
}
}
}
},
"destination": "all_guests",
"entity": "per_invoice",
"to": [
{
"id": "01907a9c-4b79-7cd7-94f8-571b2f575432",
"email": "real_person_email",
"point_of_contact_id": "018c1c40-21c9-7c67-89bc-07d611dc740f"
}
]
},
{
"id": "flow_node_1717108919120",
"order": 1,
"call_next_action_ids": [
"flow_node_1717109505357"
],
"action_type": "wait_reservation",
"seconds": 432000,
"type": "after_event",
"departure_starts_on": "2024-07-24T11:16:00",
"value": "P5D"
},
{
"id": "flow_node_1717108853210",
"order": 0,
"call_next_action_ids": [
"flow_node_1717108919120"
],
"action_type": "trigger"
}
],
"target_person": {
"id": "01907a9c-4b79-7cd7-94f8-571b2f575432",
"email": "real_person_email",
"point_of_contact_id": "018c1c40-21c9-7c67-89bc-07d611dc740f"
}
}
]
Could this be related to the workflow storage settings? Because all the workflows that are causing errors and not displaying are relatively old (created more than two weeks ago)
