diff --git a/build.sbt b/build.sbt index 46e2d6486..15020eaee 100644 --- a/build.sbt +++ b/build.sbt @@ -33,7 +33,7 @@ lazy val `fs2-kafka` = project Test / console := (core / Test / console).value ) .enablePlugins(TypelevelMimaPlugin) - .aggregate(core, vulcan, `vulcan-testkit-munit`) + .aggregate(core, `schema-registry`, vulcan, `vulcan-testkit-munit`) lazy val core = project .in(file("modules/core")) @@ -52,6 +52,22 @@ lazy val core = project testSettings ) +lazy val `schema-registry` = project + .in(file("modules/schema-registry")) + .settings( + moduleName := "fs2-kafka-schema-registry", + name := moduleName.value, + dependencySettings ++ Seq( + libraryDependencies ++= Seq( + "io.confluent" % "kafka-schema-registry-client" % confluentVersion + ) + ), + publishSettings, + scalaSettings, + testSettings + ) + .dependsOn(core) + lazy val vulcan = project .in(file("modules/vulcan")) .settings( @@ -67,7 +83,7 @@ lazy val vulcan = project scalaSettings, testSettings ) - .dependsOn(core) + .dependsOn(core, `schema-registry`) lazy val `vulcan-testkit-munit` = project .in(file("modules/vulcan-testkit-munit")) @@ -177,6 +193,9 @@ lazy val buildInfoSettings = Seq( BuildInfoKey.map(core / crossScalaVersions) { case (k, v) => "core" ++ k.capitalize -> v }, + BuildInfoKey.map(`schema-registry` / moduleName) { + case (k, v) => "schemaRegistry" ++ k.capitalize -> v + }, BuildInfoKey.map(vulcan / moduleName) { case (k, v) => "vulcan" ++ k.capitalize -> v }, diff --git a/modules/core/src/main/scala/fs2/kafka/internal/syntax.scala b/modules/core/src/main/scala/fs2/kafka/internal/syntax.scala index e63a96011..987ab10de 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/syntax.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/syntax.scala @@ -122,6 +122,8 @@ private[kafka] object syntax { def updatedIfAbsent(k: K, v: => V): Map[K, V] = if (map.contains(k)) map else map.updated(k, v) + def asJavaMap: util.Map[K, V] = + map.asJava } implicit final class MapWrappedValueSyntax[F[_], K, V]( diff --git a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/Auth.scala b/modules/schema-registry/src/main/scala/fs2/kafka/schemaregistry/client/Auth.scala similarity index 77% rename from modules/vulcan/src/main/scala/fs2/kafka/vulcan/Auth.scala rename to modules/schema-registry/src/main/scala/fs2/kafka/schemaregistry/client/Auth.scala index 618c89df0..85f4bb5db 100644 --- a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/Auth.scala +++ b/modules/schema-registry/src/main/scala/fs2/kafka/schemaregistry/client/Auth.scala @@ -4,7 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package fs2.kafka.vulcan +package fs2.kafka.schemaregistry.client /** * The available options for [[SchemaRegistryClientSettings#withAuth]]. @@ -17,15 +17,16 @@ package fs2.kafka.vulcan sealed abstract class Auth object Auth { - private[vulcan] final case class BasicAuth(username: String, password: String) extends Auth { + private[schemaregistry] final case class BasicAuth(username: String, password: String) + extends Auth { override def toString: String = s"Basic($username)" } - private[vulcan] final case class BearerAuth(token: String) extends Auth { + private[schemaregistry] final case class BearerAuth(token: String) extends Auth { override def toString: String = "Bearer" } - private[vulcan] case object NoAuth extends Auth { + private[schemaregistry] case object NoAuth extends Auth { override def toString: String = "None" } diff --git a/modules/schema-registry/src/main/scala/fs2/kafka/schemaregistry/client/SchemaRegistryClient.scala b/modules/schema-registry/src/main/scala/fs2/kafka/schemaregistry/client/SchemaRegistryClient.scala new file mode 100644 index 000000000..e22e69d3a --- /dev/null +++ b/modules/schema-registry/src/main/scala/fs2/kafka/schemaregistry/client/SchemaRegistryClient.scala @@ -0,0 +1,83 @@ +package fs2.kafka.schemaregistry.client + +import cats.effect.kernel.Sync +import fs2.kafka.internal.syntax.MapSyntax + +trait SchemaRegistryClient[F[_]] { + + /** + * Register a schema for a given `Codec` for some type `A`, + * or return the existing schema id if it already exists. + * @param subject The subject name + * @return The schema id + */ + def register(subject: String, schema: ParsedSchema): F[Int] + + /** + * Get latest schema for the specified subject + * @param subject The subject name + * @return Latest available schema for the subject + */ + def getLatestSchemaMetadata(subject: String): F[SchemaMetadata] + + /** + * Get the schema for the specified subject with the specified version + * @param subject The subject name + * @param version Schema version + * @return + */ + def getSchemaMetadata(subject: String, version: Int): F[SchemaMetadata] + + /** + * Get the schema for the specified `id` and cast it to specified type + * @param id Schema id + * @tparam S Schema type + * @return Schema for the specified `id` + */ + def getSchemaById[S <: ParsedSchema](id: Int): F[S] + + /** + * Get the wrapped java instance + * @return The wrapped Java instance of `SchemaRegistryClient` + */ + def javaClient: JSchemaRegistryClient +} +object SchemaRegistryClient { + + import cats.syntax.all._ + + private[this] final case class SchemaRegistryClientSettingsImpl[F[_]]( + override val javaClient: JSchemaRegistryClient + )(implicit F: Sync[F]) + extends SchemaRegistryClient[F] { + + override def register(subject: String, schema: ParsedSchema): F[Int] = + F.delay(javaClient.register(subject, schema)) + + override def getLatestSchemaMetadata(subject: String): F[SchemaMetadata] = + F.delay(javaClient.getLatestSchemaMetadata(subject)) + + override def getSchemaMetadata(subject: String, version: Int): F[SchemaMetadata] = + F.delay(javaClient.getSchemaMetadata(subject, version)) + + override def getSchemaById[S <: ParsedSchema](id: Int): F[S] = + F.delay(javaClient.getSchemaById(id).asInstanceOf[S]) + } + + def apply[F[_]: Sync](baseUrl: String): F[SchemaRegistryClient[F]] = + SchemaRegistryClient(SchemaRegistryClientSettings(baseUrl)) + + def apply[F[_]: Sync](settings: SchemaRegistryClientSettings): F[SchemaRegistryClient[F]] = + Sync[F] + .delay( + new JCachedSchemaRegistryClient( + settings.baseUrl, + settings.maxCacheSize, + settings.properties.asJavaMap + ) + ) + .map(fromJava(_)) + + def fromJava[F[_]: Sync](jclient: JSchemaRegistryClient): SchemaRegistryClient[F] = + new SchemaRegistryClientSettingsImpl[F](jclient) +} diff --git a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/SchemaRegistryClientSettings.scala b/modules/schema-registry/src/main/scala/fs2/kafka/schemaregistry/client/SchemaRegistryClientSettings.scala similarity index 60% rename from modules/vulcan/src/main/scala/fs2/kafka/vulcan/SchemaRegistryClientSettings.scala rename to modules/schema-registry/src/main/scala/fs2/kafka/schemaregistry/client/SchemaRegistryClientSettings.scala index 8e336a447..79cac8bc6 100644 --- a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/SchemaRegistryClientSettings.scala +++ b/modules/schema-registry/src/main/scala/fs2/kafka/schemaregistry/client/SchemaRegistryClientSettings.scala @@ -4,11 +4,9 @@ * SPDX-License-Identifier: Apache-2.0 */ -package fs2.kafka.vulcan +package fs2.kafka.schemaregistry.client -import cats.effect.Sync -import cats.Show -import fs2.kafka.internal.converters.collection._ +import cats.{Eq, Show} /** * Describes how to create a `SchemaRegistryClient` and which @@ -17,7 +15,7 @@ import fs2.kafka.internal.converters.collection._ * * Use `SchemaRegistryClient#apply` to create an instance. */ -sealed abstract class SchemaRegistryClientSettings[F[_]] { +sealed trait SchemaRegistryClientSettings { /** * The base URL of the schema registry service. @@ -35,13 +33,13 @@ sealed abstract class SchemaRegistryClientSettings[F[_]] { * Creates a new [[SchemaRegistryClientSettings]] instance * with the specified [[maxCacheSize]]. */ - def withMaxCacheSize(maxCacheSize: Int): SchemaRegistryClientSettings[F] + def withMaxCacheSize(maxCacheSize: Int): SchemaRegistryClientSettings /** * Creates a new [[SchemaRegistryClientSettings]] instance * with the specified authentication details. */ - def withAuth(auth: Auth): SchemaRegistryClientSettings[F] + def withAuth(auth: Auth): SchemaRegistryClientSettings /** * Properties provided when creating a `SchemaRegistryClient`. @@ -54,50 +52,33 @@ sealed abstract class SchemaRegistryClientSettings[F[_]] { * Creates a new [[SchemaRegistryClientSettings]] instance * including a property with the specified key and value. */ - def withProperty(key: String, value: String): SchemaRegistryClientSettings[F] + def withProperty(key: String, value: String): SchemaRegistryClientSettings /** * Creates a new [[SchemaRegistryClientSettings]] instance * including properties with the specified keys and values. */ - def withProperties(properties: (String, String)*): SchemaRegistryClientSettings[F] + def withProperties(properties: (String, String)*): SchemaRegistryClientSettings /** * Creates a new [[SchemaRegistryClientSettings]] instance * including properties with the specified keys and values. */ - def withProperties(properties: Map[String, String]): SchemaRegistryClientSettings[F] - - /** - * Creates a new `SchemaRegistryClient` using the settings - * contained within this [[SchemaRegistryClientSettings]]. - */ - def createSchemaRegistryClient: F[SchemaRegistryClient] - - /** - * Creates a new [[SchemaRegistryClientSettings]] instance - * with the specified function for creating new instances - * of `SchemaRegistryClient` from settings. The arguments - * are [[baseUrl]], [[maxCacheSize]], and [[properties]]. - */ - def withCreateSchemaRegistryClient( - createSchemaRegistryClientWith: (String, Int, Map[String, String]) => F[SchemaRegistryClient] - ): SchemaRegistryClientSettings[F] + def withProperties(properties: Map[String, String]): SchemaRegistryClientSettings } object SchemaRegistryClientSettings { - private[this] final case class SchemaRegistryClientSettingsImpl[F[_]]( + + private[this] final case class SchemaRegistryClientSettingsImpl( override val baseUrl: String, override val maxCacheSize: Int, - override val properties: Map[String, String], - // format: off - val createSchemaRegistryClientWith: (String, Int, Map[String, String]) => F[SchemaRegistryClient] - // format: on - ) extends SchemaRegistryClientSettings[F] { - override def withMaxCacheSize(maxCacheSize: Int): SchemaRegistryClientSettings[F] = + override val properties: Map[String, String] + ) extends SchemaRegistryClientSettings { + + override def withMaxCacheSize(maxCacheSize: Int): SchemaRegistryClientSettings = copy(maxCacheSize = maxCacheSize) - override def withAuth(auth: Auth): SchemaRegistryClientSettings[F] = + override def withAuth(auth: Auth): SchemaRegistryClientSettings = auth match { case Auth.BasicAuth(username, password) => withProperties( @@ -115,23 +96,15 @@ object SchemaRegistryClientSettings { this } - override def withProperty(key: String, value: String): SchemaRegistryClientSettings[F] = + override def withProperty(key: String, value: String): SchemaRegistryClientSettings = copy(properties = properties.updated(key, value)) - override def withProperties(properties: (String, String)*): SchemaRegistryClientSettings[F] = + override def withProperties(properties: (String, String)*): SchemaRegistryClientSettings = copy(properties = this.properties ++ properties.toMap) - override def withProperties(properties: Map[String, String]): SchemaRegistryClientSettings[F] = + override def withProperties(properties: Map[String, String]): SchemaRegistryClientSettings = copy(properties = this.properties ++ properties) - override def createSchemaRegistryClient: F[SchemaRegistryClient] = - createSchemaRegistryClientWith(baseUrl, maxCacheSize, properties) - - override def withCreateSchemaRegistryClient( - createSchemaRegistryClientWith: (String, Int, Map[String, String]) => F[SchemaRegistryClient] - ): SchemaRegistryClientSettings[F] = - copy(createSchemaRegistryClientWith = createSchemaRegistryClientWith) - override def toString: String = s"SchemaRegistryClientSettings(baseUrl = $baseUrl, maxCacheSize = $maxCacheSize)" } @@ -140,15 +113,17 @@ object SchemaRegistryClientSettings { * Creates a new [[SchemaRegistryClientSettings]] instance * using the specified base URL of the schema registry. */ - def apply[F[_]](baseUrl: String)(implicit F: Sync[F]): SchemaRegistryClientSettings[F] = + def apply(baseUrl: String): SchemaRegistryClientSettings = SchemaRegistryClientSettingsImpl( baseUrl = baseUrl, maxCacheSize = 1000, - properties = Map.empty, - createSchemaRegistryClientWith = (baseUrl, maxCacheSize, properties) => - F.delay(new CachedSchemaRegistryClient(baseUrl, maxCacheSize, properties.asJava)) + properties = Map.empty ) - implicit def schemaRegistryClientSettingsShow[F[_]]: Show[SchemaRegistryClientSettings[F]] = + implicit val schemaRegistryClientSettingsShow: Show[SchemaRegistryClientSettings] = Show.fromToString + + implicit val schemaRegistryClientSettingsEq: Eq[SchemaRegistryClientSettings] = + Eq.fromUniversalEquals[SchemaRegistryClientSettingsImpl] + .asInstanceOf[Eq[SchemaRegistryClientSettings]] } diff --git a/modules/schema-registry/src/main/scala/fs2/kafka/schemaregistry/client/package.scala b/modules/schema-registry/src/main/scala/fs2/kafka/schemaregistry/client/package.scala new file mode 100644 index 000000000..adfc25aea --- /dev/null +++ b/modules/schema-registry/src/main/scala/fs2/kafka/schemaregistry/client/package.scala @@ -0,0 +1,20 @@ +package fs2.kafka.schemaregistry + +package object client { + + /** Alias for `io.confluent.kafka.schemaregistry.client.SchemaMetadata`. */ + type SchemaMetadata = + io.confluent.kafka.schemaregistry.client.SchemaMetadata + + /** Alias for `io.confluent.kafka.schemaregistry.ParsedSchema`. */ + type ParsedSchema = + io.confluent.kafka.schemaregistry.ParsedSchema + + /** Alias for `io.confluent.kafka.schemaregistry.client.SchemaRegistryClient`. */ + type JSchemaRegistryClient = + io.confluent.kafka.schemaregistry.client.SchemaRegistryClient + + /** Alias for `io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient`. */ + type JCachedSchemaRegistryClient = + io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient +} diff --git a/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AuthSpec.scala b/modules/schema-registry/src/test/scala/fs2/kafka/schemaregistry/client/AuthSpec.scala similarity index 96% rename from modules/vulcan/src/test/scala/fs2/kafka/vulcan/AuthSpec.scala rename to modules/schema-registry/src/test/scala/fs2/kafka/schemaregistry/client/AuthSpec.scala index 6787ea050..dce44b4d3 100644 --- a/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AuthSpec.scala +++ b/modules/schema-registry/src/test/scala/fs2/kafka/schemaregistry/client/AuthSpec.scala @@ -4,7 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package fs2.kafka.vulcan +package fs2.kafka.schemaregistry.client import org.scalatest.funspec.AnyFunSpec import org.scalatestplus.scalacheck._ diff --git a/modules/vulcan/src/test/scala/fs2/kafka/vulcan/SchemaRegistryClientSettingsSpec.scala b/modules/schema-registry/src/test/scala/fs2/kafka/schemaregistry/client/SchemaRegistryClientSettingsSpec.scala similarity index 80% rename from modules/vulcan/src/test/scala/fs2/kafka/vulcan/SchemaRegistryClientSettingsSpec.scala rename to modules/schema-registry/src/test/scala/fs2/kafka/schemaregistry/client/SchemaRegistryClientSettingsSpec.scala index eb9cb8e77..ac579b6bf 100644 --- a/modules/vulcan/src/test/scala/fs2/kafka/vulcan/SchemaRegistryClientSettingsSpec.scala +++ b/modules/schema-registry/src/test/scala/fs2/kafka/schemaregistry/client/SchemaRegistryClientSettingsSpec.scala @@ -4,15 +4,17 @@ * SPDX-License-Identifier: Apache-2.0 */ -package fs2.kafka.vulcan +package fs2.kafka.schemaregistry.client -import cats.effect.IO -import cats.effect.unsafe.implicits.global -import cats.syntax.all._ import org.scalatest.funspec.AnyFunSpec import org.scalatestplus.scalacheck._ +import cats.syntax.all._ final class SchemaRegistryClientSettingsSpec extends AnyFunSpec with ScalaCheckPropertyChecks { + + val settings: SchemaRegistryClientSettings = + SchemaRegistryClientSettings("baseUrl") + describe("SchemaRegistryClientSettings") { it("should provide withMaxCacheSize") { assert { @@ -65,15 +67,11 @@ final class SchemaRegistryClientSettingsSpec extends AnyFunSpec with ScalaCheckP settings.withAuth(Auth.None) assert { - settingsWithAuth.properties - .get("basic.auth.credentials.source") - .isEmpty + !settingsWithAuth.properties.contains("basic.auth.credentials.source") } assert { - settingsWithAuth.properties - .get("schema.registry.basic.auth.user.info") - .isEmpty + !settingsWithAuth.properties.contains("schema.registry.basic.auth.user.info") } } @@ -107,19 +105,6 @@ final class SchemaRegistryClientSettingsSpec extends AnyFunSpec with ScalaCheckP } } - it("should provide withCreateSchemaRegistryClient") { - assert { - settings - .withCreateSchemaRegistryClient { - case _ => IO.raiseError(new RuntimeException) - } - .createSchemaRegistryClient - .attempt - .unsafeRunSync() - .isLeft - } - } - it("should provide toString") { assert { settings.toString == "SchemaRegistryClientSettings(baseUrl = baseUrl, maxCacheSize = 1000)" @@ -131,8 +116,23 @@ final class SchemaRegistryClientSettingsSpec extends AnyFunSpec with ScalaCheckP settings.show == "SchemaRegistryClientSettings(baseUrl = baseUrl, maxCacheSize = 1000)" } } - } - val settings: SchemaRegistryClientSettings[IO] = - SchemaRegistryClientSettings("baseUrl") + it("should provide Eq") { + + val s1 = SchemaRegistryClientSettings("baseUrl") + .withAuth(Auth.Basic("user", "password")) + .withMaxCacheSize(100) + .withProperties("TEST" -> "FOO") + + val s2 = SchemaRegistryClientSettings("baseUrl") + .withAuth(Auth.Basic("user", "password")) + .withMaxCacheSize(100) + .withProperties("TEST" -> "FOO") + + assert { + s1.eqv(s2) + } + } + + } } diff --git a/modules/schema-registry/src/test/scala/fs2/kafka/schemaregistry/client/SchemaRegistryClientSpec.scala b/modules/schema-registry/src/test/scala/fs2/kafka/schemaregistry/client/SchemaRegistryClientSpec.scala new file mode 100644 index 000000000..5bfaa9489 --- /dev/null +++ b/modules/schema-registry/src/test/scala/fs2/kafka/schemaregistry/client/SchemaRegistryClientSpec.scala @@ -0,0 +1,71 @@ +package fs2.kafka.schemaregistry.client + +import cats.effect.IO +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient +import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference +import org.scalatest.funspec.AnyFunSpec +import org.scalatestplus.scalacheck._ + +import java.util + +final class SchemaRegistryClientSpec extends AnyFunSpec with ScalaCheckPropertyChecks { + + import cats.effect.unsafe.implicits._ + + describe("SchemaRegistryClient") { + + it("should provide register") { + assert { + SchemaRegistryClient + .fromJava[IO](new MockSchemaRegistryClient()) + .register("subject", StubbedParsedSchema("schema1")) + .unsafeRunSync() == 1 + } + } + + it("should provide getLatestSchemaMetadata") { + assert { + SchemaRegistryClient + .fromJava[IO](new MockSchemaRegistryClient()) + .getLatestSchemaMetadata("subject") + .attempt + .unsafeRunSync() + .isLeft + } + } + + it("should provide getSchemaMetadata") { + assert { + SchemaRegistryClient + .fromJava[IO](new MockSchemaRegistryClient()) + .getSchemaMetadata("subject", 1) + .attempt + .unsafeRunSync() + .isLeft + } + } + + it("should provide getSchemaById") { + val client = SchemaRegistryClient.fromJava[IO](new MockSchemaRegistryClient()) + val schema = StubbedParsedSchema("schema1") + val test = for { + id <- client.register("subject", schema) + schema <- client.getSchemaById[StubbedParsedSchema](id) + } yield schema + + assert { + test.unsafeRunSync() == schema + } + } + } + + case class StubbedParsedSchema(override val name: String) extends ParsedSchema { + override def schemaType(): String = "stub" + override def canonicalString(): String = name + override def references(): util.List[SchemaReference] = + new util.ArrayList[SchemaReference]() + override def isBackwardCompatible(previousSchema: ParsedSchema): util.List[String] = + new util.ArrayList[String]() + override def rawSchema(): AnyRef = null + } +} diff --git a/modules/vulcan-testkit-munit/src/main/scala/fs2/kafka/vulcan/testkit/SchemaSuite.scala b/modules/vulcan-testkit-munit/src/main/scala/fs2/kafka/vulcan/testkit/SchemaSuite.scala index 5fe78ad2c..276045ecd 100644 --- a/modules/vulcan-testkit-munit/src/main/scala/fs2/kafka/vulcan/testkit/SchemaSuite.scala +++ b/modules/vulcan-testkit-munit/src/main/scala/fs2/kafka/vulcan/testkit/SchemaSuite.scala @@ -6,13 +6,13 @@ package fs2.kafka.vulcan.testkit -import fs2.kafka.vulcan.SchemaRegistryClientSettings import munit.FunSuite import vulcan.Codec import org.apache.avro.SchemaCompatibility import io.confluent.kafka.schemaregistry.avro.AvroSchema import cats.effect.IO import cats.effect.unsafe.implicits.global +import fs2.kafka.schemaregistry.client.{SchemaRegistryClient, SchemaRegistryClientSettings} import org.apache.avro.Schema trait CompatibilityChecker[F[_]] { @@ -32,24 +32,23 @@ trait SchemaSuite extends FunSuite { private def codecAsSchema[A](codec: Codec[A]) = codec.schema.fold(e => fail(e.message), ok => ok) def compatibilityChecker( - clientSettings: SchemaRegistryClientSettings[IO], + clientSettings: SchemaRegistryClientSettings, name: String = "schema-compatibility-checker" - ) = new Fixture[CompatibilityChecker[IO]](name) { + ): Fixture[CompatibilityChecker[IO]] = new Fixture[CompatibilityChecker[IO]](name) { + private var checker: CompatibilityChecker[IO] = null override def apply(): CompatibilityChecker[IO] = checker override def beforeAll(): Unit = - checker = clientSettings.createSchemaRegistryClient + checker = SchemaRegistryClient[IO](clientSettings) .map { client => new CompatibilityChecker[IO] { private def registrySchema(subject: String): IO[Schema] = for { - metadata <- IO.delay(client.getLatestSchemaMetadata(subject)) - schema <- IO.delay( - client.getSchemaById(metadata.getId).asInstanceOf[AvroSchema] - ) + metadata <- client.getLatestSchemaMetadata(subject) + schema <- client.getSchemaById[AvroSchema](metadata.getId) } yield schema.rawSchema() def checkReaderCompatibility[A]( diff --git a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala index 488d080de..50e967da6 100644 --- a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala +++ b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala @@ -16,6 +16,7 @@ import java.nio.ByteBuffer final class AvroDeserializer[A] private[vulcan] ( private val codec: Codec[A] ) extends AnyVal { + def using[F[_]]( settings: AvroSettings[F] )(implicit F: Sync[F]): RecordDeserializer[F, A] = @@ -26,31 +27,24 @@ final class AvroDeserializer[A] private[vulcan] ( case (deserializer, schemaRegistryClient) => Deserializer.instance { (topic, _, bytes) => F.defer { - if (bytes == null || bytes.length == 0) { - F.raiseError( + for { + _ <- F.raiseError( new IllegalArgumentException( s"Invalid Avro record: bytes is null or empty" ) - ) - - } else { - val writerSchemaId = - ByteBuffer.wrap(bytes).getInt(1) // skip magic byte - - val writerSchema = { - val schema = schemaRegistryClient.getSchemaById(writerSchemaId) - if (schema.isInstanceOf[AvroSchema]) - schema.asInstanceOf[AvroSchema].rawSchema() - else - null - } - - codec + ).whenA(bytes == null || bytes.length == 0) + // skip magic byte + writerSchemaId = ByteBuffer.wrap(bytes).getInt(1) + writerSchema <- schemaRegistryClient + .getSchemaById[AvroSchema](writerSchemaId) + .map(_.rawSchema()) + .recover(_ => null) + result <- codec .decode(deserializer.deserialize(topic, bytes, schema), writerSchema) match { case Right(a) => F.pure(a) case Left(error) => F.raiseError(error.throwable) } - } + } yield result } } } diff --git a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala index 4fca097ae..5409304ad 100644 --- a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala +++ b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala @@ -10,10 +10,13 @@ import cats.effect.Sync import cats.syntax.all._ import fs2.kafka.internal.converters.collection._ import fs2.kafka.internal.syntax._ +import fs2.kafka.schemaregistry.client.{SchemaRegistryClient, SchemaRegistryClientSettings} import io.confluent.kafka.schemaregistry.avro.AvroSchema import org.apache.avro.Schema import vulcan.Codec +import java.util + /** * Describes how to create a `KafkaAvroDeserializer` and a * `KafkaAvroSerializer` and which settings should be used. @@ -28,7 +31,7 @@ sealed abstract class AvroSettings[F[_]] { * The `SchemaRegistryClient` to use for the serializers * and deserializers created from this [[AvroSettings]]. */ - def schemaRegistryClient: F[SchemaRegistryClient] + def schemaRegistryClient: F[SchemaRegistryClient[F]] /** * Creates a new `AvroSettings` instance with the specified @@ -88,7 +91,7 @@ sealed abstract class AvroSettings[F[_]] { * specified `isKey` flag, denoting whether a record key or * value is being deserialized. */ - def createAvroDeserializer(isKey: Boolean): F[(KafkaAvroDeserializer, SchemaRegistryClient)] + def createAvroDeserializer(isKey: Boolean): F[(KafkaAvroDeserializer, SchemaRegistryClient[F])] /** * Register a schema for a given `Codec` for some type `A`, @@ -107,12 +110,12 @@ sealed abstract class AvroSettings[F[_]] { def createAvroSerializer( isKey: Boolean, writerSchema: Option[Schema] - ): F[(KafkaAvroSerializer, SchemaRegistryClient)] + ): F[(KafkaAvroSerializer, SchemaRegistryClient[F])] @deprecated("use the overload that takes an optional writer schema", "2.5.0-M3") final def createAvroSerializer( isKey: Boolean - ): F[(KafkaAvroSerializer, SchemaRegistryClient)] = + ): F[(KafkaAvroSerializer, SchemaRegistryClient[F])] = createAvroSerializer(isKey, writerSchema = None) /** @@ -123,7 +126,7 @@ sealed abstract class AvroSettings[F[_]] { */ def withCreateAvroDeserializer( // format: off - createAvroDeserializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroDeserializer, SchemaRegistryClient)] + createAvroDeserializerWith: (F[SchemaRegistryClient[F]], Boolean, Map[String, String]) => F[(KafkaAvroDeserializer, SchemaRegistryClient[F])] // format: on ): AvroSettings[F] @@ -135,14 +138,14 @@ sealed abstract class AvroSettings[F[_]] { */ def withCreateAvroSerializer( // format: off - createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Option[Schema], Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)] + createAvroSerializerWith: (F[SchemaRegistryClient[F]], Boolean, Option[Schema], Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient[F])] // format: on ): AvroSettings[F] @deprecated("use the overload that has an `Option[Schema]` argument", "2.5.0-M3") final def withCreateAvroSerializer( // format: off - createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)] + createAvroSerializerWith: (F[SchemaRegistryClient[F]], Boolean, Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient[F])] // format: on ): AvroSettings[F] = withCreateAvroSerializer( @@ -155,18 +158,34 @@ sealed abstract class AvroSettings[F[_]] { * The arguments are [[schemaRegistryClient]], `subject`, and `codec`. */ def withRegisterSchema( - registerSchemaWith: (F[SchemaRegistryClient], String, Codec[_]) => F[Int] + registerSchemaWith: (F[SchemaRegistryClient[F]], String, Codec[_]) => F[Int] ): AvroSettings[F] } object AvroSettings { + + def apply[F[_]: Sync]( + schemaRegistryClientSettings: SchemaRegistryClientSettings + ): AvroSettings[F] = + AvroSettings(SchemaRegistryClient[F](schemaRegistryClientSettings)) + + def apply[F[_]]( + schemaRegistryClient: SchemaRegistryClient[F] + )(implicit F: Sync[F]): AvroSettings[F] = + AvroSettings(F.pure(schemaRegistryClient)) + + def apply[F[_]: Sync]( + schemaRegistryClient: F[SchemaRegistryClient[F]] + ): AvroSettings[F] = + create(schemaRegistryClient) + private[this] final case class AvroSettingsImpl[F[_]]( - override val schemaRegistryClient: F[SchemaRegistryClient], + override val schemaRegistryClient: F[SchemaRegistryClient[F]], override val properties: Map[String, String], // format: off - val createAvroDeserializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroDeserializer, SchemaRegistryClient)], - val createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Option[Schema], Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)], - val registerSchemaWith: (F[SchemaRegistryClient], String, Codec[_]) => F[Int] + val createAvroDeserializerWith: (F[SchemaRegistryClient[F]], Boolean, Map[String, String]) => F[(KafkaAvroDeserializer, SchemaRegistryClient[F])], + val createAvroSerializerWith: (F[SchemaRegistryClient[F]], Boolean, Option[Schema], Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient[F])], + val registerSchemaWith: (F[SchemaRegistryClient[F]], String, Codec[_]) => F[Int] // format: on ) extends AvroSettings[F] { override def withAutoRegisterSchemas(autoRegisterSchemas: Boolean): AvroSettings[F] = @@ -193,13 +212,13 @@ object AvroSettings { override def createAvroDeserializer( isKey: Boolean - ): F[(KafkaAvroDeserializer, SchemaRegistryClient)] = + ): F[(KafkaAvroDeserializer, SchemaRegistryClient[F])] = createAvroDeserializerWith(schemaRegistryClient, isKey, properties) override def createAvroSerializer( isKey: Boolean, writerSchema: Option[Schema] - ): F[(KafkaAvroSerializer, SchemaRegistryClient)] = + ): F[(KafkaAvroSerializer, SchemaRegistryClient[F])] = createAvroSerializerWith(schemaRegistryClient, isKey, writerSchema, properties) override def registerSchema[A](subject: String)(implicit codec: Codec[A]): F[Int] = @@ -207,20 +226,20 @@ object AvroSettings { override def withCreateAvroDeserializer( // format: off - createAvroDeserializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroDeserializer, SchemaRegistryClient)] + createAvroDeserializerWith: (F[SchemaRegistryClient[F]], Boolean, Map[String, String]) => F[(KafkaAvroDeserializer, SchemaRegistryClient[F])] // format: on ): AvroSettings[F] = copy(createAvroDeserializerWith = createAvroDeserializerWith) override def withCreateAvroSerializer( // format: off - createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Option[Schema], Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)] + createAvroSerializerWith: (F[SchemaRegistryClient[F]], Boolean, Option[Schema], Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient[F])] // format: on ): AvroSettings[F] = copy(createAvroSerializerWith = createAvroSerializerWith) override def withRegisterSchema( - registerSchemaWith: (F[SchemaRegistryClient], String, Codec[_]) => F[Int] + registerSchemaWith: (F[SchemaRegistryClient[F]], String, Codec[_]) => F[Int] ): AvroSettings[F] = copy(registerSchemaWith = registerSchemaWith) @@ -228,11 +247,11 @@ object AvroSettings { "AvroSettings$" + System.identityHashCode(this) } - private[this] def withDefaults(properties: Map[String, String]) = + private[this] def withDefaults(properties: Map[String, String]): util.Map[String, String] = properties.updatedIfAbsent("schema.registry.url", "").asJava private[this] def create[F[_]]( - schemaRegistryClient: F[SchemaRegistryClient] + schemaRegistryClient: F[SchemaRegistryClient[F]] )(implicit F: Sync[F]): AvroSettings[F] = AvroSettingsImpl( schemaRegistryClient = schemaRegistryClient, @@ -240,7 +259,7 @@ object AvroSettings { createAvroDeserializerWith = (schemaRegistryClient, isKey, properties) => schemaRegistryClient.flatMap { schemaRegistryClient => F.delay { - val deserializer = new KafkaAvroDeserializer(schemaRegistryClient) + val deserializer = new KafkaAvroDeserializer(schemaRegistryClient.javaClient) deserializer.configure(withDefaults(properties), isKey) (deserializer, schemaRegistryClient) } @@ -249,9 +268,9 @@ object AvroSettings { schemaRegistryClient.flatMap { schemaRegistryClient => F.delay { val serializer = schema match { - case None => new KafkaAvroSerializer(schemaRegistryClient) + case None => new KafkaAvroSerializer(schemaRegistryClient.javaClient) case Some(schema) => - new KafkaAvroSerializer(schemaRegistryClient) { + new KafkaAvroSerializer(schemaRegistryClient.javaClient) { // Overrides the default auto-registration behaviour, which attempts to guess the // writer schema based on the encoded representation used by the Java Avro SDK. // This works for types such as Records, which contain a reference to the exact schema @@ -279,19 +298,9 @@ object AvroSettings { registerSchemaWith = (schemaRegistryClient, subjectName, codec) => { schemaRegistryClient.flatMap { client => codec.schema.leftMap(_.throwable).liftTo[F].flatMap { schema => - F.delay(client.register(subjectName, new AvroSchema(schema))) + client.register(subjectName, new AvroSchema(schema)) } } } ) - - def apply[F[_]]( - schemaRegistryClientSettings: SchemaRegistryClientSettings[F] - )(implicit F: Sync[F]): AvroSettings[F] = - create(schemaRegistryClientSettings.createSchemaRegistryClient) - - def apply[F[_]]( - schemaRegistryClient: SchemaRegistryClient - )(implicit F: Sync[F]): AvroSettings[F] = - create(F.pure(schemaRegistryClient)) } diff --git a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/package.scala b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/package.scala index 4f546cfc4..89f70d88f 100644 --- a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/package.scala +++ b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/package.scala @@ -10,14 +10,6 @@ import _root_.vulcan.Codec package object vulcan { - /** Alias for `io.confluent.kafka.schemaregistry.client.SchemaRegistryClient`. */ - type SchemaRegistryClient = - io.confluent.kafka.schemaregistry.client.SchemaRegistryClient - - /** Alias for `io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient`. */ - type CachedSchemaRegistryClient = - io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient - /** Alias for `io.confluent.kafka.serializers.KafkaAvroDeserializer`. */ type KafkaAvroDeserializer = io.confluent.kafka.serializers.KafkaAvroDeserializer diff --git a/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroDeserializerSpec.scala b/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroDeserializerSpec.scala index fb6718450..1f1eda271 100644 --- a/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroDeserializerSpec.scala +++ b/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroDeserializerSpec.scala @@ -8,12 +8,17 @@ package fs2.kafka.vulcan import cats.effect.IO import cats.effect.unsafe.implicits.global +import fs2.kafka.schemaregistry.client.SchemaRegistryClient import fs2.kafka.Headers import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient import org.scalatest.funspec.AnyFunSpec import vulcan.Codec final class AvroDeserializerSpec extends AnyFunSpec { + + val avroSettings: AvroSettings[IO] = + AvroSettings(SchemaRegistryClient.fromJava[IO](new MockSchemaRegistryClient())) + describe("AvroDeserializer") { it("can create a deserializer") { val deserializer = @@ -47,18 +52,4 @@ final class AvroDeserializerSpec extends AnyFunSpec { } } } - - val schemaRegistryClient: MockSchemaRegistryClient = - new MockSchemaRegistryClient() - - val schemaRegistryClientSettings: SchemaRegistryClientSettings[IO] = - SchemaRegistryClientSettings[IO]("baseUrl") - .withAuth(Auth.Basic("username", "password")) - .withMaxCacheSize(100) - .withCreateSchemaRegistryClient { (_, _, _) => - IO.pure(schemaRegistryClient) - } - - val avroSettings: AvroSettings[IO] = - AvroSettings(schemaRegistryClientSettings) } diff --git a/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroSerializerSpec.scala b/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroSerializerSpec.scala index 6bf74cf4f..c737fe864 100644 --- a/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroSerializerSpec.scala +++ b/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroSerializerSpec.scala @@ -8,12 +8,17 @@ package fs2.kafka.vulcan import cats.effect.IO import cats.effect.unsafe.implicits.global +import fs2.kafka.schemaregistry.client.SchemaRegistryClient import fs2.kafka.Headers import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient import org.scalatest.funspec.AnyFunSpec import vulcan.Codec final class AvroSerializerSpec extends AnyFunSpec { + + val avroSettings: AvroSettings[IO] = + AvroSettings(SchemaRegistryClient.fromJava[IO](new MockSchemaRegistryClient())) + describe("AvroSerializer") { it("can create a serializer") { val serializer = @@ -35,11 +40,16 @@ final class AvroSerializerSpec extends AnyFunSpec { ) )) .unsafeRunSync() - assert( - schemaRegistryClient - .getLatestSchemaMetadata("test-union-topic-value") - .getSchema === """["int","boolean"]""" - ) + + avroSettings.schemaRegistryClient + .flatMap(_.getLatestSchemaMetadata("test-union-topic-value")) + .map( + latestSchemaMetadata => + assert( + latestSchemaMetadata.getSchema === """["int","boolean"]""" + ) + ) + .unsafeRunSync() } it("raises schema errors") { @@ -59,18 +69,4 @@ final class AvroSerializerSpec extends AnyFunSpec { } } } - - val schemaRegistryClient: MockSchemaRegistryClient = - new MockSchemaRegistryClient() - - val schemaRegistryClientSettings: SchemaRegistryClientSettings[IO] = - SchemaRegistryClientSettings[IO]("baseUrl") - .withAuth(Auth.Basic("username", "password")) - .withMaxCacheSize(100) - .withCreateSchemaRegistryClient { (_, _, _) => - IO.pure(schemaRegistryClient) - } - - val avroSettings: AvroSettings[IO] = - AvroSettings(schemaRegistryClientSettings) } diff --git a/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroSettingsSpec.scala b/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroSettingsSpec.scala index a7be4454c..a577e9b4a 100644 --- a/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroSettingsSpec.scala +++ b/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroSettingsSpec.scala @@ -8,10 +8,15 @@ package fs2.kafka.vulcan import cats.effect.IO import cats.effect.unsafe.implicits.global +import fs2.kafka.schemaregistry.client.SchemaRegistryClient import org.scalatest.funspec.AnyFunSpec import org.scalatestplus.scalacheck._ final class AvroSettingsSpec extends AnyFunSpec with ScalaCheckPropertyChecks { + + val settings: AvroSettings[IO] = + AvroSettings[IO](SchemaRegistryClient[IO]("baseUrl")) + describe("AvroSettings") { it("should provide withAutoRegisterSchemas") { forAll { (value: Boolean) => @@ -124,10 +129,4 @@ final class AvroSettingsSpec extends AnyFunSpec with ScalaCheckPropertyChecks { } } } - - val settings: AvroSettings[IO] = - AvroSettings(SchemaRegistryClientSettings[IO]("baseUrl")) - - val settingsWithClient: AvroSettings[IO] = - AvroSettings(null: SchemaRegistryClient) } diff --git a/modules/vulcan/src/test/scala/fs2/kafka/vulcan/PackageSpec.scala b/modules/vulcan/src/test/scala/fs2/kafka/vulcan/PackageSpec.scala index 591742428..abff47661 100644 --- a/modules/vulcan/src/test/scala/fs2/kafka/vulcan/PackageSpec.scala +++ b/modules/vulcan/src/test/scala/fs2/kafka/vulcan/PackageSpec.scala @@ -6,18 +6,22 @@ package fs2.kafka.vulcan -import java.time.Instant - -import cats.syntax.all._ +import _root_.vulcan.Codec import cats.effect.IO import cats.effect.unsafe.implicits.global +import cats.syntax.all._ import fs2.kafka._ - -import org.scalatest.funspec.AnyFunSpec +import fs2.kafka.schemaregistry.client.SchemaRegistryClient import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient -import _root_.vulcan.Codec +import org.scalatest.funspec.AnyFunSpec + +import java.time.Instant final class PackageSpec extends AnyFunSpec { + + val avroSettings: AvroSettings[IO] = + AvroSettings(SchemaRegistryClient.fromJava[IO](new MockSchemaRegistryClient())) + describe("avroSerializer") { it("should be available given explicit settings") { avroSerializer[Test].using(avroSettings) @@ -87,18 +91,4 @@ final class PackageSpec extends AnyFunSpec { ).mapN(apply) } } - - val schemaRegistryClient: MockSchemaRegistryClient = - new MockSchemaRegistryClient() - - val schemaRegistryClientSettings: SchemaRegistryClientSettings[IO] = - SchemaRegistryClientSettings[IO]("baseUrl") - .withAuth(Auth.Basic("username", "password")) - .withMaxCacheSize(100) - .withCreateSchemaRegistryClient { (_, _, _) => - IO.pure(schemaRegistryClient) - } - - val avroSettings: AvroSettings[IO] = - AvroSettings(schemaRegistryClientSettings) }