Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[proposal] Externalize schema registry client #843

Open
wants to merge 6 commits into
base: series/2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ lazy val `fs2-kafka` = project
Test / console := (core / Test / console).value
)
.enablePlugins(TypelevelMimaPlugin)
.aggregate(core, vulcan, `vulcan-testkit-munit`)
.aggregate(core, `schema-registry`, vulcan, `vulcan-testkit-munit`)

lazy val core = project
.in(file("modules/core"))
Expand All @@ -52,6 +52,22 @@ lazy val core = project
testSettings
)

lazy val `schema-registry` = project
.in(file("modules/schema-registry"))
.settings(
moduleName := "fs2-kafka-schema-registry",
name := moduleName.value,
dependencySettings ++ Seq(
libraryDependencies ++= Seq(
"io.confluent" % "kafka-schema-registry-client" % confluentVersion
)
),
publishSettings,
scalaSettings,
testSettings
)
.dependsOn(core)

lazy val vulcan = project
.in(file("modules/vulcan"))
.settings(
Expand All @@ -67,7 +83,7 @@ lazy val vulcan = project
scalaSettings,
testSettings
)
.dependsOn(core)
.dependsOn(core, `schema-registry`)

lazy val `vulcan-testkit-munit` = project
.in(file("modules/vulcan-testkit-munit"))
Expand Down Expand Up @@ -177,6 +193,9 @@ lazy val buildInfoSettings = Seq(
BuildInfoKey.map(core / crossScalaVersions) {
case (k, v) => "core" ++ k.capitalize -> v
},
BuildInfoKey.map(`schema-registry` / moduleName) {
case (k, v) => "schemaRegistry" ++ k.capitalize -> v
},
BuildInfoKey.map(vulcan / moduleName) {
case (k, v) => "vulcan" ++ k.capitalize -> v
},
Expand Down
2 changes: 2 additions & 0 deletions modules/core/src/main/scala/fs2/kafka/internal/syntax.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ private[kafka] object syntax {
def updatedIfAbsent(k: K, v: => V): Map[K, V] =
if (map.contains(k)) map else map.updated(k, v)

def asJavaMap: util.Map[K, V] =
map.asJava
}

implicit final class MapWrappedValueSyntax[F[_], K, V](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package fs2.kafka.vulcan
package fs2.kafka.schemaregistry.client

/**
* The available options for [[SchemaRegistryClientSettings#withAuth]].
Expand All @@ -17,15 +17,16 @@ package fs2.kafka.vulcan
sealed abstract class Auth

object Auth {
private[vulcan] final case class BasicAuth(username: String, password: String) extends Auth {
private[schemaregistry] final case class BasicAuth(username: String, password: String)
extends Auth {
override def toString: String = s"Basic($username)"
}

private[vulcan] final case class BearerAuth(token: String) extends Auth {
private[schemaregistry] final case class BearerAuth(token: String) extends Auth {
override def toString: String = "Bearer"
}

private[vulcan] case object NoAuth extends Auth {
private[schemaregistry] case object NoAuth extends Auth {
override def toString: String = "None"
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package fs2.kafka.schemaregistry.client

import cats.effect.kernel.Sync
import fs2.kafka.internal.syntax.MapSyntax

trait SchemaRegistryClient[F[_]] {

/**
* Register a schema for a given `Codec` for some type `A`,
* or return the existing schema id if it already exists.
* @param subject The subject name
* @return The schema id
*/
def register(subject: String, schema: ParsedSchema): F[Int]

/**
* Get latest schema for the specified subject
* @param subject The subject name
* @return Latest available schema for the subject
*/
def getLatestSchemaMetadata(subject: String): F[SchemaMetadata]

/**
* Get the schema for the specified subject with the specified version
* @param subject The subject name
* @param version Schema version
* @return
*/
def getSchemaMetadata(subject: String, version: Int): F[SchemaMetadata]

/**
* Get the schema for the specified `id` and cast it to specified type
* @param id Schema id
* @tparam S Schema type
* @return Schema for the specified `id`
*/
def getSchemaById[S <: ParsedSchema](id: Int): F[S]

/**
* Get the wrapped java instance
* @return The wrapped Java instance of `SchemaRegistryClient`
*/
def javaClient: JSchemaRegistryClient
}
object SchemaRegistryClient {

import cats.syntax.all._

private[this] final case class SchemaRegistryClientSettingsImpl[F[_]](
override val javaClient: JSchemaRegistryClient
)(implicit F: Sync[F])
extends SchemaRegistryClient[F] {

override def register(subject: String, schema: ParsedSchema): F[Int] =
F.delay(javaClient.register(subject, schema))

override def getLatestSchemaMetadata(subject: String): F[SchemaMetadata] =
F.delay(javaClient.getLatestSchemaMetadata(subject))

override def getSchemaMetadata(subject: String, version: Int): F[SchemaMetadata] =
F.delay(javaClient.getSchemaMetadata(subject, version))

override def getSchemaById[S <: ParsedSchema](id: Int): F[S] =
F.delay(javaClient.getSchemaById(id).asInstanceOf[S])
}

def apply[F[_]: Sync](baseUrl: String): F[SchemaRegistryClient[F]] =
SchemaRegistryClient(SchemaRegistryClientSettings(baseUrl))

def apply[F[_]: Sync](settings: SchemaRegistryClientSettings): F[SchemaRegistryClient[F]] =
Sync[F]
.delay(
new JCachedSchemaRegistryClient(
settings.baseUrl,
settings.maxCacheSize,
settings.properties.asJavaMap
)
)
.map(fromJava(_))

def fromJava[F[_]: Sync](jclient: JSchemaRegistryClient): SchemaRegistryClient[F] =
new SchemaRegistryClientSettingsImpl[F](jclient)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@
* SPDX-License-Identifier: Apache-2.0
*/

package fs2.kafka.vulcan
package fs2.kafka.schemaregistry.client

import cats.effect.Sync
import cats.Show
import fs2.kafka.internal.converters.collection._
import cats.{Eq, Show}

/**
* Describes how to create a `SchemaRegistryClient` and which
Expand All @@ -17,7 +15,7 @@ import fs2.kafka.internal.converters.collection._
*
* Use `SchemaRegistryClient#apply` to create an instance.
*/
sealed abstract class SchemaRegistryClientSettings[F[_]] {
sealed trait SchemaRegistryClientSettings {

/**
* The base URL of the schema registry service.
Expand All @@ -35,13 +33,13 @@ sealed abstract class SchemaRegistryClientSettings[F[_]] {
* Creates a new [[SchemaRegistryClientSettings]] instance
* with the specified [[maxCacheSize]].
*/
def withMaxCacheSize(maxCacheSize: Int): SchemaRegistryClientSettings[F]
def withMaxCacheSize(maxCacheSize: Int): SchemaRegistryClientSettings

/**
* Creates a new [[SchemaRegistryClientSettings]] instance
* with the specified authentication details.
*/
def withAuth(auth: Auth): SchemaRegistryClientSettings[F]
def withAuth(auth: Auth): SchemaRegistryClientSettings

/**
* Properties provided when creating a `SchemaRegistryClient`.
Expand All @@ -54,50 +52,33 @@ sealed abstract class SchemaRegistryClientSettings[F[_]] {
* Creates a new [[SchemaRegistryClientSettings]] instance
* including a property with the specified key and value.
*/
def withProperty(key: String, value: String): SchemaRegistryClientSettings[F]
def withProperty(key: String, value: String): SchemaRegistryClientSettings

/**
* Creates a new [[SchemaRegistryClientSettings]] instance
* including properties with the specified keys and values.
*/
def withProperties(properties: (String, String)*): SchemaRegistryClientSettings[F]
def withProperties(properties: (String, String)*): SchemaRegistryClientSettings

/**
* Creates a new [[SchemaRegistryClientSettings]] instance
* including properties with the specified keys and values.
*/
def withProperties(properties: Map[String, String]): SchemaRegistryClientSettings[F]

/**
* Creates a new `SchemaRegistryClient` using the settings
* contained within this [[SchemaRegistryClientSettings]].
*/
def createSchemaRegistryClient: F[SchemaRegistryClient]

/**
* Creates a new [[SchemaRegistryClientSettings]] instance
* with the specified function for creating new instances
* of `SchemaRegistryClient` from settings. The arguments
* are [[baseUrl]], [[maxCacheSize]], and [[properties]].
*/
def withCreateSchemaRegistryClient(
createSchemaRegistryClientWith: (String, Int, Map[String, String]) => F[SchemaRegistryClient]
): SchemaRegistryClientSettings[F]
def withProperties(properties: Map[String, String]): SchemaRegistryClientSettings
}

object SchemaRegistryClientSettings {
private[this] final case class SchemaRegistryClientSettingsImpl[F[_]](

private[this] final case class SchemaRegistryClientSettingsImpl(
override val baseUrl: String,
override val maxCacheSize: Int,
override val properties: Map[String, String],
// format: off
val createSchemaRegistryClientWith: (String, Int, Map[String, String]) => F[SchemaRegistryClient]
// format: on
) extends SchemaRegistryClientSettings[F] {
override def withMaxCacheSize(maxCacheSize: Int): SchemaRegistryClientSettings[F] =
override val properties: Map[String, String]
) extends SchemaRegistryClientSettings {

override def withMaxCacheSize(maxCacheSize: Int): SchemaRegistryClientSettings =
copy(maxCacheSize = maxCacheSize)

override def withAuth(auth: Auth): SchemaRegistryClientSettings[F] =
override def withAuth(auth: Auth): SchemaRegistryClientSettings =
auth match {
case Auth.BasicAuth(username, password) =>
withProperties(
Expand All @@ -115,23 +96,15 @@ object SchemaRegistryClientSettings {
this
}

override def withProperty(key: String, value: String): SchemaRegistryClientSettings[F] =
override def withProperty(key: String, value: String): SchemaRegistryClientSettings =
copy(properties = properties.updated(key, value))

override def withProperties(properties: (String, String)*): SchemaRegistryClientSettings[F] =
override def withProperties(properties: (String, String)*): SchemaRegistryClientSettings =
copy(properties = this.properties ++ properties.toMap)

override def withProperties(properties: Map[String, String]): SchemaRegistryClientSettings[F] =
override def withProperties(properties: Map[String, String]): SchemaRegistryClientSettings =
copy(properties = this.properties ++ properties)

override def createSchemaRegistryClient: F[SchemaRegistryClient] =
createSchemaRegistryClientWith(baseUrl, maxCacheSize, properties)

override def withCreateSchemaRegistryClient(
createSchemaRegistryClientWith: (String, Int, Map[String, String]) => F[SchemaRegistryClient]
): SchemaRegistryClientSettings[F] =
copy(createSchemaRegistryClientWith = createSchemaRegistryClientWith)

override def toString: String =
s"SchemaRegistryClientSettings(baseUrl = $baseUrl, maxCacheSize = $maxCacheSize)"
}
Expand All @@ -140,15 +113,17 @@ object SchemaRegistryClientSettings {
* Creates a new [[SchemaRegistryClientSettings]] instance
* using the specified base URL of the schema registry.
*/
def apply[F[_]](baseUrl: String)(implicit F: Sync[F]): SchemaRegistryClientSettings[F] =
def apply(baseUrl: String): SchemaRegistryClientSettings =
SchemaRegistryClientSettingsImpl(
baseUrl = baseUrl,
maxCacheSize = 1000,
properties = Map.empty,
createSchemaRegistryClientWith = (baseUrl, maxCacheSize, properties) =>
F.delay(new CachedSchemaRegistryClient(baseUrl, maxCacheSize, properties.asJava))
properties = Map.empty
)

implicit def schemaRegistryClientSettingsShow[F[_]]: Show[SchemaRegistryClientSettings[F]] =
implicit val schemaRegistryClientSettingsShow: Show[SchemaRegistryClientSettings] =
Show.fromToString

implicit val schemaRegistryClientSettingsEq: Eq[SchemaRegistryClientSettings] =
Eq.fromUniversalEquals[SchemaRegistryClientSettingsImpl]
.asInstanceOf[Eq[SchemaRegistryClientSettings]]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package fs2.kafka.schemaregistry

package object client {

/** Alias for `io.confluent.kafka.schemaregistry.client.SchemaMetadata`. */
type SchemaMetadata =
io.confluent.kafka.schemaregistry.client.SchemaMetadata

/** Alias for `io.confluent.kafka.schemaregistry.ParsedSchema`. */
type ParsedSchema =
io.confluent.kafka.schemaregistry.ParsedSchema

/** Alias for `io.confluent.kafka.schemaregistry.client.SchemaRegistryClient`. */
type JSchemaRegistryClient =
io.confluent.kafka.schemaregistry.client.SchemaRegistryClient

/** Alias for `io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient`. */
type JCachedSchemaRegistryClient =
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package fs2.kafka.vulcan
package fs2.kafka.schemaregistry.client

import org.scalatest.funspec.AnyFunSpec
import org.scalatestplus.scalacheck._
Expand Down
Loading