Skip to content

Commit

Permalink
ISSUE-849: Use a map between topics and message-names when using Prot…
Browse files Browse the repository at this point in the history
…obufFile (provectus#854)

* Use a map between topics and message-names when using ProtobufFile
* Validate the given message names for the topics in ProtobufFileRecordSerDe
  • Loading branch information
zarezadeh authored Sep 13, 2021
1 parent f9d0bbf commit f92acec
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
Expand All @@ -26,6 +27,7 @@ public static class Cluster {
String keySchemaNameTemplate = "%s-key";
String protobufFile;
String protobufMessageName;
Map<String, String> protobufMessageNameByTopic;
List<ConnectCluster> kafkaConnect;
int jmxPort;
boolean jmxSsl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class KafkaCluster {
private final Throwable lastZookeeperException;
private final Path protobufFile;
private final String protobufMessageName;
private final Map<String, String> protobufMessageNameByTopic;
private final Properties properties;
private final Boolean readOnly;
private final Boolean disableLogDirsCollection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ private RecordSerDe createRecordDeserializerForCluster(KafkaCluster cluster) {
if (cluster.getProtobufFile() != null) {
log.info("Using ProtobufFileRecordSerDe for cluster '{}'", cluster.getName());
return new ProtobufFileRecordSerDe(cluster.getProtobufFile(),
cluster.getProtobufMessageName(), objectMapper);
cluster.getProtobufMessageNameByTopic(), cluster.getProtobufMessageName(),
objectMapper);
} else {
log.info("Using SchemaRegistryAwareRecordSerDe for cluster '{}'", cluster.getName());
return new SchemaRegistryAwareRecordSerDe(cluster);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.provectus.kafka.ui.serde;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.util.JsonFormat;
import com.provectus.kafka.ui.model.MessageSchema;
Expand All @@ -14,6 +15,8 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
Expand All @@ -30,15 +33,35 @@ public class ProtobufFileRecordSerDe implements RecordSerDe {
private final ObjectMapper objectMapper;
private final Path protobufSchemaPath;
private final ProtobufSchemaConverter schemaConverter = new ProtobufSchemaConverter();
private final Map<String, Descriptor> messageDescriptorMap;
private final Descriptor defaultMessageDescriptor;

public ProtobufFileRecordSerDe(Path protobufSchemaPath, String messageName,
ObjectMapper objectMapper) throws IOException {
public ProtobufFileRecordSerDe(Path protobufSchemaPath, Map<String, String> messageNameMap,
String defaultMessageName, ObjectMapper objectMapper)
throws IOException {
this.objectMapper = objectMapper;
this.protobufSchemaPath = protobufSchemaPath;
try (final Stream<String> lines = Files.lines(protobufSchemaPath)) {
this.protobufSchema = new ProtobufSchema(
var schema = new ProtobufSchema(
lines.collect(Collectors.joining("\n"))
).copy(messageName);
);
if (defaultMessageName != null) {
this.protobufSchema = schema.copy(defaultMessageName);
} else {
this.protobufSchema = schema;
}
this.messageDescriptorMap = new HashMap<>();
if (messageNameMap != null) {
for (Map.Entry<String, String> entry : messageNameMap.entrySet()) {
var descriptor = Objects.requireNonNull(protobufSchema.toDescriptor(entry.getValue()),
"The given message type is not found in protobuf definition: "
+ entry.getValue());
messageDescriptorMap.put(entry.getKey(), descriptor);
}
}
defaultMessageDescriptor = Objects.requireNonNull(protobufSchema.toDescriptor(),
"The given message type is not found in protobuf definition: "
+ defaultMessageName);
}
}

Expand All @@ -51,7 +74,7 @@ public DeserializedKeyValue deserialize(ConsumerRecord<Bytes, Bytes> msg) {
builder.keyFormat(MessageFormat.UNKNOWN);
}
if (msg.value() != null) {
builder.value(parse(msg.value().get()));
builder.value(parse(msg.value().get(), getDescriptor(msg.topic())));
builder.valueFormat(MessageFormat.PROTOBUF);
}
return builder.build();
Expand All @@ -60,11 +83,15 @@ public DeserializedKeyValue deserialize(ConsumerRecord<Bytes, Bytes> msg) {
}
}

private Descriptor getDescriptor(String topic) {
return messageDescriptorMap.getOrDefault(topic, defaultMessageDescriptor);
}

@SneakyThrows
private String parse(byte[] value) {
private String parse(byte[] value, Descriptor descriptor) {
DynamicMessage protoMsg = DynamicMessage.parseFrom(
protobufSchema.toDescriptor(),
new ByteArrayInputStream(value)
descriptor,
new ByteArrayInputStream(value)
);
byte[] jsonFromProto = ProtobufSchemaUtils.toJson(protoMsg);
return new String(jsonFromProto);
Expand All @@ -78,7 +105,7 @@ public ProducerRecord<byte[], byte[]> serialize(String topic,
if (data == null) {
return new ProducerRecord<>(topic, partition, Objects.requireNonNull(key).getBytes(), null);
}
DynamicMessage.Builder builder = protobufSchema.newMessageBuilder();
DynamicMessage.Builder builder = DynamicMessage.newBuilder(getDescriptor(topic));
try {
JsonFormat.parser().merge(data, builder);
final DynamicMessage message = builder.build();
Expand All @@ -97,8 +124,8 @@ public ProducerRecord<byte[], byte[]> serialize(String topic,
public TopicMessageSchema getTopicSchema(String topic) {

final JsonSchema jsonSchema = schemaConverter.convert(
protobufSchemaPath.toUri(),
protobufSchema.toDescriptor()
protobufSchemaPath.toUri(),
getDescriptor(topic)
);
final MessageSchema keySchema = new MessageSchema()
.name(protobufSchema.fullName())
Expand Down
8 changes: 8 additions & 0 deletions kafka-ui-api/src/main/resources/application-local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ kafka:
# address: http://localhost:8083
# jmxPort: 9998
# read-only: true
# -
# name: localUsingProtobufFile
# bootstrapServers: localhost:9092
# protobufFile: messages.proto
# protobufMessageName: GenericMessage
# protobufMessageNameByTopic:
# input-topic: InputMessage
# output-topic: OutputMessage
admin-client-timeout: 5000
zookeeper:
connection-timeout: 1000
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.provectus.kafka.ui.serde;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.util.JsonFormat;
import com.provectus.kafka.ui.serde.schemaregistry.MessageFormat;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

class ProtobufFileRecordSerDeTest {

// Sample message of type `test.Person`
private static byte[] personMessage;
// Sample message of type `test.AddressBook`
private static byte[] addressBookMessage;
private static Path protobufSchemaPath;

@BeforeAll
static void setUp() throws URISyntaxException, IOException {
protobufSchemaPath = Paths.get(ProtobufFileRecordSerDeTest.class.getClassLoader()
.getResource("address-book.proto").toURI());
ProtobufSchema protobufSchema = new ProtobufSchema(Files.readString(protobufSchemaPath));

DynamicMessage.Builder builder = protobufSchema.newMessageBuilder("test.Person");
JsonFormat.parser().merge(
"{ \"name\": \"My Name\",\"id\": 101, \"email\": \"user1@example.com\" }", builder);
personMessage = builder.build().toByteArray();

builder = protobufSchema.newMessageBuilder("test.AddressBook");
JsonFormat.parser().merge(
"{\"version\": 1, \"people\": ["
+ "{ \"name\": \"My Name\",\"id\": 102, \"email\": \"user2@example.com\" }]}", builder);
addressBookMessage = builder.build().toByteArray();
}

@Test
void testDeserialize() throws IOException {
var messageNameMap = Map.of(
"topic1", "test.Person",
"topic2", "test.AddressBook");
var deserializer =
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, null, new ObjectMapper());
var msg1 = deserializer
.deserialize(new ConsumerRecord<>("topic1", 1, 0, Bytes.wrap("key".getBytes()),
Bytes.wrap(personMessage)));
assertEquals(MessageFormat.PROTOBUF, msg1.getValueFormat());
assertTrue(msg1.getValue().contains("user1@example.com"));

var msg2 = deserializer
.deserialize(new ConsumerRecord<>("topic2", 1, 1, Bytes.wrap("key".getBytes()),
Bytes.wrap(addressBookMessage)));
assertTrue(msg2.getValue().contains("user2@example.com"));
}

@Test
void testNoDefaultMessageName() throws IOException {
// by default the first message type defined in proto definition is used
var deserializer =
new ProtobufFileRecordSerDe(protobufSchemaPath, Collections.emptyMap(), null,
new ObjectMapper());
var msg = deserializer
.deserialize(new ConsumerRecord<>("topic", 1, 0, Bytes.wrap("key".getBytes()),
Bytes.wrap(personMessage)));
assertTrue(msg.getValue().contains("user1@example.com"));
}

@Test
void testDefaultMessageName() throws IOException {
var messageNameMap = Map.of("topic1", "test.Person");
var deserializer =
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, "test.AddressBook",
new ObjectMapper());
var msg = deserializer
.deserialize(new ConsumerRecord<>("a_random_topic", 1, 0, Bytes.wrap("key".getBytes()),
Bytes.wrap(addressBookMessage)));
assertTrue(msg.getValue().contains("user2@example.com"));
}

@Test
void testSerialize() throws IOException {
var messageNameMap = Map.of("topic1", "test.Person");
var serializer =
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, "test.AddressBook",
new ObjectMapper());
var serialized = serializer.serialize("topic1", "key1", "{\"name\":\"MyName\"}", 0);
assertNotNull(serialized.value());
}
}
39 changes: 39 additions & 0 deletions kafka-ui-api/src/test/resources/address-book.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// [START declaration]
syntax = "proto3";
package test;

// [END declaration]

// [START java_declaration]
option java_multiple_files = true;
option java_package = "com.example.tutorial.protos";
option java_outer_classname = "AddressBookProtos";
// [END java_declaration]

// [START messages]
message Person {
string name = 1;
int32 id = 2; // Unique ID number for this person.
string email = 3;

enum PhoneType {
MOBILE = 0;
HOME = 1;
WORK = 2;
}

message PhoneNumber {
string number = 1;
PhoneType type = 2;
}

repeated PhoneNumber phones = 4;

}

// Our address book file is just one of these.
message AddressBook {
int32 version = 1;
repeated Person people = 2;
}
// [END messages]

0 comments on commit f92acec

Please sign in to comment.