diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index cf00c9cd819..271cc6d2232 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -32,6 +32,7 @@ public static class Cluster { String protobufFile; String protobufMessageName; Map protobufMessageNameByTopic; + Map protobufMessageNameForKeyByTopic; List kafkaConnect; int jmxPort; boolean jmxSsl; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java index 1a97fe13a0f..1bb7b766c8b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java @@ -29,6 +29,7 @@ public class KafkaCluster { private final Path protobufFile; private final String protobufMessageName; private final Map protobufMessageNameByTopic; + private final Map protobufMessageNameForKeyByTopic; private final Properties properties; private final boolean readOnly; private final boolean disableLogDirsCollection; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/DeserializationService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/DeserializationService.java index 3bfd5e9d8d2..81b3a4ef695 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/DeserializationService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/DeserializationService.java @@ -32,7 +32,8 @@ private RecordSerDe createRecordDeserializerForCluster(KafkaCluster cluster) { if (cluster.getProtobufFile() != null) { log.info("Using ProtobufFileRecordSerDe for cluster '{}'", cluster.getName()); return new ProtobufFileRecordSerDe(cluster.getProtobufFile(), - cluster.getProtobufMessageNameByTopic(), cluster.getProtobufMessageName()); + cluster.getProtobufMessageNameByTopic(), cluster.getProtobufMessageNameForKeyByTopic(), + cluster.getProtobufMessageName()); } else if (cluster.getSchemaRegistry() != null) { log.info("Using SchemaRegistryAwareRecordSerDe for cluster '{}'", cluster.getName()); return new SchemaRegistryAwareRecordSerDe(cluster); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java index 7553535999c..a4a0ff117df 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java @@ -26,16 +26,16 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.utils.Bytes; -//TODO: currently we assume that keys for this serde are always string - need to discuss if it is ok public class ProtobufFileRecordSerDe implements RecordSerDe { private final ProtobufSchema protobufSchema; private final Path protobufSchemaPath; private final ProtobufSchemaConverter schemaConverter = new ProtobufSchemaConverter(); private final Map messageDescriptorMap; + private final Map keyMessageDescriptorMap; private final Descriptor defaultMessageDescriptor; public ProtobufFileRecordSerDe(Path protobufSchemaPath, Map messageNameMap, - String defaultMessageName) + Map keyMessageNameMap, String defaultMessageName) throws IOException { this.protobufSchemaPath = protobufSchemaPath; try (final Stream lines = Files.lines(protobufSchemaPath)) { @@ -49,12 +49,11 @@ public ProtobufFileRecordSerDe(Path protobufSchemaPath, Map mess } this.messageDescriptorMap = new HashMap<>(); if (messageNameMap != null) { - for (Map.Entry entry : messageNameMap.entrySet()) { - var descriptor = Objects.requireNonNull(protobufSchema.toDescriptor(entry.getValue()), - "The given message type is not found in protobuf definition: " - + entry.getValue()); - messageDescriptorMap.put(entry.getKey(), descriptor); - } + populateDescriptors(messageNameMap, messageDescriptorMap); + } + this.keyMessageDescriptorMap = new HashMap<>(); + if (keyMessageNameMap != null) { + populateDescriptors(keyMessageNameMap, keyMessageDescriptorMap); } defaultMessageDescriptor = Objects.requireNonNull(protobufSchema.toDescriptor(), "The given message type is not found in protobuf definition: " @@ -62,13 +61,28 @@ public ProtobufFileRecordSerDe(Path protobufSchemaPath, Map mess } } + private void populateDescriptors(Map messageNameMap, Map messageDescriptorMap) { + for (Map.Entry entry : messageNameMap.entrySet()) { + var descriptor = Objects.requireNonNull(protobufSchema.toDescriptor(entry.getValue()), + "The given message type is not found in protobuf definition: " + + entry.getValue()); + messageDescriptorMap.put(entry.getKey(), descriptor); + } + } + @Override public DeserializedKeyValue deserialize(ConsumerRecord msg) { try { var builder = DeserializedKeyValue.builder(); if (msg.key() != null) { - builder.key(new String(msg.key().get())); - builder.keyFormat(MessageFormat.UNKNOWN); + Descriptor descriptor = keyMessageDescriptorMap.get(msg.topic()); + if (descriptor == null) { + builder.key(new String(msg.key().get())); + builder.keyFormat(MessageFormat.UNKNOWN); + } else { + builder.key(parse(msg.key().get(), descriptor)); + builder.keyFormat(MessageFormat.PROTOBUF); + } } if (msg.value() != null) { builder.value(parse(msg.value().get(), getDescriptor(msg.topic()))); diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java index 269b65872a0..e958a82374b 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java @@ -51,8 +51,10 @@ void testDeserialize() throws IOException { var messageNameMap = Map.of( "topic1", "test.Person", "topic2", "test.AddressBook"); + var keyMessageNameMap = Map.of( + "topic2", "test.Person"); var deserializer = - new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, null); + new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, keyMessageNameMap, null); var msg1 = deserializer .deserialize(new ConsumerRecord<>("topic1", 1, 0, Bytes.wrap("key".getBytes()), Bytes.wrap(personMessage))); @@ -60,8 +62,10 @@ void testDeserialize() throws IOException { assertTrue(msg1.getValue().contains("user1@example.com")); var msg2 = deserializer - .deserialize(new ConsumerRecord<>("topic2", 1, 1, Bytes.wrap("key".getBytes()), + .deserialize(new ConsumerRecord<>("topic2", 1, 1, Bytes.wrap(personMessage), Bytes.wrap(addressBookMessage))); + assertEquals(MessageFormat.PROTOBUF, msg2.getKeyFormat()); + assertTrue(msg2.getKey().contains("user1@example.com")); assertTrue(msg2.getValue().contains("user2@example.com")); } @@ -69,7 +73,7 @@ void testDeserialize() throws IOException { void testNoDefaultMessageName() throws IOException { // by default the first message type defined in proto definition is used var deserializer = - new ProtobufFileRecordSerDe(protobufSchemaPath, Collections.emptyMap(), null); + new ProtobufFileRecordSerDe(protobufSchemaPath, Collections.emptyMap(), null, null); var msg = deserializer .deserialize(new ConsumerRecord<>("topic", 1, 0, Bytes.wrap("key".getBytes()), Bytes.wrap(personMessage))); @@ -80,7 +84,7 @@ void testNoDefaultMessageName() throws IOException { void testDefaultMessageName() throws IOException { var messageNameMap = Map.of("topic1", "test.Person"); var deserializer = - new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, "test.AddressBook"); + new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, null, "test.AddressBook"); var msg = deserializer .deserialize(new ConsumerRecord<>("a_random_topic", 1, 0, Bytes.wrap("key".getBytes()), Bytes.wrap(addressBookMessage))); @@ -91,7 +95,7 @@ void testDefaultMessageName() throws IOException { void testSerialize() throws IOException { var messageNameMap = Map.of("topic1", "test.Person"); var serializer = - new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, "test.AddressBook"); + new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, null, "test.AddressBook"); var serialized = serializer.serialize("topic1", "key1", "{\"name\":\"MyName\"}", 0); assertNotNull(serialized.value()); }