Temporal Queue Activities

Hi,

One of the workflows we have, process files and perform different operations over them and following the examples in the documentation (samples-java/FileProcessingWorkflowImpl.java at master · temporalio/samples-java · GitHub), we are using taskQueues so the activities that need those files are executed always in the same worker.

The issue we have faced is that when a worker dies (or is taken out of load balance), the next activities are not taken by any other worker (as it should be) and eventually the workflow fails due to a timeout on the activity not being taken.

We would like a mechanism to automatically recover from that, as it’s something it could happen fairly often. Any idea? We were thinking about some kind of reset of the workflow when the timeout is reached, but we don’t if it’s feasible or not.

Regards, Diego.

Specify short activity ScheduleToStart timeout for these activities. When a worker dies the activity tasks are not picked up within this timeout and fail. Then workflow can redispatch them to another worker.

See The 4 Types of Activity timeouts blog post that explains this pattern.

Hi @maxim,

We had already set a ScheduleToStart timeout for these activities, but the workflow isn’t redispatched to another worker and it fails.

Since we’re using files that were downloaded to the worker in previously executed task queue activities, if the activity would be redispatched to a different worker it wouldn’t have the files, that’s why we wanted to reset the workflow from the start so it can be pick by any worker in the pool.

Regards, Diego.

We had already set a ScheduleToStart timeout for these activities, but the workflow isn’t redispatched to another worker and it fails.

If a workflow is not redispatched to another worker it is going to timeout not fail. Are you sure that you handle the activity failure from this timeout appropriately?

True, we have set the timeout, but we are not handling it.

Researching, looks like TimeoutFailure with timeoutType TIMEOUT_TYPE_START_TO_CLOSE is what is thrown, so we would like to retry the whole workflow on that (correct?). In the samples, it’s retried for any error, and I don’t see a way to retry just on an specific exception + a field value (timeoutType).

Why do you want to retry the whole workflow on a single activity failure? Have you considered increasing that particular activity timeout to avoid it failing?

The workflow works with files:

  • Activity1 - download file
  • Activity2 - transform file
  • Activity3 - upload file

Following the Java samples provided, we’re using task queues, so Activity2 and Activity3 are executed in the same worker.

The issue we’re trying to avoid is that if a worker dies, all the workflows running in that host timeout.

Our understanding, is that we would need to re-run in a different the whole workflow, otherwise it won’t have the file downloaded in the Activity1.

The Java sample retries in any failure, but in our use case we would only want to retry on TimeoutFailure with timeoutType TIMEOUT_TYPE_START_TO_CLOSE, right?

The issue we’re trying to avoid is that if a worker dies, all the workflows running in that host timeout.

Let’s be precise on the terminology. Workflows don’t run on a specific worker. So if a worker dies workflows are not affected. The activities running on that worker will timeout and you want to retry the whole sequence on a different most as the example demonstrate.

The main timeout you want to see on the host specific task queue is SCHEDULE_TO_START as it ensures that an activity task is not going to get stuck in the queue for long if the host is down. I highly recommend reading the blog post (or watch associated video) that explains activity timeouts in detail.

The Java sample retries in any failure, but in our use case we would only want to retry on TimeoutFailure with timeoutType TIMEOUT_TYPE_START_TO_CLOSE , right?

I agree that Workflow.retry makes it hard to retry on an exception which is not a top level one, but chained to ActivityFailure. The workaround is to rethrow the cause:

  @Override
  public void processFile(URL source, URL destination) {
    RetryOptions retryOptions =
        RetryOptions.newBuilder()
            .setInitialInterval(Duration.ofSeconds(1))
            .setDoNotRetry("io.temporal.failure.TimeoutFailure")
            .build();
    Workflow.retry(
        retryOptions,
        Optional.of(Duration.ofSeconds(10)),
        () -> {
          try {
            processFileImpl(source, destination);
          } catch (ActivityFailure e) {
            throw (TemporalFailure) e.getCause();
          }
        });
  }

I filed an enhancement request to support Workflow.retry that can execute some application code to decide if retry is needed.

Do you have an example? Because RetryOptions don’t allow to retry under a certain condition, but the other way around with setDoNotRetry() method.

Sorry, posted a partial answer by mistake. Check the sample above.

Hi Maxim,

First, the blog and the whiteboard video was really really helpful.

  @Override
  public void processFile(URL source, URL destination) {
    RetryOptions retryOptions =
        RetryOptions.newBuilder()
            .setInitialInterval(Duration.ofSeconds(1))
            .setDoNotRetry("io.temporal.failure.TimeoutFailure")
            .build();
    Workflow.retry(
        retryOptions,
        Optional.of(Duration.ofSeconds(10)),
        () -> {
          try {
            processFileImpl(source, destination);
          } catch (ActivityFailure e) {
            throw (TemporalFailure) e.getCause();
          }
        });
  }

Wouldn’t this snippet do the contrary of what we need? It will retry the workflow in anything but a TimeoutFailure (and also not taking into account the TIMEOUT_TYPE_SCHEDULE_TO_START).

Would it work if we instead do something like:

  @Override
  public void processFile(URL source, URL destination) {
    RetryOptions retryOptions =
        RetryOptions.newBuilder()
            .setInitialInterval(Duration.ofSeconds(1))
            .setDoNotRetry("com.example.NonRetryableException")
            .build();
    Workflow.retry(
        retryOptions,
        Optional.of(Duration.ofSeconds(10)),
        () -> {
          try {
            processFileImpl(source, destination);
          } catch (ActivityFailure e) {
            if(e.getCause() instanceof TimeoutFailure && 
               ((TimeoutFailure) e.getCause()).getTimeoutType() == TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_START) {
              throw e;
            } else {
              throw new NonRetryableException(e);
            }
          }
        });
  }

So we doNotRetry in anything but in a TimeoutFailure + TIMEOUT_TYPE_SCHEDULE_TO_START timeout.

Your code is going to retry only TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_START. If it is what you want then it is fine.