I am trying to create and execute workflows in Spring webflux(Project reactor)…
I have simple server that retrieves user data from postgres and data is available to application as Mono. When activity call completes, temporal tries to serialize data and in doing so it throws below exception Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of reactor.core.publisher.Mono (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
Looks like sdk is not able to deserialize Mono.
Please not that if we get User object from Mono by invoking block() method on Mono, workflow succeeds. But doing this will defeat the purpose of using reactive streams.
I tried to convert this application to regular Springboot web application and it works but fails to work for spring reactive.
Any solution to this problem? Does Java SDK has support for reactive programming?
An activity invocation is essentially a remote procedure call with workflows and activities potentially residing on different machines. That’s why activity arguments and results should be serialized. Passing a stream to a remote process doesn’t make sense, so the error is expected.
I’m trying to build workflows in Spring webflux, activity use WebClient to sent htttp request to client, data will response as reactor.core.publisher.Mono, it throws exception Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of reactor.core.publisher.Mono (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
So I think temporal-sdk cannot deserialize Mono
My workflow has an asynchronous activity completion to another client, but this client response too slow, so when a lot of request to workflow create hundreds thread, so I want to build workflow in Webflux, do you have any solution ?
You could invoke activity with short StartToClose timeout to notify client and then in workflow wait for completion (client would signal when completed instead of using activity completion client).
when a lot of request to workflow create hundreds thread, so I want to build workflow in Webflux, do you have any solution ?
Requests that workflow creates shouldn’t create threads unless you requested this. Could you post a snippet of your workflow code that creates threads?
CONTROLLER
@GetMapping("/flowNonBlocking")
public Mono<ActivityResult> flowNonBlocking() throws Exception {
return senderService.nonBlocking();
}
SERVICE
public Mono<ActivityResult> nonBlocking() throws Exception {
var workflowOptions = workflowUtils.getWorkflowOptions(AllFunction.TEST);
var workflow = workflowUtils.buildWorkflow(TestWorkflow.class, workflowOptions);
return workflow.nonBlocking();
}
WORKFLOW
public Mono<ActivityResult> nonBlocking() throws Exception {
return mainActivities.deductHttp();
}
ACTIVITY
public Mono<ActivityResult> deductHttp() {
return adapter.deductHttp("id1");
}
ADAPTER
public Mono<ActivityResult> deductHttp(String id) {
var dto = ActivityRequest.builder()
.transactionId("1111")
.accountId("id1")
.build();
return webClient.post()
.uri("/api/deduct")
.bodyValue(dto)
.retrieve()
.bodyToMono(ActivityResult.class);
}
I just build a simple spring api, call http to get data from client, but client response too slow, so when I call this api 500 times per second, then will create 500 threads waiting, so i build spring webflux to reduce threads.
Hey guys. We’re using Kotlin coroutines to build non-thread blocking activity code, so maybe our experience could be helpful here (the method we use is exactly what @maxim described above, so I’m not really adding much here, this is just another sample to his post).
For Kotlin coroutines, we’re using a code pattern like the one below (the actual code is more complex due to various error-handling branches but this is the core piece)
fun activityMethod(): String {
val activityCompletionClient = Activity.getExecutionContext().useLocalManualCompletion()
launch {
try {
activityCompletionClient.complete(suspendingActivityMethod())
} catch (e: Exception) {
activityCompletionClient.fail(e)
}
}
return ""
}
private suspend fun suspendingActivityMethod(): String {
// … activity business logic that can use Kotlin coroutines for non-blocking operations
}
Perhaps a similar pattern can be used for building reactive activity code
I’m not an expert on using Reactor, so I bet the code above has to be extended to better handle exceptional cases, but hopefully, it should give a starting point. One important note - the reactiveActivityMethod in the example above doesn’t run on a Temporal-managed thread pool so the Activity.getExecutionContext() won’t work there, if you need activity context there it has to be propagated via an argument.
P.S. Regardless of how the activity is built, the workflow code should not be using coroutines, Monos, or anything else other than Temporal workflow primitives.
ah, yes @maxim - cut and paste fail; the workflow should return ActivityResult and it can be turned into a Mono by creating one from the CompletableFuture returned from the WorkflowClient
Hi @maxim can I ask one more question?
With every workflow I create a worker
@Slf4j
@Component
@RequiredArgsConstructor
public class NonBlockingWorker {
final WorkflowClient workflowClient;
final WorkerFactoryOptions defaultWorkerFactoryOptions;
final WorkerOptions defaultWorkerOptions;
final WorkflowImplementationOptions defaultWorkflowImplementationOptions;
final MainAdapter mainAdapter;
@PostConstruct
public void createWorker() {
log.info("Registering NonBlockingWorker..");
var workerFactory = WorkerFactory.newInstance(workflowClient, defaultWorkerFactoryOptions);
var worker = workerFactory.newWorker(AllFunction.BLOCKING.name(), defaultWorkerOptions);
var completionClient = workflowClient.newActivityCompletionClient();
worker.registerWorkflowImplementationTypes(defaultWorkflowImplementationOptions, NonBlockingWorkflowImpl.class);
worker.registerActivitiesImplementations(new MainActivitiesImpl(mainAdapter, completionClient));
workerFactory.start();
}
}
So this will be create multiple WorkerFactory, is it good or bad ?
This is wasteful as each factory will have its own cache and thread pools. Also, I wouldn’t create a worker per workflow, which is also very wasteful. Unless you have special requirements use a single worker for all workflows and activities in the process.