Can't serialize instances of ApplicationFailure

I’ve found a problem with the class ApplicationFailure (Java SDK 1.0.4).

Background for context: within an activity, to return an exception and indicate as retryable or non-retryable, you use ApplicationFailure#newFailure(..) or ApplicationFailure#newNonRetryableFailure. Within the workflow, this is wrapped by an ActivityFailure exception, of which the cause is the ApplicationFailure.

This works, because the SDK converts the exception into a Failure (for return to the server) with FailureConverter.exceptionToFailure(..).

Now imagine we’re using activity with doNotCompleteOnReturn(). You complete the activity with a call to an instance of ActivityCompletionClient. It doesn’t seem wise to make calls to ActivityCompletionClient within the workflow, therefore, you have to call from an activity (preferable a local one, briefly considered sideEffect(..) but I don’t think that’s applicable here). To call the activity, you call from a workflow. However, if you pass an instance of ApplicationFailure into the activity – Jackson can’t serialize it. You get:

io.temporal.common.converter.DataConverterException: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class com.google.protobuf.UnknownFieldSet$Parser and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: io.temporal.failure.ApplicationFailure["failure"]->java.util.Optional["value"]->io.temporal.api.failure.v1.Failure["unknownFields"]->com.google.protobuf.UnknownFieldSet["parserForType"])

The issue is the field TemporalFailure#failure: Jackson doesn’t know how to serialize it. The failure appears to be a convenience when building the message to send back to Temporal.

I did try creating a new ApplicationFailure, but it cannot serialize an empty failure value with.

The only solution I can see, is not passing the exception at all, but manually passing message and type values into an activity, building the ApplicationFailure there, then sending it ActivityCompletionClient.

Can we consider a changing ApplicationFailure, such that it can be serialized with Jackson? This might be as simple as flagging the failure field as Jackson/ignore.

Many thanks!

Sean

The following code works fine:

      completionClient.completeExceptionally(
          taskToken, ApplicationFailure.newFailure("foo", "bar", new FailureDetails("data1")));

How does the code that fails the activity look in your case?
Here is the end to end sample:

package io.temporal.samples.hello;

import io.temporal.activity.Activity;
import io.temporal.activity.ActivityExecutionContext;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityOptions;
import io.temporal.client.ActivityCompletionClient;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.failure.ApplicationFailure;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;

/**
 * Sample Temporal workflow that demonstrates asynchronous activity implementation.
 *
 * <p>To execute this example a locally running Temporal service instance is required. You can
 * follow instructions on how to set up your Temporal service here:
 * https://github.com/temporalio/temporal/blob/master/README.md#download-and-start-temporal-server-locally
 */
public class HelloAsyncActivityCompletion {

  // Define the task queue name
  static final String TASK_QUEUE = "HelloAsyncActivityCompletionTaskQueue";

  // Define the workflow unique id
  static final String WORKFLOW_ID = "HelloAsyncActivityCompletionWorkflow";

  public static class FailureDetails {
    public String data;

    public FailureDetails() {}

    public FailureDetails(String data) {
      this.data = data;
    }

    public String getData() {
      return data;
    }

    @Override
    public String toString() {
      return "FailureDetails{" + "data='" + data + '\'' + '}';
    }
  }

  /**
   * Define the Workflow Interface. It must contain one method annotated with @WorkflowMethod.
   *
   * <p>Workflow code includes core processing logic. It that shouldn't contain any heavyweight
   * computations, non-deterministic code, network calls, database operations, etc. All those things
   * should be handled by Activities.
   *
   * @see io.temporal.workflow.WorkflowInterface
   * @see io.temporal.workflow.WorkflowMethod
   */
  @WorkflowInterface
  public interface GreetingWorkflow {

    /**
     * This method is executed when the workflow is started. The workflow completes when the
     * workflow method finishes execution.
     */
    @WorkflowMethod
    String getGreeting(String name);
  }

  /**
   * Define the Activity Interface. Activities are building blocks of any temporal workflow and
   * contain any business logic that could perform long running computation, network calls, etc.
   *
   * <p>Annotating activity methods with @ActivityMethod is optional
   *
   * @see io.temporal.activity.ActivityInterface
   * @see io.temporal.activity.ActivityMethod
   */
  @ActivityInterface
  public interface GreetingActivities {

    /** Define the activity method which can be called during workflow execution */
    String composeGreeting(String greeting, String name);
  }

  // Define the workflow implementation which implements the getGreeting workflow method.
  public static class GreetingWorkflowImpl implements GreetingWorkflow {

    /**
     * Define the GreetingActivities stub. Activity stubs are proxies for activity invocations that
     * are executed outside of the workflow thread on the activity worker, that can be on a
     * different host. Temporal is going to dispatch the activity results back to the workflow and
     * unblock the stub as soon as activity is completed on the activity worker.
     *
     * <p>Let's take a look at each {@link ActivityOptions} defined:
     *
     * <p>The "setScheduleToCloseTimeout" option sets the overall timeout that the workflow is
     * willing to wait for activity to complete. For this example it is set to 10 seconds.
     */
    private final GreetingActivities activities =
        Workflow.newActivityStub(
            GreetingActivities.class,
            ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofSeconds(10)).build());

    @Override
    public String getGreeting(String name) {
      // This is a blocking call that returns only after the activity has completed.
      return activities.composeGreeting("Hello", name);
    }
  }

  /**
   * Implementation of our workflow activity interface. It overwrites the defined composeGreeting
   * activity method.
   */
  static class GreetingActivitiesImpl implements GreetingActivities {

    /**
     * ActivityCompletionClient is used to asynchronously complete activities. In this example we
     * will use this client alongside with {@link
     * io.temporal.activity.ActivityExecutionContext#doNotCompleteOnReturn()} which means our
     * activity method will not complete when it returns, however is expected to be completed
     * asynchronously using the client.
     */
    private final ActivityCompletionClient completionClient;

    GreetingActivitiesImpl(ActivityCompletionClient completionClient) {
      this.completionClient = completionClient;
    }

    @Override
    public String composeGreeting(String greeting, String name) {

      // Get the activity execution context
      ActivityExecutionContext context = Activity.getExecutionContext();

      // Set a correlation token that can be used to complete the activity asynchronously
      byte[] taskToken = context.getTaskToken();

      /*
       * For the example we will use a {@link java.util.concurrent.ForkJoinPool} to execute our
       * activity. In real-life applications this could be any service. The composeGreetingAsync
       * method is the one that will actually complete workflow action execution.
       */
      ForkJoinPool.commonPool().execute(() -> composeGreetingAsync(taskToken, greeting, name));
      context.doNotCompleteOnReturn();

      // Since we have set doNotCompleteOnReturn(), the workflow action method return value is
      // ignored.
      return "ignored";
    }

    // Method that will complete action execution using the defined ActivityCompletionClient
    private void composeGreetingAsync(byte[] taskToken, String greeting, String name) {
      String result = greeting + " " + name + "!";

      // Complete our workflow activity using ActivityCompletionClient
      //      completionClient.complete(taskToken, result);
      completionClient.completeExceptionally(
          taskToken, ApplicationFailure.newFailure("foo", "bar", new FailureDetails("data1")));
    }
  }

  /**
   * With our Workflow and Activities defined, we can now start execution. The main method starts
   * the worker and then the workflow.
   */
  public static void main(String[] args) throws ExecutionException, InterruptedException {

    // Define the workflow service.
    WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();

    /*
     * Define the workflow client. It is a Temporal service client used to start, signal, and query
     * workflows
     */
    WorkflowClient client = WorkflowClient.newInstance(service);

    /*
     * Define the workflow factory. It is used to create workflow workers for a specific task queue.
     */
    WorkerFactory factory = WorkerFactory.newInstance(client);

    /*
     * Define the workflow worker. Workflow workers listen to a defined task queue and process
     * workflows and activities.
     */
    Worker worker = factory.newWorker(TASK_QUEUE);

    /*
     * Register our workflow implementation with the worker.
     * Workflow implementations must be known to the worker at runtime in
     * order to dispatch workflow tasks.
     */
    worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class);

    /*
     * Register our workflow activity implementation with the worker. Since workflow activities are
     * stateless and thread-safe, we need to register a shared instance. We create our
     * ActivityCompletionClient and pass it to the workflow activity implementation
     */
    ActivityCompletionClient completionClient = client.newActivityCompletionClient();
    worker.registerActivitiesImplementations(new GreetingActivitiesImpl(completionClient));

    /*
     * Start all the workers registered for a specific task queue.
     * The started workers then start polling for workflows and activities.
     */
    factory.start();

    // Create the workflow client stub. It is used to start our workflow execution.
    GreetingWorkflow workflow =
        client.newWorkflowStub(
            GreetingWorkflow.class,
            WorkflowOptions.newBuilder()
                .setWorkflowId(WORKFLOW_ID)
                .setTaskQueue(TASK_QUEUE)
                .build());

    /*
     * Here we use {@link io.temporal.client.WorkflowClient} to execute our workflow asynchronously.
     * It gives us back a {@link java.util.concurrent.CompletableFuture}. We can then call its get
     * method to block and wait until a result is available.
     */
    CompletableFuture<String> greeting = WorkflowClient.execute(workflow::getGreeting, "World");

    // Wait for workflow execution to complete and display its results.
    System.out.println(greeting.get());
    System.exit(0);
  }
}

Greetings. Many thanks for replying! If I may, this code exhibits the problem (tried with both 1.0.4 and 1.0.7 SDKs with associated server versions). It’s a slightly different error message – but still tied to the fact that it can’t be serialized.

package org.example.cancel;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ExecutionException;

import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import io.temporal.activity.ActivityOptions;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.failure.ApplicationFailure;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;

public class ActivitySerializationExample {

    @ActivityInterface
    public interface TaskCompletionActivity {

        @ActivityMethod
        void fail(byte[] taskToken, ApplicationFailure af);
    }

    public static class TaskCompletionActivityImpl implements TaskCompletionActivity {

        @Override
        public void fail(byte[] taskToken, ApplicationFailure af) {

            /*
             * Here we would call the completion client.
             */
        }
    }

    @WorkflowInterface
    public interface TaskCompletionWorkflow {

        @WorkflowMethod
        void complete();
    }

    public static class TaskCompletionWorkflowImpl implements TaskCompletionWorkflow {

        private final TaskCompletionActivity activities =
            Workflow.newActivityStub(
                TaskCompletionActivity.class,
                ActivityOptions.newBuilder()
                    .setScheduleToCloseTimeout(Duration.ofSeconds(10))
                    .setTaskQueue(TASK_QUEUE)
                    .build());

        @Override
        public void complete() {

            /*
             * Pretend we've received an exception from an activity, and we're going to pass to an
             * activity who's only purpose is to complete the token.
             *
             * IT FAILS WHEN TRYING TO SERIALIZE THE INSTANCE of ApplicationFailure.
             */
            activities.fail(
                "hello".getBytes(StandardCharsets.UTF_8),
                ApplicationFailure.newNonRetryableFailure("blah", "test"));
        }
    }


    private static final String TASK_QUEUE = "queue";

    /**
     * With our Workflow and Activities defined, we can now start execution. The main method starts
     * the worker and then the workflow.
     */
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
        WorkflowClient client = WorkflowClient.newInstance(service);
        WorkerFactory factory = WorkerFactory.newInstance(client);
        Worker worker = factory.newWorker(TASK_QUEUE);

        worker.registerWorkflowImplementationTypes(TaskCompletionWorkflowImpl.class);
        worker.registerActivitiesImplementations(new TaskCompletionActivityImpl());

        factory.start();

        // Create the workflow client stub. It is used to start our workflow execution.
        TaskCompletionWorkflow workflow =
            client.newWorkflowStub(
                TaskCompletionWorkflow.class,
                WorkflowOptions.newBuilder()
                    .setWorkflowId(UUID.randomUUID().toString())
                    .setTaskQueue(TASK_QUEUE)
                    .build());

        WorkflowClient.execute(workflow::complete).get();

        System.exit(0);
    }
}

ApplicationFailure and any other Java exceptions are not serializable at least through a JSON converter. I would recommend creating a value class with the data that you care about to use as a parameter to your activity.

Hmm. In using ApplicationFailure directly you get to control the nonRetryable field. If I just send the payload, I’d have to have a flag in the payload retryable/non-retryable, and create the exception from it. For now, I have just coded the activities as:

void failure(byte[] taskToken, String message, String type, Object... details);
void nonRetryableFailure(byte[] taskToken, String message, String type, Object... details);

Which exactly maps to the statics on ApplicationFailure.

As an alternative, how safe would it be to use sideEffect(..) (to call ActivityCompletionClient directly) from the workflow instead?

Sean

Would you explain what you are trying to achieve? I’ve never seen before the need to pass failures as arguments. They were always returned as a result of a call.

It’s actually quite straightforward. Imagine I have a queue, and that queue is to sequentially process commands (backup, restore, etc.) on a single SQL Server instance. The commands are processing in activities – but might fail. Each command also has the taskToken of the parent activity, such that when the command processing is finished, we either success or fail the token. In the fail case, I just want to send the exception back – indicating retryable/non-retryable.

@Override
@SuppressWarnings("checkstyle:IllegalCatch")
public void execute(final DatabaseServer databaseServer) {

    while (!queue.isEmpty()) {
        DatabaseCommands command = queue.poll();

        try {

            try {

                // We heartbeat to ensure that the parent activity is still valid.
                taskCompletionActivity.heartbeat(command.getTaskToken(), "running");

                // Execute each command iteratively.
                command.getCommands().forEach(
                    c -> commandActivity.execute(databaseServer, c));

            } catch (ActivityCompletionException e) {
                // The command task is no longer valid. Ignore it and continue.
            }

            taskCompletionActivity.complete(command.getTaskToken());

        } catch (ActivityFailure e) {

            if (e.getCause() instanceof ApplicationFailure) {

                ApplicationFailure af = (ApplicationFailure)e.getCause();

                if (af.isNonRetryable()) {
                    taskCompletionActivity.nonRetryableFailure(
                        command.getTaskToken(), af.getMessage(), af.getType());
                } else {
                    taskCompletionActivity.failure(
                        command.getTaskToken(), af.getMessage(), af.getType());
                }
            }

            // Don't know if we'd reach this code, but we HAVE to fail the activity.
            taskCompletionActivity.failure(command.getTaskToken(), e.getMessage(), UNKNOWN);
        } catch (Exception e) {

            // Don't know if we'd reach this code, but we HAVE to fail the activity.
            taskCompletionActivity.failure(command.getTaskToken(), e.getMessage(), UNKNOWN);
        }
    }
}

I’m confused. Is this code of a workflow or an activity?

Workflow. Basically, trying to restore 100s of dbs to a server, and we have to ensure they’re done sequentially.

Then I"m even more confused. Why are you completing and heartbeating activities from the workflow?

Have you seen this sample?

Yup, you’d sent a version of it to me a while back, thanks! The only difference is that we have a callback on each command/task. But, no matter, will refactor to move the task completion into the activity doing the processing. I’d already figured that, as coded, it’s far too chatty.

Feel free to post your new solution here if you want feedback.

Adding more detail to the use-case . . . :slight_smile:

Thank you! Now I fully understand what you are doing.

I personally would change this to:

  1. The invoking workflow would run the “enqueue command” with a short timeout.
  2. The invoking workflow would block waiting fo the “command done/failed” signal up to some timeout.
  3. The queue workflow would receive the signal and execute DB commands one by one.
  4. Upon command completion/failure a signal is sent to the invoking workflow with the DB command result.
  5. The invoking workflow continues its execution upon receiving the completion signal.

Greetings. I think I understand. To paraphrase, I think you’re saying: from the invoking workflow (a) don’t use a manually completed activity and (b) receive a signal instead.

I have some concerns:

• Using a signal there is nothing (apart from namespace authentication) to prevent any workflow calling the signal. I suppose you could randomize, say, a UUID, that you expect to receive back. However, a manual activity gives you this for free, with a taskToken.

• With a manually completed activity I don’t need to implement a @SignalMethod. If you think of it in terms of public/private APIs, I don’t want the calling/invoking workflow to even know that the signal exists. Though, this can be hidden with something like the following, but it’s burdensome.

// exists within namespace: database-public-namespace
@WorkflowInterface
public interface RestoreDatabaseWorkflow {
    @WorkflowMethod
    void restore(RestoreDatabase restoreDatabase);
}

// exists within namespace: database-private-namespace
@WorkflowInterface
public interface RestoreDatabasePrivateWorkflow {
    @WorkflowMethd
    void restorePrivate(RestoreDatabase restoreDatabase);
    @SignalMethod
    void restoreComplete();
}

Is there some reason that makes the signal more efficient/reliable than manually completed activities?

Many thanks!

• Using a signal there is nothing (apart from namespace authentication) to prevent any workflow calling the signal. I suppose you could randomize, say, a UUID, that you expect to receive back. However, a manual activity gives you this for free, with a taskToken .

Any code can complete an activity by workflowId and activityId. So, no added security here.

• With a manually completed activity I don’t need to implement a @SignalMethod . If you think of it in terms of public/private APIs, I don’t want the calling/invoking workflow to even know that the signal exists. Though, this can be hidden with something like the following, but it’s burdensome.

You can use Workflow.registerListener to register an interface that contains @Signal method dynamically. This allows not exposing that signal method on the main workflow interface.

Is there some reason that makes the signal more efficient/reliable than manually completed activities?

Much fewer moving parts. In my solution, there are only two activities (one is needed only because signalWithStart is not yet supported from the workflow code). No heartbeating is needed as well.

Understood. Many thanks!