org.apache.kafka.clients.consumer.ConsumerRecords to workflow

Is it possible to pass org.apache.kafka.clients.consumer.ConsumerRecords directly to a Temporal workflow?

I’m reading messages from Kafka as Avro, and I already have services implemented that consume this format. I want to integrate Temporal by passing the Kafka message into a workflow. If I can pass the ConsumerRecords directly, the activity can reuse existing logic and reduce a lot of effort.

Is this possible? Anyone did this ?

Havent personally tried but if its serializable by jackson it should be ok. Did you try it out and ran into issues?

By default, ConsumerRecord<K, V> from Apache Kafka is not directly serializable by Jackson

Looks like it is not that simple as avro depends on schema.

You can write your own DataConverter to support Avro.

Trying getting some issues. Will look into.

Avro serialization and deserialization depends on schema registry. Do I need to add schema registry dependency in data converter?

Yes, I don’t see the other way. How does Kafka and other products do it?

This may be useful for someone. Can be integrated into an open-source project to serve as an Avro payload converter. It has been tested and works like a charm.

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ByteString;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.temporal.api.common.v1.Payload;
import io.temporal.common.converter.DataConverterException;
import io.temporal.common.converter.PayloadConverter;
import java.lang.reflect.Type;
import java.util.*;
import java.util.Base64;
import java.util.Optional;
import org.apache.avro.generic.GenericData;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;

public class AvroKafkaPayloadConverter implements PayloadConverter {

    private static final String ENCODING = "Kafka-avro";
    private final KafkaAvroDeserializer avroDeserializer;
    private final KafkaAvroSerializer avroSerializer;
    private final ObjectMapper mapper = new ObjectMapper();

    public AvroKafkaPayloadConverter(String schemaRegistryUrl) {
        Map<String, Object> config = Map.of("schema.registry.url", schemaRegistryUrl);
        this.avroDeserializer = new KafkaAvroDeserializer();
        this.avroDeserializer.configure(config, false);
        this.avroSerializer = new KafkaAvroSerializer();
        this.avroSerializer.configure(config, false);
    }

    @Override
    public Optional<Payload> toData(Object value) {
        try {
            if (value instanceof ConsumerRecord<?, ?> record) {
                return Optional.of(buildPayloadFromSingleRecord(record));
            }

            if (value instanceof ConsumerRecords<?, ?> records) {
                return Optional.of(buildPayloadFromRecords(records));
            }
        } catch (Exception e) {
            throw new DataConverterException("Failed to convert Kafka ConsumerRecord(s) to Payload", e);
        }
        return Optional.empty();
    }

    private Payload buildPayloadFromSingleRecord(ConsumerRecord<?, ?> record) throws Exception {
        byte[] keyBytes = avroSerializer.serialize(record.topic(), record.key());
        byte[] valueBytes = avroSerializer.serialize(record.topic(), record.value());

        Map<String, String> kvMap = Map.of("key", Base64.getEncoder()
                                                        .encodeToString(keyBytes),
                        "value", Base64.getEncoder()
                                       .encodeToString(valueBytes));

        byte[] jsonData = mapper.writeValueAsBytes(List.of(kvMap));

        return Payload.newBuilder()
                      .putMetadata("encoding", ByteString.copyFromUtf8(ENCODING))
                      .putMetadata("topic", ByteString.copyFromUtf8(record.topic()))
                      .putMetadata("record-type", ByteString.copyFromUtf8("single"))
                      .setData(ByteString.copyFrom(jsonData))
                      .build();
    }

    private Payload buildPayloadFromRecords(ConsumerRecords<?, ?> records) throws Exception {
        List<Map<String, String>> recordList = new ArrayList<>();
        String topic = null;

        for (ConsumerRecord<?, ?> record : records) {
            topic = record.topic();

            byte[] keyBytes = avroSerializer.serialize(topic, record.key());
            byte[] valueBytes = avroSerializer.serialize(topic, record.value());

            Map<String, String> kvMap = Map.of("key", Base64.getEncoder()
                                                            .encodeToString(keyBytes),
                            "value", Base64.getEncoder()
                                           .encodeToString(valueBytes));

            recordList.add(kvMap);
        }

        byte[] jsonData = mapper.writeValueAsBytes(recordList);

        return Payload.newBuilder()
                      .putMetadata("encoding", ByteString.copyFromUtf8(ENCODING))
                      .putMetadata("topic", ByteString.copyFromUtf8(topic != null ? topic : ""))
                      .putMetadata("record-type", ByteString.copyFromUtf8("batch"))
                      .setData(ByteString.copyFrom(jsonData))
                      .build();
    }

    @SuppressWarnings("unchecked")
    @Override
    public <T> T fromData(Payload payload, Class<T> type, Type genericType) {
        String encoding = payload.getMetadataOrDefault("encoding", ByteString.EMPTY)
                                 .toStringUtf8();
        if (!ENCODING.equals(encoding)) {
            return null;
        }

        String topic = payload.getMetadataOrDefault("topic", ByteString.EMPTY)
                              .toStringUtf8();
        String recordType = payload.getMetadataOrDefault("record-type", ByteString.EMPTY)
                                   .toStringUtf8();
        byte[] jsonData = payload.getData()
                                 .toByteArray();

        try {
            List<Map<String, String>> rawList = mapper.readValue(jsonData, new TypeReference<>() {});
            List<ConsumerRecord<GenericData.Record, GenericData.Record>> recordList = new ArrayList<>();

            for (Map<String, String> kv : rawList) {
                byte[] keyBytes = Base64.getDecoder()
                                        .decode(kv.get("key"));
                byte[] valueBytes = Base64.getDecoder()
                                          .decode(kv.get("value"));

                GenericData.Record key = (GenericData.Record) avroDeserializer.deserialize(topic, keyBytes);
                GenericData.Record value = (GenericData.Record) avroDeserializer.deserialize(topic, valueBytes);

                recordList.add(new ConsumerRecord<>(topic, 0, 0L, key, value));
            }

            if ("single".equals(recordType) && type.isAssignableFrom(ConsumerRecord.class)) {
                return type.cast(recordList.get(0));
            }

            if ("batch".equals(recordType) && type.isAssignableFrom(ConsumerRecords.class)) {
                TopicPartition tp = new TopicPartition(topic, 0);
                Map<TopicPartition, List<ConsumerRecord<GenericData.Record, GenericData.Record>>> map = Map.of(tp, recordList);
                return type.cast(new ConsumerRecords<>(map));
            }

        } catch (Exception e) {
            throw new DataConverterException("Failed to deserialize Payload into Kafka ConsumerRecords", e);
        }

        return null;
    }

    @Override
    public String getEncodingType() {
        return ENCODING;
    }

    public void close() {
        avroSerializer.close();
        avroDeserializer.close();
    }
}
1 Like