Workflow attempting deserialization when it shouldn't

I have a Vertx/Temporal setup where vertx receives Kafka records and throws them to workflows.

So, vertx maps the input to my model:

PortalZone zoneAfter = recordAfter.mapTo(PortalZone .class);

This is the first Deserialization trigger, and works correctly.
We are concerned here with the name object, which is the only property using the Deserializer.
The issue is, when the workflow is triggered, the input is received correctly (with a correctly deserialized name , but something behind the scenes is triggering a second deserialization which crashes and ends up overwriting the first one.

I ended up removing all my workflow code just to make sure it was not my code:

public String ProcessPortalZone(PortalZoneKafkaRecord zoneRecord) {
        return "Zone code: " + zoneRecord.zoneAfter.code + " | En Name: " + zoneRecord.zoneAfter.name.en + " | Ar Name: " + zoneRecord.zoneAfter.name.ar;
    }

Even with no logic in the workflow, it still triggers a second deserialization.
The only way to not trigger the second deserialization is to comment out my workflow entirely from the worker, which proves that the issue is related to temporal.

Screenshot of input/output of the workflow:

The second deserialization call happens some time between the workflow starting and ending, and sends an input of { to the deserializer which of course crashes it with this error:

com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input: expected close marker for Object (start marker at [Source: (String)"{"; line: 1, column: 1])

It seems that just passing the object to the workflow causes a deserialization to happen when the workflow starts.

It seems that just passing the object to the workflow causes a deserialization to happen when the workflow starts.

By default Temporal uses Jackson for serialization and deserialization of arguments.
When you request workflow execution via your client, and pass a data input, for example:

myworkflowStub.ProcessPortalZone(myZoneRecordObj);

as you have shown in the screenshot, the data input is deserialized using the default data converter (JacksonJsonPayloadConverter). Data converters in Temporal are fully pluggable.

When your workflow execution starts, this data input is serialized and passed as input to your workflow method.

Regarding the error, can you show the whole workflow history? If your workflow code invokes an activity, or a child workflow, the data input is going to be deserialized again and recorded in the workflow history.

The workflow currently is exactly like the code. It does nothing, just returns a string. It’s still triggering the second deserialization with an input of {

From your screenshot provided, it looks as the workflow execution completed with a result. Can you show at what point you are getting the jackson error?

Yes, the result is
return "Zone code: " + zoneRecord.zoneAfter.code + " | En Name: " + zoneRecord.zoneAfter.name.en + " | Ar Name: " + zoneRecord.zoneAfter.name.ar; Which is just printing the code and the deserialized name property. As I said, the workflow itself has no logic aside from the return, so I cannot pinpoint why it is attempting to deserialize it again. From the image in the original post, you can see that the workflow actually receives correctly deserialized input.

Ok so we are talking about deserialization of the workflow result right?
Workflow results are deserialized when in your client code you wait for results of workflow execution.

Sorry, when you say “request it in your client code” what exactly do you mean?

Do you mean that any time I try to access any input args to the workflow (in this case zoneRecord) that it will trigger a deserialization? Because this happens even if I don’t try to access any of the input args. The following will still trigger the second deserialization:

public String ProcessCountryPortalZone(CountryPortalZoneKafkaRecord zoneRecord) {
        return "hello";
}

For example, in your client code if you start workflow execution and wait for its results (same really if you invoke it async and then wait):

 MyWorkflow workflow = client.newWorkflowStub(
                MyWorkflow.class,
                WorkflowOptions.newBuilder()
                        .setWorkflowId(workflowId)
                        .setTaskQueue(taskQueue)
                        .build()
        );

        String result = workflow.execute(someInput);

In this case your workflow results (which you see noted in the workflow history) would be deserialized into the String type result.

Well I’m not calling doing either of those (I suppose it’s still async but my point was that I’m not trying to assign its result to a variable)

This is how I’m calling it (in a Vertx event handler)

WorkflowClient.start(workflow::ProcessPortalZone, zoneRecord);
LOG.debug(LogColor.PURPLE + "Portal Zones Workflow executed" + LogColor.PURPLE);
return Future.succeededFuture();

Is there any way I can disable deserialization in the workflow? I just want the name to be deserialized without doing it manually but if this behaviour can’t be changed I guess I have to bite the bullet and do it without using the Deserializer at all. Do you have any suggestions?

In this case JacksonJsonPayloadConverter is called two times. Once to serialize your input (zoneRecord) to Json (which you see as workflow input in the history) and then second time to deserialize the input when workflow execution starts (your @WorkflowMethod which takes in a type CountryPortalZoneKafkaRecord in its signature.

I don’t understand:

I just want the name to be deserialized

can you please elaborate?

Note that converters are pluggable and you can define your custom
PayloadConverter with your custom logic and configure the DefaultDataConverter with the list of converters that includes your custom PayloadConverter.

Okay so I took a look at the converters and this is all way over my head. I won’t be able to define custom payload converters.

and then second time to deserialize the input when workflow execution starts (your @WorkflowMethod which takes in a type CountryPortalZoneKafkaRecord in its signature.

Could you explain why the second call is breaking then? Because my input is not a {

As far as DataConverter goes, you can create your own or extend existing ones as well. See for example https://github.com/temporalio/samples-java/blob/main/src/main/java/io/temporal/samples/encryptedpayloads/CryptDataConverter.java

You can register your custom data converter via WorkflowClientOptions.

You can also replace payload converters for specific encoding type with for example:

DefaultDataConverter ddc =
        DefaultDataConverter.newDefaultInstance()
            .withPayloadConverterOverrides(myJacksonJsonPayloadConverter);
...
WorkflowClientOptions wco = WorkflowClientOptions.newBuilder().setDataConverter(ddc).build();

where myJacksonJsonPayloadConverter is your custom payload converter that you want to use the overwrite the default one. You can also set it up globally with
DefaultDataConverter.setDefaultDataConverter

Could you explain why the second call is breaking then? Because my input is not a {

I am not sure why this is happening. Can you provide your workflow history where this happens and a full stack trace? I think that would help.

There isn’t anything in the workflow history since the workflow is empty as I’ve shown before. I’m not hiding code or anything, it is literally just an empty workflow for now until I fix this. As for the stack trace, this is what happens when the deserializer crashes - it’s not very informative, at least nothing I can see. It’s basically just saying "I can’t deserialize { ".

com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input: expected close marker for Object (start marker at [Source: (String)"{"; line: 1, column: 1])
 at [Source: (String)"{"; line: 1, column: 2]
	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:662)
	at com.fasterxml.jackson.core.base.ParserBase._handleEOF(ParserBase.java:490)
	at com.fasterxml.jackson.core.base.ParserBase._eofAsNextChar(ParserBase.java:502)
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2390)
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:677)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:187)
	at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
	at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4593)
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3548)
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3516)
	at com.saifxhatem.models.NameDeserializer.deserialize(NameDeserializer.java:29)
	at com.saifxhatem.models.NameDeserializer(NameDeserializer.java:11)
	at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:324)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:187)
	at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:324)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:187)
	at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
	at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4593)
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3643)
	at io.temporal.common.converter.JacksonJsonPayloadConverter.fromData(JacksonJsonPayloadConverter.java:80)
	at io.temporal.common.converter.DefaultDataConverter.fromPayload(DefaultDataConverter.java:132)
	at io.temporal.common.converter.DataConverter.arrayFromPayloads(DataConverter.java:104)
	at io.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation.execute(POJOWorkflowImplementationFactory.java:288)
	at io.temporal.internal.sync.WorkflowExecuteRunnable.run(WorkflowExecuteRunnable.java:53)
	at io.temporal.internal.sync.SyncWorkflow.lambda$start$0(SyncWorkflow.java:131)
	at io.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:101)
	at io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper.run(WorkflowThreadImpl.java:110)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Have you tried debugging JacksonJsonPayloadConverter.java on line 80 and see the value of content.getData().toByteArray()?
I have hard time reproducing, tried multiple things without luck. If you are able to provide a small reproduce that would really help.