How to wait for a workflow query result, which is filled asynchronously

I have the following problem:

I have a service that starts a workflow. I would like the service to wait for an intermediate result of the workflow before continuing.
To solve this, i do the following:

  1. Start an AwaitWorkflow that waits for a signal:
@WorkflowInterface
public interface AwaitOrderWorkflow {

    @WorkflowMethod
    OrderResponse executeAwaitOrderResponse();

    @SignalMethod
    void orderResponseReceived(OrderResponse orderResponse);
}

public class AwaitOrderWorkflowImpl implements AwaitOrderWorkflow {

    private Optional<OrderResponse> orderResponse = Optional.empty();

    @Override
    public CreateOrderResponse executeAwaitOrderResponse() {
        Workflow.await(Duration.ofSeconds(30), () -> orderResponse.isPresent());
        return orderResponse.orElseThrow();
    }

    @Override
    public void orderResponseReceived(OrderResponse orderResponse) {
        this.orderResponse = Optional.of(orderResponse);
    }
}
  1. Start the main workflow
  2. Once the main workflow is at a certain intermediate state, it uses an activity to send a signal to the associated AwaitOrderWorkflow:
    @Override
    public void signalOrderCreated(OrderResponse orderResponse, UUID orderReference) {
        var workFlowId = String.format("AwaitOrderWorkflow-%s", orderReference);
        var workflow = workflowClient.newWorkflowStub(AwaitOrderResponseWorkflow.class, workFlowId);
        workflow.orderResponseReceived(orderResponse);
    }
  1. The external service does a .get on the completable future response executeAwaitOrderResponse() method:
var orderResponse = orderResponseCompletableFuture.get(30, TimeUnit.SECONDS);

Question:
Is this a right way to do this, or is there something in place to perform this in an easier way?

Does your service need to poll for the completion of the intermediate result, or could the workflow run an activity which would make an API call into your service with the intermediate result?

Hi Andrew,

I dont know if I understand exactly what you mean, but I will try to explain what you need to know to answer my question.

The service is called by a REST controller that just needs to return a response that states the result of the first asynchronous step of the main workflow. So I think this can not be done by subscribing to some topic, but requires polling.
I have thought about several options to do this:

  1. Write a polling mechanism in the service that polls the workflow query method until the query returns the desired state
  2. Use Temporal to do the polling. I prefer this because we use Temporal anyway and if we could use the internal Temporal polling mechanism we would not have to write that ourselves in the service code.

The question that I have is if there is some other (easier) way to achieve the same without the need for an extra AwaitWorkflow. For example, would it be possible to have a query method that returns a completable future or some asynchronous result?

I wonder if you could use an update to do this?

The service starts a workflow and then calls an update.

The initialization of the workflow would create a workflow promise (Workflow (temporal-sdk 1.24.1 API)). The main function of the workflow calls .get() on the promise to wait until the intermediate result is complete.

When the workflow receives the update, it processes the workflow up to the point where the intermediate result has been completed. It calls .complete() on the promise to signal the main function to continue. It then returns the intermediate result from the update back to the service.

The main function of the workflow continues with the rest of the processing.

Yes, the update is the way to implement this requirement.

Oh, I imagine you could also have the main function of the workflow generate the intermediate result and complete the promise, and then the update handler would be the one to wait on the promise. Same result, but clearer code.

1 Like

Thanks for your answers. I tried to make an implementation using a CompletablePromise. However I still get an error, so I dont know if the implementation matches what you tried to explain.
The code that I have now:

The service that starts the main workflow and returns a response:

@Service
public class MainService {

    private final WorkflowClient workflowClient;

    public MainService(WorkflowClient workflowClient) {
        this.workflowClient = workflowClient;
    }

    public MainWorkflowResponse runMainWorkflow(int runId) {
        MainWorkflow workflow = workflowClient.newWorkflowStub(MainWorkflow.class, WORKFLOW_OPTIONS.toBuilder()
                .setTaskQueue(MainWorkflowProcess.MAIN_TASK_QUEUE)
                .setWorkflowId("MainWorkflow-" + runId)
                .setWorkflowIdReusePolicy(WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY)
                .build());
        return new MainWorkflowResponse(workflow.executeWorkflow().get());
    }
}

And the workflow itself:

@WorkflowInterface
public interface MainWorkflow {

    @WorkflowMethod
    CompletablePromise<String> executeWorkflow();

    @SignalMethod
    void sendSignal();
}

public class MainWorkflowImpl implements MainWorkflow {

    private CompletablePromise<String> response;

    @Override
    public CompletablePromise<String> executeWorkflow() {
        response = Workflow.newPromise();
        return response;
    }

    @Override
    public void sendSignal() {
        System.out.println("signal received!");
        response.complete("Completed!");
    }
}

But when I send a signal to the workflow, I get the following error:

Caused by: io.temporal.common.converter.DataConverterException: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class com.uber.m3.tally.NoopScope and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: io.temporal.internal.sync.CompletablePromiseImpl["runner"]->io.temporal.internal.sync.DeterministicRunnerImpl["threads"]->java.util.TreeSet[0]->io.temporal.internal.sync.WorkflowThreadImpl["workflowThreadExecutor"]->io.temporal.worker.ActiveThreadReportingExecutor["workflowThreadPool"]->java.util.concurrent.ThreadPoolExecutor["threadFactory"]->io.temporal.worker.WorkerFactory$$Lambda/0x00000008008ff898["arg$1"]->io.temporal.worker.WorkerFactory["metricsScope"])

Additional remark: I do not know if you understood the problem correctly:

I do NOT want a workflow to wait for an update of the outside world.
I want the outside world to wait for a certain state of the workflow, while the workflow continues after that state.

You’re attempting to return a promise from the workflow. A promise is an object which cannot be communicated to a different process. You can call .get() on a promise to get the value of the promise, and return that from your workflow (assuming that the value is plain data that can be converted to JSON)

But it will be simpler, I think, to let the main function of the workflow do the entire workflow, and to use the update to wait for the interim result.

Yes.

private CompletablePromise<String> response;

I haven’t actually run any of this code, so I don’t know if this is correct, but I’d try initializing the promise variable so that it’s available to both the main function and the update function.

private CompletablePromise<String> interimResult = Workflow.newPromise();

Now the main function can execute the entire workflow:

  • produce the interim result
  • call .complete() on the promise with the interim result
  • continue with the rest of the workflow execution

The return type of the main function should be whatever should be returned by the workflow as a whole, if anything.

Next you’ll need an update, not a signal. (Workflow message passing - Temporal Java SDK feature guide | Temporal Documentation). Only updates can take actions such as waiting for a promise to complete.

@UpdateMethod
String waitForInterimResult() {
    return interimResult.get();
}

Again, I haven’t actually run any of this code, so I don’t know if this is correct!

Now your service needs to start the workflow but not wait for the entire workflow to execute; that is, you want to asynchronously start the workflow, not synchronously wait for for it to complete. (Temporal Client - Java SDK feature guide | Temporal Documentation)

After the service has started the workflow, it can synchronously call the update method to wait for, and to get, the interim result. This will wait for the update to return, which in turn waits for the promise to be completed.

Here is a sample that implements this pattern.

Direct link:temporal-java-samples/core/src/main/java/io/temporal/samples/bookingsyncsaga at c90b76aef9ac794e4b13f5ee82f46a05d2c6a4c7 · mfateev/temporal-java-samples · GitHub

@maxim if the booking fails, the booking promise is not completed, the workflow runs the compensation actions, the workflow terminates… and then… the client calling trip1.waitForBooking() gets an error because it’s waiting for the update result but the workflow has terminated without returning one? Oh, I missed the booking.completeExceptionally(e) line.

Ok, that is indeed exactly what I need. Thanks for the help and the example! We will try to implement this, I will let it know if it succeeds.