Skip to content

Commit

Permalink
BE: Refactor SchemaRegistry serialization logic (#4116)
Browse files Browse the repository at this point in the history
Co-authored-by: iliax <ikuramshin@provectus.com>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
  • Loading branch information
3 people authored Aug 30, 2023
1 parent 4ec7975 commit b0583a3
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 238 deletions.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package com.provectus.kafka.ui.serdes.builtin.sr;

import static com.provectus.kafka.ui.serdes.builtin.sr.Serialize.serializeAvro;
import static com.provectus.kafka.ui.serdes.builtin.sr.Serialize.serializeJson;
import static com.provectus.kafka.ui.serdes.builtin.sr.Serialize.serializeProto;
import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE;
import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG;

import com.google.common.annotations.VisibleForTesting;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.serde.api.DeserializeResult;
import com.provectus.kafka.ui.serde.api.PropertyResolver;
import com.provectus.kafka.ui.serde.api.RecordHeaders;
import com.provectus.kafka.ui.serde.api.SchemaDescription;
import com.provectus.kafka.ui.serdes.BuiltInSerde;
import com.provectus.kafka.ui.util.jsonschema.AvroJsonSchemaConverter;
Expand All @@ -32,13 +34,15 @@
import java.util.Optional;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.kafka.common.config.SslConfigs;


public class SchemaRegistrySerde implements BuiltInSerde {

private static final byte SR_PAYLOAD_MAGIC_BYTE = 0x0;
private static final int SR_PAYLOAD_PREFIX_LENGTH = 5;

public static String name() {
return "SchemaRegistry";
}
Expand Down Expand Up @@ -221,8 +225,8 @@ private String convertSchema(SchemaMetadata schema, ParsedSchema parsedSchema) {
.convert(basePath, ((AvroSchema) parsedSchema).rawSchema())
.toJson();
case JSON ->
//need to use confluent JsonSchema since it includes resolved references
((JsonSchema) parsedSchema).rawSchema().toString();
//need to use confluent JsonSchema since it includes resolved references
((JsonSchema) parsedSchema).rawSchema().toString();
};
}

Expand Down Expand Up @@ -254,35 +258,27 @@ private String schemaSubject(String topic, Target type) {
@Override
public Serializer serializer(String topic, Target type) {
String subject = schemaSubject(topic, type);
var schema = getSchemaBySubject(subject)
.orElseThrow(() -> new ValidationException(String.format("No schema for subject '%s' found", subject)));
boolean isKey = type == Target.KEY;
SchemaType schemaType = SchemaType.fromString(schema.getSchemaType())
.orElseThrow(() -> new IllegalStateException("Unknown schema type: " + schema.getSchemaType()));
SchemaMetadata meta = getSchemaBySubject(subject)
.orElseThrow(() -> new ValidationException(
String.format("No schema for subject '%s' found", subject)));
ParsedSchema schema = getSchemaById(meta.getId())
.orElseThrow(() -> new IllegalStateException(
String.format("Schema found for id %s, subject '%s'", meta.getId(), subject)));
SchemaType schemaType = SchemaType.fromString(meta.getSchemaType())
.orElseThrow(() -> new IllegalStateException("Unknown schema type: " + meta.getSchemaType()));
return switch (schemaType) {
case PROTOBUF -> new ProtobufSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
case AVRO -> new AvroSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
case JSON -> new JsonSchemaSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
case PROTOBUF -> input ->
serializeProto(schemaRegistryClient, topic, type, (ProtobufSchema) schema, meta.getId(), input);
case AVRO -> input ->
serializeAvro((AvroSchema) schema, meta.getId(), input);
case JSON -> input ->
serializeJson((JsonSchema) schema, meta.getId(), input);
};
}

@Override
public Deserializer deserializer(String topic, Target type) {
return new SrDeserializer(topic);
}

///--------------------------------------------------------------

private static final byte SR_RECORD_MAGIC_BYTE = (byte) 0;
private static final int SR_RECORD_PREFIX_LENGTH = 5;

@RequiredArgsConstructor
private class SrDeserializer implements Deserializer {

private final String topic;

@Override
public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
return (headers, data) -> {
var schemaId = extractSchemaIdFromMsg(data);
SchemaType format = getMessageFormatBySchemaId(schemaId);
MessageFormatter formatter = schemaRegistryFormatters.get(format);
Expand All @@ -294,7 +290,7 @@ public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
"type", format.name()
)
);
}
};
}

private SchemaType getMessageFormatBySchemaId(int schemaId) {
Expand All @@ -306,7 +302,7 @@ private SchemaType getMessageFormatBySchemaId(int schemaId) {

private int extractSchemaIdFromMsg(byte[] data) {
ByteBuffer buffer = ByteBuffer.wrap(data);
if (buffer.remaining() > SR_RECORD_PREFIX_LENGTH && buffer.get() == SR_RECORD_MAGIC_BYTE) {
if (buffer.remaining() >= SR_PAYLOAD_PREFIX_LENGTH && buffer.get() == SR_PAYLOAD_MAGIC_BYTE) {
return buffer.getInt();
}
throw new ValidationException(
Expand Down

This file was deleted.

Loading

0 comments on commit b0583a3

Please sign in to comment.