This may be useful for someone. Can be integrated into an open-source project to serve as an Avro payload converter. It has been tested and works like a charm.
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ByteString;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.temporal.api.common.v1.Payload;
import io.temporal.common.converter.DataConverterException;
import io.temporal.common.converter.PayloadConverter;
import java.lang.reflect.Type;
import java.util.*;
import java.util.Base64;
import java.util.Optional;
import org.apache.avro.generic.GenericData;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
public class AvroKafkaPayloadConverter implements PayloadConverter {
private static final String ENCODING = "Kafka-avro";
private final KafkaAvroDeserializer avroDeserializer;
private final KafkaAvroSerializer avroSerializer;
private final ObjectMapper mapper = new ObjectMapper();
public AvroKafkaPayloadConverter(String schemaRegistryUrl) {
Map<String, Object> config = Map.of("schema.registry.url", schemaRegistryUrl);
this.avroDeserializer = new KafkaAvroDeserializer();
this.avroDeserializer.configure(config, false);
this.avroSerializer = new KafkaAvroSerializer();
this.avroSerializer.configure(config, false);
}
@Override
public Optional<Payload> toData(Object value) {
try {
if (value instanceof ConsumerRecord<?, ?> record) {
return Optional.of(buildPayloadFromSingleRecord(record));
}
if (value instanceof ConsumerRecords<?, ?> records) {
return Optional.of(buildPayloadFromRecords(records));
}
} catch (Exception e) {
throw new DataConverterException("Failed to convert Kafka ConsumerRecord(s) to Payload", e);
}
return Optional.empty();
}
private Payload buildPayloadFromSingleRecord(ConsumerRecord<?, ?> record) throws Exception {
byte[] keyBytes = avroSerializer.serialize(record.topic(), record.key());
byte[] valueBytes = avroSerializer.serialize(record.topic(), record.value());
Map<String, String> kvMap = Map.of("key", Base64.getEncoder()
.encodeToString(keyBytes),
"value", Base64.getEncoder()
.encodeToString(valueBytes));
byte[] jsonData = mapper.writeValueAsBytes(List.of(kvMap));
return Payload.newBuilder()
.putMetadata("encoding", ByteString.copyFromUtf8(ENCODING))
.putMetadata("topic", ByteString.copyFromUtf8(record.topic()))
.putMetadata("record-type", ByteString.copyFromUtf8("single"))
.setData(ByteString.copyFrom(jsonData))
.build();
}
private Payload buildPayloadFromRecords(ConsumerRecords<?, ?> records) throws Exception {
List<Map<String, String>> recordList = new ArrayList<>();
String topic = null;
for (ConsumerRecord<?, ?> record : records) {
topic = record.topic();
byte[] keyBytes = avroSerializer.serialize(topic, record.key());
byte[] valueBytes = avroSerializer.serialize(topic, record.value());
Map<String, String> kvMap = Map.of("key", Base64.getEncoder()
.encodeToString(keyBytes),
"value", Base64.getEncoder()
.encodeToString(valueBytes));
recordList.add(kvMap);
}
byte[] jsonData = mapper.writeValueAsBytes(recordList);
return Payload.newBuilder()
.putMetadata("encoding", ByteString.copyFromUtf8(ENCODING))
.putMetadata("topic", ByteString.copyFromUtf8(topic != null ? topic : ""))
.putMetadata("record-type", ByteString.copyFromUtf8("batch"))
.setData(ByteString.copyFrom(jsonData))
.build();
}
@SuppressWarnings("unchecked")
@Override
public <T> T fromData(Payload payload, Class<T> type, Type genericType) {
String encoding = payload.getMetadataOrDefault("encoding", ByteString.EMPTY)
.toStringUtf8();
if (!ENCODING.equals(encoding)) {
return null;
}
String topic = payload.getMetadataOrDefault("topic", ByteString.EMPTY)
.toStringUtf8();
String recordType = payload.getMetadataOrDefault("record-type", ByteString.EMPTY)
.toStringUtf8();
byte[] jsonData = payload.getData()
.toByteArray();
try {
List<Map<String, String>> rawList = mapper.readValue(jsonData, new TypeReference<>() {});
List<ConsumerRecord<GenericData.Record, GenericData.Record>> recordList = new ArrayList<>();
for (Map<String, String> kv : rawList) {
byte[] keyBytes = Base64.getDecoder()
.decode(kv.get("key"));
byte[] valueBytes = Base64.getDecoder()
.decode(kv.get("value"));
GenericData.Record key = (GenericData.Record) avroDeserializer.deserialize(topic, keyBytes);
GenericData.Record value = (GenericData.Record) avroDeserializer.deserialize(topic, valueBytes);
recordList.add(new ConsumerRecord<>(topic, 0, 0L, key, value));
}
if ("single".equals(recordType) && type.isAssignableFrom(ConsumerRecord.class)) {
return type.cast(recordList.get(0));
}
if ("batch".equals(recordType) && type.isAssignableFrom(ConsumerRecords.class)) {
TopicPartition tp = new TopicPartition(topic, 0);
Map<TopicPartition, List<ConsumerRecord<GenericData.Record, GenericData.Record>>> map = Map.of(tp, recordList);
return type.cast(new ConsumerRecords<>(map));
}
} catch (Exception e) {
throw new DataConverterException("Failed to deserialize Payload into Kafka ConsumerRecords", e);
}
return null;
}
@Override
public String getEncodingType() {
return ENCODING;
}
public void close() {
avroSerializer.close();
avroDeserializer.close();
}
}