Hi Deserialization problem

Hi,
I am using using temporal.io Java SDK with Kotlin. I switched from cadence to Temporal which required me to upgrade Jackson. After upgrade I am noticing deserialization errors with classes. I had to add dummy constructors with data classes. Is there a place where data serialization libraries are set.

Errors looks something like this
com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class com.google.protobuf.UnknownFieldSet$Parser and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS)

Best

2 Likes

Switchin to Jackson was very common feature request as gson wasn’t actively developed.

I would recommend checking Jackson documentation for possible solutions.

If you want to change properties of Jackson serializer or even switch back to the gson one see the converter package.

1 Like

The current java-sdk API makes this a tad brittle, but the approach works for me:

    val mapper = ObjectMapper()

    // Configure the mapper just like the JacksonJsonPayloadConverter() constructor would
    mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
    mapper.registerModule(JavaTimeModule())
    mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)

    // Now add Kotlin support
    mapper.registerModule(KotlinModule())

    // And build the exact same thing as DefaultDataConverter::defaultDataConverterInstance
    val converter = new DefaultDataConverter(
            NullPayloadConverter(),
            ByteArrayPayloadConverter(),
            ProtobufJsonPayloadConverter(),
            JacksonJsonPayloadConverter(mapper))

    // Now create a workflow client options with it, and move along as before!
    val workflowClientOptions = WorkflowClientOptions.newBuilder()
                .setDataConverter(converter)
                .build();
3 Likes

Thank you @maxim and @shyamal.

I got it mostly working mainly forcing it to use similar to the one you have @shyamal.

It works great as long as I call the workflow function directly. The sticking point is the exception when I use

WorkflowClient.start(singerWorkflowImpl::singerWorkflow, singerConfig)

I am getting the following exceptio

“No serializer found for class com.google.protobuf.UnknownFieldSet$Parser and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: io.temporal.api.common.v1.WorkflowExecution["unknownFields"]->com.google.protobuf.UnknownFieldSet["parserForType"])”

Here is my workflow start code that i try to force using GsonJsonPayloadConverter() same error

val workflowOptions = WorkflowOptions.newBuilder()
.setWorkflowExecutionTimeout(Duration.ofDays(10000))
.setTaskQueue(“singer-activity-task-list”)
.setWorkflowIdReusePolicy(WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE)
.setWorkflowId(id)
.build()

    val service = WorkflowServiceStubs.newInstance();

    val wfc: WorkflowClient = WorkflowClient.newInstance(service,
            WorkflowClientOptions.newBuilder()
                    .setDataConverter(DefaultDataConverter(GsonJsonPayloadConverter()))
                    .setNamespace("singer-activity-namespace")
                    .build())

Do you have any idea what that exception is or what it is trying to do?

Best

My best guess is you only provided the Gson payload converter. Try adding the three other non-JSON payload converters first, and then your GsonJsonPayloadConverter when you build the DefaultDataConverter?

I’m a newbie at this stuff, but it is odd that you’re seeing a Jackson error, so I’m only guessing at this point. In my experience changing the SDK serialization strategy is best done very gently (do as little of it as you can).

Is there a reason you cannot use the default Jackson strategy, and just configure it with the KotlinModule (like in my code snippet)? Does that not work for you? Without the KotlinModule you will surely get deserialization errors for plain Kotlin classes.

You disabled all other converters and kept only Gson one. And it looks like you are trying to pass WorkflowExecution class as a parameter. As WorkflowExecution class is a protobuf the Gson doesn’t know how to serialize it. Make sure to keep Protobuf converter in the list of the converters.

thanks @maxim & @shyamal it works.

@shyamal
It does work with JacksonJsonPayloadConverter(mapper) as you described it. However it is having trouble handling interfaces. I managed to get it working by adding a custom module with the following code to handle my interfaces

    private val objectMapper: ObjectMapper = ObjectMapper()
    private val resolver = SimpleAbstractTypeResolver()
    .addMapping(SingerActivityCallBackInterface::class.java, SingerController.SingerActivityCallBack::class.java)
    private val module = SimpleModule("CustomModel", Version.unknownVersion())


objectMapper.registerModule(KotlinModule())
                .registerModule(JavaTimeModule())
                .registerModule(module)
                .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
                .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);

but i have quite a few interfaces and I did not want to couple the object mapper like this.
If you know of alternative let me know. GsonJsonPayloadConverter() works just fine for now, but I’d like to use Jackson for sure.

Thank you

One thing I notices is the Params and result of the work flow are not serialized! I have a string return type and I was expecting to see it in the front end. Could this be the same problem? For example

result should be a string.

Best

Is result empty in the exported history JSON? It might be UI issue.

@maxim

Correct error must be from the front end. The run history from the command line does show result payload there which was what i was expecting

10 WorkflowExecutionCompleted {Result:[[Mock Result:
0.891770742597598]],
WorkflowTaskCompletedEventId:9}

The JSON from the frontend UI still does not have result. so probably a bug

"workflowTaskCompletedEventId": "9",
      "eventId": "10",
      "eventType": "WorkflowExecutionCompleted",
      "kvps": [
        {
          "key": "result",
          "value": {
            "jsonStringDisplay": "{\n  \"payloads\": []\n}",
            "jsonStringFull": "{\n  \"payloads\": []\n}"
          }
        },
        {
          "key": "workflowTaskCompletedEventId",
          "value": "9"
        }

Related question! Is there any way to get this result within the workflow. i know if we use cron we can use Workflow.getLastCompletionResult. But i wanted to use periodic loop and wanted to get pervious result.

Thanks for the help

What UI/Service version are you using? I believe the latest release fixed a lot of UI related bugs.

If you are doing periodic loop using continue as new then you can pass the data from the previous run as arguments to the next one.

@maxim
I am using v0.28-web. If there is a new version i will update.

Thanks! You mean something like “pastResult” in this code. Will this be fault tolerant?

fun singerWorkflow(singerConfig: SingerConfig): List<String> {
    val pastResult = null

    // for scheduled workflow 
    if (singerConfig.schedule != null) {
        for (i in 0 until Companion.CONTINUE_AS_NEW_FREQUENCY) {
            pastResult = singerActivities.stub(pastResult = pastResult)
            Workflow.sleep(
                    Duration.of(singerConfig.schedule.gap,
                            ChronoUnit.valueOf(singerConfig.schedule.frequency.toString()))
            )
        }

        singerActivitiesContinue.singerWorkflow(singerConfig)
    }
    // if one off activity
    return singerActivities.stub()
}

I meant passing this data as an argument to the continue as new:

fun singerWorkflow(singerConfig: SingerConfig, pastResult: List<String>): List<String> {
    // for scheduled workflow 
    if (singerConfig.schedule != null) {
        for (i in 0 until Companion.CONTINUE_AS_NEW_FREQUENCY) {
            pastResult = singerActivities.stub(pastResult = pastResult)
            Workflow.sleep(
                    Duration.of(singerConfig.schedule.gap,
                            ChronoUnit.valueOf(singerConfig.schedule.frequency.toString()))
            )
        }

        singerActivitiesContinue.singerWorkflow(singerConfig, pastResult)
    }
    // if one off activity
    return singerActivities.stub()
}

Just a correction, there is a newer version 0.28.1 which has the fix for Payloads on Web UI

Ok great thank you @maxim did not realize that you could do that.

@Ruslan is there a docker release for it? If not i will just wait to upgrade, since this is on a kubernetes instance

Thank you

Yes, there is a docker release for 28.1.

This topic was automatically closed after 31 hours. New replies are no longer allowed.