Skip to content

Commit

Permalink
Implement deserializing binary protobuf encoded message keys (provect…
Browse files Browse the repository at this point in the history
…us#1729)

* Add protobufMessageNameForKeyByTopic option to config. Message keys are
deserialized using a protobuf schema if the config is set. Otherwise
message keys are treated as strings.

Closes provectus#1699

* Add documentation around kafkaui's protobuf support

* Add protobufMessageNameForKey config option

* Update README with info about default types

* Imeplement support for protobufMessageNameForKeyByTopic

* fallback to FALLBACK_FORMATTER

* Add ability to publish message with protobuf key

* Change log levels to debug and add @nullable annotations

* Attempt at fixing documentation workflow

Co-authored-by: Ilya Kuramshin <ilia-2k@rambler.ru>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
Co-authored-by: Roman Zabaluev <github@haarolean.dev>
  • Loading branch information
4 people authored Apr 5, 2022
1 parent 4fabb09 commit e8999cf
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 45 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/documentation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ jobs:
uses: urlstechie/urlchecker-action@0.2.31
with:
exclude_patterns: localhost,127.0.,192.168.
exclude_urls: https://api.server,https://graph.microsoft.com/User.Read,https://dev-a63ggcut.auth0.com
exclude_urls: https://api.server,https://graph.microsoft.com/User.Read,https://dev-a63ggcut.auth0.com/
print_all: false
file_types: .md
33 changes: 33 additions & 0 deletions documentation/guides/Protobuf.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Kafkaui Protobuf Support

Kafkaui supports deserializing protobuf messages in two ways:
1. Using Confluent Schema Registry's [protobuf support](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-protobuf.html).
2. Supplying a protobuf file as well as a configuration that maps topic names to protobuf types.

## Configuring Kafkaui with a Protobuf File

To configure Kafkaui to deserialize protobuf messages using a supplied protobuf schema add the following to the config:
```yaml
kafka:
clusters:
- # Cluster configuration omitted.
# protobufFile is the path to the protobuf schema.
protobufFile: path/to/my.proto
# protobufMessageName is the default protobuf type that is used to deserilize
# the message's value if the topic is not found in protobufMessageNameByTopic.
protobufMessageName: my.Type1
# protobufMessageNameByTopic is a mapping of topic names to protobuf types.
# This mapping is required and is used to deserialize the Kafka message's value.
protobufMessageNameByTopic:
topic1: my.Type1
topic2: my.Type2
# protobufMessageNameForKey is the default protobuf type that is used to deserilize
# the message's key if the topic is not found in protobufMessageNameForKeyByTopic.
protobufMessageNameForKey: my.Type1
# protobufMessageNameForKeyByTopic is a mapping of topic names to protobuf types.
# This mapping is optional and is used to deserialize the Kafka message's key.
# If a protobuf type is not found for a topic's key, the key is deserialized as a string,
# unless protobufMessageNameForKey is specified.
protobufMessageNameForKeyByTopic:
topic1: my.KeyType1
```
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public static class Cluster {
String protobufFile;
String protobufMessageName;
Map<String, String> protobufMessageNameByTopic;
String protobufMessageNameForKey;
Map<String, String> protobufMessageNameForKeyByTopic;
List<ConnectCluster> kafkaConnect;
int jmxPort;
boolean jmxSsl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public class KafkaCluster {
private final Path protobufFile;
private final String protobufMessageName;
private final Map<String, String> protobufMessageNameByTopic;
private final String protobufMessageNameForKey;
private final Map<String, String> protobufMessageNameForKeyByTopic;
private final Properties properties;
private final boolean readOnly;
private final boolean disableLogDirsCollection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ private RecordSerDe createRecordDeserializerForCluster(KafkaCluster cluster) {
if (cluster.getProtobufFile() != null) {
log.info("Using ProtobufFileRecordSerDe for cluster '{}'", cluster.getName());
return new ProtobufFileRecordSerDe(cluster.getProtobufFile(),
cluster.getProtobufMessageNameByTopic(), cluster.getProtobufMessageName());
cluster.getProtobufMessageNameByTopic(), cluster.getProtobufMessageNameForKeyByTopic(),
cluster.getProtobufMessageName(), cluster.getProtobufMessageNameForKey());
} else if (cluster.getSchemaRegistry() != null) {
log.info("Using SchemaRegistryAwareRecordSerDe for cluster '{}'", cluster.getName());
return new SchemaRegistryAwareRecordSerDe(cluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.provectus.kafka.ui.model.MessageSchemaDTO;
import com.provectus.kafka.ui.model.TopicMessageSchemaDTO;
import com.provectus.kafka.ui.serde.schemaregistry.MessageFormat;
import com.provectus.kafka.ui.serde.schemaregistry.StringMessageFormatter;
import com.provectus.kafka.ui.util.jsonschema.JsonSchema;
import com.provectus.kafka.ui.util.jsonschema.ProtobufSchemaConverter;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
Expand All @@ -22,20 +23,26 @@
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Bytes;

//TODO: currently we assume that keys for this serde are always string - need to discuss if it is ok
@Slf4j
public class ProtobufFileRecordSerDe implements RecordSerDe {
private static final StringMessageFormatter FALLBACK_FORMATTER = new StringMessageFormatter();

private final ProtobufSchema protobufSchema;
private final Path protobufSchemaPath;
private final ProtobufSchemaConverter schemaConverter = new ProtobufSchemaConverter();
private final Map<String, Descriptor> messageDescriptorMap;
private final Map<String, Descriptor> keyMessageDescriptorMap;
private final Descriptor defaultMessageDescriptor;
private final Descriptor defaultKeyMessageDescriptor;

public ProtobufFileRecordSerDe(Path protobufSchemaPath, Map<String, String> messageNameMap,
String defaultMessageName)
Map<String, String> keyMessageNameMap, String defaultMessageName,
@Nullable String defaultKeyMessageName)
throws IOException {
this.protobufSchemaPath = protobufSchemaPath;
try (final Stream<String> lines = Files.lines(protobufSchemaPath)) {
Expand All @@ -49,35 +56,70 @@ public ProtobufFileRecordSerDe(Path protobufSchemaPath, Map<String, String> mess
}
this.messageDescriptorMap = new HashMap<>();
if (messageNameMap != null) {
for (Map.Entry<String, String> entry : messageNameMap.entrySet()) {
var descriptor = Objects.requireNonNull(protobufSchema.toDescriptor(entry.getValue()),
"The given message type is not found in protobuf definition: "
+ entry.getValue());
messageDescriptorMap.put(entry.getKey(), descriptor);
}
populateDescriptors(messageNameMap, messageDescriptorMap);
}
this.keyMessageDescriptorMap = new HashMap<>();
if (keyMessageNameMap != null) {
populateDescriptors(keyMessageNameMap, keyMessageDescriptorMap);
}
defaultMessageDescriptor = Objects.requireNonNull(protobufSchema.toDescriptor(),
"The given message type is not found in protobuf definition: "
+ defaultMessageName);
if (defaultKeyMessageName != null) {
defaultKeyMessageDescriptor = schema.copy(defaultKeyMessageName).toDescriptor();
} else {
defaultKeyMessageDescriptor = null;
}
}
}

private void populateDescriptors(Map<String, String> messageNameMap, Map<String, Descriptor> messageDescriptorMap) {
for (Map.Entry<String, String> entry : messageNameMap.entrySet()) {
var descriptor = Objects.requireNonNull(protobufSchema.toDescriptor(entry.getValue()),
"The given message type is not found in protobuf definition: "
+ entry.getValue());
messageDescriptorMap.put(entry.getKey(), descriptor);
}
}

@Override
public DeserializedKeyValue deserialize(ConsumerRecord<Bytes, Bytes> msg) {
try {
var builder = DeserializedKeyValue.builder();
if (msg.key() != null) {
builder.key(new String(msg.key().get()));
builder.keyFormat(MessageFormat.UNKNOWN);
var builder = DeserializedKeyValue.builder();

if (msg.key() != null) {
Descriptor descriptor = getKeyDescriptor(msg.topic());
if (descriptor == null) {
builder.key(FALLBACK_FORMATTER.format(msg.topic(), msg.key().get()));
builder.keyFormat(FALLBACK_FORMATTER.getFormat());
} else {
try {
builder.key(parse(msg.key().get(), descriptor));
builder.keyFormat(MessageFormat.PROTOBUF);
} catch (Throwable e) {
log.debug("Failed to deserialize key as protobuf, falling back to string formatter", e);
builder.key(FALLBACK_FORMATTER.format(msg.topic(), msg.key().get()));
builder.keyFormat(FALLBACK_FORMATTER.getFormat());
}
}
if (msg.value() != null) {
}

if (msg.value() != null) {
try {
builder.value(parse(msg.value().get(), getDescriptor(msg.topic())));
builder.valueFormat(MessageFormat.PROTOBUF);
} catch (Throwable e) {
log.debug("Failed to deserialize value as protobuf, falling back to string formatter", e);
builder.key(FALLBACK_FORMATTER.format(msg.topic(), msg.value().get()));
builder.keyFormat(FALLBACK_FORMATTER.getFormat());
}
return builder.build();
} catch (Throwable e) {
throw new RuntimeException("Failed to parse record from topic " + msg.topic(), e);
}

return builder.build();
}

@Nullable
private Descriptor getKeyDescriptor(String topic) {
return keyMessageDescriptorMap.getOrDefault(topic, defaultKeyMessageDescriptor);
}

private Descriptor getDescriptor(String topic) {
Expand All @@ -99,40 +141,67 @@ public ProducerRecord<byte[], byte[]> serialize(String topic,
@Nullable String key,
@Nullable String data,
@Nullable Integer partition) {
if (data == null) {
return new ProducerRecord<>(topic, partition, Objects.requireNonNull(key).getBytes(), null);
byte[] keyPayload = null;
byte[] valuePayload = null;

if (key != null) {
Descriptor keyDescriptor = getKeyDescriptor(topic);
if (keyDescriptor == null) {
keyPayload = key.getBytes();
} else {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(keyDescriptor);
try {
JsonFormat.parser().merge(key, builder);
keyPayload = builder.build().toByteArray();
} catch (Throwable e) {
throw new RuntimeException("Failed to merge record key for topic " + topic, e);
}
}
}
DynamicMessage.Builder builder = DynamicMessage.newBuilder(getDescriptor(topic));
try {
JsonFormat.parser().merge(data, builder);
final DynamicMessage message = builder.build();
return new ProducerRecord<>(
topic,
partition,
Optional.ofNullable(key).map(String::getBytes).orElse(null),
message.toByteArray()
);
} catch (Throwable e) {
throw new RuntimeException("Failed to merge record for topic " + topic, e);

if (data != null) {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(getDescriptor(topic));
try {
JsonFormat.parser().merge(data, builder);
valuePayload = builder.build().toByteArray();
} catch (Throwable e) {
throw new RuntimeException("Failed to merge record value for topic " + topic, e);
}
}

return new ProducerRecord<>(
topic,
partition,
keyPayload,
valuePayload);
}

@Override
public TopicMessageSchemaDTO getTopicSchema(String topic) {
JsonSchema keyJsonSchema;

Descriptor keyDescriptor = getKeyDescriptor(topic);
if (keyDescriptor == null) {
keyJsonSchema = JsonSchema.stringSchema();
} else {
keyJsonSchema = schemaConverter.convert(
protobufSchemaPath.toUri(),
keyDescriptor);
}

final JsonSchema jsonSchema = schemaConverter.convert(
protobufSchemaPath.toUri(),
getDescriptor(topic)
);
final MessageSchemaDTO keySchema = new MessageSchemaDTO()
.name(protobufSchema.fullName())
.source(MessageSchemaDTO.SourceEnum.PROTO_FILE)
.schema(JsonSchema.stringSchema().toJson());
.schema(keyJsonSchema.toJson());

final JsonSchema valueJsonSchema = schemaConverter.convert(
protobufSchemaPath.toUri(),
getDescriptor(topic));

final MessageSchemaDTO valueSchema = new MessageSchemaDTO()
.name(protobufSchema.fullName())
.source(MessageSchemaDTO.SourceEnum.PROTO_FILE)
.schema(jsonSchema.toJson());
.schema(valueJsonSchema.toJson());

return new TopicMessageSchemaDTO()
.key(keySchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,29 @@ void testDeserialize() throws IOException {
var messageNameMap = Map.of(
"topic1", "test.Person",
"topic2", "test.AddressBook");
var keyMessageNameMap = Map.of(
"topic2", "test.Person");
var deserializer =
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, null);
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, keyMessageNameMap, null, null);
var msg1 = deserializer
.deserialize(new ConsumerRecord<>("topic1", 1, 0, Bytes.wrap("key".getBytes()),
Bytes.wrap(personMessage)));
assertEquals(MessageFormat.PROTOBUF, msg1.getValueFormat());
assertTrue(msg1.getValue().contains("user1@example.com"));

var msg2 = deserializer
.deserialize(new ConsumerRecord<>("topic2", 1, 1, Bytes.wrap("key".getBytes()),
.deserialize(new ConsumerRecord<>("topic2", 1, 1, Bytes.wrap(personMessage),
Bytes.wrap(addressBookMessage)));
assertEquals(MessageFormat.PROTOBUF, msg2.getKeyFormat());
assertTrue(msg2.getKey().contains("user1@example.com"));
assertTrue(msg2.getValue().contains("user2@example.com"));
}

@Test
void testNoDefaultMessageName() throws IOException {
// by default the first message type defined in proto definition is used
var deserializer =
new ProtobufFileRecordSerDe(protobufSchemaPath, Collections.emptyMap(), null);
new ProtobufFileRecordSerDe(protobufSchemaPath, Collections.emptyMap(), null, null, null);
var msg = deserializer
.deserialize(new ConsumerRecord<>("topic", 1, 0, Bytes.wrap("key".getBytes()),
Bytes.wrap(personMessage)));
Expand All @@ -80,19 +84,42 @@ void testNoDefaultMessageName() throws IOException {
void testDefaultMessageName() throws IOException {
var messageNameMap = Map.of("topic1", "test.Person");
var deserializer =
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, "test.AddressBook");
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, null, "test.AddressBook", null);
var msg = deserializer
.deserialize(new ConsumerRecord<>("a_random_topic", 1, 0, Bytes.wrap("key".getBytes()),
.deserialize(new ConsumerRecord<>("a_random_topic", 1, 0, Bytes.wrap(addressBookMessage),
Bytes.wrap(addressBookMessage)));
assertTrue(msg.getValue().contains("user2@example.com"));
}

@Test
void testDefaultKeyMessageName() throws IOException {
var messageNameMap = Map.of("topic1", "test.Person");
var deserializer =
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, messageNameMap, "test.AddressBook",
"test.AddressBook");
var msg = deserializer
.deserialize(new ConsumerRecord<>("a_random_topic", 1, 0, Bytes.wrap(addressBookMessage),
Bytes.wrap(addressBookMessage)));
assertTrue(msg.getKey().contains("user2@example.com"));
}

@Test
void testSerialize() throws IOException {
var messageNameMap = Map.of("topic1", "test.Person");
var serializer =
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, "test.AddressBook");
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, null, "test.AddressBook", null);
var serialized = serializer.serialize("topic1", "key1", "{\"name\":\"MyName\"}", 0);
assertNotNull(serialized.value());
}

@Test
void testSerializeKeyAndValue() throws IOException {
var messageNameMap = Map.of("topic1", "test.Person");
var serializer =
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, messageNameMap, "test.AddressBook",
"test.AddressBook");
var serialized = serializer.serialize("topic1", "{\"name\":\"MyName\"}", "{\"name\":\"MyName\"}", 0);
assertNotNull(serialized.key());
assertNotNull(serialized.value());
}
}

0 comments on commit e8999cf

Please sign in to comment.