Java SDK support for Reactive framework

I am trying to create and execute workflows in Spring webflux(Project reactor)…
I have simple server that retrieves user data from postgres and data is available to application as Mono. When activity call completes, temporal tries to serialize data and in doing so it throws below exception
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of reactor.core.publisher.Mono (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information

Looks like sdk is not able to deserialize Mono.

Please not that if we get User object from Mono by invoking block() method on Mono, workflow succeeds. But doing this will defeat the purpose of using reactive streams.

I tried to convert this application to regular Springboot web application and it works but fails to work for spring reactive.

Any solution to this problem? Does Java SDK has support for reactive programming?

Thanks,
Dhanraj

An activity invocation is essentially a remote procedure call with workflows and activities potentially residing on different machines. That’s why activity arguments and results should be serialized. Passing a stream to a remote process doesn’t make sense, so the error is expected.

So Java SDK does not support for reactive programming, right @maxim?

I’m not sure what reactive programming is and why you need it as part of a Temporal workflow. What is the actual problem you are trying to solve?

I’m trying to build workflows in Spring webflux, activity use WebClient to sent htttp request to client, data will response as reactor.core.publisher.Mono, it throws exception Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of reactor.core.publisher.Mono (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
So I think temporal-sdk cannot deserialize Mono

Cant use mono/flux for payloads (inputs and results). On client side you can wait for results async if needed.

Can I custom DataConverter to convert mono/flux to java object and set to WorkflowClient ?

Would probably not do that. Data converter is called within workflow task and to get value from mono you have to block. Workflow task have timeout.

Whats use case, can you maybe show example?

My workflow has an asynchronous activity completion to another client, but this client response too slow, so when a lot of request to workflow create hundreds thread, so I want to build workflow in Webflux, do you have any solution ?

You could invoke activity with short StartToClose timeout to notify client and then in workflow wait for completion (client would signal when completed instead of using activity completion client).

when a lot of request to workflow create hundreds thread, so I want to build workflow in Webflux, do you have any solution ?

Requests that workflow creates shouldn’t create threads unless you requested this. Could you post a snippet of your workflow code that creates threads?

Hi @maxim

CONTROLLER
@GetMapping("/flowNonBlocking")
public Mono<ActivityResult> flowNonBlocking() throws Exception {
	return senderService.nonBlocking();
}

SERVICE
public Mono<ActivityResult> nonBlocking() throws Exception {
	var workflowOptions = workflowUtils.getWorkflowOptions(AllFunction.TEST);
	var workflow = workflowUtils.buildWorkflow(TestWorkflow.class, workflowOptions);
	return workflow.nonBlocking();
}

WORKFLOW
public Mono<ActivityResult> nonBlocking() throws Exception {
	return mainActivities.deductHttp();
}

ACTIVITY
public Mono<ActivityResult> deductHttp() {
	return adapter.deductHttp("id1");
}

ADAPTER
public Mono<ActivityResult> deductHttp(String id) {
	var dto = ActivityRequest.builder()
			.transactionId("1111")
			.accountId("id1")
			.build();
	return webClient.post()
			.uri("/api/deduct")
			.bodyValue(dto)
			.retrieve()
			.bodyToMono(ActivityResult.class);
}

I just build a simple spring api, call http to get data from client, but client response too slow, so when I call this api 500 times per second, then will create 500 threads waiting, so i build spring webflux to reduce threads.

Use manual activity completion to execute activity asynchronously. I believe you can write an adapter to reactive framework to do this.

ACTIVITY
public ActivityResult deductHttp() {
        decuctHttp("id1");
        return null;
}

public void deductHttp(String id, ManualActivityCompletionClient client ) {
       ActivityExecutionContext context = Activity.getExecutionContext();
       ManualActivityCompletionClient client = context.useLocalManualCompletion(); 
	// call client.complete or client.fail from the HTTP request completion callback.
}

for invoking the workflow, your workflow method should appear blocking, return ActivityResult directly, not a mono.

SERVICE
public Mono<ActivityResult> nonBlocking() throws Exception {
	var workflowOptions = workflowUtils.getWorkflowOptions(AllFunction.TEST);
	var workflow = workflowUtils.buildWorkflow(TestWorkflow.class, workflowOptions);
	return workflow.nonBlocking();
}

WORKFLOW
public Mono<ActivityResult> nonBlocking() throws Exception {
	return mainActivities.deductHttp();
}

when invoking the workflow, bridge the two worlds like this:

public Mono<ActivityResult> nonBlocking() throws Exception {
	var workflowOptions = workflowUtils.getWorkflowOptions(AllFunction.TEST);
	var workflow = workflowUtils.buildWorkflow(TestWorkflow.class, workflowOptions);
	return Mono.fromFuture(WorkflowClient.execute(workflow::nonBlocking));
}

@peter_royal We don’t support passing Mono type objects as inputs and outputs of workflows and activities. So your sample is incorrect.

Hey guys. We’re using Kotlin coroutines to build non-thread blocking activity code, so maybe our experience could be helpful here (the method we use is exactly what @maxim described above, so I’m not really adding much here, this is just another sample to his post).

For Kotlin coroutines, we’re using a code pattern like the one below (the actual code is more complex due to various error-handling branches but this is the core piece)

fun activityMethod(): String {
    val activityCompletionClient = Activity.getExecutionContext().useLocalManualCompletion()
    launch {
        try {
            activityCompletionClient.complete(suspendingActivityMethod())
        } catch (e: Exception) {
            activityCompletionClient.fail(e)
        }
    }
    return ""
}

private suspend fun suspendingActivityMethod(): String {
    // … activity business logic that can use Kotlin coroutines for non-blocking operations
}

Perhaps a similar pattern can be used for building reactive activity code

public String activityMethod() {
    ActivityExecutionContext activityContext = Activity.getExecutionContext();
    ManualActivityCompletionClient activityCompletionClient = activityContext.useLocalManualCompletion();

    reactiveActivityMethod(activityContext)
            .subscribe(activityCompletionClient::complete, activityCompletionClient::fail);

    return null;
}

private Mono<String> reactiveActivityMethod(ActivityExecutionContext activityContext) {
    // … reactive activity code
}

I’m not an expert on using Reactor, so I bet the code above has to be extended to better handle exceptional cases, but hopefully, it should give a starting point. One important note - the reactiveActivityMethod in the example above doesn’t run on a Temporal-managed thread pool so the Activity.getExecutionContext() won’t work there, if you need activity context there it has to be propagated via an argument.

P.S. Regardless of how the activity is built, the workflow code should not be using coroutines, Monos, or anything else other than Temporal workflow primitives.

1 Like

ah, yes @maxim - cut and paste fail; the workflow should return ActivityResult and it can be turned into a Mono by creating one from the CompletableFuture returned from the WorkflowClient

1 Like

Thank you so much, I change workflow return object and turn it into Mono in service and it work well

SERVICE
public Mono<ActivityResult> nonBlocking(TransactionRequest dto) throws Exception {
   var workflowOptions = workflowUtils.getWorkflowOptions(AllFunction.BLOCKING);
   var workflow = workflowUtils.buildWorkflow(NonBlockingWorkflow.class, workflowOptions);
   return Mono.fromFuture(WorkflowClient.execute(() -> workflow.getDataNonBlocking(dto)));
}

Hi @maxim can I ask one more question?
With every workflow I create a worker

@Slf4j
@Component
@RequiredArgsConstructor
public class NonBlockingWorker {
    final WorkflowClient workflowClient;
    final WorkerFactoryOptions defaultWorkerFactoryOptions;
    final WorkerOptions defaultWorkerOptions;
    final WorkflowImplementationOptions defaultWorkflowImplementationOptions;
    final MainAdapter mainAdapter;

    @PostConstruct
    public void createWorker() {
        log.info("Registering NonBlockingWorker..");
        var workerFactory = WorkerFactory.newInstance(workflowClient, defaultWorkerFactoryOptions);
        var worker = workerFactory.newWorker(AllFunction.BLOCKING.name(), defaultWorkerOptions);

        var completionClient = workflowClient.newActivityCompletionClient();
        worker.registerWorkflowImplementationTypes(defaultWorkflowImplementationOptions, NonBlockingWorkflowImpl.class);
        worker.registerActivitiesImplementations(new MainActivitiesImpl(mainAdapter, completionClient));
        workerFactory.start();
    }
}

So this will be create multiple WorkerFactory, is it good or bad ?

This is wasteful as each factory will have its own cache and thread pools. Also, I wouldn’t create a worker per workflow, which is also very wasteful. Unless you have special requirements use a single worker for all workflows and activities in the process.