diff --git a/src/main/scala/za/co/absa/abris/avro/registry/AbstractConfluentRegistryClient.scala b/src/main/scala/za/co/absa/abris/avro/registry/AbstractConfluentRegistryClient.scala new file mode 100644 index 00000000..cf2c1d57 --- /dev/null +++ b/src/main/scala/za/co/absa/abris/avro/registry/AbstractConfluentRegistryClient.scala @@ -0,0 +1,51 @@ +/* + * Copyright 2020 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.abris.avro.registry + +import io.confluent.kafka.schemaregistry.avro.AvroSchema +import io.confluent.kafka.schemaregistry.client.{SchemaMetadata, SchemaRegistryClient} +import org.apache.avro.Schema + +import java.util + + +abstract class AbstractConfluentRegistryClient(client: SchemaRegistryClient) extends AbrisRegistryClient { + + override def getAllVersions(subject: String): util.List[Integer] = + client.getAllVersions(subject) + + override def testCompatibility(subject: String, schema: Schema): Boolean = + client.testCompatibility(subject, new AvroSchema(schema)) + + override def register(subject: String, schema: Schema): Int = + client.register(subject, new AvroSchema(schema)) + + override def getLatestSchemaMetadata(subject: String): SchemaMetadata = + client.getLatestSchemaMetadata(subject) + + override def getSchemaMetadata(subject: String, version: Int): SchemaMetadata = + client.getSchemaMetadata(subject, version) + + override def getById(schemaId: Int): Schema = { + val parsedSchema = client.getSchemaById(schemaId) + parsedSchema match { + case schema: AvroSchema => schema.rawSchema() + case schema => throw new UnsupportedOperationException(s"Only AvroSchema is supported," + + s" got schema type ${schema.schemaType()}") + } + } +} diff --git a/src/main/scala/za/co/absa/abris/avro/registry/ConfluentMockRegistryClient.scala b/src/main/scala/za/co/absa/abris/avro/registry/ConfluentMockRegistryClient.scala index 5f261669..d9cacae1 100644 --- a/src/main/scala/za/co/absa/abris/avro/registry/ConfluentMockRegistryClient.scala +++ b/src/main/scala/za/co/absa/abris/avro/registry/ConfluentMockRegistryClient.scala @@ -18,25 +18,14 @@ package za.co.absa.abris.avro.registry import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException import io.confluent.kafka.schemaregistry.client.{MockSchemaRegistryClient, SchemaMetadata, SchemaRegistryClient} -import org.apache.avro.Schema import java.io.IOException -import java.util -class ConfluentMockRegistryClient(client: SchemaRegistryClient) extends AbrisRegistryClient { +class ConfluentMockRegistryClient(client: SchemaRegistryClient) extends AbstractConfluentRegistryClient(client) { def this() = this(new MockSchemaRegistryClient()) - override def getAllVersions(subject: String): util.List[Integer] = - client.getAllVersions(subject) - - override def testCompatibility(subject: String, schema: Schema): Boolean = - client.testCompatibility(subject, schema) - - override def register(subject: String, schema: Schema): Int = - client.register(subject, schema) - /** * MockSchemaRegistryClient is throwing different Exception than the mocked client, this is a workaround */ @@ -49,10 +38,4 @@ class ConfluentMockRegistryClient(client: SchemaRegistryClient) extends AbrisReg throw new RestClientException("No schema registered under subject!", 404, 40401) } } - - override def getSchemaMetadata(subject: String, version: Int): SchemaMetadata = - client.getSchemaMetadata(subject, version) - - override def getById(schemaId: Int): Schema = - client.getById(schemaId) } diff --git a/src/main/scala/za/co/absa/abris/avro/registry/ConfluentRegistryClient.scala b/src/main/scala/za/co/absa/abris/avro/registry/ConfluentRegistryClient.scala index 6a49270f..bbf33145 100644 --- a/src/main/scala/za/co/absa/abris/avro/registry/ConfluentRegistryClient.scala +++ b/src/main/scala/za/co/absa/abris/avro/registry/ConfluentRegistryClient.scala @@ -15,34 +15,14 @@ */ package za.co.absa.abris.avro.registry -import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaMetadata, SchemaRegistryClient} +import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient} import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig -import org.apache.avro.Schema -import java.util import scala.collection.JavaConverters._ -class ConfluentRegistryClient(client: SchemaRegistryClient) extends AbrisRegistryClient { +class ConfluentRegistryClient(client: SchemaRegistryClient) extends AbstractConfluentRegistryClient(client) { def this(configs: Map[String,String]) = this(ConfluentRegistryClient.createClient(configs)) - - override def getAllVersions(subject: String): util.List[Integer] = - client.getAllVersions(subject) - - override def testCompatibility(subject: String, schema: Schema): Boolean = - client.testCompatibility(subject, schema) - - override def register(subject: String, schema: Schema): Int = - client.register(subject, schema) - - override def getLatestSchemaMetadata(subject: String): SchemaMetadata = - client.getLatestSchemaMetadata(subject) - - override def getSchemaMetadata(subject: String, version: Int): SchemaMetadata = - client.getSchemaMetadata(subject, version) - - override def getById(schemaId: Int): Schema = - client.getById(schemaId) } object ConfluentRegistryClient {