diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/AvroSchemaRegistrySerializer.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/AvroSchemaRegistrySerializer.java deleted file mode 100644 index 164938bfc70..00000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/AvroSchemaRegistrySerializer.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.provectus.kafka.ui.serdes.builtin.sr; - -import com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion; -import io.confluent.kafka.schemaregistry.ParsedSchema; -import io.confluent.kafka.schemaregistry.avro.AvroSchema; -import io.confluent.kafka.schemaregistry.client.SchemaMetadata; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; -import io.confluent.kafka.serializers.KafkaAvroSerializer; -import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; -import java.util.Map; -import org.apache.kafka.common.serialization.Serializer; - -class AvroSchemaRegistrySerializer extends SchemaRegistrySerializer { - - AvroSchemaRegistrySerializer(String topic, boolean isKey, - SchemaRegistryClient client, - SchemaMetadata schema) { - super(topic, isKey, client, schema); - } - - @Override - protected Serializer createSerializer(SchemaRegistryClient client) { - var serializer = new KafkaAvroSerializer(client); - serializer.configure( - Map.of( - "schema.registry.url", "wontbeused", - AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false, - KafkaAvroSerializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true, - AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true - ), - isKey - ); - return serializer; - } - - @Override - protected Object serialize(String value, ParsedSchema schema) { - try { - return JsonAvroConversion.convertJsonToAvro(value, ((AvroSchema) schema).rawSchema()); - } catch (Throwable e) { - throw new RuntimeException("Failed to serialize record for topic " + topic, e); - } - - } -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/JsonSchemaSchemaRegistrySerializer.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/JsonSchemaSchemaRegistrySerializer.java deleted file mode 100644 index 406ab2db5d4..00000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/JsonSchemaSchemaRegistrySerializer.java +++ /dev/null @@ -1,79 +0,0 @@ -package com.provectus.kafka.ui.serdes.builtin.sr; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.provectus.kafka.ui.exception.ValidationException; -import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant; -import io.confluent.kafka.schemaregistry.ParsedSchema; -import io.confluent.kafka.schemaregistry.client.SchemaMetadata; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.schemaregistry.json.JsonSchema; -import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; -import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer; -import java.util.Map; -import org.apache.kafka.common.serialization.Serializer; - -class JsonSchemaSchemaRegistrySerializer extends SchemaRegistrySerializer { - - private static final ObjectMapper MAPPER = new ObjectMapper(); - - JsonSchemaSchemaRegistrySerializer(String topic, - boolean isKey, - SchemaRegistryClient client, - SchemaMetadata schema) { - super(topic, isKey, client, schema); - } - - @Override - protected Serializer createSerializer(SchemaRegistryClient client) { - var serializer = new KafkaJsonSchemaSerializerWithoutSchemaInfer(client); - serializer.configure( - Map.of( - "schema.registry.url", "wontbeused", - AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false, - AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true - ), - isKey - ); - return serializer; - } - - @Override - protected JsonNode serialize(String value, ParsedSchema schema) { - try { - JsonNode json = MAPPER.readTree(value); - ((JsonSchema) schema).validate(json); - return json; - } catch (JsonProcessingException e) { - throw new ValidationException(String.format("'%s' is not valid json", value)); - } catch (org.everit.json.schema.ValidationException e) { - throw new ValidationException( - String.format("'%s' does not fit schema: %s", value, e.getAllMessages())); - } - } - - @KafkaClientInternalsDependant - private class KafkaJsonSchemaSerializerWithoutSchemaInfer - extends KafkaJsonSchemaSerializer { - - KafkaJsonSchemaSerializerWithoutSchemaInfer(SchemaRegistryClient client) { - super(client); - } - - /** - * Need to override original method because it tries to infer schema from input - * by checking 'schema' json field or @Schema annotation on input class, which is not - * possible in our case. So, we just skip all infer logic and pass schema directly. - */ - @Override - public byte[] serialize(String topic, JsonNode rec) { - return super.serializeImpl( - super.getSubjectName(topic, isKey, rec, schema), - rec, - (JsonSchema) schema - ); - } - } - -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/ProtobufSchemaRegistrySerializer.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/ProtobufSchemaRegistrySerializer.java deleted file mode 100644 index 1af2bb4999c..00000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/ProtobufSchemaRegistrySerializer.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.provectus.kafka.ui.serdes.builtin.sr; - -import com.google.protobuf.DynamicMessage; -import com.google.protobuf.Message; -import com.google.protobuf.util.JsonFormat; -import io.confluent.kafka.schemaregistry.ParsedSchema; -import io.confluent.kafka.schemaregistry.client.SchemaMetadata; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; -import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; -import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer; -import java.util.Map; -import lombok.SneakyThrows; -import org.apache.kafka.common.serialization.Serializer; - -class ProtobufSchemaRegistrySerializer extends SchemaRegistrySerializer { - - @SneakyThrows - public ProtobufSchemaRegistrySerializer(String topic, boolean isKey, - SchemaRegistryClient client, SchemaMetadata schema) { - super(topic, isKey, client, schema); - } - - @Override - protected Serializer createSerializer(SchemaRegistryClient client) { - var serializer = new KafkaProtobufSerializer<>(client); - serializer.configure( - Map.of( - "schema.registry.url", "wontbeused", - AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false, - AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true - ), - isKey - ); - return serializer; - } - - @Override - protected Message serialize(String value, ParsedSchema schema) { - ProtobufSchema protobufSchema = (ProtobufSchema) schema; - DynamicMessage.Builder builder = protobufSchema.newMessageBuilder(); - try { - JsonFormat.parser().merge(value, builder); - return builder.build(); - } catch (Throwable e) { - throw new RuntimeException("Failed to serialize record for topic " + topic, e); - } - } - -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java index 25d6e8fc88b..4ef0bbe5dd4 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java @@ -1,5 +1,8 @@ package com.provectus.kafka.ui.serdes.builtin.sr; +import static com.provectus.kafka.ui.serdes.builtin.sr.Serialize.serializeAvro; +import static com.provectus.kafka.ui.serdes.builtin.sr.Serialize.serializeJson; +import static com.provectus.kafka.ui.serdes.builtin.sr.Serialize.serializeProto; import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE; import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG; @@ -7,7 +10,6 @@ import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.serde.api.DeserializeResult; import com.provectus.kafka.ui.serde.api.PropertyResolver; -import com.provectus.kafka.ui.serde.api.RecordHeaders; import com.provectus.kafka.ui.serde.api.SchemaDescription; import com.provectus.kafka.ui.serdes.BuiltInSerde; import com.provectus.kafka.ui.util.jsonschema.AvroJsonSchemaConverter; @@ -32,13 +34,15 @@ import java.util.Optional; import java.util.concurrent.Callable; import javax.annotation.Nullable; -import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import org.apache.kafka.common.config.SslConfigs; public class SchemaRegistrySerde implements BuiltInSerde { + private static final byte SR_PAYLOAD_MAGIC_BYTE = 0x0; + private static final int SR_PAYLOAD_PREFIX_LENGTH = 5; + public static String name() { return "SchemaRegistry"; } @@ -221,8 +225,8 @@ private String convertSchema(SchemaMetadata schema, ParsedSchema parsedSchema) { .convert(basePath, ((AvroSchema) parsedSchema).rawSchema()) .toJson(); case JSON -> - //need to use confluent JsonSchema since it includes resolved references - ((JsonSchema) parsedSchema).rawSchema().toString(); + //need to use confluent JsonSchema since it includes resolved references + ((JsonSchema) parsedSchema).rawSchema().toString(); }; } @@ -254,35 +258,27 @@ private String schemaSubject(String topic, Target type) { @Override public Serializer serializer(String topic, Target type) { String subject = schemaSubject(topic, type); - var schema = getSchemaBySubject(subject) - .orElseThrow(() -> new ValidationException(String.format("No schema for subject '%s' found", subject))); - boolean isKey = type == Target.KEY; - SchemaType schemaType = SchemaType.fromString(schema.getSchemaType()) - .orElseThrow(() -> new IllegalStateException("Unknown schema type: " + schema.getSchemaType())); + SchemaMetadata meta = getSchemaBySubject(subject) + .orElseThrow(() -> new ValidationException( + String.format("No schema for subject '%s' found", subject))); + ParsedSchema schema = getSchemaById(meta.getId()) + .orElseThrow(() -> new IllegalStateException( + String.format("Schema found for id %s, subject '%s'", meta.getId(), subject))); + SchemaType schemaType = SchemaType.fromString(meta.getSchemaType()) + .orElseThrow(() -> new IllegalStateException("Unknown schema type: " + meta.getSchemaType())); return switch (schemaType) { - case PROTOBUF -> new ProtobufSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema); - case AVRO -> new AvroSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema); - case JSON -> new JsonSchemaSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema); + case PROTOBUF -> input -> + serializeProto(schemaRegistryClient, topic, type, (ProtobufSchema) schema, meta.getId(), input); + case AVRO -> input -> + serializeAvro((AvroSchema) schema, meta.getId(), input); + case JSON -> input -> + serializeJson((JsonSchema) schema, meta.getId(), input); }; } @Override public Deserializer deserializer(String topic, Target type) { - return new SrDeserializer(topic); - } - - ///-------------------------------------------------------------- - - private static final byte SR_RECORD_MAGIC_BYTE = (byte) 0; - private static final int SR_RECORD_PREFIX_LENGTH = 5; - - @RequiredArgsConstructor - private class SrDeserializer implements Deserializer { - - private final String topic; - - @Override - public DeserializeResult deserialize(RecordHeaders headers, byte[] data) { + return (headers, data) -> { var schemaId = extractSchemaIdFromMsg(data); SchemaType format = getMessageFormatBySchemaId(schemaId); MessageFormatter formatter = schemaRegistryFormatters.get(format); @@ -294,7 +290,7 @@ public DeserializeResult deserialize(RecordHeaders headers, byte[] data) { "type", format.name() ) ); - } + }; } private SchemaType getMessageFormatBySchemaId(int schemaId) { @@ -306,7 +302,7 @@ private SchemaType getMessageFormatBySchemaId(int schemaId) { private int extractSchemaIdFromMsg(byte[] data) { ByteBuffer buffer = ByteBuffer.wrap(data); - if (buffer.remaining() > SR_RECORD_PREFIX_LENGTH && buffer.get() == SR_RECORD_MAGIC_BYTE) { + if (buffer.remaining() >= SR_PAYLOAD_PREFIX_LENGTH && buffer.get() == SR_PAYLOAD_MAGIC_BYTE) { return buffer.getInt(); } throw new ValidationException( diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerializer.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerializer.java deleted file mode 100644 index a7a202c04f2..00000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerializer.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.provectus.kafka.ui.serdes.builtin.sr; - -import com.provectus.kafka.ui.serde.api.Serde; -import io.confluent.kafka.schemaregistry.ParsedSchema; -import io.confluent.kafka.schemaregistry.client.SchemaMetadata; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import lombok.SneakyThrows; -import org.apache.kafka.common.serialization.Serializer; - -abstract class SchemaRegistrySerializer implements Serde.Serializer { - protected final Serializer serializer; - protected final String topic; - protected final boolean isKey; - protected final ParsedSchema schema; - - @SneakyThrows - protected SchemaRegistrySerializer(String topic, boolean isKey, SchemaRegistryClient client, - SchemaMetadata schema) { - this.topic = topic; - this.isKey = isKey; - this.serializer = createSerializer(client); - this.schema = client.getSchemaById(schema.getId()); - } - - protected abstract Serializer createSerializer(SchemaRegistryClient client); - - @Override - public byte[] serialize(String input) { - final T read = this.serialize(input, schema); - return this.serializer.serialize(topic, read); - } - - protected abstract T serialize(String value, ParsedSchema schema); -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/Serialize.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/Serialize.java new file mode 100644 index 00000000000..8381382576b --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/Serialize.java @@ -0,0 +1,126 @@ +package com.provectus.kafka.ui.serdes.builtin.sr; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Message; +import com.google.protobuf.util.JsonFormat; +import com.provectus.kafka.ui.exception.ValidationException; +import com.provectus.kafka.ui.serde.api.Serde; +import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant; +import com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; +import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.json.JsonSchema; +import io.confluent.kafka.schemaregistry.json.jackson.Jackson; +import io.confluent.kafka.schemaregistry.protobuf.MessageIndexes; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; +import io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufSerializer; +import io.confluent.kafka.serializers.subject.DefaultReferenceSubjectNameStrategy; +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.HashMap; +import lombok.SneakyThrows; +import org.apache.avro.Schema; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; + +final class Serialize { + + private static final byte MAGIC = 0x0; + private static final ObjectMapper JSON_SERIALIZE_MAPPER = Jackson.newObjectMapper(); //from confluent package + + private Serialize() { + } + + @KafkaClientInternalsDependant("AbstractKafkaJsonSchemaSerializer::serializeImpl") + @SneakyThrows + static byte[] serializeJson(JsonSchema schema, int schemaId, String value) { + JsonNode json; + try { + json = JSON_SERIALIZE_MAPPER.readTree(value); + } catch (JsonProcessingException e) { + throw new ValidationException(String.format("'%s' is not valid json", value)); + } + try { + schema.validate(json); + } catch (org.everit.json.schema.ValidationException e) { + throw new ValidationException( + String.format("'%s' does not fit schema: %s", value, e.getAllMessages())); + } + try (var out = new ByteArrayOutputStream()) { + out.write(MAGIC); + out.write(schemaId(schemaId)); + out.write(JSON_SERIALIZE_MAPPER.writeValueAsBytes(json)); + return out.toByteArray(); + } + } + + @KafkaClientInternalsDependant("AbstractKafkaProtobufSerializer::serializeImpl") + @SneakyThrows + static byte[] serializeProto(SchemaRegistryClient srClient, + String topic, + Serde.Target target, + ProtobufSchema schema, + int schemaId, + String input) { + // flags are tuned like in ProtobufSerializer by default + boolean normalizeSchema = false; + boolean autoRegisterSchema = false; + boolean useLatestVersion = true; + boolean latestCompatStrict = true; + boolean skipKnownTypes = true; + + schema = AbstractKafkaProtobufSerializer.resolveDependencies( + srClient, normalizeSchema, autoRegisterSchema, useLatestVersion, latestCompatStrict, + new HashMap<>(), skipKnownTypes, new DefaultReferenceSubjectNameStrategy(), + topic, target == Serde.Target.KEY, schema + ); + + DynamicMessage.Builder builder = schema.newMessageBuilder(); + JsonFormat.parser().merge(input, builder); + Message message = builder.build(); + MessageIndexes indexes = schema.toMessageIndexes(message.getDescriptorForType().getFullName(), normalizeSchema); + try (var out = new ByteArrayOutputStream()) { + out.write(MAGIC); + out.write(schemaId(schemaId)); + out.write(indexes.toByteArray()); + message.writeTo(out); + return out.toByteArray(); + } + } + + @KafkaClientInternalsDependant("AbstractKafkaAvroSerializer::serializeImpl") + @SneakyThrows + static byte[] serializeAvro(AvroSchema schema, int schemaId, String input) { + var avroObject = JsonAvroConversion.convertJsonToAvro(input, schema.rawSchema()); + try (var out = new ByteArrayOutputStream()) { + out.write(MAGIC); + out.write(schemaId(schemaId)); + Schema rawSchema = schema.rawSchema(); + if (rawSchema.getType().equals(Schema.Type.BYTES)) { + Preconditions.checkState( + avroObject instanceof ByteBuffer, + "Unrecognized bytes object of type: " + avroObject.getClass().getName() + ); + out.write(((ByteBuffer) avroObject).array()); + } else { + boolean useLogicalTypeConverters = true; + BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null); + DatumWriter writer = + (DatumWriter) AvroSchemaUtils.getDatumWriter(avroObject, rawSchema, useLogicalTypeConverters); + writer.write(avroObject, encoder); + encoder.flush(); + } + return out.toByteArray(); + } + } + + private static byte[] schemaId(int id) { + return ByteBuffer.allocate(Integer.BYTES).putInt(id).array(); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/annotation/KafkaClientInternalsDependant.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/annotation/KafkaClientInternalsDependant.java index 5fe22118a2e..440c9887834 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/annotation/KafkaClientInternalsDependant.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/annotation/KafkaClientInternalsDependant.java @@ -5,4 +5,5 @@ * should be marked with this annotation to make further update process easier. */ public @interface KafkaClientInternalsDependant { + String value() default ""; }