From 2ad229439eeca7cba8f67d9826d69e68792905d4 Mon Sep 17 00:00:00 2001 From: kostasdizas <254960+kostasdizas@users.noreply.github.com> Date: Mon, 17 Jul 2023 22:30:37 +0100 Subject: [PATCH 1/4] Add protobuf raw message deserializer --- .../kafka/ui/serdes/SerdesInitializer.java | 2 + .../ui/serdes/builtin/ProtobufRawSerde.java | 59 +++++++ .../serdes/builtin/ProtobufRawSerdeTest.java | 150 ++++++++++++++++++ 3 files changed, 211 insertions(+) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerde.java create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerdeTest.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java index ac3c2241cfc..b60a1f7a983 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java @@ -15,6 +15,7 @@ import com.provectus.kafka.ui.serdes.builtin.Int32Serde; import com.provectus.kafka.ui.serdes.builtin.Int64Serde; import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde; +import com.provectus.kafka.ui.serdes.builtin.ProtobufRawSerde; import com.provectus.kafka.ui.serdes.builtin.StringSerde; import com.provectus.kafka.ui.serdes.builtin.UInt32Serde; import com.provectus.kafka.ui.serdes.builtin.UInt64Serde; @@ -48,6 +49,7 @@ public SerdesInitializer() { .put(AvroEmbeddedSerde.name(), AvroEmbeddedSerde.class) .put(Base64Serde.name(), Base64Serde.class) .put(UuidBinarySerde.name(), UuidBinarySerde.class) + .put(ProtobufRawSerde.name(), ProtobufRawSerde.class) .build(), new CustomSerdeLoader() ); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerde.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerde.java new file mode 100644 index 00000000000..1cefee65d3e --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerde.java @@ -0,0 +1,59 @@ +package com.provectus.kafka.ui.serdes.builtin; + +import com.google.protobuf.UnknownFieldSet; +import com.provectus.kafka.ui.exception.ValidationException; +import com.provectus.kafka.ui.serde.api.DeserializeResult; +import com.provectus.kafka.ui.serde.api.RecordHeaders; +import com.provectus.kafka.ui.serde.api.SchemaDescription; +import com.provectus.kafka.ui.serdes.BuiltInSerde; +import java.util.Map; +import java.util.Optional; +import lombok.SneakyThrows; + +public class ProtobufRawSerde implements BuiltInSerde { + + public static String name() { + return "ProtobufRaw"; + } + + @Override + public Optional getDescription() { + return Optional.empty(); + } + + @Override + public Optional getSchema(String topic, Target type) { + return Optional.empty(); + } + + @Override + public boolean canSerialize(String topic, Target type) { + return false; + } + + @Override + public boolean canDeserialize(String topic, Target type) { + return true; + } + + @Override + public Serializer serializer(String topic, Target type) { + throw new UnsupportedOperationException(); + } + + @Override + public Deserializer deserializer(String topic, Target type) { + return new Deserializer() { + @SneakyThrows + @Override + public DeserializeResult deserialize(RecordHeaders headers, byte[] data) { + try { + UnknownFieldSet unknownFields = UnknownFieldSet.parseFrom(data); + return new DeserializeResult(unknownFields.toString(), DeserializeResult.Type.STRING, Map.of()); + } catch (Exception e) { + throw new ValidationException(e.getMessage()); + } + } + }; + } +} \ No newline at end of file diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerdeTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerdeTest.java new file mode 100644 index 00000000000..5225b2dfaac --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerdeTest.java @@ -0,0 +1,150 @@ +package com.provectus.kafka.ui.serdes.builtin; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.provectus.kafka.ui.exception.ValidationException; +import com.provectus.kafka.ui.serde.api.Serde; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class ProtobufRawSerdeTest { + + private static final String DUMMY_TOPIC = "dummy-topic"; + + private ProtobufRawSerde serde; + + @BeforeEach + void init() { + serde = new ProtobufRawSerde(); + } + + @SneakyThrows + Descriptors.Descriptor getDescriptor() { + DescriptorProtos.FileDescriptorProto fileDescriptorProto = DescriptorProtos.FileDescriptorProto + .newBuilder() + .setSyntax("proto3") + .setName("test.proto") + .addMessageType( + DescriptorProtos.DescriptorProto.newBuilder() + .setName("MyMessage") + .addField( + DescriptorProtos.FieldDescriptorProto.newBuilder() + .setName("my_field") + .setNumber(1) + .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT32) + .build() + ) + .build() + ) + .build(); + + Descriptors.FileDescriptor fileDescriptor = Descriptors.FileDescriptor.buildFrom( + fileDescriptorProto, new Descriptors.FileDescriptor[0]); + + return fileDescriptor.findMessageTypeByName("MyMessage"); + } + + @SneakyThrows + private byte[] getProtobufMessage() { + DynamicMessage.Builder builder = DynamicMessage.newBuilder(getDescriptor()); + builder.setField(builder.getDescriptorForType().findFieldByName("my_field"), 5); + return builder.build().toByteArray(); + } + + @Test + void deserializeSimpleMessage() { + var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE) + .deserialize(null, getProtobufMessage()); + assertThat(deserialized.getResult()).isEqualTo("1: 5\n"); + } + + @Test + void deserializeEmptyMessage() { + var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE) + .deserialize(null, new byte[0]); + assertThat(deserialized.getResult()).isEqualTo(""); + } + + @Test + void deserializeInvalidMessage() { + var deserializer = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE); + assertThatThrownBy(() -> deserializer.deserialize(null, new byte[] { 1, 2, 3 })) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Protocol message contained an invalid tag"); + } + + @Test + void deserializeNullMessage() { + var deserializer = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE); + assertThatThrownBy(() -> deserializer.deserialize(null, null)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Cannot read the array length"); + } + + @SneakyThrows + private byte[] getComplexProtobufMessage() { + DescriptorProtos.FileDescriptorProto fileDescriptorProto = DescriptorProtos.FileDescriptorProto + .newBuilder() + .setSyntax("proto3") + .setName("test.proto") + .addMessageType( + DescriptorProtos.DescriptorProto.newBuilder() + .setName("MyMessage") + .addField( + DescriptorProtos.FieldDescriptorProto.newBuilder() + .setName("my_field") + .setNumber(1) + .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT32) + .build() + ) + .addField( + DescriptorProtos.FieldDescriptorProto.newBuilder() + .setName("my_nested_message") + .setNumber(2) + .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_MESSAGE) + .setTypeName("MyNestedMessage") + .build() + ) + .build() + ) + .addMessageType( + DescriptorProtos.DescriptorProto.newBuilder() + .setName("MyNestedMessage") + .addField( + DescriptorProtos.FieldDescriptorProto.newBuilder() + .setName("my_nested_field") + .setNumber(1) + .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT32) + .build() + ) + .build() + ) + .build(); + + Descriptors.FileDescriptor fileDescriptor = Descriptors.FileDescriptor.buildFrom( + fileDescriptorProto, new Descriptors.FileDescriptor[0]); + + Descriptors.Descriptor descriptor = fileDescriptor.findMessageTypeByName("MyMessage"); + Descriptors.Descriptor nestedDescriptor = fileDescriptor.findMessageTypeByName("MyNestedMessage"); + + DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor); + builder.setField(builder.getDescriptorForType().findFieldByName("my_field"), 5); + DynamicMessage.Builder nestedBuilder = DynamicMessage.newBuilder(nestedDescriptor); + nestedBuilder.setField(nestedBuilder.getDescriptorForType().findFieldByName("my_nested_field"), 10); + builder.setField(builder.getDescriptorForType().findFieldByName("my_nested_message"), nestedBuilder.build()); + + return builder.build().toByteArray(); + } + + @Test + void deserializeNestedMessage() { + var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE) + .deserialize(null, getComplexProtobufMessage()); + assertThat(deserialized.getResult()).isEqualTo("1: 5\n2: {\n 1: 10\n}\n"); + } +} \ No newline at end of file From 0c627a4d7b70675742d864bc2efd798eae390f25 Mon Sep 17 00:00:00 2001 From: kostasdizas <254960+kostasdizas@users.noreply.github.com> Date: Fri, 4 Aug 2023 19:25:36 +0100 Subject: [PATCH 2/4] Update serde name --- .../com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerde.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerde.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerde.java index 1cefee65d3e..221b8b5ea53 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerde.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerde.java @@ -13,7 +13,7 @@ public class ProtobufRawSerde implements BuiltInSerde { public static String name() { - return "ProtobufRaw"; + return "ProtobufDecodeRaw"; } @Override From 504962447b2147aa665dc6208b5c8922a25595fc Mon Sep 17 00:00:00 2001 From: kostasdizas <254960+kostasdizas@users.noreply.github.com> Date: Fri, 4 Aug 2023 19:27:58 +0100 Subject: [PATCH 3/4] Use schema text in test --- .../serdes/builtin/ProtobufRawSerdeTest.java | 31 ++++++------------- 1 file changed, 9 insertions(+), 22 deletions(-) diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerdeTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerdeTest.java index 5225b2dfaac..eb068970447 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerdeTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerdeTest.java @@ -8,6 +8,7 @@ import com.google.protobuf.DynamicMessage; import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.serde.api.Serde; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import lombok.SneakyThrows; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -25,28 +26,14 @@ void init() { @SneakyThrows Descriptors.Descriptor getDescriptor() { - DescriptorProtos.FileDescriptorProto fileDescriptorProto = DescriptorProtos.FileDescriptorProto - .newBuilder() - .setSyntax("proto3") - .setName("test.proto") - .addMessageType( - DescriptorProtos.DescriptorProto.newBuilder() - .setName("MyMessage") - .addField( - DescriptorProtos.FieldDescriptorProto.newBuilder() - .setName("my_field") - .setNumber(1) - .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT32) - .build() - ) - .build() - ) - .build(); - - Descriptors.FileDescriptor fileDescriptor = Descriptors.FileDescriptor.buildFrom( - fileDescriptorProto, new Descriptors.FileDescriptor[0]); - - return fileDescriptor.findMessageTypeByName("MyMessage"); + return new ProtobufSchema( + """ + syntax = "proto3"; + message MyMessage { + int32 my_field = 1; + } + """ + ).toDescriptor("MyMessage"); } @SneakyThrows From 4b7d89a198bbd50ee864d24a2d34e948bee39c1d Mon Sep 17 00:00:00 2001 From: kostasdizas <254960+kostasdizas@users.noreply.github.com> Date: Sat, 5 Aug 2023 21:08:45 +0100 Subject: [PATCH 4/4] Update complex schema in test Make schemas more consistent --- .../serdes/builtin/ProtobufRawSerdeTest.java | 71 ++++++------------- 1 file changed, 21 insertions(+), 50 deletions(-) diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerdeTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerdeTest.java index eb068970447..a71e9969a84 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerdeTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerdeTest.java @@ -25,20 +25,20 @@ void init() { } @SneakyThrows - Descriptors.Descriptor getDescriptor() { + ProtobufSchema getSampleSchema() { return new ProtobufSchema( """ syntax = "proto3"; - message MyMessage { + message Message1 { int32 my_field = 1; } """ - ).toDescriptor("MyMessage"); + ); } @SneakyThrows private byte[] getProtobufMessage() { - DynamicMessage.Builder builder = DynamicMessage.newBuilder(getDescriptor()); + DynamicMessage.Builder builder = DynamicMessage.newBuilder(getSampleSchema().toDescriptor("Message1")); builder.setField(builder.getDescriptorForType().findFieldByName("my_field"), 5); return builder.build().toByteArray(); } @@ -73,55 +73,26 @@ void deserializeNullMessage() { .hasMessageContaining("Cannot read the array length"); } + ProtobufSchema getSampleNestedSchema() { + return new ProtobufSchema( + """ + syntax = "proto3"; + message Message2 { + int32 my_nested_field = 1; + } + message Message1 { + int32 my_field = 1; + Message2 my_nested_message = 2; + } + """ + ); + } + @SneakyThrows private byte[] getComplexProtobufMessage() { - DescriptorProtos.FileDescriptorProto fileDescriptorProto = DescriptorProtos.FileDescriptorProto - .newBuilder() - .setSyntax("proto3") - .setName("test.proto") - .addMessageType( - DescriptorProtos.DescriptorProto.newBuilder() - .setName("MyMessage") - .addField( - DescriptorProtos.FieldDescriptorProto.newBuilder() - .setName("my_field") - .setNumber(1) - .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT32) - .build() - ) - .addField( - DescriptorProtos.FieldDescriptorProto.newBuilder() - .setName("my_nested_message") - .setNumber(2) - .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_MESSAGE) - .setTypeName("MyNestedMessage") - .build() - ) - .build() - ) - .addMessageType( - DescriptorProtos.DescriptorProto.newBuilder() - .setName("MyNestedMessage") - .addField( - DescriptorProtos.FieldDescriptorProto.newBuilder() - .setName("my_nested_field") - .setNumber(1) - .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT32) - .build() - ) - .build() - ) - .build(); - - Descriptors.FileDescriptor fileDescriptor = Descriptors.FileDescriptor.buildFrom( - fileDescriptorProto, new Descriptors.FileDescriptor[0]); - - Descriptors.Descriptor descriptor = fileDescriptor.findMessageTypeByName("MyMessage"); - Descriptors.Descriptor nestedDescriptor = fileDescriptor.findMessageTypeByName("MyNestedMessage"); - - DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor); + DynamicMessage.Builder builder = DynamicMessage.newBuilder(getSampleNestedSchema().toDescriptor("Message1")); builder.setField(builder.getDescriptorForType().findFieldByName("my_field"), 5); - DynamicMessage.Builder nestedBuilder = DynamicMessage.newBuilder(nestedDescriptor); + DynamicMessage.Builder nestedBuilder = DynamicMessage.newBuilder(getSampleNestedSchema().toDescriptor("Message2")); nestedBuilder.setField(nestedBuilder.getDescriptorForType().findFieldByName("my_nested_field"), 10); builder.setField(builder.getDescriptorForType().findFieldByName("my_nested_message"), nestedBuilder.build());