What is the recommended way to do Async way of executing?

We are planning to adopt Temporal workflow engine to replace with our existing one.

  1. Our workflow comprises of Lot of rest call - Email Actions - DB Transactions, etc…
  2. Our existing workflow engine is a written like blocking code.
  3. I am trying to see Temporal Async way of executing WorkFlow.For that I am executing my workflow like this :slight_smile:
    WorkflowExecution workflowExecution = WorkflowClient.start(workFlow::execute,"myworkflow");

Now my Doubt is -> How should I write my execute? I am trying my execute method like this -.>

 `public CompleteableFuture<String> execute(String input) {`
`    CompletableFuture<String> result = new CompletableFuture();`
`    ` `logger.info` `("Welcome to workflow");`
`    CompletableFuture<String> completableFuture;`
`    ` `logger.info` `("calling activity 2 --> which will make a Async Rest Call using Spring WebFlux");`
`    completableFuture = activity.apiCallTwo("ome");`
`    //Rest call returs a CompletableFuture<ResponseEntity>`
`    completableFuture.whenCompleteAsync((resp, err)->{`
`    ` `logger.info` `("Workflow completed ...{}", resp);`
`    result.complete(resp)`
`    });`
`return result;` 

It looks like a Nonblocking code for me.I have three Doubts
a. Is this is the recommended way to do the Async Non Blocking way of writing a workflow
b. I am seeing a stack trace error

Exception in thread "main" io.temporal.client.WorkflowFailedException: workflowId='cQCfwEdMFp', runId='e8abf0a1-a901-4618-b2dd-c2ce062f279f', workflowType='HelloWorkFlow', retryState=RETRY_STATE_RETRY_POLICY_NOT_SET, workflowTaskCompletedEventId=10
at io.temporal.internal.sync.WorkflowStubImpl.mapToWorkflowFailureException(WorkflowStubImpl.java:422)
at io.temporal.internal.sync.WorkflowStubImpl.getResult(WorkflowStubImpl.java:358)
at io.temporal.internal.sync.WorkflowStubImpl.getResult(WorkflowStubImpl.java:327)
at io.temporal.internal.sync.WorkflowInvocationHandler$SyncWorkflowInvocationHandler.startWorkflow(WorkflowInvocationHandler.java:315)
at io.temporal.internal.sync.WorkflowInvocationHandler$SyncWorkflowInvocationHandler.invoke(WorkflowInvocationHandler.java:270)
at io.temporal.internal.sync.WorkflowInvocationHandler.invoke(WorkflowInvocationHandler.java:178)
at com.sun.proxy.$Proxy3.execute(Unknown Source)
at ai.jiffy.temporal.cadence.jiffycadence.client.HelloClient.main(HelloClient.java:32)
Caused by: io.temporal.failure.ApplicationFailure: message='com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "done" (class java.util.concurrent.CompletableFuture), not marked as ignorable (2 known properties: "stack", "result"])
at [Source: (byte[])"{"result":null,"stack":null,"done":false,"cancelled":false,"completedExceptionally":false,"numberOfDependents":0}"; line: 1, column: 41] (through reference chain: java.util.concurrent.CompletableFuture["done"])', type='io.temporal.common.converter.DataConverterException', nonRetryable=false
at io.temporal.common.converter.JacksonJsonPayloadConverter.fromData(JacksonJsonPayloadConverter.java:82)
c. When I am seeing Promise -> in a completable Future I can get the result Asynchronously like thenAsync -> is it is similar to thenApply ?
Could you please give an idea around this.

2 Likes

You don’t need async

  1. Our existing workflow engine is a written like blocking code.
  2. I am trying to see Temporal Async way of executing WorkFlow.

Any reason you want to use async to implement your workflow? I would recommend implementing your workflow synchronously. Your code is going to be much more simple this way.

Any particular reason you want it to be async?

Promise vs CompletableFuture.

Never use CompletableFuture or any other standard Java synchronization primitives or native Java threads inside the workflow code. Only use the APIs that Temporal provides for this.

For example, Promise and its implementation CompletablePromise - temporal-sdk 1.22.3 javadoc should be used instead of Future and CompletableFuture.

See this section of the documentation on what exactly is prohibited inside the workflow code.

1 Like

Will it be OK if I am writing a blocking code inside the ForkJoinPool thread?
Just thought to confirm that point.
For example: Inside the workflow, I am planning to make a rest call, which may take 30 minutes to get the response. So I can make a normal blocking rest call for this?

It is not OK as using Java threads inside the workflow code is not allowed. And ForkJoinPool uses Java threads directly. Use Async class methods to start threads inside the workflow code instead.

Workflow code cannot make API calls directly. Instead any such code should reside in activities and workflow code should invoke activities. There is no limit on how long an activity can run. It can run for 30 minutes, 30 days, or a year.

Can you please advise on this
I am trying to run a workflow completely asyc. Looks like Promise#get is a blocking call. Is there anything similar to CompletableFuture#whenCompleteAsync while using Promise class?
Promise#thenApply does not trigger the workflow execution unless get() method is called.

See this answer.

May be I am not understanding the Async behavior.

Promise#get documentation says “Waits if necessary for the computation to complete or fail, and then returns its result.”
This blocks the main workflow thread. Is there a mechanism to register a call back to process the results Async?

Yes, there is. You can use:

  • Promise.thenApply
  • Promise.handle
  • Promise.thenCompose

which have a behavior similar to CompletableFuture.

To initiate an asynchronous behavior use Async class. For example, to run a function in a different thread you can write:

public String exampleFunction(String arg) { ... }

// Somewhere in the workflow code:
// Execute someFunction in a different thread.
Promise<String> f1 = Async.function(() -> someFunction("bar"));
// Handle the f1 completion asynchronously.
Promise<String> appended = f1.thenApply((r)->r + "!");

But you didn’t answer to the most important question I asked. Why are you trying to use async? It is much more complicated then synchronous code and practically never necessary while writing Temporal workflows. Have you seen this post that explains that it is OK to block workflow threads for a long time?

2 Likes

We are used to Java Async/reactive libraries exposing these methods/patterns.

CompletableFuture completableFuture
= CompletableFuture.supplyAsync(() -> {…} )
.thenCompose(s -> CompletableFuture.supplyAsync(() -> {…}));

Since completableFuture.get() block, we register a closure for asynchronous reception as shown below:

completableFuture.whenCompleteAsync((r, e) -> {…}); Hence trying to program Temporal using a similar pattern.

Using Temporal, I could see that a Promise.get call blocked for 30 seconds while making a Rest call(We intentionally blocked the rest call for 30 seconds to test this). Can I assume that the thread is blocked while retrieving the results? Isnt it a good idea to wait async rather than blocking with a get? We are only trying to get rid of the blocking calls:)

We are translating a JSON DSL, so would like to trigger the next activity when the Promise returns asynchronously, something like promise.whenCompleteAsync({/* trigger next */});

I re-read your answer on threads, as I understand it, temporal caches the execution state in threads and this could show up as lots of blocked threads in thread dumps. And that threads are kicked out to make space for another when a pool is maxed out. I am still trying to understand this - lets say many workflows are waiting for Promise#get to return, does it mean one such call will be kicked out to make space for a new workflow to get triggered?

I have seen single threaded applications like Redis scaling out to large clients. Also having moved many blocking server components to async Java, this could be a misunderstanding from my side. I am still getting used to Temporal APIs.

To handle 1000 concurrent workflows, what should be an ideal pool size?

1 Like

Currently, there is no way to implement workflow code fully asynchronously because as soon as the method annotated with @WorkflowMethod returns the workflow completes. So even if you write your code using the async you’ll have to block the workflow thread to avoid premature workflow completion. We might add the ability to release the workflow thread in the future, but it is needed only for really high scale applications that absolutely need caching.

For your low throughput use case, I would stick to the synchronous execution and let Temporal SDK handle the threads.

To handle 1000 concurrent workflows, what should be an ideal pool size?

The pool size doesn’t depend on the number of concurrent executions. It depends on the number of state transitions that these executions need per second. You can have a thread pool of 10 threads keeping up with millions of concurrent workflows if they don’t need to do state transitions at the same time.

I would recommend prototyping your use case using the synchronous approach and test your expected load. If you find any performance issues let us know and we will help you.

3 Likes

Thank you, I will test and get back with data.

@Aswath_Murugan I am looking for a similar solution where in an activity I do a POST call to trigger a task in another service and later keep doing GET to monitor the status of the task.