Hi,
I am using the MDCContextPropagator the comments in the Javadocs for ContextPropagator
.
In that class you are always trying to force the types to String.class
which works for regular objects, but it looks like the data values can be JSON objects?
I see you do register a JSonPayloadConverter which handles JSON types, but the code is dying trying to cast the JSON object to String.class.
So, I don’t know if I can or need to check each mime/type of the incoming values and then call the converter myself.
The Global
converter has a bunch of default converters by type, so it is finding the right converter, just failing on the casting.
I tried Object.class
for both type params but that doesn’t compile, so I’m not sure how to handle mixed types.
I printed out the context params coming into my converter and it is dying on the first one which is put the from the open-telemetry/tracing
bridge?
It dies right there and doesn’t process the rest of the context.
This is now coming in the MDC where before it wasn’t but is killing the GlobalDataConverter with the JSON_OBJECT not expected…
KEY => _tracer-data
PAYLOAD => metadata {
key: "encoding"
value: "json/plain"
}
data: "{\"traceparent\":\"00-310dffda8b906e0ea4b21297a22812e3-a7f3aa3e457d7e25-01\"}"```
This is my MDCContextPropagator I am using. I modified it from the original one in the java-sdk comments in the io.temporal.common.context.ContextPropagator
class to handle ‘empty’ json objects.
We were getting empty objects trying to turn on open-telemetry.
/**
* A {@link ContextPropagator} implementation that propagates the SLF4J MDC
* (Mapped Diagnostic Context) across Temporal workflow and activity boundaries.
* This class ensures that MDC entries with keys starting with "X-" are
* propagated.
*/
public class MDCContextPropagator implements ContextPropagator {
public MDCContextPropagator() {
super();
}
/**
* Gets the name of the context propagator.
*
* @return the name of the context propagator, which is the fully qualified
* class name.
*/
@Override
public String getName() {
return this.getClass().getName();
}
/**
* Retrieves the current MDC context to be propagated.
*
* @return a map containing the current MDC context, filtered to include only
* entries with keys starting with "X-".
*/
@Override
public Object getCurrentContext() {
Map<String, String> context = new HashMap<>();
Map<String, String> mdcContext = MDC.getCopyOfContextMap();
if (mdcContext != null) {
mdcContext.entrySet().stream()
.filter(entry -> entry.getKey().startsWith("X-"))
.forEach(entry -> context.put(entry.getKey(), entry.getValue()));
}
return context;
}
/**
* Sets the current MDC context from the given context map.
*
* @param context the context map containing MDC entries to be set.
*/
@Override
public void setCurrentContext(Object context) {
if (context instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, String> contextMap = (Map<String, String>) context;
contextMap.forEach(MDC::put);
}
}
/**
* Serializes the given context map to a map of Payloads.
*
* @param context the context map containing MDC entries to be serialized.
* @return a map of Payloads representing the serialized context.
*/
@Override
public Map<String, Payload> serializeContext(Object context) {
if (!(context instanceof Map)) {
return new HashMap<>();
}
@SuppressWarnings("unchecked")
Map<String, String> contextMap = (Map<String, String>) context;
Map<String, Payload> serializedContext = new HashMap<>();
contextMap.forEach((key, value) -> GlobalDataConverter.get().toPayload(value)
.ifPresent(payload -> serializedContext.put(key, payload)));
return serializedContext;
}
/**
* Deserializes the given map of Payloads to a context map.
*
* @param context the map of Payloads to be deserialized.
* @return a context map containing the deserialized MDC entries.
*/
@Override
public Object deserializeContext(Map<String, Payload> context) {
Map<String, String> contextMap = new HashMap<>();
context.forEach((key, payload) -> {
// Handle empty {} when the data value is empty
// Adding opentracing seems to add a new value with empty data
// and the dataconverter throws an error
//
// {_tracer-data=metadata {
// key: "encoding"
// value: "json/plain"
// }
// data: "{}"
// }
String payloadValue = ""; // default value
// Convert data to string to compare
ByteString data = payload.getData();
// Check the value to see if it "empty"
if (data != null && !data.isEmpty()) {
// Convert to string
String theData = data.toStringUtf8();
// Check if the value isn't {}'s
if (!theData.equals("{}")) {
payloadValue = GlobalDataConverter.get().fromPayload(payload, String.class, String.class);
}
}
// Add the value into the map
contextMap.put(key, payloadValue);
});
return contextMap;
}
}
This is the error:
Caused by: io.temporal.common.converter.DataConverterException: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.lang.String` from Object value (token `JsonToken.START_OBJECT`)
at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 1]
at io.temporal.common.converter.JacksonJsonPayloadConverter.fromData(JacksonJsonPayloadConverter.java:101)
at io.temporal.common.converter.PayloadAndFailureDataConverter.fromPayload(PayloadAndFailureDataConverter.java:95)
at io.temporal.common.converter.DefaultDataConverter.fromPayload(DefaultDataConverter.java:33)
at com.melloware.petstore.common.utils.MDCContextPropagator.lambda$deserializeContext$4(MDCContextPropagator.java:125)
at java.base/java.util.HashMap.forEach(HashMap.java:1429)
at com.melloware.petstore.common.utils.MDCContextPropagator.deserializeContext(MDCContextPropagator.java:97)
at io.temporal.internal.sync.SyncWorkflowContext.getPropagatedContexts(SyncWorkflowContext.java:1286)
at io.temporal.internal.sync.DeterministicRunnerImpl.getPropagatedContexts(DeterministicRunnerImpl.java:622)
at io.temporal.internal.sync.DeterministicRunnerImpl.newRootThread(DeterministicRunnerImpl.java:463)
at io.temporal.internal.sync.DeterministicRunnerImpl.runUntilAllBlocked(DeterministicRunnerImpl.java:187)
at io.temporal.internal.sync.SyncWorkflow.eventLoop(SyncWorkflow.java:206)
at io.temporal.internal.replay.ReplayWorkflowExecutor.eventLoop(ReplayWorkflowExecutor.java:105)
at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler$StatesMachinesCallbackImpl.eventLoop(ReplayWorkflowRunTaskHandler.java:410)
at io.temporal.internal.statemachines.WorkflowStateMachines.eventLoop(WorkflowStateMachines.java:721)
at io.temporal.internal.statemachines.WorkflowStateMachines.access$700(WorkflowStateMachines.java:52)
at io.temporal.internal.statemachines.WorkflowStateMachines$WorkflowTaskCommandsListener.workflowTaskStarted(WorkflowStateMachines.java:1239)
at io.temporal.internal.statemachines.WorkflowTaskStateMachine.handleCompleted(WorkflowTaskStateMachine.java:139)
at io.temporal.internal.statemachines.WorkflowTaskStateMachine.handleStarted(WorkflowTaskStateMachine.java:129)
at io.temporal.internal.statemachines.FixedTransitionAction.apply(FixedTransitionAction.java:46)