diff --git a/api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java b/api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java index e7abbad90..94c72fa6b 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java +++ b/api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java @@ -5,7 +5,7 @@ import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -20,18 +20,23 @@ public ProducerRecord create(String topic, @Nullable String key, @Nullable String value, @Nullable Map headers) { + + Headers kafkaHeaders = createHeaders(headers); + return new ProducerRecord<>( topic, partition, - key == null ? null : keySerializer.serialize(key), - value == null ? null : valuesSerializer.serialize(value), - headers == null ? null : createHeaders(headers) + key == null ? null : keySerializer.serialize(key, kafkaHeaders), + value == null ? null : valuesSerializer.serialize(value, kafkaHeaders), + kafkaHeaders ); } - private Iterable
createHeaders(Map clientHeaders) { + private Headers createHeaders(Map clientHeaders) { RecordHeaders headers = new RecordHeaders(); - clientHeaders.forEach((k, v) -> headers.add(new RecordHeader(k, v == null ? null : v.getBytes()))); + if (clientHeaders != null) { + clientHeaders.forEach((k, v) -> headers.add(new RecordHeader(k, v == null ? null : v.getBytes()))); + } return headers; } diff --git a/api/src/main/java/io/kafbat/ui/serdes/SerdeInstance.java b/api/src/main/java/io/kafbat/ui/serdes/SerdeInstance.java index b0fdc9834..0aac6f1fa 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/SerdeInstance.java +++ b/api/src/main/java/io/kafbat/ui/serdes/SerdeInstance.java @@ -10,6 +10,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.header.Headers; @Slf4j @RequiredArgsConstructor @@ -80,7 +81,17 @@ public boolean canDeserialize(String topic, Serde.Target type) { public Serde.Serializer serializer(String topic, Serde.Target type) { return wrapWithClassloader(() -> { var serializer = serde.serializer(topic, type); - return input -> wrapWithClassloader(() -> serializer.serialize(input)); + return new Serde.Serializer() { + @Override + public byte[] serialize(String input) { + return wrapWithClassloader(() -> serializer.serialize(input)); + } + + @Override + public byte[] serialize(String input, Headers headers) { + return wrapWithClassloader(() -> serializer.serialize(input, headers)); + } + }; }); } diff --git a/serde-api/build.gradle b/serde-api/build.gradle index 7268a37b7..913116e3b 100644 --- a/serde-api/build.gradle +++ b/serde-api/build.gradle @@ -15,6 +15,10 @@ tasks.register('javadocJar', Jar) { from javadoc.destinationDir } +dependencies { + implementation libs.kafka.clients +} + artifacts { archives sourceJar, javadocJar } diff --git a/serde-api/src/main/java/io/kafbat/ui/serde/api/Serde.java b/serde-api/src/main/java/io/kafbat/ui/serde/api/Serde.java index 24a0f6f87..b9f812b62 100644 --- a/serde-api/src/main/java/io/kafbat/ui/serde/api/Serde.java +++ b/serde-api/src/main/java/io/kafbat/ui/serde/api/Serde.java @@ -2,6 +2,7 @@ import java.io.Closeable; import java.util.Optional; +import org.apache.kafka.common.header.Headers; /** * Main interface of serialization/deserialization logic. @@ -121,6 +122,10 @@ interface Serializer { * @return serialized bytes. Can be null if input is null or empty string. */ byte[] serialize(String input); + + default byte[] serialize(String input, Headers headers) { + return serialize(input); + } } /**