Skip to content

Commit

Permalink
feat(topicdata) : custom deserializer for avro raw format (#843)
Browse files Browse the repository at this point in the history
close #833
  • Loading branch information
bgranvea authored and tchiotludo committed Oct 24, 2021
1 parent bd6347d commit c731681
Show file tree
Hide file tree
Showing 15 changed files with 462 additions and 12 deletions.
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,36 @@ akhq:
More examples about Protobuf deserialization can be found in [tests](./src/test/java/org/akhq/utils).
Info about the descriptor files generation can be found in [test resources](./src/test/resources/protobuf_proto).

#### Avro deserialization

Avro messages using Schema registry are automatically decoded if the registry is configured (see [Kafka cluster](#kafka-cluster-configuration)).

You can also decode raw binary Avro messages, that is messages encoded directly with [DatumWriter](https://avro.apache.org/docs/current/api/java/org/apache/avro/io/DatumWriter.html) without any header.
You must provide a `schemas-folder` and mappings which associate a `topic-regex` and a schema file name. The schema can be
specified either for message keys with `key-schema-file` and/or for values with `value-schema-file`.

Here is an example of configuration:

```
akhq:
connections:
kafka:
properties:
# standard kafka properties
avro-raw:
schemas-folder: "/app/avro_schemas"
topics-mapping:
- topic-regex: "album.*"
value-schema-file: "Album.avsc"
- topic-regex: "film.*"
value-schema-file: "Film.avsc"
- topic-regex: "test.*"
key-schema-file: "Key.avsc"
value-schema-file: "Value.avsc"
```

Examples can be found in [tests](./src/test/java/org/akhq/utils).

### AKHQ Configuration Bootstrap OAuth2

#### Requirement Library Strimzi
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/org/akhq/configs/AvroTopicsMapping.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.akhq.configs;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class AvroTopicsMapping {
String topicRegex;
String keySchemaFile;
String valueSchemaFile;
}
25 changes: 19 additions & 6 deletions src/main/java/org/akhq/configs/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
public class Connection extends AbstractProperties {
SchemaRegistry schemaRegistry;
List<Connect> connect;
ProtobufDeserializationTopicsMapping deserialization;
Deserialization deserialization = new Deserialization();
UiOptions uiOptions = new UiOptions();

public Connection(@Parameter String name) {
Expand All @@ -38,11 +38,24 @@ public static class SchemaRegistry {
}

@Getter
@Data
@ConfigurationProperties("deserialization.protobuf")
public static class ProtobufDeserializationTopicsMapping {
String descriptorsFolder;
List<TopicsMapping> topicsMapping = new ArrayList<>();
@ConfigurationProperties("deserialization")
public static class Deserialization {
ProtobufDeserializationTopicsMapping protobuf;
AvroDeserializationTopicsMapping avroRaw;

@Data
@ConfigurationProperties("protobuf")
public static class ProtobufDeserializationTopicsMapping {
String descriptorsFolder;
List<TopicsMapping> topicsMapping = new ArrayList<>();
}

@Data
@ConfigurationProperties("avro-raw")
public static class AvroDeserializationTopicsMapping {
String schemasFolder;
List<AvroTopicsMapping> topicsMapping = new ArrayList<>();
}
}

@Data
Expand Down
20 changes: 19 additions & 1 deletion src/main/java/org/akhq/models/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import lombok.*;
import org.akhq.configs.SchemaRegistryType;
import org.akhq.utils.AvroToJsonDeserializer;
import org.akhq.utils.AvroToJsonSerializer;
import org.akhq.utils.ProtobufToJsonDeserializer;
import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -54,6 +55,9 @@ public class Record {
@JsonIgnore
private ProtobufToJsonDeserializer protobufToJsonDeserializer;

@JsonIgnore
private AvroToJsonDeserializer avroToJsonDeserializer;

@Getter(AccessLevel.NONE)
private byte[] bytesKey;

Expand Down Expand Up @@ -89,7 +93,7 @@ public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte

public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record, SchemaRegistryType schemaRegistryType, Deserializer kafkaAvroDeserializer,
Deserializer kafkaJsonDeserializer, Deserializer kafkaProtoDeserializer, AvroToJsonSerializer avroToJsonSerializer,
ProtobufToJsonDeserializer protobufToJsonDeserializer, byte[] bytesValue, Topic topic) {
ProtobufToJsonDeserializer protobufToJsonDeserializer, AvroToJsonDeserializer avroToJsonDeserializer, byte[] bytesValue, Topic topic) {
if (schemaRegistryType == SchemaRegistryType.TIBCO) {
this.MAGIC_BYTE = (byte) 0x80;
} else {
Expand All @@ -111,6 +115,7 @@ public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record

this.kafkaAvroDeserializer = kafkaAvroDeserializer;
this.protobufToJsonDeserializer = protobufToJsonDeserializer;
this.avroToJsonDeserializer = avroToJsonDeserializer;
this.kafkaProtoDeserializer = kafkaProtoDeserializer;
this.avroToJsonSerializer = avroToJsonSerializer;
this.kafkaJsonDeserializer = kafkaJsonDeserializer;
Expand Down Expand Up @@ -197,6 +202,19 @@ private String convertToString(byte[] payload, Integer schemaId, boolean isKey)
return new String(payload);
}
}

if (avroToJsonDeserializer != null) {
try {
String record = avroToJsonDeserializer.deserialize(topic.getName(), payload, isKey);
if (record != null) {
return record;
}
} catch (Exception exception) {
this.exceptions.add(exception.getMessage());

return new String(payload);
}
}
return new String(payload);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.akhq.repositories;

import org.akhq.modules.KafkaModule;
import org.akhq.utils.AvroToJsonDeserializer;
import org.akhq.utils.AvroToJsonSerializer;
import org.akhq.utils.ProtobufToJsonDeserializer;

import javax.inject.Inject;
Expand All @@ -12,15 +14,28 @@
public class CustomDeserializerRepository {
@Inject
private KafkaModule kafkaModule;
@Inject
private AvroToJsonSerializer avroToJsonSerializer;
private final Map<String, ProtobufToJsonDeserializer> protobufToJsonDeserializers = new HashMap<>();
private final Map<String, AvroToJsonDeserializer> avroToJsonDeserializers = new HashMap<>();

public ProtobufToJsonDeserializer getProtobufToJsonDeserializer(String clusterId) {
if (!this.protobufToJsonDeserializers.containsKey(clusterId)) {
this.protobufToJsonDeserializers.put(
clusterId,
new ProtobufToJsonDeserializer(this.kafkaModule.getConnection(clusterId).getDeserialization())
new ProtobufToJsonDeserializer(this.kafkaModule.getConnection(clusterId).getDeserialization().getProtobuf())
);
}
return this.protobufToJsonDeserializers.get(clusterId);
}

public AvroToJsonDeserializer getAvroToJsonDeserializer(String clusterId) {
if (!this.avroToJsonDeserializers.containsKey(clusterId)) {
this.avroToJsonDeserializers.put(
clusterId,
new AvroToJsonDeserializer(this.kafkaModule.getConnection(clusterId).getDeserialization().getAvroRaw(), this.avroToJsonSerializer)
);
}
return this.avroToJsonDeserializers.get(clusterId);
}
}
4 changes: 3 additions & 1 deletion src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ private Record newRecord(ConsumerRecord<byte[], byte[]> record, String clusterId
schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaProtoDeserializer(clusterId):null,
this.avroToJsonSerializer,
this.customDeserializerRepository.getProtobufToJsonDeserializer(clusterId),
this.customDeserializerRepository.getAvroToJsonDeserializer(clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, client,
this.schemaRegistryRepository.getSchemaRegistryType(clusterId)),
topic
Expand All @@ -458,6 +459,7 @@ private Record newRecord(ConsumerRecord<byte[], byte[]> record, BaseOptions opti
schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaProtoDeserializer(options.clusterId):null,
this.avroToJsonSerializer,
this.customDeserializerRepository.getProtobufToJsonDeserializer(options.clusterId),
this.customDeserializerRepository.getAvroToJsonDeserializer(options.clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, client,
this.schemaRegistryRepository.getSchemaRegistryType(options.clusterId)),
topic
Expand Down Expand Up @@ -1262,4 +1264,4 @@ private static class EndOffsetBound {
private final KafkaConsumer<byte[], byte[]> consumer;
}
}

2 changes: 1 addition & 1 deletion src/main/java/org/akhq/utils/AvroDeserializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public static Map<String, Object> recordDeserializer(GenericRecord record) {
.getFields()
.stream()
.collect(
HashMap::new,
LinkedHashMap::new, // preserve schema field order
(m, v) -> m.put(
v.name(),
AvroDeserializer.objectDeserializer(record.get(v.name()), v.schema())
Expand Down
150 changes: 150 additions & 0 deletions src/main/java/org/akhq/utils/AvroToJsonDeserializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package org.akhq.utils;

import io.micronaut.core.serialize.exceptions.SerializationException;
import lombok.extern.slf4j.Slf4j;
import org.akhq.configs.AvroTopicsMapping;
import org.akhq.configs.Connection;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

/**
* Class for deserialization of messages in Avro raw data binary format using topics mapping config.
*/
@Slf4j
public class AvroToJsonDeserializer {
private final DecoderFactory decoderFactory = DecoderFactory.get();
private final Map<String, Schema> keySchemas;
private final Map<String, Schema> valueSchemas;
private final List<AvroTopicsMapping> topicsMapping;
private final String avroSchemasFolder;
private final AvroToJsonSerializer avroToJsonSerializer;

public AvroToJsonDeserializer(Connection.Deserialization.AvroDeserializationTopicsMapping avroDeserializationTopicsMapping, AvroToJsonSerializer avroToJsonSerializer) {
if (avroDeserializationTopicsMapping == null) {
this.keySchemas = new HashMap<>();
this.valueSchemas = new HashMap<>();
this.topicsMapping = new ArrayList<>();
this.avroSchemasFolder = null;
this.avroToJsonSerializer = null;
} else {
this.avroSchemasFolder = avroDeserializationTopicsMapping.getSchemasFolder();
this.topicsMapping = avroDeserializationTopicsMapping.getTopicsMapping();
this.keySchemas = buildSchemas(AvroTopicsMapping::getKeySchemaFile);
this.valueSchemas = buildSchemas(AvroTopicsMapping::getValueSchemaFile);
this.avroToJsonSerializer = avroToJsonSerializer;
}
}

/**
* Load Avro schemas from schema folder
*
* @return map where keys are topic regexes and value is Avro schema
*/
private Map<String, Schema> buildSchemas(Function<AvroTopicsMapping, String> schemaFileMapper) {
Map<String, Schema> allSchemas = new HashMap<>();
for (AvroTopicsMapping mapping : topicsMapping) {
String schemaFile = schemaFileMapper.apply(mapping);

if (schemaFile != null) {
try {
Schema schema = loadSchemaFile(mapping, schemaFile);
allSchemas.put(mapping.getTopicRegex(), schema);
} catch (IOException e) {
throw new RuntimeException(String.format("Cannot get a schema file for the topics regex [%s]", mapping.getTopicRegex()), e);
}
}
}
return allSchemas;
}

Schema loadSchemaFile(AvroTopicsMapping mapping, String schemaFile) throws IOException {
if (avroSchemasFolder != null && Files.exists(Path.of(avroSchemasFolder))) {
String fullPath = avroSchemasFolder + File.separator + schemaFile;
return new Schema.Parser().parse(Path.of(fullPath).toFile());
}
throw new FileNotFoundException("Avro schema file is not found for topic regex [" +
mapping.getTopicRegex() + "]. Folder is not specified or doesn't exist.");
}

/**
* Deserialize from Avro raw data binary format to Json.
* Messages must have been encoded directly with {@link org.apache.avro.io.DatumWriter}, not {@link org.apache.avro.file.DataFileWriter} or {@link org.apache.avro.message.BinaryMessageEncoder}.
* Topic name should match topic-regex from {@code akhq.connections.[clusterName].deserialization.avro.topics-mapping} config
* and schema should be set for key or value in that config.
*
* @param topic current topic name
* @param buffer binary data to decode
* @param isKey is this data represent key or value
* @return {@code null} if cannot deserialize or configuration is not matching, return decoded string otherwise
*/
public String deserialize(String topic, byte[] buffer, boolean isKey) {
AvroTopicsMapping matchingConfig = findMatchingConfig(topic);
if (matchingConfig == null) {
log.debug("Avro deserialization config is not found for topic [{}]", topic);
return null;
}

if (matchingConfig.getKeySchemaFile() == null && matchingConfig.getValueSchemaFile() == null) {
throw new SerializationException(String.format("Avro deserialization is configured for topic [%s], " +
"but schema is not specified neither for a key, nor for a value.", topic));
}

Schema schema;
if (isKey) {
schema = keySchemas.get(matchingConfig.getTopicRegex());
} else {
schema = valueSchemas.get(matchingConfig.getTopicRegex());
}

if (schema == null) {
return null;
}

String result;
try {
result = tryToDeserializeWithSchemaFile(buffer, schema);
} catch (Exception e) {
throw new SerializationException(String.format("Cannot deserialize message with Avro deserializer " +
"for topic [%s] and schema [%s]", topic, schema.getFullName()), e);
}
return result;
}

private AvroTopicsMapping findMatchingConfig(String topic) {
for (AvroTopicsMapping mapping : topicsMapping) {
if (topic.matches(mapping.getTopicRegex())) {
return new AvroTopicsMapping(
mapping.getTopicRegex(),
mapping.getKeySchemaFile(), mapping.getValueSchemaFile());
}
}
return null;
}

private String tryToDeserializeWithSchemaFile(byte[] buffer, Schema schema) throws IOException {
DatumReader<?> reader = new GenericDatumReader<>(schema);
Object result = reader.read(null, decoderFactory.binaryDecoder(buffer, null));

//for primitive avro type
if (!(result instanceof GenericRecord)) {
return String.valueOf(result);
}

GenericRecord record = (GenericRecord) result;
return avroToJsonSerializer.toJson(record);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class ProtobufToJsonDeserializer {
private final List<TopicsMapping> topicsMapping;
private final String protobufDescriptorsFolder;

public ProtobufToJsonDeserializer(Connection.ProtobufDeserializationTopicsMapping protobufDeserializationTopicsMapping) {
public ProtobufToJsonDeserializer(Connection.Deserialization.ProtobufDeserializationTopicsMapping protobufDeserializationTopicsMapping) {
if (protobufDeserializationTopicsMapping == null) {
this.descriptors = new HashMap<>();
this.topicsMapping = new ArrayList<>();
Expand Down
11 changes: 11 additions & 0 deletions src/test/avro/Album.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"name": "AlbumAvro",
"namespace": "org.akhq",
"type": "record",
"fields" : [
{"name": "title", "type": "string"},
{"name": "artist", "type": {"type": "array", "items": "string"}},
{"name": "releaseYear", "type": "int"},
{"name": "songTitle", "type": {"type": "array", "items": "string"}}
]
}
Loading

0 comments on commit c731681

Please sign in to comment.