Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new transactional producer #130

Merged
merged 28 commits into from
May 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
7cae7b9
Add consumer setting for read isolation level.
danxmoran May 8, 2019
76360a3
Add producer settings for transaction configuration.
danxmoran May 9, 2019
f13ba67
Add group ID to CommittableOffset.
danxmoran May 9, 2019
2f2b81c
Add TransactionalProducerMessage class.
danxmoran May 9, 2019
5cd0888
Add TransactionalKafkaProducer.
danxmoran May 9, 2019
90622bc
Scalafmt.
danxmoran May 9, 2019
9868758
Change to use IsolationLevel coproduct
May 9, 2019
57b5045
Remove ConsumerSettings#groupId
May 9, 2019
e121aaa
Fix docs for ProducerSettings#withTransactionTimeout
May 9, 2019
c76c532
Fix test and scalafmt.
danxmoran May 10, 2019
2af6089
Track consumer group IDs in CommittableOffsetBatch.
danxmoran May 10, 2019
9bb30f7
Don't force a 1:1 match between record and offset.
danxmoran May 10, 2019
404ff20
Update cats-effect to 1.3.0
May 12, 2019
cfbaa82
Add internal ByteProducer alias
May 12, 2019
dfe89da
Change to use CommittableProducerRecords
May 12, 2019
7542af6
Add missing trailing comma
May 12, 2019
c0daf56
Formatting
May 12, 2019
5a33f73
Remove unused group ID tracking from offset batches.
danxmoran May 12, 2019
754e4a7
Add docs to CommittableProducerRecords.
danxmoran May 12, 2019
c23ccd7
Add docs to TransactionalProducerMessage.
danxmoran May 12, 2019
a133308
Add test for TransactionalProducerMessage with zero records.
danxmoran May 12, 2019
88d1784
Change to not require same type constructors
May 13, 2019
d2a889c
Add functions for explicitly specifying effect type
May 14, 2019
c570cfc
Change to cleanup CommittableOffset
May 14, 2019
a74f246
Change to remove duplication across producers
May 14, 2019
7d0c1f1
Change TransactionalProducerMessage to use Chunk
May 14, 2019
ce8c0d7
Change TransactionalProducerMessage to use Chunk
May 14, 2019
720ce88
Fix compilation on scala 2.11
May 14, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ lazy val docs = project
lazy val dependencySettings = Seq(
libraryDependencies ++= Seq(
"co.fs2" %% "fs2-core" % fs2Version,
"org.typelevel" %% "cats-effect" % "1.3.0",
"org.apache.kafka" % "kafka-clients" % kafkaVersion
),
libraryDependencies ++= Seq(
Expand Down
24 changes: 22 additions & 2 deletions src/main/scala/fs2/kafka/CommittableOffset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package fs2.kafka

import cats.instances.string._
import cats.Show
import cats.syntax.show._
import fs2.kafka.internal.instances._
Expand Down Expand Up @@ -47,6 +48,15 @@ sealed abstract class CommittableOffset[F[_]] {
*/
def offsetAndMetadata: OffsetAndMetadata

/**
* The consumer group ID of the consumer that fetched the
* [[offsetAndMetadata]] from the [[topicPartition]] from
* Kafka.<br>
* <br>
* Required for committing messages within a transaction.
*/
def consumerGroupId: Option[String]

/**
* The [[topicPartition]] and [[offsetAndMetadata]] as a `Map`.
* This is provided for convenience and is always guaranteed to
Expand Down Expand Up @@ -90,10 +100,12 @@ object CommittableOffset {
def apply[F[_]](
topicPartition: TopicPartition,
offsetAndMetadata: OffsetAndMetadata,
consumerGroupId: Option[String],
commit: Map[TopicPartition, OffsetAndMetadata] => F[Unit]
): CommittableOffset[F] = {
val _topicPartition = topicPartition
val _offsetAndMetadata = offsetAndMetadata
val _consumerGroupId = consumerGroupId
val _commit = commit

new CommittableOffset[F] {
Expand All @@ -103,6 +115,9 @@ object CommittableOffset {
override val offsetAndMetadata: OffsetAndMetadata =
_offsetAndMetadata

override val consumerGroupId: Option[String] =
_consumerGroupId

override def offsets: Map[TopicPartition, OffsetAndMetadata] =
Map(_topicPartition -> _offsetAndMetadata)

Expand All @@ -116,10 +131,15 @@ object CommittableOffset {
_commit

override def toString: String =
Show[CommittableOffset[F]].show(this)
consumerGroupId match {
case Some(consumerGroupId) =>
show"CommittableOffset($topicPartition -> $offsetAndMetadata, $consumerGroupId)"
case None =>
show"CommittableOffset($topicPartition -> $offsetAndMetadata)"
}
}
}

implicit def committableOffsetShow[F[_]]: Show[CommittableOffset[F]] =
Show.show(co => show"CommittableOffset(${co.topicPartition} -> ${co.offsetAndMetadata})")
Show.fromToString
}
76 changes: 76 additions & 0 deletions src/main/scala/fs2/kafka/CommittableProducerRecords.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2018-2019 OVO Energy Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fs2.kafka

import cats.Foldable

/**
* [[CommittableProducerRecords]] represents zero or more [[ProducerRecord]]s
* and a [[CommittableOffset]], used by [[TransactionalKafkaProducer]] to
* produce the records and commit the offset atomically.<br>
* <br>
* [[CommittableProducerRecords]]s can be created using one of the following options.<br>
* <br>
* - `CommittableProducerRecords#apply` to produce zero or more records
* within the same transaction as the offset is committed.<br>
* - `CommittableProducerRecords#one` to produce exactly one record within
* the same transaction as the offset is committed.
*/
sealed abstract class CommittableProducerRecords[F[_], G[+ _], +K, +V] {

/** The records to produce. Can be empty to simply commit the offset. */
def records: G[ProducerRecord[K, V]]

/** The offset to commit. */
def committableOffset: CommittableOffset[F]

/** The `Foldable` instance for `G[_]`. Required by [[TransactionalKafkaProducer]]. */
def foldable: Foldable[G]
}

object CommittableProducerRecords {
private[this] final class CommittableProducerRecordsImpl[F[_], G[+ _], +K, +V](
override val records: G[ProducerRecord[K, V]],
override val committableOffset: CommittableOffset[F],
override val foldable: Foldable[G]
) extends CommittableProducerRecords[F, G, K, V] {
override def toString: String =
s"CommittableProducerRecords($records, $committableOffset)"
}

/**
* Creates a new [[CommittableProducerRecords]] for producing zero or
* more [[ProducerRecord]]s and committing an offset atomically within
* a transaction.
*/
def apply[F[_], G[+ _], K, V](
records: G[ProducerRecord[K, V]],
committableOffset: CommittableOffset[F]
)(implicit G: Foldable[G]): CommittableProducerRecords[F, G, K, V] =
new CommittableProducerRecordsImpl(records, committableOffset, G)

/**
* Creates a new [[CommittableProducerRecords]] for producing exactly
* one [[ProducerRecord]] and committing an offset atomically within
* a transaction.
*/
def one[F[_], K, V](
record: ProducerRecord[K, V],
committableOffset: CommittableOffset[F]
): CommittableProducerRecords[F, Id, K, V] =
apply[F, Id, K, V](record, committableOffset)
}
41 changes: 41 additions & 0 deletions src/main/scala/fs2/kafka/ConsumerGroupException.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2018-2019 OVO Energy Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fs2.kafka

import org.apache.kafka.common.KafkaException

/**
* [[ConsumerGroupException]] indicates that one of the two following
* conditions occurred before records were produced transactionally
* with the [[TransactionalKafkaProducer]].<br>
* <br>
* - There were [[CommittableOffset]]s without a consumer group ID.<br>
* - There were [[CommittableOffset]]s for multiple consumer group IDs.
*/
sealed abstract class ConsumerGroupException(groupIds: Set[String])
extends KafkaException({
val groupIdsString = groupIds.toList.sorted.mkString(", ")
s"multiple or missing consumer group ids in transaction [$groupIdsString]"
})

private[kafka] object ConsumerGroupException {
def apply(groupIds: Set[String]): ConsumerGroupException =
new ConsumerGroupException(groupIds) {
override def toString: String =
s"fs2.kafka.ConsumerGroupException: $getMessage"
}
}
21 changes: 21 additions & 0 deletions src/main/scala/fs2/kafka/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,18 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
*/
def withDefaultApiTimeout(defaultApiTimeout: FiniteDuration): ConsumerSettings[F, K, V]

/**
* Returns a new [[ConsumerSettings]] instance with the specified
* isolation level. This is equivalent to setting the following
* property using the [[withProperty]] function, except you can
* specify it with an [[IsolationLevel]] instead of a `String`.
*
* {{{
* ConsumerConfig.ISOLATION_LEVEL_CONFIG
* }}}
*/
def withIsolationLevel(isolationLevel: IsolationLevel): ConsumerSettings[F, K, V]

/**
* Includes a property with the specified `key` and `value`.
* The key should be one of the keys in `ConsumerConfig`,
Expand Down Expand Up @@ -464,6 +476,15 @@ object ConsumerSettings {
defaultApiTimeout.toMillis.toString
)

override def withIsolationLevel(isolationLevel: IsolationLevel): ConsumerSettings[F, K, V] =
withProperty(
ConsumerConfig.ISOLATION_LEVEL_CONFIG,
isolationLevel match {
case IsolationLevel.ReadCommittedIsolationLevel => "read_committed"
case IsolationLevel.ReadUncommittedIsolationLevel => "read_uncommitted"
}
)

override def withProperty(key: String, value: String): ConsumerSettings[F, K, V] =
copy(properties = properties.updated(key, value))

Expand Down
46 changes: 46 additions & 0 deletions src/main/scala/fs2/kafka/IsolationLevel.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2018-2019 OVO Energy Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fs2.kafka

/**
* The available options for [[ConsumerSettings#withIsolationLevel]].<br>
* <br>
* Available options include:<br>
* - [[IsolationLevel#ReadCommitted]] to only read committed records,<br>
* - [[IsolationLevel#ReadUncommitted]] to also read uncommitted records.
*/
sealed abstract class IsolationLevel

object IsolationLevel {
private[kafka] case object ReadCommittedIsolationLevel extends IsolationLevel {
override def toString: String = "ReadCommitted"
}

private[kafka] case object ReadUncommittedIsolationLevel extends IsolationLevel {
override def toString: String = "ReadUncommitted"
}

/**
* Option to only read committed records.
*/
val ReadCommitted: IsolationLevel = ReadCommittedIsolationLevel

/**
* Option to read both committed and uncommitted records.
*/
val ReadUncommitted: IsolationLevel = ReadUncommittedIsolationLevel
}
2 changes: 1 addition & 1 deletion src/main/scala/fs2/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ private[kafka] object KafkaConsumer {
polls <- Resource.liftF(Queue.bounded[F, Request[F, K, V]](1))
ref <- Resource.liftF(Ref.of[F, State[F, K, V]](State.empty))
streamId <- Resource.liftF(Ref.of[F, Int](0))
withConsumer <- WithConsumer.of(settings)
withConsumer <- WithConsumer(settings)
actor = new KafkaConsumerActor(settings, ref, requests, withConsumer)
actor <- startConsumerActor(requests, polls, actor)
polls <- startPollScheduler(polls, settings.pollInterval)
Expand Down
Loading