What is the best practice for a polling activity?

What’s the best practice for waiting on either a signal or a polling activity before continuing a workflow (unblocking workflow.Select(ctx))?

Should you have 1 activity that has something like:

ticker := time.NewTicker(30 * time.Second)

for {
  select {
    case <- ticker.C:
       // do some polling
     case <-ctx.Done():
       // context deadline exceeded
  }
}

Or should you have the for loop in the workflow and the activity is just a single polling action?
Because this way, the activity is very long-running and I’m actually not sure what happens if the worker that is executing the activity is stopped. Also I’m not sure how to cancel the activity from the workflow if a signal was received before the activity returned.

In the end either a success poll OR a signal should be able to unblock workflow.Select(ctx) . How to achieve this?

We currently use the RetryPolicy with no back-off coefficient to mimic polling. The activity returns an error when there’s no result, and as soon as there’s a result the retrying will stop. The down-side of this is that errors get logged… It also doesn’t really seem like a good pattern to begin with.

More information on our use case: Selector with cancellation

4 Likes

It depends on how frequently you want to poll.

  • For infrequent polls (every minute or slower) use the server side retry. Specify a RetryPolicy (or RetryOptions for Java) when invoking the activity. In the RetryPolicy specify an exponential coefficient of 1 and an initial interval of the poll frequency. Then fail the activity in case the polled resource is not ready and the server is going to retry it up to the specified retry policy expiration interval. So what you are doing is absolutely OK. I’ve updated the issue to suppress logging in this case.

  • For very frequent polls of every few seconds or faster the solution is to implement the polling inside an activity implementation as a loop that polls and then sleeps for the poll interval. To ensure that the polling activity is restarted in a timely manner in case of worker failure/restart the activity has to heartbeat on every iteration. Note that heartbeating only works if the HeartbeatTimeout that is shorter than the StartToClose timeout is specified. Use an appropriate RetryPolicy for the restarts of such failed activity.

  • In a rare case when the polling requires a periodic execution of a sequence of activities or activity arguments should change between retries a child workflow can be used. The trick is that a parent is not aware about a child calling continue as new. It only gets notified when a child completes or fails. So if a child executes the sequence of activities in a loop and calls continue as new periodically the parent is not affected until the child completes.

10 Likes

When using server-side retry, does the workflow history get bloated with the retry events?

No, in case of the server-side retry only when an activity completes or fails after all the retries a single ActivityTaskStarted and ActivityTaskCompleted/Failed/TimedOut pair is written to the history. ActivityTaskStarted.attempt field shows the number of retries executed.

To see which activities are currently executing and retrying look at the UI summary page or output of CLI workflow describe command. The output of describe includes the last failure information for an activity that is being retried.

Is there a way (Java SDK) to configure the activity methods separately when creating an Activity stub?

For example:

@ActivityInterface
public interface RemoteServiceActivity {
    String startJob(Object payload);
    void waitForJob(String jobId);
}

For starting a job, my retry logic is driven by whether the remote service is inaccessible and how long I am willing to wait for it to come back online, while the retry logic for waiting on a remote job is driven more by how long the job will take and how often I should check in on the job progress. These result in very different retry configurations.

RemoteServiceActivity stub = Workflow.newActivityStub(
    RemoteServiceActivity.class, 
    /* conflicting settings here */
);

Do I need to split the interface (the implementation could handle both, I assume) in order to create separately-configurable stubs, or is there a less verbose way to approach this?

I do not see Activity.heartbeat() in the Java SDK (version 0.29.0):

package io.temporal.activity;

import io.temporal.internal.sync.ActivityInternal;
import io.temporal.internal.sync.WorkflowInternal;

public final class Activity {
    public static ActivityExecutionContext getExecutionContext() {
        return ActivityInternal.getExecutionContext();
    }

    public static RuntimeException wrap(Throwable e) {
        return WorkflowInternal.wrap(e);
    }

    private Activity() {
    }
}

Is activity heartbeating possible in Java?

Would you create a separate topic for different questions? It would make it easier to find information this way for future readers.

Do I need to split the interface (the implementation could handle both, I assume) in order to create separately-configurable stubs, or is there a less verbose way to approach this?

Currently, there is no mechanism to configure each activity independently. We have plans to add such support. Now create multiple stubs for the same interface.

Is activity heartbeating possible in Java?

Yes:

      ActivityExecutionContext ctx = Activity.getExecutionContext();
      ctx.heartbeat(progress);
1 Like

For the case of infrequent polls, the suggestion is:

“Then fail the activity in case the polled resource is not ready”

Is there a best way (in Java) to fail an activity? Should the activity just sleep and let the startToCloseTimeout expire?

Thanks!

Currently throwing an exception is the way to go. But letting it timeout is also going to work. The best way to timeout an activity is to not call sleep that takes a thread but call Activity.getExecutionContext().doNotCompleteOnReturn() and return.

1 Like

Thanks, Maxim, that makes sense.

In my case, I have a workflow that starts a childWorkflow that runs the activity. When the activity throws the exception (I’m thinking unchecked), does Temporal automatically intercept and deal with it?

Yes, by default all the activities are retried unless you disable the retries through RetryOptions.

1 Like

For frequent polling, would it be better to create a new activity, say “isActivityDone”, and from the Workflow call that with a Workflow.sleep(t) in between polling?

The suggestion you have for polling inside the activity, would it cause the workflow to hold up the thread and not allow Temporal to swap it out to thread cache?

For frequent polling, would it be better to create a new activity, say “isActivityDone”, and from the Workflow call that with a Workflow.sleep(t) in between polling?

This would imo have the side-effect of increasing the workflow history very fast and potentially hitting the 50K history limitation. In this case your workflow would have to call continueAsNew after X number of activity executions. Doing the polling in a single activity would not have this problem.

Here is a PR that still need to be reviewed that implements the three different methods mentioned (Java SDK)

Thanks for the quick response and PR.

So frequent polling within activity is not going to hold up the thread?

Do you mean the workflow thread? If you invoke that activity sync (wait for it to complete) yes but you would still be able to handle signals and queries. If you invoke it async it would block when you wait on its result (for example call .get() on returned promise of Async.function call).

1 Like

If activity is implemented synchronously it is going to hold a thread. You can implement it asynchronously to avoid thread holding.

Workflow even if blocked doesn’t hold the worker thread.

1 Like

It’s been a while, since last comment, but in examples this is still a way to go. One side-effect of throwing exceptions for retrying is false positive failures in metrics and traces for activities.

Maybe there is a way to retry activity without triggering temporal_activity_execution_failed metrics?