Handling user feedback, retries, and timeouts

Originally on Slack.

I have a couple of points I want to verify, and questions. They are mostly in the context of the Go client.

  1. On the topic of human interaction, let’s say I need to capture a user’s details (e.g. through a signal), act on it (e.g. through an activity), and give a response to the user. Would the simplest option here be a signal, and query? And is it generally acceptable to query for structs rather than simple strings?

  2. If I have a somewhat long-running activity that fails midway through because the VM running the worker shuts down, I’m guessing the activity will eventually fail assuming it has a timeout? And I can handle this case by setting a retry policy?

  3. What’s the best way to handle a workflow timeout? For example, I want to carry out some compensation action if a workflow times out

1 Like
  1. The simplest is signal and then query. Temporal guarantees read after write consistency for this interaction. Yes, structs are supported as query results. This answer gives more options for implementing synchronous workflow updates.
  2. Correct, the simplest way to retry it is by setting a retry policy.
  3. Don’t rely on workflow timeout for any business logic as they are essentially “kill -9” on timer. Use timers inside the workflow for any business-related timeouts. In your case set a timer that performs cleanup when fired.
1 Like

I’m giving this a try now.

Assuming I want to have a workflow-wide timeout, with a clean up activity. Would the idea be to use a selector, and a timer like so:

selector := workflow.NewSelector(ctx)
timerFuture := workflow.NewTimer(ctx, workflowWideTimeout)
selector.AddFuture(timerFuture, func(f workflow.Future) {
    if ShouldCleanup {
        _ = workflow.ExecuteActivity(ctx, CleanupActivity).Get(ctx, nil)
    }
})

And for every activity:

f := workflow.ExecuteActivity(ctx, DoSomethingActivity)
selector.AddFuture(f, func(f workflow.Future) {
    _ = f.Get(ctx, &doSomethingResult)
})

Followed by a:

selector.Select(ctx)

So that either the timeout kicks in or the activity completes.

If so, I suppose my next question is, what about error handling within the future? How does that get propagated back to the workflow? Would that be a matter of mutating some err, and checking for it?

I think your approach is unnecessary complicated. I would start a separate goroutine to execute the main workflow logic.

func MyWorkflow(ctx workflow.Context) {
    cancellableCtx, cancelHandler := workflow.WithCancel(ctx)
    workflow.Go(cancellableCtx, func(ctx workflow.Context) {
         MyWorkflowImp(ctx)
    })
     workflow.Sleep(ctx, CleanupInterval)
     cancelHandler()
     workflow.ExecuteActivity(ctx, CleanupActivity).Get(ctx, nil)
}
1 Like

I see! That’s very helpful.

I’m guessing the workflow will continue to run even after the Go routine finishes? And I can make it exit if the Go routine finishes by using a timer, and a channel:

selector := workflow.NewSelector(ctx)
selector.AddFuture(workflow.NewTimer(ctx, CleanupTimeout), func(f workflow.Future) {})

cancellableCtx, cancelHandler := workflow.WithCancel(ctx)
doneCh := workflow.NewChannel(ctx)
selector.AddReceive(doneCh, func(c workflow.ReceiveChannel, m bool) {})

workflow.Go(cancellableCtx, func(ctx workflow.Context) {
    MyWorkflowImp(ctx)
    doneCh.Send(ctx, "done")
})

selector.Select(ctx)

cancelHandler()

workflow.ExecuteActivity(ctx, CleanupActivity).Get(ctx, nil)

This would work. Instead of doneCh you could also use a Future, created through workflow.NewFuture.

Is there a similar example with the Java SDK? Specifically I’m wondering if there’s a way to create a newCancellationScope where the containing activities do not need to be invoked explicitly async.

Invoke the CancellationScope.run through the Async.procedure.

I see, thanks. Does this look correct?

val cancellationScope = Workflow.newCancellationScope(Runnable { syncBusinessLogic() })
val procedure = Async.procedure { cancellationScope.run() }
try {
    procedure.get(7, TimeUnit.DAYS)
} catch (e: TimeoutException) {
    cancellationScope.cancel()
    cleanup()
}

Yes. You might also wait for the completion of the cancellation of syncBusinessLogic.

Thanks. How do I do that? The return type of cancel() is void.

It depends on your business logic. For example, you could pass a SettablePromise into syncBusinessLogic as a parameter and then wait on it during cleanup.

Wouldn’t calling cancel() ensure that no further activities defined within the cancellationScope are run?

You might wait for the cancellation of already running activities, for example. It is also possible to run compensations in a disconnected context.

I see. I had hoped that CancellationScope.cancel would propagate the cancellation to any containing activities. Is there no way to achieve that behavior?

It does propagate. But an activity can take 10 hours to acknowledge cancellation and you can decide to wait for it.

Oh I see, for long-running activities? I.e. similar to a Java thread checking its interrupted state, the activity would need to check if it’s been signaled for cancellation?

Would an activity in a waiting-to-retry state be automatically canceled via propagation?

activity would need to check if it’s been signaled for cancellation?

Activity must heartbeat to lean about cancellation.

Would an activity in a waiting-to-retry state be automatically canceled via propagation?

Yes, an activity waiting for retry will be canceled immediately.