Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add protobuf raw message deserializer #4041

Merged
merged 6 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde;
import com.provectus.kafka.ui.serdes.builtin.ProtobufRawSerde;
import com.provectus.kafka.ui.serdes.builtin.StringSerde;
import com.provectus.kafka.ui.serdes.builtin.UInt32Serde;
import com.provectus.kafka.ui.serdes.builtin.UInt64Serde;
Expand Down Expand Up @@ -48,6 +49,7 @@ public SerdesInitializer() {
.put(AvroEmbeddedSerde.name(), AvroEmbeddedSerde.class)
.put(Base64Serde.name(), Base64Serde.class)
.put(UuidBinarySerde.name(), UuidBinarySerde.class)
.put(ProtobufRawSerde.name(), ProtobufRawSerde.class)
.build(),
new CustomSerdeLoader()
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.provectus.kafka.ui.serdes.builtin;

import com.google.protobuf.UnknownFieldSet;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.serde.api.DeserializeResult;
import com.provectus.kafka.ui.serde.api.RecordHeaders;
import com.provectus.kafka.ui.serde.api.SchemaDescription;
import com.provectus.kafka.ui.serdes.BuiltInSerde;
import java.util.Map;
import java.util.Optional;
import lombok.SneakyThrows;

public class ProtobufRawSerde implements BuiltInSerde {

public static String name() {
return "ProtobufDecodeRaw";
}

@Override
public Optional<String> getDescription() {
return Optional.empty();
}

@Override
public Optional<SchemaDescription> getSchema(String topic, Target type) {
return Optional.empty();
}

@Override
public boolean canSerialize(String topic, Target type) {
return false;
}

@Override
public boolean canDeserialize(String topic, Target type) {
return true;
}

@Override
public Serializer serializer(String topic, Target type) {
throw new UnsupportedOperationException();
}

@Override
public Deserializer deserializer(String topic, Target type) {
return new Deserializer() {
@SneakyThrows
@Override
public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
try {
UnknownFieldSet unknownFields = UnknownFieldSet.parseFrom(data);
return new DeserializeResult(unknownFields.toString(), DeserializeResult.Type.STRING, Map.of());
iliax marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception e) {
throw new ValidationException(e.getMessage());
}
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package com.provectus.kafka.ui.serdes.builtin;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.serde.api.Serde;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class ProtobufRawSerdeTest {

private static final String DUMMY_TOPIC = "dummy-topic";

private ProtobufRawSerde serde;

@BeforeEach
void init() {
serde = new ProtobufRawSerde();
}

@SneakyThrows
Descriptors.Descriptor getDescriptor() {
return new ProtobufSchema(
"""
syntax = "proto3";
message MyMessage {
int32 my_field = 1;
}
"""
).toDescriptor("MyMessage");
}

@SneakyThrows
private byte[] getProtobufMessage() {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(getDescriptor());
builder.setField(builder.getDescriptorForType().findFieldByName("my_field"), 5);
return builder.build().toByteArray();
}

@Test
void deserializeSimpleMessage() {
var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE)
.deserialize(null, getProtobufMessage());
assertThat(deserialized.getResult()).isEqualTo("1: 5\n");
}

@Test
void deserializeEmptyMessage() {
var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE)
.deserialize(null, new byte[0]);
assertThat(deserialized.getResult()).isEqualTo("");
}

@Test
void deserializeInvalidMessage() {
var deserializer = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE);
assertThatThrownBy(() -> deserializer.deserialize(null, new byte[] { 1, 2, 3 }))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Protocol message contained an invalid tag");
}

@Test
void deserializeNullMessage() {
var deserializer = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE);
assertThatThrownBy(() -> deserializer.deserialize(null, null))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Cannot read the array length");
}

@SneakyThrows
private byte[] getComplexProtobufMessage() {
DescriptorProtos.FileDescriptorProto fileDescriptorProto = DescriptorProtos.FileDescriptorProto
kostasdizas marked this conversation as resolved.
Show resolved Hide resolved
.newBuilder()
.setSyntax("proto3")
.setName("test.proto")
.addMessageType(
DescriptorProtos.DescriptorProto.newBuilder()
.setName("MyMessage")
.addField(
DescriptorProtos.FieldDescriptorProto.newBuilder()
.setName("my_field")
.setNumber(1)
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT32)
.build()
)
.addField(
DescriptorProtos.FieldDescriptorProto.newBuilder()
.setName("my_nested_message")
.setNumber(2)
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_MESSAGE)
.setTypeName("MyNestedMessage")
.build()
)
.build()
)
.addMessageType(
DescriptorProtos.DescriptorProto.newBuilder()
.setName("MyNestedMessage")
.addField(
DescriptorProtos.FieldDescriptorProto.newBuilder()
.setName("my_nested_field")
.setNumber(1)
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT32)
.build()
)
.build()
)
.build();

Descriptors.FileDescriptor fileDescriptor = Descriptors.FileDescriptor.buildFrom(
fileDescriptorProto, new Descriptors.FileDescriptor[0]);

Descriptors.Descriptor descriptor = fileDescriptor.findMessageTypeByName("MyMessage");
Descriptors.Descriptor nestedDescriptor = fileDescriptor.findMessageTypeByName("MyNestedMessage");

DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
builder.setField(builder.getDescriptorForType().findFieldByName("my_field"), 5);
DynamicMessage.Builder nestedBuilder = DynamicMessage.newBuilder(nestedDescriptor);
nestedBuilder.setField(nestedBuilder.getDescriptorForType().findFieldByName("my_nested_field"), 10);
builder.setField(builder.getDescriptorForType().findFieldByName("my_nested_message"), nestedBuilder.build());

return builder.build().toByteArray();
}

@Test
void deserializeNestedMessage() {
var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE)
.deserialize(null, getComplexProtobufMessage());
assertThat(deserialized.getResult()).isEqualTo("1: 5\n2: {\n 1: 10\n}\n");
}
}