Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address issue #1699 (config option to deserialize proto message keys) #1729

Merged
merged 15 commits into from
Apr 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/documentation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ jobs:
uses: urlstechie/urlchecker-action@0.2.31
with:
exclude_patterns: localhost,127.0.,192.168.
exclude_urls: https://api.server,https://graph.microsoft.com/User.Read,https://dev-a63ggcut.auth0.com
exclude_urls: https://api.server,https://graph.microsoft.com/User.Read,https://dev-a63ggcut.auth0.com/
print_all: false
file_types: .md
33 changes: 33 additions & 0 deletions documentation/guides/Protobuf.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Kafkaui Protobuf Support

Kafkaui supports deserializing protobuf messages in two ways:
1. Using Confluent Schema Registry's [protobuf support](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-protobuf.html).
2. Supplying a protobuf file as well as a configuration that maps topic names to protobuf types.

## Configuring Kafkaui with a Protobuf File

To configure Kafkaui to deserialize protobuf messages using a supplied protobuf schema add the following to the config:
```yaml
kafka:
clusters:
- # Cluster configuration omitted.
# protobufFile is the path to the protobuf schema.
protobufFile: path/to/my.proto
# protobufMessageName is the default protobuf type that is used to deserilize
# the message's value if the topic is not found in protobufMessageNameByTopic.
protobufMessageName: my.Type1
# protobufMessageNameByTopic is a mapping of topic names to protobuf types.
# This mapping is required and is used to deserialize the Kafka message's value.
protobufMessageNameByTopic:
topic1: my.Type1
topic2: my.Type2
# protobufMessageNameForKey is the default protobuf type that is used to deserilize
# the message's key if the topic is not found in protobufMessageNameForKeyByTopic.
protobufMessageNameForKey: my.Type1
# protobufMessageNameForKeyByTopic is a mapping of topic names to protobuf types.
# This mapping is optional and is used to deserialize the Kafka message's key.
# If a protobuf type is not found for a topic's key, the key is deserialized as a string,
# unless protobufMessageNameForKey is specified.
protobufMessageNameForKeyByTopic:
topic1: my.KeyType1
```
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public static class Cluster {
String protobufFile;
String protobufMessageName;
Map<String, String> protobufMessageNameByTopic;
String protobufMessageNameForKey;
Map<String, String> protobufMessageNameForKeyByTopic;
List<ConnectCluster> kafkaConnect;
int jmxPort;
boolean jmxSsl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public class KafkaCluster {
private final Path protobufFile;
private final String protobufMessageName;
private final Map<String, String> protobufMessageNameByTopic;
private final String protobufMessageNameForKey;
private final Map<String, String> protobufMessageNameForKeyByTopic;
private final Properties properties;
private final boolean readOnly;
private final boolean disableLogDirsCollection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ private RecordSerDe createRecordDeserializerForCluster(KafkaCluster cluster) {
if (cluster.getProtobufFile() != null) {
log.info("Using ProtobufFileRecordSerDe for cluster '{}'", cluster.getName());
return new ProtobufFileRecordSerDe(cluster.getProtobufFile(),
cluster.getProtobufMessageNameByTopic(), cluster.getProtobufMessageName());
cluster.getProtobufMessageNameByTopic(), cluster.getProtobufMessageNameForKeyByTopic(),
cluster.getProtobufMessageName(), cluster.getProtobufMessageNameForKey());
} else if (cluster.getSchemaRegistry() != null) {
log.info("Using SchemaRegistryAwareRecordSerDe for cluster '{}'", cluster.getName());
return new SchemaRegistryAwareRecordSerDe(cluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.provectus.kafka.ui.model.MessageSchemaDTO;
import com.provectus.kafka.ui.model.TopicMessageSchemaDTO;
import com.provectus.kafka.ui.serde.schemaregistry.MessageFormat;
import com.provectus.kafka.ui.serde.schemaregistry.StringMessageFormatter;
import com.provectus.kafka.ui.util.jsonschema.JsonSchema;
import com.provectus.kafka.ui.util.jsonschema.ProtobufSchemaConverter;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
Expand All @@ -22,20 +23,26 @@
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Bytes;

//TODO: currently we assume that keys for this serde are always string - need to discuss if it is ok
@Slf4j
public class ProtobufFileRecordSerDe implements RecordSerDe {
private static final StringMessageFormatter FALLBACK_FORMATTER = new StringMessageFormatter();

private final ProtobufSchema protobufSchema;
private final Path protobufSchemaPath;
private final ProtobufSchemaConverter schemaConverter = new ProtobufSchemaConverter();
private final Map<String, Descriptor> messageDescriptorMap;
private final Map<String, Descriptor> keyMessageDescriptorMap;
private final Descriptor defaultMessageDescriptor;
private final Descriptor defaultKeyMessageDescriptor;

public ProtobufFileRecordSerDe(Path protobufSchemaPath, Map<String, String> messageNameMap,
String defaultMessageName)
Map<String, String> keyMessageNameMap, String defaultMessageName,
@Nullable String defaultKeyMessageName)
throws IOException {
this.protobufSchemaPath = protobufSchemaPath;
try (final Stream<String> lines = Files.lines(protobufSchemaPath)) {
Expand All @@ -49,35 +56,70 @@ public ProtobufFileRecordSerDe(Path protobufSchemaPath, Map<String, String> mess
}
this.messageDescriptorMap = new HashMap<>();
if (messageNameMap != null) {
for (Map.Entry<String, String> entry : messageNameMap.entrySet()) {
var descriptor = Objects.requireNonNull(protobufSchema.toDescriptor(entry.getValue()),
"The given message type is not found in protobuf definition: "
+ entry.getValue());
messageDescriptorMap.put(entry.getKey(), descriptor);
}
populateDescriptors(messageNameMap, messageDescriptorMap);
}
this.keyMessageDescriptorMap = new HashMap<>();
if (keyMessageNameMap != null) {
populateDescriptors(keyMessageNameMap, keyMessageDescriptorMap);
}
defaultMessageDescriptor = Objects.requireNonNull(protobufSchema.toDescriptor(),
"The given message type is not found in protobuf definition: "
+ defaultMessageName);
if (defaultKeyMessageName != null) {
defaultKeyMessageDescriptor = schema.copy(defaultKeyMessageName).toDescriptor();
} else {
defaultKeyMessageDescriptor = null;
}
}
}

private void populateDescriptors(Map<String, String> messageNameMap, Map<String, Descriptor> messageDescriptorMap) {
for (Map.Entry<String, String> entry : messageNameMap.entrySet()) {
var descriptor = Objects.requireNonNull(protobufSchema.toDescriptor(entry.getValue()),
"The given message type is not found in protobuf definition: "
+ entry.getValue());
messageDescriptorMap.put(entry.getKey(), descriptor);
}
}

@Override
public DeserializedKeyValue deserialize(ConsumerRecord<Bytes, Bytes> msg) {
try {
var builder = DeserializedKeyValue.builder();
if (msg.key() != null) {
builder.key(new String(msg.key().get()));
builder.keyFormat(MessageFormat.UNKNOWN);
var builder = DeserializedKeyValue.builder();

if (msg.key() != null) {
Descriptor descriptor = getKeyDescriptor(msg.topic());
if (descriptor == null) {
builder.key(FALLBACK_FORMATTER.format(msg.topic(), msg.key().get()));
builder.keyFormat(FALLBACK_FORMATTER.getFormat());
} else {
try {
builder.key(parse(msg.key().get(), descriptor));
builder.keyFormat(MessageFormat.PROTOBUF);
} catch (Throwable e) {
log.debug("Failed to deserialize key as protobuf, falling back to string formatter", e);
builder.key(FALLBACK_FORMATTER.format(msg.topic(), msg.key().get()));
builder.keyFormat(FALLBACK_FORMATTER.getFormat());
}
iliax marked this conversation as resolved.
Show resolved Hide resolved
}
if (msg.value() != null) {
}

if (msg.value() != null) {
try {
builder.value(parse(msg.value().get(), getDescriptor(msg.topic())));
builder.valueFormat(MessageFormat.PROTOBUF);
} catch (Throwable e) {
log.debug("Failed to deserialize value as protobuf, falling back to string formatter", e);
builder.key(FALLBACK_FORMATTER.format(msg.topic(), msg.value().get()));
builder.keyFormat(FALLBACK_FORMATTER.getFormat());
}
return builder.build();
} catch (Throwable e) {
throw new RuntimeException("Failed to parse record from topic " + msg.topic(), e);
}

return builder.build();
}

iliax marked this conversation as resolved.
Show resolved Hide resolved
@Nullable
private Descriptor getKeyDescriptor(String topic) {
return keyMessageDescriptorMap.getOrDefault(topic, defaultKeyMessageDescriptor);
}

private Descriptor getDescriptor(String topic) {
Expand All @@ -99,40 +141,67 @@ public ProducerRecord<byte[], byte[]> serialize(String topic,
@Nullable String key,
@Nullable String data,
@Nullable Integer partition) {
if (data == null) {
return new ProducerRecord<>(topic, partition, Objects.requireNonNull(key).getBytes(), null);
byte[] keyPayload = null;
byte[] valuePayload = null;

if (key != null) {
Descriptor keyDescriptor = getKeyDescriptor(topic);
if (keyDescriptor == null) {
keyPayload = key.getBytes();
} else {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(keyDescriptor);
try {
JsonFormat.parser().merge(key, builder);
keyPayload = builder.build().toByteArray();
} catch (Throwable e) {
throw new RuntimeException("Failed to merge record key for topic " + topic, e);
}
}
}
DynamicMessage.Builder builder = DynamicMessage.newBuilder(getDescriptor(topic));
try {
JsonFormat.parser().merge(data, builder);
final DynamicMessage message = builder.build();
return new ProducerRecord<>(
topic,
partition,
Optional.ofNullable(key).map(String::getBytes).orElse(null),
message.toByteArray()
);
} catch (Throwable e) {
throw new RuntimeException("Failed to merge record for topic " + topic, e);

if (data != null) {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(getDescriptor(topic));
try {
JsonFormat.parser().merge(data, builder);
valuePayload = builder.build().toByteArray();
} catch (Throwable e) {
throw new RuntimeException("Failed to merge record value for topic " + topic, e);
}
}

return new ProducerRecord<>(
topic,
partition,
keyPayload,
valuePayload);
}

@Override
public TopicMessageSchemaDTO getTopicSchema(String topic) {
JsonSchema keyJsonSchema;

Descriptor keyDescriptor = getKeyDescriptor(topic);
if (keyDescriptor == null) {
keyJsonSchema = JsonSchema.stringSchema();
} else {
keyJsonSchema = schemaConverter.convert(
protobufSchemaPath.toUri(),
keyDescriptor);
}

final JsonSchema jsonSchema = schemaConverter.convert(
protobufSchemaPath.toUri(),
getDescriptor(topic)
);
final MessageSchemaDTO keySchema = new MessageSchemaDTO()
.name(protobufSchema.fullName())
.source(MessageSchemaDTO.SourceEnum.PROTO_FILE)
.schema(JsonSchema.stringSchema().toJson());
.schema(keyJsonSchema.toJson());

final JsonSchema valueJsonSchema = schemaConverter.convert(
protobufSchemaPath.toUri(),
getDescriptor(topic));

final MessageSchemaDTO valueSchema = new MessageSchemaDTO()
.name(protobufSchema.fullName())
.source(MessageSchemaDTO.SourceEnum.PROTO_FILE)
.schema(jsonSchema.toJson());
.schema(valueJsonSchema.toJson());

return new TopicMessageSchemaDTO()
.key(keySchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,29 @@ void testDeserialize() throws IOException {
var messageNameMap = Map.of(
"topic1", "test.Person",
"topic2", "test.AddressBook");
var keyMessageNameMap = Map.of(
"topic2", "test.Person");
var deserializer =
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, null);
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, keyMessageNameMap, null, null);
var msg1 = deserializer
.deserialize(new ConsumerRecord<>("topic1", 1, 0, Bytes.wrap("key".getBytes()),
Bytes.wrap(personMessage)));
assertEquals(MessageFormat.PROTOBUF, msg1.getValueFormat());
assertTrue(msg1.getValue().contains("user1@example.com"));

var msg2 = deserializer
.deserialize(new ConsumerRecord<>("topic2", 1, 1, Bytes.wrap("key".getBytes()),
.deserialize(new ConsumerRecord<>("topic2", 1, 1, Bytes.wrap(personMessage),
Bytes.wrap(addressBookMessage)));
assertEquals(MessageFormat.PROTOBUF, msg2.getKeyFormat());
assertTrue(msg2.getKey().contains("user1@example.com"));
assertTrue(msg2.getValue().contains("user2@example.com"));
}

@Test
void testNoDefaultMessageName() throws IOException {
// by default the first message type defined in proto definition is used
var deserializer =
new ProtobufFileRecordSerDe(protobufSchemaPath, Collections.emptyMap(), null);
new ProtobufFileRecordSerDe(protobufSchemaPath, Collections.emptyMap(), null, null, null);
var msg = deserializer
.deserialize(new ConsumerRecord<>("topic", 1, 0, Bytes.wrap("key".getBytes()),
Bytes.wrap(personMessage)));
Expand All @@ -80,19 +84,42 @@ void testNoDefaultMessageName() throws IOException {
void testDefaultMessageName() throws IOException {
var messageNameMap = Map.of("topic1", "test.Person");
var deserializer =
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, "test.AddressBook");
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, null, "test.AddressBook", null);
var msg = deserializer
.deserialize(new ConsumerRecord<>("a_random_topic", 1, 0, Bytes.wrap("key".getBytes()),
.deserialize(new ConsumerRecord<>("a_random_topic", 1, 0, Bytes.wrap(addressBookMessage),
Bytes.wrap(addressBookMessage)));
assertTrue(msg.getValue().contains("user2@example.com"));
}

@Test
void testDefaultKeyMessageName() throws IOException {
var messageNameMap = Map.of("topic1", "test.Person");
var deserializer =
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, messageNameMap, "test.AddressBook",
"test.AddressBook");
var msg = deserializer
.deserialize(new ConsumerRecord<>("a_random_topic", 1, 0, Bytes.wrap(addressBookMessage),
Bytes.wrap(addressBookMessage)));
assertTrue(msg.getKey().contains("user2@example.com"));
}

@Test
void testSerialize() throws IOException {
var messageNameMap = Map.of("topic1", "test.Person");
var serializer =
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, "test.AddressBook");
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, null, "test.AddressBook", null);
var serialized = serializer.serialize("topic1", "key1", "{\"name\":\"MyName\"}", 0);
assertNotNull(serialized.value());
}

@Test
void testSerializeKeyAndValue() throws IOException {
var messageNameMap = Map.of("topic1", "test.Person");
var serializer =
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, messageNameMap, "test.AddressBook",
"test.AddressBook");
var serialized = serializer.serialize("topic1", "{\"name\":\"MyName\"}", "{\"name\":\"MyName\"}", 0);
assertNotNull(serialized.key());
assertNotNull(serialized.value());
}
}