Skip to content

Commit

Permalink
Add protobufMessageNameForKeyByTopic option to config. Message keys are
Browse files Browse the repository at this point in the history
deserialized using a protobuf schema if the config is set. Otherwise
message keys are treated as strings.

Closes provectus#1699
  • Loading branch information
jdechicchis committed Mar 14, 2022
1 parent ed1e2bd commit bb0637e
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public static class Cluster {
String protobufFile;
String protobufMessageName;
Map<String, String> protobufMessageNameByTopic;
Map<String, String> protobufMessageNameForKeyByTopic;
List<ConnectCluster> kafkaConnect;
int jmxPort;
boolean jmxSsl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class KafkaCluster {
private final Path protobufFile;
private final String protobufMessageName;
private final Map<String, String> protobufMessageNameByTopic;
private final Map<String, String> protobufMessageNameForKeyByTopic;
private final Properties properties;
private final boolean readOnly;
private final boolean disableLogDirsCollection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ private RecordSerDe createRecordDeserializerForCluster(KafkaCluster cluster) {
if (cluster.getProtobufFile() != null) {
log.info("Using ProtobufFileRecordSerDe for cluster '{}'", cluster.getName());
return new ProtobufFileRecordSerDe(cluster.getProtobufFile(),
cluster.getProtobufMessageNameByTopic(), cluster.getProtobufMessageName());
cluster.getProtobufMessageNameByTopic(), cluster.getProtobufMessageNameForKeyByTopic(),
cluster.getProtobufMessageName());
} else if (cluster.getSchemaRegistry() != null) {
log.info("Using SchemaRegistryAwareRecordSerDe for cluster '{}'", cluster.getName());
return new SchemaRegistryAwareRecordSerDe(cluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Bytes;

//TODO: currently we assume that keys for this serde are always string - need to discuss if it is ok
public class ProtobufFileRecordSerDe implements RecordSerDe {
private final ProtobufSchema protobufSchema;
private final Path protobufSchemaPath;
private final ProtobufSchemaConverter schemaConverter = new ProtobufSchemaConverter();
private final Map<String, Descriptor> messageDescriptorMap;
private final Map<String, Descriptor> keyMessageDescriptorMap;
private final Descriptor defaultMessageDescriptor;

public ProtobufFileRecordSerDe(Path protobufSchemaPath, Map<String, String> messageNameMap,
String defaultMessageName)
Map<String, String> keyMessageNameMap, String defaultMessageName)
throws IOException {
this.protobufSchemaPath = protobufSchemaPath;
try (final Stream<String> lines = Files.lines(protobufSchemaPath)) {
Expand All @@ -49,26 +49,40 @@ public ProtobufFileRecordSerDe(Path protobufSchemaPath, Map<String, String> mess
}
this.messageDescriptorMap = new HashMap<>();
if (messageNameMap != null) {
for (Map.Entry<String, String> entry : messageNameMap.entrySet()) {
var descriptor = Objects.requireNonNull(protobufSchema.toDescriptor(entry.getValue()),
"The given message type is not found in protobuf definition: "
+ entry.getValue());
messageDescriptorMap.put(entry.getKey(), descriptor);
}
populateDescriptors(messageNameMap, messageDescriptorMap);
}
this.keyMessageDescriptorMap = new HashMap<>();
if (keyMessageNameMap != null) {
populateDescriptors(keyMessageNameMap, keyMessageDescriptorMap);
}
defaultMessageDescriptor = Objects.requireNonNull(protobufSchema.toDescriptor(),
"The given message type is not found in protobuf definition: "
+ defaultMessageName);
}
}

private void populateDescriptors(Map<String, String> messageNameMap, Map<String, Descriptor> messageDescriptorMap) {
for (Map.Entry<String, String> entry : messageNameMap.entrySet()) {
var descriptor = Objects.requireNonNull(protobufSchema.toDescriptor(entry.getValue()),
"The given message type is not found in protobuf definition: "
+ entry.getValue());
messageDescriptorMap.put(entry.getKey(), descriptor);
}
}

@Override
public DeserializedKeyValue deserialize(ConsumerRecord<Bytes, Bytes> msg) {
try {
var builder = DeserializedKeyValue.builder();
if (msg.key() != null) {
builder.key(new String(msg.key().get()));
builder.keyFormat(MessageFormat.UNKNOWN);
Descriptor descriptor = keyMessageDescriptorMap.get(msg.topic());
if (descriptor == null) {
builder.key(new String(msg.key().get()));
builder.keyFormat(MessageFormat.UNKNOWN);
} else {
builder.key(parse(msg.key().get(), descriptor));
builder.keyFormat(MessageFormat.PROTOBUF);
}
}
if (msg.value() != null) {
builder.value(parse(msg.value().get(), getDescriptor(msg.topic())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,29 @@ void testDeserialize() throws IOException {
var messageNameMap = Map.of(
"topic1", "test.Person",
"topic2", "test.AddressBook");
var keyMessageNameMap = Map.of(
"topic2", "test.Person");
var deserializer =
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, null);
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, keyMessageNameMap, null);
var msg1 = deserializer
.deserialize(new ConsumerRecord<>("topic1", 1, 0, Bytes.wrap("key".getBytes()),
Bytes.wrap(personMessage)));
assertEquals(MessageFormat.PROTOBUF, msg1.getValueFormat());
assertTrue(msg1.getValue().contains("user1@example.com"));

var msg2 = deserializer
.deserialize(new ConsumerRecord<>("topic2", 1, 1, Bytes.wrap("key".getBytes()),
.deserialize(new ConsumerRecord<>("topic2", 1, 1, Bytes.wrap(personMessage),
Bytes.wrap(addressBookMessage)));
assertEquals(MessageFormat.PROTOBUF, msg2.getKeyFormat());
assertTrue(msg2.getKey().contains("user1@example.com"));
assertTrue(msg2.getValue().contains("user2@example.com"));
}

@Test
void testNoDefaultMessageName() throws IOException {
// by default the first message type defined in proto definition is used
var deserializer =
new ProtobufFileRecordSerDe(protobufSchemaPath, Collections.emptyMap(), null);
new ProtobufFileRecordSerDe(protobufSchemaPath, Collections.emptyMap(), null, null);
var msg = deserializer
.deserialize(new ConsumerRecord<>("topic", 1, 0, Bytes.wrap("key".getBytes()),
Bytes.wrap(personMessage)));
Expand All @@ -80,7 +84,7 @@ void testNoDefaultMessageName() throws IOException {
void testDefaultMessageName() throws IOException {
var messageNameMap = Map.of("topic1", "test.Person");
var deserializer =
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, "test.AddressBook");
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, null, "test.AddressBook");
var msg = deserializer
.deserialize(new ConsumerRecord<>("a_random_topic", 1, 0, Bytes.wrap("key".getBytes()),
Bytes.wrap(addressBookMessage)));
Expand All @@ -91,7 +95,7 @@ void testDefaultMessageName() throws IOException {
void testSerialize() throws IOException {
var messageNameMap = Map.of("topic1", "test.Person");
var serializer =
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, "test.AddressBook");
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, null, "test.AddressBook");
var serialized = serializer.serialize("topic1", "key1", "{\"name\":\"MyName\"}", 0);
assertNotNull(serialized.value());
}
Expand Down

0 comments on commit bb0637e

Please sign in to comment.