Skip to content

Commit

Permalink
Add protobuf raw message deserializer (#4041)
Browse files Browse the repository at this point in the history
Implemented a Protobuf Raw deserialiser that works like protoc --decode_raw. This is a no config alternative to the existing ProtobufFileSerde.

Co-authored-by: Ilya Kuramshin <iliax@proton.me>
  • Loading branch information
kostasdizas and iliax authored Aug 11, 2023
1 parent 150fc21 commit d915de4
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde;
import com.provectus.kafka.ui.serdes.builtin.ProtobufRawSerde;
import com.provectus.kafka.ui.serdes.builtin.StringSerde;
import com.provectus.kafka.ui.serdes.builtin.UInt32Serde;
import com.provectus.kafka.ui.serdes.builtin.UInt64Serde;
Expand Down Expand Up @@ -50,6 +51,7 @@ public SerdesInitializer() {
.put(Base64Serde.name(), Base64Serde.class)
.put(HexSerde.name(), HexSerde.class)
.put(UuidBinarySerde.name(), UuidBinarySerde.class)
.put(ProtobufRawSerde.name(), ProtobufRawSerde.class)
.build(),
new CustomSerdeLoader()
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.provectus.kafka.ui.serdes.builtin;

import com.google.protobuf.UnknownFieldSet;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.serde.api.DeserializeResult;
import com.provectus.kafka.ui.serde.api.RecordHeaders;
import com.provectus.kafka.ui.serde.api.SchemaDescription;
import com.provectus.kafka.ui.serdes.BuiltInSerde;
import java.util.Map;
import java.util.Optional;
import lombok.SneakyThrows;

public class ProtobufRawSerde implements BuiltInSerde {

public static String name() {
return "ProtobufDecodeRaw";
}

@Override
public Optional<String> getDescription() {
return Optional.empty();
}

@Override
public Optional<SchemaDescription> getSchema(String topic, Target type) {
return Optional.empty();
}

@Override
public boolean canSerialize(String topic, Target type) {
return false;
}

@Override
public boolean canDeserialize(String topic, Target type) {
return true;
}

@Override
public Serializer serializer(String topic, Target type) {
throw new UnsupportedOperationException();
}

@Override
public Deserializer deserializer(String topic, Target type) {
return new Deserializer() {
@SneakyThrows
@Override
public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
try {
UnknownFieldSet unknownFields = UnknownFieldSet.parseFrom(data);
return new DeserializeResult(unknownFields.toString(), DeserializeResult.Type.STRING, Map.of());
} catch (Exception e) {
throw new ValidationException(e.getMessage());
}
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.provectus.kafka.ui.serdes.builtin;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.serde.api.Serde;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class ProtobufRawSerdeTest {

private static final String DUMMY_TOPIC = "dummy-topic";

private ProtobufRawSerde serde;

@BeforeEach
void init() {
serde = new ProtobufRawSerde();
}

@SneakyThrows
ProtobufSchema getSampleSchema() {
return new ProtobufSchema(
"""
syntax = "proto3";
message Message1 {
int32 my_field = 1;
}
"""
);
}

@SneakyThrows
private byte[] getProtobufMessage() {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(getSampleSchema().toDescriptor("Message1"));
builder.setField(builder.getDescriptorForType().findFieldByName("my_field"), 5);
return builder.build().toByteArray();
}

@Test
void deserializeSimpleMessage() {
var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE)
.deserialize(null, getProtobufMessage());
assertThat(deserialized.getResult()).isEqualTo("1: 5\n");
}

@Test
void deserializeEmptyMessage() {
var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE)
.deserialize(null, new byte[0]);
assertThat(deserialized.getResult()).isEqualTo("");
}

@Test
void deserializeInvalidMessage() {
var deserializer = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE);
assertThatThrownBy(() -> deserializer.deserialize(null, new byte[] { 1, 2, 3 }))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Protocol message contained an invalid tag");
}

@Test
void deserializeNullMessage() {
var deserializer = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE);
assertThatThrownBy(() -> deserializer.deserialize(null, null))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Cannot read the array length");
}

ProtobufSchema getSampleNestedSchema() {
return new ProtobufSchema(
"""
syntax = "proto3";
message Message2 {
int32 my_nested_field = 1;
}
message Message1 {
int32 my_field = 1;
Message2 my_nested_message = 2;
}
"""
);
}

@SneakyThrows
private byte[] getComplexProtobufMessage() {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(getSampleNestedSchema().toDescriptor("Message1"));
builder.setField(builder.getDescriptorForType().findFieldByName("my_field"), 5);
DynamicMessage.Builder nestedBuilder = DynamicMessage.newBuilder(getSampleNestedSchema().toDescriptor("Message2"));
nestedBuilder.setField(nestedBuilder.getDescriptorForType().findFieldByName("my_nested_field"), 10);
builder.setField(builder.getDescriptorForType().findFieldByName("my_nested_message"), nestedBuilder.build());

return builder.build().toByteArray();
}

@Test
void deserializeNestedMessage() {
var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE)
.deserialize(null, getComplexProtobufMessage());
assertThat(deserialized.getResult()).isEqualTo("1: 5\n2: {\n 1: 10\n}\n");
}
}

0 comments on commit d915de4

Please sign in to comment.