Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add seekToBeginning option, reuse instances of kafka (de)serializers #179

Merged
merged 3 commits into from
Apr 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ lazy val warnUnusedImport = Seq(
lazy val sharedSettings = warnUnusedImport ++ Seq(
organization := "io.monix",
scalaVersion := "2.12.10",
crossScalaVersions := Seq("2.11.12", "2.12.10", "2.13.0"),
crossScalaVersions := Seq("2.11.12", "2.12.10", "2.13.1"),

scalacOptions ++= Seq(
// warnings
Expand Down
5 changes: 3 additions & 2 deletions kafka-0.10.x/src/main/resources/monix/kafka/default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ kafka {
# Number of requests that KafkaProducerSink
# can push in parallel
monix.producer.sink.parallelism = 100
# Triggers a seekToEnd when the observable starts
monix.observable.seekEnd.onStart = false
# Triggers either seekToEnd or seektoBeginning when the observable starts
# Possible values: end, beginning, no-seek
monix.observable.seek.onStart = "no-seek"
# Possible values: sync, async
monix.observable.commit.type = "sync"
# Possible values: before-ack, after-ack or no-ack
Expand Down
1 change: 1 addition & 0 deletions kafka-0.10.x/src/main/scala/monix/kafka/Commit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ private[kafka] object Commit {

val empty: Commit = new Commit {
override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit

override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] =
Task.unit
}
Expand Down
3 changes: 2 additions & 1 deletion kafka-0.10.x/src/main/scala/monix/kafka/Deserializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ object Deserializer {
implicit def fromKafkaDeserializer[A](implicit des: KafkaDeserializer[A]): Deserializer[A] =
Deserializer[A](
className = des.getClass.getName,
classType = des.getClass
classType = des.getClass,
constructor = _ => des
)

/** Alias for the function that provides an instance of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ final case class KafkaConsumerConfig(
retryBackoffTime: FiniteDuration,
observableCommitType: ObservableCommitType,
observableCommitOrder: ObservableCommitOrder,
observableSeekToEndOnStart: Boolean,
observableSeekOnStart: ObservableSeekOnStart,
properties: Map[String, String]) {

def toMap: Map[String, String] = properties ++ Map(
Expand Down Expand Up @@ -428,7 +428,7 @@ object KafkaConsumerConfig {
retryBackoffTime = config.getInt("retry.backoff.ms").millis,
observableCommitType = ObservableCommitType(config.getString("monix.observable.commit.type")),
observableCommitOrder = ObservableCommitOrder(config.getString("monix.observable.commit.order")),
observableSeekToEndOnStart = config.getBoolean("monix.observable.seekEnd.onStart"),
observableSeekOnStart = ObservableSeekOnStart(config.getString("monix.observable.seek.onStart")),
properties = Map.empty
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] {
implicit val s = scheduler
val feedTask = consumer.flatMap { c =>
// Skipping all available messages on all partitions
if (config.observableSeekToEndOnStart) c.seekToEnd(Nil.asJavaCollection)
if (config.observableSeekOnStart.isSeekEnd) c.seekToEnd(Nil.asJavaCollection)
else if (config.observableSeekOnStart.isSeekBeginning) c.seekToBeginning(Nil.asJavaCollection)
// A task to execute on both cancellation and normal termination
val onCancel = cancelTask(c)
runLoop(c, out).guarantee(onCancel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] (
private val pollTimeoutMillis = config.fetchMaxWaitTime.toMillis

case class CommitWithConsumer(consumer: KafkaConsumer[K, V]) extends Commit {

override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] =
Task(blocking(consumer.synchronized(consumer.commitSync(batch.map {
case (k, v) => k -> new OffsetAndMetadata(v)
}.asJava))))

override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] =
Task {
blocking(consumer.synchronized(consumer.commitAsync(batch.map {
Expand Down
28 changes: 16 additions & 12 deletions kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducerSink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,17 @@ import scala.util.{Failure, Success}
* - An error with `Task.raiseError` which will finish the stream with an error.
*/
final class KafkaProducerSink[K, V] private (
producer: Coeval[KafkaProducer[K, V]],
shouldTerminate: Boolean,
parallelism: Int,
onSendError: Throwable => Task[Ack])
extends Consumer[Seq[ProducerRecord[K, V]], Unit] with StrictLogging with Serializable {
producer: Coeval[KafkaProducer[K, V]],
shouldTerminate: Boolean,
parallelism: Int,
onSendError: Throwable => Task[Ack])
extends Consumer[Seq[ProducerRecord[K, V]], Unit] with StrictLogging with Serializable {

require(parallelism >= 1, "parallelism >= 1")

def createSubscriber(
cb: Callback[Throwable, Unit],
s: Scheduler): (Subscriber[Seq[ProducerRecord[K, V]]], AssignableCancelable.Multi) = {
cb: Callback[Throwable, Unit],
s: Scheduler): (Subscriber[Seq[ProducerRecord[K, V]]], AssignableCancelable.Multi) = {
val out = new Subscriber[Seq[ProducerRecord[K, V]]] { self =>
implicit val scheduler: Scheduler = s
private[this] val p = producer.memoize
Expand Down Expand Up @@ -102,10 +102,11 @@ final class KafkaProducerSink[K, V] private (

object KafkaProducerSink extends StrictLogging {

private[this] val onSendErrorDefault = (ex: Throwable) => Task {
logger.error("Unexpected error in KafkaProducerSink", ex)
Continue
}
private[this] val onSendErrorDefault = (ex: Throwable) =>
Task {
logger.error("Unexpected error in KafkaProducerSink", ex)
Continue
}

/** Builder for [[KafkaProducerSink]]. */
def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)(
Expand All @@ -126,6 +127,9 @@ object KafkaProducerSink extends StrictLogging {
apply(producer, parallelism, onSendErrorDefault)

/** Builder for [[KafkaProducerSink]]. */
def apply[K, V](producer: Coeval[KafkaProducer[K, V]], parallelism: Int, onSendError: Throwable => Task[Ack]): KafkaProducerSink[K, V] =
def apply[K, V](
producer: Coeval[KafkaProducer[K, V]],
parallelism: Int,
onSendError: Throwable => Task[Ack]): KafkaProducerSink[K, V] =
new KafkaProducerSink(producer, shouldTerminate = false, parallelism = parallelism, onSendError)
}
3 changes: 2 additions & 1 deletion kafka-0.10.x/src/main/scala/monix/kafka/Serializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ object Serializer {
implicit def fromKafkaSerializer[A](implicit ser: KafkaSerializer[A]): Serializer[A] =
Serializer[A](
className = ser.getClass.getName,
classType = ser.getClass
classType = ser.getClass,
constructor = _ => ser
)

/** Alias for the function that provides an instance of
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package monix.kafka.config

import com.typesafe.config.ConfigException.BadValue

/** Specifies whether to call `seekToEnd` or `seekToBeginning` when starting
* [[monix.kafka.KafkaConsumerObservable KafkaConsumerObservable]]
*
* Available options:
*
* - [[ObservableSeekOnStart.End]]
* - [[ObservableSeekOnStart.Beginning]]
* - [[ObservableSeekOnStart.NoSeek]]
*/
sealed trait ObservableSeekOnStart extends Serializable {
def id: String

def isSeekBeginning: Boolean =
this match {
case ObservableSeekOnStart.Beginning => true
case _ => false
}

def isSeekEnd: Boolean =
this match {
case ObservableSeekOnStart.End => true
case _ => false
}
}

object ObservableSeekOnStart {

@throws(classOf[BadValue])
def apply(id: String): ObservableSeekOnStart =
id match {
case End.id => End
case Beginning.id => Beginning
case NoSeek.id => NoSeek
case _ =>
throw new BadValue("kafka.monix.observable.seek.onStart", s"Invalid value: $id")
}

/** Calls `consumer.seekToEnd()` when starting consumer.
*/
case object End extends ObservableSeekOnStart {
val id = "end"
}

/** Calls `consumer.seekToBeginning()` when starting consumer.
*/
case object Beginning extends ObservableSeekOnStart {
val id = "beginning"
}

/** Does not call neither `consumer.seekToEnd()` nor `consumer.seekToBeginning`
* when starting consumer.
*/
case object NoSeek extends ObservableSeekOnStart {
val id = "no-seek"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaChe

val commitCallbacks: List[Commit] = List.fill(4)(new Commit {
override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit

override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] =
Task.unit
})
Expand All @@ -33,9 +34,7 @@ class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaChe
val partitions = offsets.map(_.topicPartition)
val received: List[CommittableOffsetBatch] = CommittableOffsetBatch.mergeByCommitCallback(offsets)

received.foreach { batch =>
partitions should contain allElementsOf batch.offsets.keys
}
received.foreach { batch => partitions should contain allElementsOf batch.offsets.keys }

received.size should be <= 4
}
Expand Down Expand Up @@ -64,9 +63,7 @@ class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaChe
.mergeMap(i => createConsumer(i.toInt, topicName).take(500))
.bufferTumbling(2000)
.map(CommittableOffsetBatch.mergeByCommitCallback)
.map { offsetBatches =>
assert(offsetBatches.length == 4)
}
.map { offsetBatches => assert(offsetBatches.length == 4) }
.completedL

Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,7 @@ class MonixKafkaTopicListTest extends FunSuite with KafkaTestKit {
val listT = consumer
.executeOn(io)
.bufferTumbling(count)
.map { messages =>
messages.map(_.record.value()) -> CommittableOffsetBatch(messages.map(_.committableOffset))
}
.map { messages => messages.map(_.record.value()) -> CommittableOffsetBatch(messages.map(_.committableOffset)) }
.mapEval { case (values, batch) => Task.shift *> batch.commitSync().map(_ => values -> batch.offsets) }
.headL

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class AFailingSerializer extends ASerializer {
}

class AHalfFailingSerializer extends ASerializer {

override def serialize(topic: String, data: A): Array[Byte] = {
if (data.value.toInt % 2 == 0) super.serialize(topic, data)
else throw new RuntimeException("fail")
Expand Down
5 changes: 3 additions & 2 deletions kafka-0.11.x/src/main/resources/monix/kafka/default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ kafka {
# Number of requests that KafkaProducerSink
# can push in parallel
monix.producer.sink.parallelism = 100
# Triggers a seekToEnd when the observable starts
monix.observable.seekEnd.onStart = false
# Triggers either seekToEnd or seektoBeginning when the observable starts
# Possible values: end, beginning, no-seek
monix.observable.seek.onStart = "no-seek"
# Possible values: sync, async
monix.observable.commit.type = "sync"
# Possible values: before-ack, after-ack or no-ack
Expand Down
1 change: 1 addition & 0 deletions kafka-0.11.x/src/main/scala/monix/kafka/Commit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ private[kafka] object Commit {

val empty: Commit = new Commit {
override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit

override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] =
Task.unit
}
Expand Down
3 changes: 2 additions & 1 deletion kafka-0.11.x/src/main/scala/monix/kafka/Deserializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ object Deserializer {
implicit def fromKafkaDeserializer[A](implicit des: KafkaDeserializer[A]): Deserializer[A] =
Deserializer[A](
className = des.getClass.getName,
classType = des.getClass
classType = des.getClass,
constructor = _ => des
)

/** Alias for the function that provides an instance of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ final case class KafkaConsumerConfig(
retryBackoffTime: FiniteDuration,
observableCommitType: ObservableCommitType,
observableCommitOrder: ObservableCommitOrder,
observableSeekToEndOnStart: Boolean,
observableSeekOnStart: ObservableSeekOnStart,
properties: Map[String, String]) {

def toMap: Map[String, String] = properties ++ Map(
Expand Down Expand Up @@ -435,7 +435,7 @@ object KafkaConsumerConfig {
retryBackoffTime = config.getInt("retry.backoff.ms").millis,
observableCommitType = ObservableCommitType(config.getString("monix.observable.commit.type")),
observableCommitOrder = ObservableCommitOrder(config.getString("monix.observable.commit.order")),
observableSeekToEndOnStart = config.getBoolean("monix.observable.seekEnd.onStart"),
observableSeekOnStart = ObservableSeekOnStart(config.getString("monix.observable.seek.onStart")),
properties = Map.empty
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] {
implicit val s = scheduler
val feedTask = consumer.flatMap { c =>
// Skipping all available messages on all partitions
if (config.observableSeekToEndOnStart) c.seekToEnd(Nil.asJavaCollection)
if (config.observableSeekOnStart.isSeekEnd) c.seekToEnd(Nil.asJavaCollection)
else if (config.observableSeekOnStart.isSeekBeginning) c.seekToBeginning(Nil.asJavaCollection)
// A task to execute on both cancellation and normal termination
val onCancel = cancelTask(c)
runLoop(c, out).guarantee(onCancel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] (
private val pollTimeoutMillis = config.fetchMaxWaitTime.toMillis

case class CommitWithConsumer(consumer: KafkaConsumer[K, V]) extends Commit {

override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] =
Task(blocking(consumer.synchronized(consumer.commitSync(batch.map {
case (k, v) => k -> new OffsetAndMetadata(v)
}.asJava))))

override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] =
Task {
blocking(consumer.synchronized(consumer.commitAsync(batch.map {
Expand Down
28 changes: 16 additions & 12 deletions kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducerSink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,17 @@ import scala.util.{Failure, Success}
* - An error with `Task.raiseError` which will finish the stream with an error.
*/
final class KafkaProducerSink[K, V] private (
producer: Coeval[KafkaProducer[K, V]],
shouldTerminate: Boolean,
parallelism: Int,
onSendError: Throwable => Task[Ack])
extends Consumer[Seq[ProducerRecord[K, V]], Unit] with StrictLogging with Serializable {
producer: Coeval[KafkaProducer[K, V]],
shouldTerminate: Boolean,
parallelism: Int,
onSendError: Throwable => Task[Ack])
extends Consumer[Seq[ProducerRecord[K, V]], Unit] with StrictLogging with Serializable {

require(parallelism >= 1, "parallelism >= 1")

def createSubscriber(
cb: Callback[Throwable, Unit],
s: Scheduler): (Subscriber[Seq[ProducerRecord[K, V]]], AssignableCancelable.Multi) = {
cb: Callback[Throwable, Unit],
s: Scheduler): (Subscriber[Seq[ProducerRecord[K, V]]], AssignableCancelable.Multi) = {
val out = new Subscriber[Seq[ProducerRecord[K, V]]] { self =>
implicit val scheduler: Scheduler = s
private[this] val p = producer.memoize
Expand Down Expand Up @@ -102,10 +102,11 @@ final class KafkaProducerSink[K, V] private (

object KafkaProducerSink extends StrictLogging {

private[this] val onSendErrorDefault = (ex: Throwable) => Task {
logger.error("Unexpected error in KafkaProducerSink", ex)
Continue
}
private[this] val onSendErrorDefault = (ex: Throwable) =>
Task {
logger.error("Unexpected error in KafkaProducerSink", ex)
Continue
}

/** Builder for [[KafkaProducerSink]]. */
def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)(
Expand All @@ -126,6 +127,9 @@ object KafkaProducerSink extends StrictLogging {
apply(producer, parallelism, onSendErrorDefault)

/** Builder for [[KafkaProducerSink]]. */
def apply[K, V](producer: Coeval[KafkaProducer[K, V]], parallelism: Int, onSendError: Throwable => Task[Ack]): KafkaProducerSink[K, V] =
def apply[K, V](
producer: Coeval[KafkaProducer[K, V]],
parallelism: Int,
onSendError: Throwable => Task[Ack]): KafkaProducerSink[K, V] =
new KafkaProducerSink(producer, shouldTerminate = false, parallelism = parallelism, onSendError)
}
3 changes: 2 additions & 1 deletion kafka-0.11.x/src/main/scala/monix/kafka/Serializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ object Serializer {
implicit def fromKafkaSerializer[A](implicit ser: KafkaSerializer[A]): Serializer[A] =
Serializer[A](
className = ser.getClass.getName,
classType = ser.getClass
classType = ser.getClass,
constructor = _ => ser
)

/** Alias for the function that provides an instance of
Expand Down
Loading