Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threadsafe transactions #892

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ object TransactionalKafkaProducer {
(
Resource.eval(settings.producerSettings.keySerializer),
Resource.eval(settings.producerSettings.valueSerializer),
WithProducer(mk, settings)
WithTransactionalProducer(mk, settings)
).mapN { (keySerializer, valueSerializer, withProducer) =>
new TransactionalKafkaProducer.WithoutOffsets[F, K, V] {
override def produce[P](
Expand Down Expand Up @@ -139,7 +139,7 @@ object TransactionalKafkaProducer {
if (records.isEmpty) F.pure(Chunk.empty)
else {

withProducer { (producer, blocking) =>
withProducer.exclusiveAccess { (producer, blocking) =>
blocking(producer.beginTransaction())
.bracketCase { _ =>
val produce = records
Expand Down
28 changes: 2 additions & 26 deletions modules/core/src/main/scala/fs2/kafka/internal/WithProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@

package fs2.kafka.internal

import fs2.kafka.producer.MkProducer
import cats.effect.{Async, Resource}
import cats.syntax.all._
import fs2.kafka.{KafkaByteProducer, ProducerSettings, TransactionalProducerSettings}
import fs2.kafka.internal.syntax._
import fs2.kafka.producer.MkProducer
import fs2.kafka.{KafkaByteProducer, ProducerSettings}

private[kafka] sealed abstract class WithProducer[F[_]] {
def apply[A](f: (KafkaByteProducer, Blocking[F]) => F[A]): F[A]
Expand Down Expand Up @@ -40,29 +39,6 @@ private[kafka] object WithProducer {
.map(create(_, blockingG))
}

def apply[F[_], K, V](
mk: MkProducer[F],
settings: TransactionalProducerSettings[F, K, V]
)(
implicit F: Async[F]
): Resource[F, WithProducer[F]] =
Resource[F, WithProducer[F]] {
mk(settings.producerSettings).flatMap { producer =>
val blocking = settings.producerSettings.customBlockingContext
.fold(Blocking.fromSync[F])(Blocking.fromExecutionContext)

val withProducer = create(producer, blocking)

val initTransactions = withProducer.blocking { _.initTransactions() }

val close = withProducer.blocking {
_.close(settings.producerSettings.closeTimeout.asJava)
}

initTransactions.as((withProducer, close))
}
}

private def create[F[_]](
producer: KafkaByteProducer,
_blocking: Blocking[F]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2018-2022 OVO Energy Limited
*
* SPDX-License-Identifier: Apache-2.0
*/

package fs2.kafka.internal

import cats.effect.std.Semaphore
import cats.effect.{Async, MonadCancelThrow, Resource}
import cats.implicits._
import fs2.kafka.internal.syntax._
import fs2.kafka.producer.MkProducer
import fs2.kafka.{KafkaByteProducer, TransactionalProducerSettings}

private[kafka] sealed abstract class WithTransactionalProducer[F[_]] {
def apply[A](f: (KafkaByteProducer, Blocking[F], ExclusiveAccess[F, A]) => F[A]): F[A]

def exclusiveAccess[A](f: (KafkaByteProducer, Blocking[F]) => F[A]): F[A] = apply {
case (producer, blocking, exclusive) => exclusive(f(producer, blocking))
}

def blocking[A](f: KafkaByteProducer => A): F[A] = apply {
case (producer, blocking, _) => blocking(f(producer))
}
}

private[kafka] object WithTransactionalProducer {
def apply[F[_], K, V](
mk: MkProducer[F],
settings: TransactionalProducerSettings[F, K, V]
)(
implicit F: Async[F]
): Resource[F, WithTransactionalProducer[F]] =
Resource[F, WithTransactionalProducer[F]] {
(mk(settings.producerSettings), Semaphore(1)).tupled.flatMap {
case (producer, semaphore) =>
val blocking = settings.producerSettings.customBlockingContext
.fold(Blocking.fromSync[F])(Blocking.fromExecutionContext)

val withProducer = create(producer, blocking, semaphore)

val initTransactions = withProducer.blocking { _.initTransactions() }

/*
Deliberately does not use the exclusive access functionality to close the producer. The close method on
the underlying client waits until the buffer has been flushed to the broker or the timeout is exceeded.
Because the transactional producer _always_ waits until the buffer is flushed and the transaction
committed on the broker before proceeding, upon gaining exclusive access to the producer the buffer will
always be empty. Therefore if we used exclusive access to close the underlying producer, the buffer
would already be empty and the close timeout setting would be redundant.

TLDR: not using exclusive access here preserves the behaviour of the underlying close method and timeout
setting
*/
val close = withProducer.blocking {
_.close(settings.producerSettings.closeTimeout.asJava)
}

initTransactions.as((withProducer, close))
}
}

private def create[F[_]: MonadCancelThrow](
producer: KafkaByteProducer,
_blocking: Blocking[F],
transactionSemaphore: Semaphore[F]
): WithTransactionalProducer[F] = new WithTransactionalProducer[F] {
override def apply[A](
f: (KafkaByteProducer, Blocking[F], ExclusiveAccess[F, A]) => F[A]
): F[A] =
f(producer, _blocking, transactionSemaphore.permit.surround)
}
}
12 changes: 12 additions & 0 deletions modules/core/src/main/scala/fs2/kafka/internal/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright 2018-2022 OVO Energy Limited
*
* SPDX-License-Identifier: Apache-2.0
*/

package fs2.kafka

package object internal {
private[kafka] type ExclusiveAccess[F[_], A] = F[A] => F[A]

}
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,67 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues {
consumed should contain theSameElementsAs records.toList
}

it("should not allow concurrent access to a producer during a transaction") {
withTopic { topic =>
createCustomTopic(topic, partitions = 3)
val toProduce =
Chunk.seq((0 to 1000000).toList.map(n => s"key-$n" -> s"value-$n"))

val result =
(for {
producer <- TransactionalKafkaProducer.stream(
TransactionalProducerSettings(
"id",
producerSettings[IO]
.withRetries(Int.MaxValue)
)
)
recordsToProduce = toProduce.map {
case (key, value) => ProducerRecord(topic, key, value)
}
offsets = toProduce.mapWithIndex {
case (_, i) =>
CommittableOffset[IO](
new TopicPartition(topic, i % 3),
new OffsetAndMetadata(i.toLong),
Some("group"),
_ => IO.unit
)
}
records = TransactionalProducerRecords(
recordsToProduce.zip(offsets).map {
case (record, offset) =>
CommittableProducerRecords.one(
record,
offset
)
}
)
_ <- Stream
.eval(producer.produce(records))
.concurrently(
Stream.eval(
producer.produce(
TransactionalProducerRecords.one(
CommittableProducerRecords.one(
ProducerRecord[String, String](topic, "test", "test"),
CommittableOffset[IO](
new TopicPartition(topic, 0),
new OffsetAndMetadata(0),
Some("group"),
_ => IO.unit
)
)
)
)
)
)
} yield ()).compile.lastOrError.attempt.unsafeRunSync()

assert(result == Right(()))
}
}

it("should abort transactions if committing offsets fails") {
withTopic { topic =>
createCustomTopic(topic, partitions = 3)
Expand Down