Skip to content
This repository has been archived by the owner on Nov 15, 2024. It is now read-only.

Commit

Permalink
Revert "ADAPT-1701: update schema-registry client and kafka version (#13
Browse files Browse the repository at this point in the history
)" (#19)

This reverts commit 8d47998.
  • Loading branch information
shreedhar-kc authored Feb 29, 2024
1 parent 8d47998 commit b013766
Show file tree
Hide file tree
Showing 31 changed files with 331 additions and 416 deletions.
5 changes: 0 additions & 5 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,3 @@ build.properties
#ensime http://ensime.github.io/
.ensime
.ensime_cache

.bsp

# ignore temp files and folders
__temp*
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object ConfluentSchemaRegistry extends LoggingAdapter {

private val cachedClients = CacheBuilder
.newBuilder()
.build[SchemaRegistryClientInfo, ConfluentSchemaRegistry](
.build(
new CacheLoader[SchemaRegistryClientInfo, ConfluentSchemaRegistry] {

def load(info: SchemaRegistryClientInfo): ConfluentSchemaRegistry = {
Expand Down
314 changes: 147 additions & 167 deletions avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala

Large diffs are not rendered by default.

204 changes: 91 additions & 113 deletions avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package hydra.avro.registry

import com.typesafe.config.ConfigFactory
import hydra.common.config.KafkaConfigUtils.SchemaRegistrySecurityConfig
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, MockSchemaRegistryClient}
import org.apache.avro.Schema
import org.scalatest.concurrent.ScalaFutures
Expand Down Expand Up @@ -53,7 +52,7 @@ class ConfluentSchemaRegistrySpec

override def beforeAll(): Unit = {
id =
ConfluentSchemaRegistry.mockRegistry.register(schema.getFullName, new AvroSchema(schema))
ConfluentSchemaRegistry.mockRegistry.register(schema.getFullName, schema)
}

describe("When creating a schema registry client") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ import org.apache.avro.{Schema, SchemaBuilder}
import org.scalatest.BeforeAndAfterAll
import com.github.sebruck.EmbeddedRedis
import hydra.avro.registry.RedisSchemaRegistryClient.IntSchemaMetadataMapBinCodec
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaMetadata, SchemaRegistryClient}
import io.github.embeddedkafka.schemaregistry.{EmbeddedKafka, EmbeddedKafkaConfig}
import net.manub.embeddedkafka.schemaregistry.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
import redis.embedded.RedisServer
Expand Down Expand Up @@ -61,16 +60,15 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with
val schema1: Schema = SchemaBuilder.record("Test1").fields()
.requiredString("testing1").endRecord()

val schema12: Schema = SchemaBuilder.record("Test1").fields()
.requiredString("testing1").optionalString("testing22").endRecord()
val schema12: Schema = SchemaBuilder.record("Test12").fields()
.requiredString("testing1").endRecord()

val schema2: Schema = SchemaBuilder.record("Test2").fields()
.requiredString("testing2").endRecord()

//register two schemas with different clients
val redisClientResult = redisClient.register(topicName1, new AvroSchema(schema1))
val cachedClientResult = cachedClient.register(topicName2 , new AvroSchema(schema2))

val redisClientResult = redisClient.register(topicName1, schema1)
val cachedClientResult = cachedClient.register(topicName2 ,schema2)

//test the getAllSubjectsById method
assert(redisClient.getAllSubjectsById(redisClientResult).contains(topicName1))
Expand All @@ -79,17 +77,17 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with
assert(cachedClient.getAllSubjectsById(cachedClientResult).contains(topicName2))

//test the getById method
redisClient.getSchemaById(redisClientResult) shouldBe new AvroSchema(schema1)
cachedClient.getSchemaById(redisClientResult) shouldBe new AvroSchema(schema1)
redisClient.getSchemaById(cachedClientResult) shouldBe new AvroSchema(schema2)
cachedClient.getSchemaById(cachedClientResult) shouldBe new AvroSchema(schema2)
redisClient.getById(redisClientResult) shouldBe schema1
cachedClient.getById(redisClientResult) shouldBe schema1
redisClient.getById(cachedClientResult) shouldBe schema2
cachedClient.getById(cachedClientResult) shouldBe schema2

//test the getBySubjectAndId method
Thread.sleep(3000)
assert(redisClient.getSchemaBySubjectAndId(topicName1, redisClientResult).equals(new AvroSchema(schema1)))
assert(cachedClient.getSchemaBySubjectAndId(topicName1, redisClientResult).equals(new AvroSchema(schema1)))
assert(redisClient.getSchemaBySubjectAndId(topicName2, cachedClientResult).equals(new AvroSchema(schema2)))
assert(cachedClient.getSchemaBySubjectAndId(topicName2, cachedClientResult).equals(new AvroSchema(schema2)))
assert(redisClient.getBySubjectAndId(topicName1, redisClientResult).equals(schema1))
assert(cachedClient.getBySubjectAndId(topicName1, redisClientResult).equals(schema1))
assert(redisClient.getBySubjectAndId(topicName2, cachedClientResult).equals(schema2))
assert(cachedClient.getBySubjectAndId(topicName2, cachedClientResult).equals(schema2))


//test the getAllSubjects method
Expand All @@ -111,10 +109,10 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with

//test the getId method
Thread.sleep(3000)
redisClient.getId(topicName1, new AvroSchema(schema1)) shouldBe redisClientResult
cachedClient.getId(topicName1, new AvroSchema(schema1)) shouldBe redisClientResult
redisClient.getId(topicName2, new AvroSchema(schema2)) shouldBe cachedClientResult
cachedClient.getId(topicName2, new AvroSchema(schema2)) shouldBe cachedClientResult
redisClient.getId(topicName1, schema1) shouldBe redisClientResult
cachedClient.getId(topicName1, schema1) shouldBe redisClientResult
redisClient.getId(topicName2, schema2) shouldBe cachedClientResult
cachedClient.getId(topicName2, schema2) shouldBe cachedClientResult

//test the getLatestSchemaMetadata method
val schemaMetadata1 = new SchemaMetadata(1, 1,schema1.toString)
Expand All @@ -125,13 +123,13 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with
schemaMetadata1Result.getVersion shouldBe schemaMetadata1.getVersion
schemaMetadata1Result.getSchema shouldBe schemaMetadata1.getSchema

redisClient.register(topicName1, new AvroSchema(schema12))
redisClient.register(topicName1, schema12)
val schemaMetadata2Result = redisClient.getLatestSchemaMetadata(topicName1)
schemaMetadata2Result.getId shouldBe schemaMetadata2.getId
schemaMetadata2Result.getVersion shouldBe schemaMetadata2.getVersion
schemaMetadata2Result.getSchema shouldBe schemaMetadata2.getSchema

//test the getSchemaMetadata method
//test the getSchemaMetadata method
val metadata1 = redisClient.getSchemaMetadata(topicName1, 1)
metadata1.getId shouldBe schemaMetadata1.getId
metadata1.getVersion shouldBe schemaMetadata1.getVersion
Expand All @@ -145,9 +143,9 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with

//test the getVersion method
Thread.sleep(3000)
redisClient.getVersion(topicName1, new AvroSchema(schema1)) shouldBe 1
redisClient.getVersion(topicName1, new AvroSchema(schema12)) shouldBe 2
redisClient.getVersion(topicName2, new AvroSchema(schema2)) shouldBe 1
redisClient.getVersion(topicName1, schema1) shouldBe 1
redisClient.getVersion(topicName1, schema12) shouldBe 2
redisClient.getVersion(topicName2, schema2) shouldBe 1

//test the deleteSchemaVersion method
//the latest metadata is in this test -> test the getLatestSchemaMetadata method
Expand All @@ -162,7 +160,7 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with

intercept[RestClientException] {
redisClient.getLatestSchemaMetadata(topicName2)
}.getMessage shouldBe "Subject 'topicB' not found.; error code: 40401"
}.getMessage shouldBe "Subject not found.; error code: 40401"

succeed
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,12 +392,12 @@ class SchemaRegistrySpec extends AnyFlatSpecLike with MockFactory with Matchers
it must "do retries for getAllVersions when SchemaRegistry throws error" in {
val expectedVersions = List(1, 2, 3)
val mockSchemaRegistryClient = mock[SchemaRegistryClient]
(mockSchemaRegistryClient.getAllVersions(_ : String))
(mockSchemaRegistryClient.getAllVersions _)
.expects(*)
.throws(new RestClientException("error", 0, 50005))
.repeat(2)

(mockSchemaRegistryClient.getAllVersions(_ : String))
(mockSchemaRegistryClient.getAllVersions _)
.expects(*)
.returns(expectedVersions.map(Integer.valueOf).asJava)

Expand Down Expand Up @@ -426,7 +426,7 @@ class SchemaRegistrySpec extends AnyFlatSpecLike with MockFactory with Matchers

it must "fail if all attempts were used" in {
val mockSchemaRegistryClient = mock[SchemaRegistryClient]
(mockSchemaRegistryClient.getAllVersions(_ : String))
(mockSchemaRegistryClient.getAllVersions _)
.expects(*)
.throws(new RestClientException("error", 0, 50005))
.repeat(3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package hydra.avro.resource

import hydra.avro.registry.SchemaRegistryException
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient
import org.apache.avro.{Schema, SchemaBuilder}
import org.scalatest.BeforeAndAfterEach
Expand Down Expand Up @@ -209,7 +208,7 @@ class SchemaResourceLoaderSpec
"returns the same underlying key schema instance if the registry metadata hasn't changed"
) {
val client = new MockSchemaRegistryClient
client.register(testKeySchema.getFullName + "-key", new AvroSchema(testKeySchema))
client.register(testKeySchema.getFullName + "-key", testKeySchema)
val loader = new SchemaResourceLoader(
"http://localhost:48223",
client,
Expand All @@ -232,7 +231,7 @@ class SchemaResourceLoaderSpec
) {
val nschema = newValueSchema(testValueSchema.getNamespace, "ntest")
val client = new MockSchemaRegistryClient
client.register(nschema.getFullName + "-value", new AvroSchema(nschema))
client.register(nschema.getFullName + "-value", nschema)
val loader = new SchemaResourceLoader(
"http://localhost:48223",
client,
Expand All @@ -259,7 +258,7 @@ class SchemaResourceLoaderSpec
false,
fields.asJava
)
client.register(nschema.getFullName + "-value", new AvroSchema(evolvedSchema))
client.register(nschema.getFullName + "-value", evolvedSchema)
eventually {
whenReady(loader.retrieveValueSchema(nschema.getFullName)) {
schemaResource => (schemaResource.schema eq nschema) shouldBe false
Expand All @@ -272,7 +271,7 @@ class SchemaResourceLoaderSpec
) {
val nschema = newKeySchema(testKeySchema.getNamespace, "ntest")
val client = new MockSchemaRegistryClient
client.register(nschema.getFullName + "-value", new AvroSchema(nschema))
client.register(nschema.getFullName + "-value", nschema)
val loader = new SchemaResourceLoader(
"http://localhost:48223",
client,
Expand Down Expand Up @@ -301,7 +300,7 @@ class SchemaResourceLoaderSpec
false,
fields.asJava
)
client.register(nschema.getFullName + "-value", new AvroSchema(evolvedSchema))
client.register(nschema.getFullName + "-value", evolvedSchema)
eventually {
whenReady(loader.retrieveValueSchema(nschema.getFullName)) {
schemaResource => (schemaResource.schema eq nschema) shouldBe false
Expand Down
27 changes: 13 additions & 14 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,11 @@ lazy val defaultSettings = Seq(
excludeDependencies += "org.slf4j" % "slf4j-log4j12",
excludeDependencies += "log4j" % "log4j",
dependencyOverrides ++= Seq(
"org.apache.commons" % "commons-compress" % "1.24.0",
"io.netty" % "netty-codec" % "4.1.77.Final",
// "org.apache.zookeeper" % "zookeeper" % "3.7.2", -- snyk vulnerability fix
"org.xerial.snappy" % "snappy-java" % "1.1.10.4",
"org.apache.avro" % "avro" % Dependencies.avroVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.3",
"org.apache.kafka" %% "kafka" % "2.8.2" % "test",
"io.confluent" %% "kafka-schema-registry" % "6.2.1" % "test",
"io.confluent" %% "kafka-avro-serializer" % "6.2.1" % "test",
"org.apache.commons" % "commons-lang3" % "3.12.0",
"org.apache.commons" % "commons-compress" % "1.22",
"org.apache.commons" % "lang3" % "3.1.0",
"io.confluent" % "kafka-schema-registry" % "5.4.2" % "test",
"io.confluent" % "kafka-avro-serializer" % "5.4.2" % "test"
),
addCompilerPlugin(
"org.typelevel" %% "kind-projector" % "0.11.3" cross CrossVersion.full
Expand Down Expand Up @@ -113,10 +109,10 @@ lazy val core = Project(
.settings(
moduleSettings,
name := "hydra-core",
libraryDependencies ++= Dependencies.coreDeps ++ Dependencies.awsAuthDeps ++ Dependencies.kafkaSchemaRegistryDep,
libraryDependencies ++= Dependencies.coreDeps ++ Dependencies.awsAuthDeps,
dependencyOverrides ++= Seq(
"org.apache.kafka" %% "kafka" % "2.8.2",
"org.apache.kafka" % "kafka-clients" % "2.8.2"
"io.confluent" % "kafka-schema-registry" % "5.4.2",
"io.confluent" % "kafka-avro-serializer" % "5.4.2"
)
)

Expand All @@ -130,8 +126,8 @@ lazy val kafka = Project(
name := "hydra-kafka",
libraryDependencies ++= Dependencies.kafkaDeps,
dependencyOverrides ++= Seq(
"org.apache.kafka" %% "kafka" % "2.8.2",
"org.apache.kafka" % "kafka-clients" % "2.8.2"
"io.confluent" % "kafka-schema-registry" % "5.4.2",
"io.confluent" % "kafka-avro-serializer" % "5.4.2"
)
)

Expand All @@ -145,6 +141,9 @@ lazy val avro = Project(
libraryDependencies ++= Dependencies.avroDeps
)

val sbSettings =
defaultSettings ++ Test.testSettings ++ noPublishSettings ++ restartSettings

lazy val ingest = Project(
id = "ingest",
base = file("ingest")
Expand Down
9 changes: 0 additions & 9 deletions common/src/main/scala/hydra/common/util/ClassUtils.scala

This file was deleted.

6 changes: 3 additions & 3 deletions core/src/main/scala/hydra/core/akka/SchemaRegistryActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientExcept
import org.apache.avro.{Schema, SchemaParseException}
import org.apache.kafka.common.PartitionInfo
import scalacache.cachingF
import io.confluent.kafka.schemaregistry.avro.AvroSchema

import scala.collection.JavaConverters._
import scala.collection.immutable.Map
import scala.concurrent.Future
Expand Down Expand Up @@ -173,8 +173,8 @@ class SchemaRegistryActor(

val subject = getSubject(schema)
log.debug(s"Registering schema ${schema.getFullName}: $json")
val schemaId = registry.registryClient.register(subject, new AvroSchema(schema))
val version = registry.registryClient.getVersion(subject, new AvroSchema(schema))
val schemaId = registry.registryClient.register(subject, schema)
val version = registry.registryClient.getVersion(subject, schema)
RegisterSchemaResponse(SchemaResource(schemaId, version, schema))
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/hydra/core/ingest/Ingestor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import hydra.core.akka.InitializingActor
import hydra.core.monitor.HydraMetrics
import hydra.core.protocol._
import hydra.core.transport.{AckStrategy, HydraRecord, RecordFactory}
import hydra.common.util.ClassUtils
import org.apache.commons.lang3.ClassUtils

import scala.concurrent.Future
import scala.util.{Success, Try}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import hydra.avro.resource.SchemaResource
import hydra.common.config.KafkaConfigUtils.SchemaRegistrySecurityConfig
import hydra.core.akka.SchemaRegistryActor._
import hydra.core.protocol.HydraApplicationError
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import org.apache.avro.Schema.Parser
import org.apache.avro.SchemaBuilder
import org.scalatest.BeforeAndAfterAll
Expand Down Expand Up @@ -59,9 +58,9 @@ class SchemaRegistryActorSpec
.endRecord

override def beforeAll() = {
client.register("hydra.test.Tester-value", new AvroSchema(testSchema))
client.register("hydra.test.TesterWithKey-key", new AvroSchema(testKeySchema))
client.register("hydra.test.TesterWithKey-value", new AvroSchema(testSchema))
client.register("hydra.test.Tester-value", testSchema)
client.register("hydra.test.TesterWithKey-key", testKeySchema)
client.register("hydra.test.TesterWithKey-value", testSchema)
}

val listener = TestProbe()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.kafka.common.{Node, PartitionInfo}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import spray.json.{JsArray, JsObject, JsValue, RootJsonFormat}
import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}

import scala.collection.immutable.Map
import scala.concurrent.duration._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import org.typelevel.log4cats.Logger
import hydra.common.config.KafkaConfigUtils._
import kafka.server.KafkaConfig

/**
* Internal interface to interact with the KafkaAdminClient from FS2 Kafka.
* Provides a live version for production usage and a test version for integration testing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.pluralsight.hydra.avro.JsonConverter
import hydra.core.transport.AckStrategy
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord

import org.apache.commons.lang3.StringUtils

/**
* Created by alexsilva on 10/30/15.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package hydra.kafka.producer

import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import hydra.core.transport.AckStrategy
import org.apache.commons.lang3.StringUtils

/**
* Created by alexsilva on 11/30/15.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package hydra.kafka.producer

import hydra.core.transport.HydraRecord
import hydra.common.util.ClassUtils
import org.apache.commons.lang3.ClassUtils
import org.apache.kafka.clients.producer.ProducerRecord

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package hydra.kafka.producer

import hydra.core.transport.AckStrategy

import org.apache.commons.lang3.StringUtils

/**
* Created by alexsilva on 11/30/15.
Expand Down
Loading

0 comments on commit b013766

Please sign in to comment.