Skip to content
This repository has been archived by the owner on Nov 15, 2024. It is now read-only.

Commit

Permalink
temp all changes
Browse files Browse the repository at this point in the history
  • Loading branch information
shreedhar-kc committed Jan 24, 2024
1 parent 74139d1 commit add0da3
Show file tree
Hide file tree
Showing 18 changed files with 86 additions and 48 deletions.
6 changes: 3 additions & 3 deletions .jvmopts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-XX:MaxPermSize=512m
-Xms256m
-Xmx1g
-XX:MaxPermSize=1g
-Xms1g
-Xmx2g
-XX:+CMSClassUnloadingEnabled
-XX:+UseConcMarkSweepGC
Original file line number Diff line number Diff line change
Expand Up @@ -654,8 +654,8 @@ class RedisSchemaRegistryClient(restService: RestService,

override def parseSchema(schemaType: String, schemaString: String,
references: util.List[SchemaReference]): Optional[ParsedSchema] = {
val parsedSchema = new AvroSchemaProvider().parseSchemaOrElseThrow(schemaString, references, false)
Optional.of(parsedSchema)
val parsedSchema = new AvroSchemaProvider().parseSchema(schemaString, references, false)
parsedSchema
}

override def register(subject: String, schema: ParsedSchema): Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class ConfluentSchemaRegistrySpec
""".stripMargin)

val client = ConfluentSchemaRegistry.forConfig(config, emptySchemaRegistrySecurityConfig).registryClient
val field = client.getClass.getDeclaredField("cacheCapacity")
val field = client.getClass.getDeclaredField("identityMapCapacity")
field.setAccessible(true)
assert(1234 === field.get(client))
}
Expand Down
19 changes: 12 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ lazy val defaultSettings = Seq(
// "org.apache.commons" % "commons-lang3" % "3.13.0",
"org.apache.commons" % "commons-compress" % "1.24.0",
// "org.apache.commons" % "lang3" % "3.1.0",
"io.confluent" %% "kafka-schema-registry" % "7.2.2" % "test" excludeAll(ExclusionRule("org.apache.kafka"), ExclusionRule("org.apache.zookeeper")),
"io.confluent" %% "kafka-avro-serializer" % "7.2.2" % "test"
"io.confluent" %% "kafka-schema-registry" % "6.2.1" % "test", // excludeAll(ExclusionRule("org.apache.kafka")),
"io.confluent" %% "kafka-avro-serializer" % "6.2.1" % "test",
// "org.apache.kafka" %% "kafka" % "2.8.2" % "test",
// "org.apache.zookeeper" % "zookeeper" % "3.5.9" % "test"
// "org.apache.zookeeper" % "zookeeper" % "3.5.9",
// "org.apache.zookeeper" % "zookeeper" % "3.5.9" % "test"
),
addCompilerPlugin(
"org.typelevel" %% "kind-projector" % "0.11.3" cross CrossVersion.full
Expand All @@ -36,6 +40,7 @@ lazy val defaultSettings = Seq(
"-deprecation",
"-unchecked",
"-Ypartial-unification"
// "-Ylog-classpath"
),
javacOptions in Compile ++= Seq(
"-encoding",
Expand Down Expand Up @@ -109,10 +114,10 @@ lazy val core = Project(
.settings(
moduleSettings,
name := "hydra-core",
libraryDependencies ++= Dependencies.coreDeps ++ Dependencies.awsAuthDeps,
libraryDependencies ++= Dependencies.coreDeps ++ Dependencies.awsAuthDeps ++ Dependencies.kafkaSchemaRegistryDep,
dependencyOverrides ++= Seq(
"io.confluent" %% "kafka-schema-registry" % "7.2.2" exclude("org.apache.kafka", "kafka-clients"),
"io.confluent" %% "kafka-avro-serializer" % "7.2.2"
"org.apache.kafka" %% "kafka" % "2.8.2" % "compile,runtime",
"org.apache.kafka" % "kafka-clients" % "2.8.2" % "compile,runtime"
)
)

Expand All @@ -126,8 +131,8 @@ lazy val kafka = Project(
name := "hydra-kafka",
libraryDependencies ++= Dependencies.kafkaDeps,
dependencyOverrides ++= Seq(
"io.confluent" %% "kafka-schema-registry" % "7.2.2" exclude("org.apache.kafka", "kafka-clients"),
"io.confluent" %% "kafka-avro-serializer" % "7.2.2"
"org.apache.kafka" %% "kafka" % "2.8.2" % "compile,runtime",
"org.apache.kafka" % "kafka-clients" % "2.8.2" % "compile,runtime"
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.kafka.common.{Node, PartitionInfo}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import spray.json.{JsArray, JsObject, JsValue, RootJsonFormat}
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}

import scala.collection.immutable.Map
import scala.concurrent.duration._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,10 @@ object KafkaUtils extends ConfigSupport {
def producerSettings(cfg: Config, kafkaClientSecurityConfig: KafkaClientSecurityConfig): Map[String, ProducerSettings[Any, Any]] = {
val clientsConfig = cfg.getConfig(s"$applicationName.kafka.clients")
val clients = clientsConfig.root().entrySet().asScala.map(_.getKey)
clients
val res = clients
.map(client => client -> producerSettings[Any, Any](client, cfg, kafkaClientSecurityConfig))
.toMap
res
}

def consumerSettings[K, V](
Expand Down
1 change: 1 addition & 0 deletions ingestors/kafka/src/test/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ hydra_kafka {
key.serializer = "org.apache.kafka.common.serialization.StringSerializer"
value.serializer = "org.apache.kafka.common.serialization.StringSerializer"
metadata.fetch.timeout.ms = 1000
schema.registry.url = ${hydra_kafka.schema.registry.url}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import hydra.kafka.algebras.KafkaClientAlgebra.getOptionalGenericRecordDeseriali
import hydra.kafka.util.KafkaUtils.TopicDetails
import org.typelevel.log4cats.SelfAwareStructuredLogger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.joda.time.DurationFieldType.seconds
import org.scalatest.{BeforeAndAfterAll, stats}
import org.scalatest.matchers.should.Matchers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package hydra.kafka.algebras

import cats.effect.{Concurrent, ContextShift, IO, Timer}
import hydra.avro.registry.SchemaRegistry
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.avro.generic.GenericRecord
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package hydra.kafka.consumer
import akka.actor.{ActorSystem, Props}
import akka.testkit.{ImplicitSender, TestKit}
import hydra.kafka.consumer.KafkaConsumerProxy._
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.kafka.common.TopicPartition
import org.scalatest.matchers.should.Matchers
import org.scalatest.funspec.AnyFunSpecLike
Expand All @@ -37,7 +37,7 @@ class KafkaConsumerProxySpec
with ImplicitSender {

implicit val config =
EmbeddedKafkaConfig(kafkaPort = 8012, zooKeeperPort = 3111)
EmbeddedKafkaConfig(kafkaPort = 8012, zooKeeperPort = 3121)

override def beforeAll() = {
super.beforeAll()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class BootstrapEndpointSpec

implicit val embeddedKafkaConfig = EmbeddedKafkaConfig(
kafkaPort = 8012,
zooKeeperPort = 3111,
zooKeeperPort = 3011,
customBrokerProperties = Map("auto.create.topics.enable" -> "false")
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import hydra.kafka.model.RequiredField
import hydra.kafka.model.TopicMetadataV2Request.Subject
import org.typelevel.log4cats.SelfAwareStructuredLogger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.kafka.common.{Node, PartitionInfo}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers
Expand Down Expand Up @@ -59,7 +59,7 @@ class TopicMetadataEndpointSpec
Slf4jLogger.getLogger[F]

implicit val kafkaConfig: EmbeddedKafkaConfig =
EmbeddedKafkaConfig(kafkaPort = 8012, zooKeeperPort = 3111)
EmbeddedKafkaConfig(kafkaPort = 8012, zooKeeperPort = 3789)

implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val concurrent: Concurrent[IO] = IO.ioConcurrentEffect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import hydra.kafka.model.TopicMetadata
import hydra.kafka.producer.AvroRecord
import hydra.kafka.services.StreamsManagerActor.{GetMetadata, GetMetadataResponse}
import hydra.kafka.services.TopicBootstrapActor._
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig, KafkaUnavailableException}
import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig, KafkaUnavailableException}
import org.apache.avro.Schema
import org.apache.kafka.common.serialization.StringSerializer
import org.joda.time.DateTime
Expand Down Expand Up @@ -48,7 +48,7 @@ class TopicBootstrapActorSpec

implicit val embeddedKafkaConfig = EmbeddedKafkaConfig(
kafkaPort = 8012,
zooKeeperPort = 3111,
zooKeeperPort = 3241,
customBrokerProperties = Map("auto.create.topics.enable" -> "false")
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class KafkaMetricsSpec

implicit val config = EmbeddedKafkaConfig(
kafkaPort = 8012,
zooKeeperPort = 3111,
zooKeeperPort = 3114,
customBrokerProperties = Map(
"auto.create.topics.enable" -> "false",
"offsets.topic.replication.factor" -> "1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import hydra.kafka.producer.{JsonRecord, KafkaRecordMetadata, StringRecord}
import hydra.kafka.transport.KafkaProducerProxy.ProduceToKafka
import hydra.kafka.transport.KafkaTransport.RecordProduceError
import hydra.kafka.util.KafkaUtils
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.scalatest.BeforeAndAfterAll
Expand All @@ -32,7 +32,7 @@ class KafkaProducerProxySpec

implicit val config = EmbeddedKafkaConfig(
kafkaPort = 8012,
zooKeeperPort = 3111,
zooKeeperPort = 3133,
customBrokerProperties = Map("auto.create.topics.enable" -> "false")
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,11 @@ class KafkaTransportSpec

it("forwards to the right proxy") {
val ack: TransportCallback =
(d: Long, m: Option[RecordMetadata], e: Option[Throwable]) =>
ingestor.ref ! "DONE"
(d: Long, m: Option[RecordMetadata], e: Option[Throwable]) => {
val msg = if(e.isDefined) e.get.getMessage else "DONE"
ingestor.ref ! msg
}

val rec =
StringRecord("transport_test", "key", "payload", AckStrategy.NoAck)
transport ! Deliver(rec, 1, ack)
Expand All @@ -104,13 +107,18 @@ class KafkaTransportSpec
}

it("publishes errors to the stream") {
val rec = JsonRecord(
"transport_test",
Some("key"),
"""{"name":"alex"}""",
AckStrategy.NoAck
)
transport ! Deliver(rec)
// val rec = JsonRecord(
// "transport_test",
// Some("key"),
// """{"name":"alex"}""",
// AckStrategy.NoAck
// )
val ack: TransportCallback =
(d: Long, m: Option[RecordMetadata], e: Option[Throwable]) =>
streamActor.ref ! "DONE"
val rec =
StringRecord("transport_test", "key", "payload", AckStrategy.NoAck)
transport ! Deliver(rec, -1, ack)
streamActor.expectMsgPF() {
case RecordProduceError(deliveryId, r, err) =>
deliveryId shouldBe -1
Expand Down
45 changes: 33 additions & 12 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ object Dependencies {
val catsRetryVersion = "2.1.0"
val catsVersion = "2.4.2"
val cirisVersion = "1.2.1"
val confluentVersion = "7.2.2"
val confluentVersion = "6.2.1"
val fs2KafkaVersion = "1.4.1"
val jacksonCoreVersion = "2.10.4"
val jacksonDatabindVersion = "2.10.4"
Expand All @@ -21,7 +21,7 @@ object Dependencies {
val kafkaVersion = "2.8.2"
val kamonPVersion = "2.1.10"
val kamonVersion = "2.1.10"
val log4jVersion = "2.17.1"
val log4jVersion = "2.22.1"
val refinedVersion = "0.9.20"
val reflectionsVersion = "0.9.12"
val scalaCacheVersion = "0.28.0"
Expand Down Expand Up @@ -73,8 +73,10 @@ object Dependencies {
val retry = "com.softwaremill.retry" %% "retry" % "0.3.3"

val embeddedKafka = Seq(
"io.github.embeddedkafka" %% "embedded-kafka" % "2.8.1" % "test" excludeAll(ExclusionRule("org.apache.kafka"), ExclusionRule("org.apache.zookeeper")),
"org.apache.kafka" %% "kafka" % kafkaVersion % "test"
"io.github.embeddedkafka" %% "embedded-kafka" % "2.8.1" % "test"
// excludeAll(ExclusionRule("org.apache.kafka"), ExclusionRule("org.apache.zookeeper"))
// "org.apache.kafka" %% "kafka" % kafkaVersion % "test",
// "org.apache.zookeeper" % "zookeeper" % "3.5.9" % "test"
)

lazy val kamon = Seq(
Expand All @@ -88,15 +90,25 @@ object Dependencies {
"org.apache.kafka" %% "kafka" % kafkaVersion
) ++ kafkaClients ++ embeddedKafka

val confluent: Seq[ModuleID] =
val kafkaAvroSerializer: Seq[ModuleID] =
Seq("io.confluent" % "kafka-avro-serializer" % confluentVersion).map(
_.excludeAll(
ExclusionRule(organization = "org.codehaus.jackson"),
ExclusionRule(organization = "com.fasterxml.jackson.core"),
ExclusionRule(organization = "org.apache.kafka", name = "kafka-clients")
ExclusionRule(organization = "org.apache.kafka")
)
)

val kafkaSchemaRegistry: Seq[ModuleID] = Seq("io.confluent" % "kafka-schema-registry-client" % confluentVersion).map(
_.excludeAll(
ExclusionRule(organization = "org.scala-lang.modules"),
ExclusionRule(organization = "org.apache.kafka", "kafka-clients"),
ExclusionRule(organization = "com.fasterxml.jackson.module"),
ExclusionRule(organization = "org.scala-lang.modules"),
ExclusionRule(organization = "com.typesafe.scala-logging")
)
)

val awsMskIamAuth = Seq("software.amazon.msk" % "aws-msk-iam-auth" % "1.1.4")

val awsSdk = Seq(
Expand Down Expand Up @@ -175,9 +187,16 @@ object Dependencies {
val junit = "junit" % "junit" % "4.13.1" % module

val embeddedKafka =
"io.github.embeddedkafka" %% "embedded-kafka-schema-registry" % confluentVersion % module excludeAll(
ExclusionRule("io.github.embeddedkafka"), ExclusionRule("org.apache.zookeeper"), ExclusionRule("org.apache.kafka")
)
"io.github.embeddedkafka" %% "embedded-kafka-schema-registry" % confluentVersion % module
//excludeAll(
// ExclusionRule("io.github.embeddedkafka"),
// ExclusionRule("io.confluent", "kafka-schema-registry-client")
// )
// "io.github.embeddedkafka" %% "embedded-kafka-schema-registry" % confluentVersion % module
// excludeAll(
//// ExclusionRule("io.github.embeddedkafka"),
// ExclusionRule("org.apache.zookeeper"), ExclusionRule("org.apache.kafka")
// )

val scalatestEmbeddedRedis = "com.github.sebruck" %% "scalatest-embedded-redis" % scalaTestEmbeddedRedisVersion % module

Expand Down Expand Up @@ -210,21 +229,23 @@ object Dependencies {
akka ++ Seq(avro, ciris, refined, enumeratum) ++ cats ++ logging ++ joda ++ testDeps ++ kafkaClients ++ awsMskIamAuth

val avroDeps: Seq[ModuleID] =
baseDeps ++ confluent ++ jackson ++ guavacache ++ catsEffect ++ redisCache
baseDeps ++ kafkaAvroSerializer ++ jackson ++ guavacache ++ catsEffect ++ redisCache

val coreDeps: Seq[ModuleID] = akka ++ baseDeps ++
Seq(
reflections,
retry
) ++ guavacache ++ confluent ++ kamon ++ redisCache
) ++ guavacache ++ kafkaAvroSerializer ++ kamon ++ redisCache

val ingestDeps: Seq[ModuleID] = coreDeps ++ akkaHttpHal ++ embeddedKafka ++ Seq(sprayJson)

val kafkaDeps: Seq[ModuleID] = coreDeps ++ Seq(
akkaKafkaStream,
refined
) ++ kafka ++ akkaHttpHal ++ vulcan ++ fs2Kafka ++ integrationDeps
) ++ kafka ++ akkaHttpHal ++ vulcan ++ fs2Kafka ++ integrationDeps ++ kafkaSchemaRegistry

val awsAuthDeps: Seq[ModuleID] = awsSdk

val kafkaSchemaRegistryDep = kafkaSchemaRegistry

}
2 changes: 2 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ logLevel := Level.Warn

resolvers += Classpaths.sbtPluginReleases

//addSbtPlugin("io.get-coursier" % "sbt-coursier" % "2.0.8")
//addSbtPlugin("ch.epfl.scala" % "sbt-missinglink" % "0.3.1")
addDependencyTreePlugin
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.2.2-RC2")
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.9.0")
Expand Down

0 comments on commit add0da3

Please sign in to comment.