How to schedule an action in the future from the workflow properly?

How to schedule an action (activity, workflow, cronjob) in the future from the workflow but cancel that activity if workflow does not need it anymore.
Or if parent workflow completes cancel it automatically?

I was thinking creating activity that sleeps for needed period time and then executes that action (like sending an email) and cancels it when I don’t need it anymore?

Is there a better way of achieving this? Maybe CoronJob?

Example use case:

I wan’t to let user to perform certain action. But if he does not do it in the period of time I wan’t to send him and reminder email to complete this action.

There are many ways to achieve this depending on a specific use case.

I wan’t to let user to perform certain action. But if he does not do it in the period of time I wan’t to send him and reminder email to complete this action.

For this you can just call Workflow.await. The following snippet assumes that the user action is delivered through a signal which handler updates userActionPerformed variable.

boolean performed = Workflow.await(timeout, ()->userActionPerformed);
if (!performed) {
   // send reminder email
}

Thank you Maxim

Want to make sure I understand what this line does:

boolean performed = Workflow.await(timeout, ()->userActionPerformed);

Is this like condition() but with timeout?

It’s awaits a timeout period, and watches the variable userActionPerformed ?

It will proceed to next line only if one of the two events will happen, if the timeout passes or variable is changed?

What’s the analog of this in Python SDK?


The approach I was thinking about, it to create an activity that sends an email.

await.sleep(timeout) inside activity and then send email.

From the workflow I I would start an activity, but don’t await. Then I check for signal, if user performed action I cancel handler for email activity.

@activity.defn
async def send_email_cancellable_activity(input: ComposeArgsInput) -> NoReturn:
    send_email_after = 60 * 60 * 2 # 2 hours
    timeout_counter = 0 
    try:
        while timeout_counter <= send_email_after:
            print("Heartbeating activity")
            await asyncio.sleep(1)
            activity.heartbeat("some details")
            timeout_counter += 1  # increment counter by 1
    except asyncio.CancelledError:
        print("Activity cancelled")
        raise
    # perform action here
    send_email()

@workflow.defn
 class WaitForUserActionWithReminderWorkflow:
     @workflow.run
     async def run(self, input: ComposeArgsInput) -> None:
        activity_handle = workflow.start_activity(
            send_email_cancellable_activity,
            ComposeArgsInput(input.arg1, input.arg2),
            start_to_close_timeout=timedelta(minutes=60 * 3), # this should probably be higher then 2 hour inside activity
            heartbeat_timeout=timedelta(seconds=2), # this should probably be higher then hatbeet inside activity
        )
        
        while user_data_not_valid(self.user_data)
            await workflow.wait_condition(lambda: self.user_data)

        activity_handle.cancel()

Is this a reliable way to approach it?
Also I want to make sure I’m not inventing a wheel with this timeout counter in the activity.
It feels like this could be done with help of temporal API.

It looks like you didn’t paste the whole example as it misses the signal handler implementation. To me it looks like the while loop is going to spin as soon as the first time the user_data is set.

It will proceed to next line only if one of the two events will happen, if the timeout passes or variable is changed?

What’s the analog of this in Python SDK?

The wait_condition takes the optional timeout argument.