From d915de4fd811265c8303cd61b10ac2cf9caea0ce Mon Sep 17 00:00:00 2001 From: Kostas Dizas <254960+kostasdizas@users.noreply.github.com> Date: Fri, 11 Aug 2023 09:47:28 +0100 Subject: [PATCH] Add protobuf raw message deserializer (#4041) Implemented a Protobuf Raw deserialiser that works like protoc --decode_raw. This is a no config alternative to the existing ProtobufFileSerde. Co-authored-by: Ilya Kuramshin --- .../kafka/ui/serdes/SerdesInitializer.java | 2 + .../ui/serdes/builtin/ProtobufRawSerde.java | 59 ++++++++++ .../serdes/builtin/ProtobufRawSerdeTest.java | 108 ++++++++++++++++++ 3 files changed, 169 insertions(+) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerde.java create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerdeTest.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java index c833d9fc726..6e28c2fdcfb 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java @@ -16,6 +16,7 @@ import com.provectus.kafka.ui.serdes.builtin.Int32Serde; import com.provectus.kafka.ui.serdes.builtin.Int64Serde; import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde; +import com.provectus.kafka.ui.serdes.builtin.ProtobufRawSerde; import com.provectus.kafka.ui.serdes.builtin.StringSerde; import com.provectus.kafka.ui.serdes.builtin.UInt32Serde; import com.provectus.kafka.ui.serdes.builtin.UInt64Serde; @@ -50,6 +51,7 @@ public SerdesInitializer() { .put(Base64Serde.name(), Base64Serde.class) .put(HexSerde.name(), HexSerde.class) .put(UuidBinarySerde.name(), UuidBinarySerde.class) + .put(ProtobufRawSerde.name(), ProtobufRawSerde.class) .build(), new CustomSerdeLoader() ); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerde.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerde.java new file mode 100644 index 00000000000..221b8b5ea53 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerde.java @@ -0,0 +1,59 @@ +package com.provectus.kafka.ui.serdes.builtin; + +import com.google.protobuf.UnknownFieldSet; +import com.provectus.kafka.ui.exception.ValidationException; +import com.provectus.kafka.ui.serde.api.DeserializeResult; +import com.provectus.kafka.ui.serde.api.RecordHeaders; +import com.provectus.kafka.ui.serde.api.SchemaDescription; +import com.provectus.kafka.ui.serdes.BuiltInSerde; +import java.util.Map; +import java.util.Optional; +import lombok.SneakyThrows; + +public class ProtobufRawSerde implements BuiltInSerde { + + public static String name() { + return "ProtobufDecodeRaw"; + } + + @Override + public Optional getDescription() { + return Optional.empty(); + } + + @Override + public Optional getSchema(String topic, Target type) { + return Optional.empty(); + } + + @Override + public boolean canSerialize(String topic, Target type) { + return false; + } + + @Override + public boolean canDeserialize(String topic, Target type) { + return true; + } + + @Override + public Serializer serializer(String topic, Target type) { + throw new UnsupportedOperationException(); + } + + @Override + public Deserializer deserializer(String topic, Target type) { + return new Deserializer() { + @SneakyThrows + @Override + public DeserializeResult deserialize(RecordHeaders headers, byte[] data) { + try { + UnknownFieldSet unknownFields = UnknownFieldSet.parseFrom(data); + return new DeserializeResult(unknownFields.toString(), DeserializeResult.Type.STRING, Map.of()); + } catch (Exception e) { + throw new ValidationException(e.getMessage()); + } + } + }; + } +} \ No newline at end of file diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerdeTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerdeTest.java new file mode 100644 index 00000000000..a71e9969a84 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerdeTest.java @@ -0,0 +1,108 @@ +package com.provectus.kafka.ui.serdes.builtin; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.provectus.kafka.ui.exception.ValidationException; +import com.provectus.kafka.ui.serde.api.Serde; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class ProtobufRawSerdeTest { + + private static final String DUMMY_TOPIC = "dummy-topic"; + + private ProtobufRawSerde serde; + + @BeforeEach + void init() { + serde = new ProtobufRawSerde(); + } + + @SneakyThrows + ProtobufSchema getSampleSchema() { + return new ProtobufSchema( + """ + syntax = "proto3"; + message Message1 { + int32 my_field = 1; + } + """ + ); + } + + @SneakyThrows + private byte[] getProtobufMessage() { + DynamicMessage.Builder builder = DynamicMessage.newBuilder(getSampleSchema().toDescriptor("Message1")); + builder.setField(builder.getDescriptorForType().findFieldByName("my_field"), 5); + return builder.build().toByteArray(); + } + + @Test + void deserializeSimpleMessage() { + var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE) + .deserialize(null, getProtobufMessage()); + assertThat(deserialized.getResult()).isEqualTo("1: 5\n"); + } + + @Test + void deserializeEmptyMessage() { + var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE) + .deserialize(null, new byte[0]); + assertThat(deserialized.getResult()).isEqualTo(""); + } + + @Test + void deserializeInvalidMessage() { + var deserializer = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE); + assertThatThrownBy(() -> deserializer.deserialize(null, new byte[] { 1, 2, 3 })) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Protocol message contained an invalid tag"); + } + + @Test + void deserializeNullMessage() { + var deserializer = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE); + assertThatThrownBy(() -> deserializer.deserialize(null, null)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Cannot read the array length"); + } + + ProtobufSchema getSampleNestedSchema() { + return new ProtobufSchema( + """ + syntax = "proto3"; + message Message2 { + int32 my_nested_field = 1; + } + message Message1 { + int32 my_field = 1; + Message2 my_nested_message = 2; + } + """ + ); + } + + @SneakyThrows + private byte[] getComplexProtobufMessage() { + DynamicMessage.Builder builder = DynamicMessage.newBuilder(getSampleNestedSchema().toDescriptor("Message1")); + builder.setField(builder.getDescriptorForType().findFieldByName("my_field"), 5); + DynamicMessage.Builder nestedBuilder = DynamicMessage.newBuilder(getSampleNestedSchema().toDescriptor("Message2")); + nestedBuilder.setField(nestedBuilder.getDescriptorForType().findFieldByName("my_nested_field"), 10); + builder.setField(builder.getDescriptorForType().findFieldByName("my_nested_message"), nestedBuilder.build()); + + return builder.build().toByteArray(); + } + + @Test + void deserializeNestedMessage() { + var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE) + .deserialize(null, getComplexProtobufMessage()); + assertThat(deserialized.getResult()).isEqualTo("1: 5\n2: {\n 1: 10\n}\n"); + } +} \ No newline at end of file