How to wait for a workflow query result, which is filled asynchronously

I have the following problem:

I have a service that starts a workflow. I would like the service to wait for an intermediate result of the workflow before continuing.
To solve this, i do the following:

  1. Start an AwaitWorkflow that waits for a signal:
@WorkflowInterface
public interface AwaitOrderWorkflow {

    @WorkflowMethod
    OrderResponse executeAwaitOrderResponse();

    @SignalMethod
    void orderResponseReceived(OrderResponse orderResponse);
}

public class AwaitOrderWorkflowImpl implements AwaitOrderWorkflow {

    private Optional<OrderResponse> orderResponse = Optional.empty();

    @Override
    public CreateOrderResponse executeAwaitOrderResponse() {
        Workflow.await(Duration.ofSeconds(30), () -> orderResponse.isPresent());
        return orderResponse.orElseThrow();
    }

    @Override
    public void orderResponseReceived(OrderResponse orderResponse) {
        this.orderResponse = Optional.of(orderResponse);
    }
}
  1. Start the main workflow
  2. Once the main workflow is at a certain intermediate state, it uses an activity to send a signal to the associated AwaitOrderWorkflow:
    @Override
    public void signalOrderCreated(OrderResponse orderResponse, UUID orderReference) {
        var workFlowId = String.format("AwaitOrderWorkflow-%s", orderReference);
        var workflow = workflowClient.newWorkflowStub(AwaitOrderResponseWorkflow.class, workFlowId);
        workflow.orderResponseReceived(orderResponse);
    }
  1. The external service does a .get on the completable future response executeAwaitOrderResponse() method:
var orderResponse = orderResponseCompletableFuture.get(30, TimeUnit.SECONDS);

Question:
Is this a right way to do this, or is there something in place to perform this in an easier way?

Does your service need to poll for the completion of the intermediate result, or could the workflow run an activity which would make an API call into your service with the intermediate result?

Hi Andrew,

I dont know if I understand exactly what you mean, but I will try to explain what you need to know to answer my question.

The service is called by a REST controller that just needs to return a response that states the result of the first asynchronous step of the main workflow. So I think this can not be done by subscribing to some topic, but requires polling.
I have thought about several options to do this:

  1. Write a polling mechanism in the service that polls the workflow query method until the query returns the desired state
  2. Use Temporal to do the polling. I prefer this because we use Temporal anyway and if we could use the internal Temporal polling mechanism we would not have to write that ourselves in the service code.

The question that I have is if there is some other (easier) way to achieve the same without the need for an extra AwaitWorkflow. For example, would it be possible to have a query method that returns a completable future or some asynchronous result?

I wonder if you could use an update to do this?

The service starts a workflow and then calls an update.

The initialization of the workflow would create a workflow promise (Workflow (temporal-sdk 1.24.1 API)). The main function of the workflow calls .get() on the promise to wait until the intermediate result is complete.

When the workflow receives the update, it processes the workflow up to the point where the intermediate result has been completed. It calls .complete() on the promise to signal the main function to continue. It then returns the intermediate result from the update back to the service.

The main function of the workflow continues with the rest of the processing.

Yes, the update is the way to implement this requirement.

Oh, I imagine you could also have the main function of the workflow generate the intermediate result and complete the promise, and then the update handler would be the one to wait on the promise. Same result, but clearer code.

1 Like

Thanks for your answers. I tried to make an implementation using a CompletablePromise. However I still get an error, so I dont know if the implementation matches what you tried to explain.
The code that I have now:

The service that starts the main workflow and returns a response:

@Service
public class MainService {

    private final WorkflowClient workflowClient;

    public MainService(WorkflowClient workflowClient) {
        this.workflowClient = workflowClient;
    }

    public MainWorkflowResponse runMainWorkflow(int runId) {
        MainWorkflow workflow = workflowClient.newWorkflowStub(MainWorkflow.class, WORKFLOW_OPTIONS.toBuilder()
                .setTaskQueue(MainWorkflowProcess.MAIN_TASK_QUEUE)
                .setWorkflowId("MainWorkflow-" + runId)
                .setWorkflowIdReusePolicy(WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY)
                .build());
        return new MainWorkflowResponse(workflow.executeWorkflow().get());
    }
}

And the workflow itself:

@WorkflowInterface
public interface MainWorkflow {

    @WorkflowMethod
    CompletablePromise<String> executeWorkflow();

    @SignalMethod
    void sendSignal();
}

public class MainWorkflowImpl implements MainWorkflow {

    private CompletablePromise<String> response;

    @Override
    public CompletablePromise<String> executeWorkflow() {
        response = Workflow.newPromise();
        return response;
    }

    @Override
    public void sendSignal() {
        System.out.println("signal received!");
        response.complete("Completed!");
    }
}

But when I send a signal to the workflow, I get the following error:

Caused by: io.temporal.common.converter.DataConverterException: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class com.uber.m3.tally.NoopScope and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: io.temporal.internal.sync.CompletablePromiseImpl["runner"]->io.temporal.internal.sync.DeterministicRunnerImpl["threads"]->java.util.TreeSet[0]->io.temporal.internal.sync.WorkflowThreadImpl["workflowThreadExecutor"]->io.temporal.worker.ActiveThreadReportingExecutor["workflowThreadPool"]->java.util.concurrent.ThreadPoolExecutor["threadFactory"]->io.temporal.worker.WorkerFactory$$Lambda/0x00000008008ff898["arg$1"]->io.temporal.worker.WorkerFactory["metricsScope"])

Additional remark: I do not know if you understood the problem correctly:

I do NOT want a workflow to wait for an update of the outside world.
I want the outside world to wait for a certain state of the workflow, while the workflow continues after that state.

You’re attempting to return a promise from the workflow. A promise is an object which cannot be communicated to a different process. You can call .get() on a promise to get the value of the promise, and return that from your workflow (assuming that the value is plain data that can be converted to JSON)

But it will be simpler, I think, to let the main function of the workflow do the entire workflow, and to use the update to wait for the interim result.

Yes.

private CompletablePromise<String> response;

I haven’t actually run any of this code, so I don’t know if this is correct, but I’d try initializing the promise variable so that it’s available to both the main function and the update function.

private CompletablePromise<String> interimResult = Workflow.newPromise();

Now the main function can execute the entire workflow:

  • produce the interim result
  • call .complete() on the promise with the interim result
  • continue with the rest of the workflow execution

The return type of the main function should be whatever should be returned by the workflow as a whole, if anything.

Next you’ll need an update, not a signal. (Workflow message passing - Temporal Java SDK feature guide | Temporal Documentation). Only updates can take actions such as waiting for a promise to complete.

@UpdateMethod
String waitForInterimResult() {
    return interimResult.get();
}

Again, I haven’t actually run any of this code, so I don’t know if this is correct!

Now your service needs to start the workflow but not wait for the entire workflow to execute; that is, you want to asynchronously start the workflow, not synchronously wait for for it to complete. (Temporal Client - Java SDK feature guide | Temporal Documentation)

After the service has started the workflow, it can synchronously call the update method to wait for, and to get, the interim result. This will wait for the update to return, which in turn waits for the promise to be completed.

Here is a sample that implements this pattern.

Direct link:temporal-java-samples/core/src/main/java/io/temporal/samples/bookingsyncsaga at c90b76aef9ac794e4b13f5ee82f46a05d2c6a4c7 · mfateev/temporal-java-samples · GitHub

@maxim if the booking fails, the booking promise is not completed, the workflow runs the compensation actions, the workflow terminates… and then… the client calling trip1.waitForBooking() gets an error because it’s waiting for the update result but the workflow has terminated without returning one? Oh, I missed the booking.completeExceptionally(e) line.

Ok, that is indeed exactly what I need. Thanks for the help and the example! We will try to implement this, I will let it know if it succeeds.

Hi,

I work in the same team as MathijsB. He’s on holidays now so I’m taking over. We’ve implemented the suggestions as described above using the @UpdateMethod annotation. It seemed to work well, however we get some intermittent errors in the logs:

Caused by: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 29.781889118s. [closed=[], open=[[remote_addr=ip-10-98-36-163.eu-central-1.compute.internal/10.98.36.163:6033]]]
	at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:271)
	at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:252)
	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:165)
	at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.updateWorkflowExecution(WorkflowServiceGrpc.java:4456)
	at io.temporal.internal.client.external.GenericWorkflowClientImpl.lambda$update$20(GenericWorkflowClientImpl.java:335)
	at io.temporal.internal.retryer.GrpcSyncRetryer.retry(GrpcSyncRetryer.java:69)
	at io.temporal.internal.retryer.GrpcRetryer.retryWithResult(GrpcRetryer.java:60)
	at io.temporal.internal.client.external.GenericWorkflowClientImpl.update(GenericWorkflowClientImpl.java:329)
	at io.temporal.internal.client.RootWorkflowClientInvoker.startUpdate(RootWorkflowClientInvoker.java:338)
	at io.temporal.common.interceptors.WorkflowClientCallsInterceptorBase.startUpdate(WorkflowClientCallsInterceptorBase.java:66)
	at io.temporal.opentracing.internal.OpenTracingWorkflowClientCallsInterceptor.startUpdate(OpenTracingWorkflowClientCallsInterceptor.java:136)
	at io.temporal.client.WorkflowStubImpl.startUpdate(WorkflowStubImpl.java:334)
	... 67 common frames omitted

It’s intermittent because it doesn’t always happen. We can’t really explain why this is happening sometimes. The context is what is described above, we get a message in our spring controller as the starting point, the controller calls the workflow. Inside the workflow there’s an

private final CompletablePromise<CreateOrderResponse> interimResult = Workflow.newPromise();

which at some point will call the @UpdateMethod in the workflow as shown below:

        orderAdministrationActivities.createOrder(new AddOrderRequest(newOrder, handleSalesOrderRequest.session()), receiveInterimResult);
        Workflow.await(() -> createOrderResponse.isPresent());
        interimResult.complete(createOrderResponse.orElseThrow());

At that point the controller proceeds its course as the workflow simultaneously continues as well.

Do you guys have any idea why this error occurs and how to fix it?

@emileg What Server and Java SDK version are you running on ?

I was curious if the client might have a 30 second timeout performing a synchronous update, but I just tried it with the HelloUpdate Java sample and it worked fine. So that isn’t it.

An alternative implementation would be to poll from the client using a query. In that case you wouldn’t need to use a promise, you could simply set a variable when the interim result was produced. But I don’t know if the issue you’re running into would have anything to do with updates specifically.

I suggest seeing if you can put together a minimal example that reproduces the issue.

Because of a deadline we decided to go for a different solution as we couldn’t get a clear understanding as to why this was happening. Answers to questions above:

  • We’re using java 21.
  • a minimal example would not necessarily point out the error as it’s intermittent.

Temporal server version: 1.22.7
Client version: io.temporal:temporal-sdk:jar:1.24.1