Hi,
I am using using temporal.io Java SDK with Kotlin. I switched from cadence to Temporal which required me to upgrade Jackson. After upgrade I am noticing deserialization errors with classes. I had to add dummy constructors with data classes. Is there a place where data serialization libraries are set.
Errors looks something like this
com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class com.google.protobuf.UnknownFieldSet$Parser and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS)
The current java-sdk API makes this a tad brittle, but the approach works for me:
val mapper = ObjectMapper()
// Configure the mapper just like the JacksonJsonPayloadConverter() constructor would
mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
mapper.registerModule(JavaTimeModule())
mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
// Now add Kotlin support
mapper.registerModule(KotlinModule())
// And build the exact same thing as DefaultDataConverter::defaultDataConverterInstance
val converter = new DefaultDataConverter(
NullPayloadConverter(),
ByteArrayPayloadConverter(),
ProtobufJsonPayloadConverter(),
JacksonJsonPayloadConverter(mapper))
// Now create a workflow client options with it, and move along as before!
val workflowClientOptions = WorkflowClientOptions.newBuilder()
.setDataConverter(converter)
.build();
“No serializer found for class com.google.protobuf.UnknownFieldSet$Parser and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: io.temporal.api.common.v1.WorkflowExecution["unknownFields"]->com.google.protobuf.UnknownFieldSet["parserForType"])”
Here is my workflow start code that i try to force using GsonJsonPayloadConverter() same error
val workflowOptions = WorkflowOptions.newBuilder()
.setWorkflowExecutionTimeout(Duration.ofDays(10000))
.setTaskQueue(“singer-activity-task-list”)
.setWorkflowIdReusePolicy(WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE)
.setWorkflowId(id)
.build()
val service = WorkflowServiceStubs.newInstance();
val wfc: WorkflowClient = WorkflowClient.newInstance(service,
WorkflowClientOptions.newBuilder()
.setDataConverter(DefaultDataConverter(GsonJsonPayloadConverter()))
.setNamespace("singer-activity-namespace")
.build())
Do you have any idea what that exception is or what it is trying to do?
My best guess is you only provided the Gson payload converter. Try adding the three other non-JSON payload converters first, and then your GsonJsonPayloadConverter when you build the DefaultDataConverter?
I’m a newbie at this stuff, but it is odd that you’re seeing a Jackson error, so I’m only guessing at this point. In my experience changing the SDK serialization strategy is best done very gently (do as little of it as you can).
Is there a reason you cannot use the default Jackson strategy, and just configure it with the KotlinModule (like in my code snippet)? Does that not work for you? Without the KotlinModule you will surely get deserialization errors for plain Kotlin classes.
You disabled all other converters and kept only Gson one. And it looks like you are trying to pass WorkflowExecution class as a parameter. As WorkflowExecution class is a protobuf the Gson doesn’t know how to serialize it. Make sure to keep Protobuf converter in the list of the converters.
@shyamal
It does work with JacksonJsonPayloadConverter(mapper) as you described it. However it is having trouble handling interfaces. I managed to get it working by adding a custom module with the following code to handle my interfaces
private val objectMapper: ObjectMapper = ObjectMapper()
private val resolver = SimpleAbstractTypeResolver()
.addMapping(SingerActivityCallBackInterface::class.java, SingerController.SingerActivityCallBack::class.java)
private val module = SimpleModule("CustomModel", Version.unknownVersion())
objectMapper.registerModule(KotlinModule())
.registerModule(JavaTimeModule())
.registerModule(module)
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
but i have quite a few interfaces and I did not want to couple the object mapper like this.
If you know of alternative let me know. GsonJsonPayloadConverter() works just fine for now, but I’d like to use Jackson for sure.
One thing I notices is the Params and result of the work flow are not serialized! I have a string return type and I was expecting to see it in the front end. Could this be the same problem? For example
Related question! Is there any way to get this result within the workflow. i know if we use cron we can use Workflow.getLastCompletionResult. But i wanted to use periodic loop and wanted to get pervious result.
Thanks! You mean something like “pastResult” in this code. Will this be fault tolerant?
fun singerWorkflow(singerConfig: SingerConfig): List<String> {
val pastResult = null
// for scheduled workflow
if (singerConfig.schedule != null) {
for (i in 0 until Companion.CONTINUE_AS_NEW_FREQUENCY) {
pastResult = singerActivities.stub(pastResult = pastResult)
Workflow.sleep(
Duration.of(singerConfig.schedule.gap,
ChronoUnit.valueOf(singerConfig.schedule.frequency.toString()))
)
}
singerActivitiesContinue.singerWorkflow(singerConfig)
}
// if one off activity
return singerActivities.stub()
}
I meant passing this data as an argument to the continue as new:
fun singerWorkflow(singerConfig: SingerConfig, pastResult: List<String>): List<String> {
// for scheduled workflow
if (singerConfig.schedule != null) {
for (i in 0 until Companion.CONTINUE_AS_NEW_FREQUENCY) {
pastResult = singerActivities.stub(pastResult = pastResult)
Workflow.sleep(
Duration.of(singerConfig.schedule.gap,
ChronoUnit.valueOf(singerConfig.schedule.frequency.toString()))
)
}
singerActivitiesContinue.singerWorkflow(singerConfig, pastResult)
}
// if one off activity
return singerActivities.stub()
}