From 6bea9d7cf90275a2d9ef8f6bc1cad30974573bac Mon Sep 17 00:00:00 2001 From: charlibot Date: Fri, 10 May 2019 13:29:23 +0200 Subject: [PATCH 1/2] With transactional partitioned source Co-authored-by: Sean Glover --- .travis.yml | 9 +- build.sbt | 3 + ...0-partitioned-transactions-source.excludes | 5 + .../scala/akka/kafka/ConsumerMessage.scala | 6 +- .../kafka/internal/CommittableSources.scala | 66 ++- .../CommittingProducerSinkStage.scala | 2 +- .../kafka/internal/DefaultProducerStage.scala | 22 +- .../kafka/internal/DeferredProducer.scala | 35 +- .../akka/kafka/internal/MessageBuilder.scala | 8 +- .../akka/kafka/internal/PlainSources.scala | 34 +- .../akka/kafka/internal/SubSourceLogic.scala | 303 +++++++++----- .../internal/TransactionalProducerStage.scala | 80 +++- ...ource.scala => TransactionalSources.scala} | 198 ++++++++- .../akka/kafka/javadsl/Transactional.scala | 28 +- .../akka/kafka/scaladsl/Transactional.scala | 44 +- docs/src/main/paradox/transactions.md | 46 +++ .../kafka/testkit/internal/KafkaTestKit.scala | 4 +- tests/src/it/resources/logback-test.xml | 2 +- .../TransactionsPartitionedSourceSpec.scala | 127 ++++++ .../akka/kafka/TransactionsSourceSpec.scala | 189 +++++++++ .../docs/javadsl/TransactionsExampleTest.java | 50 +++ tests/src/test/resources/application.conf | 3 + tests/src/test/resources/logback-test.xml | 2 +- .../src/test/scala/akka/kafka/Repeated.scala | 42 ++ .../scala/akka/kafka/TransactionsOps.scala | 212 ++++++++++ .../akka/kafka/internal/ProducerSpec.scala | 43 +- .../akka/kafka/scaladsl/IntegrationSpec.scala | 86 ++++ .../scaladsl/PartitionedSourcesSpec.scala | 4 - .../kafka/scaladsl/TransactionsSpec.scala | 391 ++++++++---------- .../docs/scaladsl/TransactionsExample.scala | 41 ++ 30 files changed, 1661 insertions(+), 424 deletions(-) create mode 100644 core/src/main/mima-filters/1.1.0.backwards.excludes/PR930-partitioned-transactions-source.excludes rename core/src/main/scala/akka/kafka/internal/{TransactionalSource.scala => TransactionalSources.scala} (54%) create mode 100644 tests/src/it/scala/akka/kafka/TransactionsPartitionedSourceSpec.scala create mode 100644 tests/src/it/scala/akka/kafka/TransactionsSourceSpec.scala create mode 100644 tests/src/test/scala/akka/kafka/Repeated.scala create mode 100644 tests/src/test/scala/akka/kafka/TransactionsOps.scala diff --git a/.travis.yml b/.travis.yml index adcd1ed44..9b47a5e3d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,11 +5,6 @@ services: - docker before_install: - # upgrade to a later docker-compose which supports services.kafka.scale - - sudo rm /usr/local/bin/docker-compose - - curl -L https://github.com/docker/compose/releases/download/1.22.0/docker-compose-`uname -s`-`uname -m` > docker-compose - - chmod +x docker-compose - - sudo mv docker-compose /usr/local/bin # fetch full history for correct current and previous version detection - git fetch --unshallow # using jabba for custom jdk management @@ -80,6 +75,10 @@ jobs: name: "Publish API and reference documentation" stages: + # runs on master commits and PRs + - name: debug + if: NOT tag =~ ^v + # runs on master commits and PRs - name: check if: NOT tag =~ ^v diff --git a/build.sbt b/build.sbt index aa2610e2e..220e0af58 100644 --- a/build.sbt +++ b/build.sbt @@ -183,6 +183,9 @@ lazy val `alpakka-kafka` = | tests/it:test | run integration tests backed by Docker containers | + | tests/testOnly -- -t "A consume-transform-produce cycle must complete in happy-path scenario" + | run a single test with an exact name (use -z for partial match) + | | benchmarks/it:testOnly *.AlpakkaKafkaPlainConsumer | run a single benchmark backed by Docker containers """.stripMargin diff --git a/core/src/main/mima-filters/1.1.0.backwards.excludes/PR930-partitioned-transactions-source.excludes b/core/src/main/mima-filters/1.1.0.backwards.excludes/PR930-partitioned-transactions-source.excludes new file mode 100644 index 000000000..20b29f433 --- /dev/null +++ b/core/src/main/mima-filters/1.1.0.backwards.excludes/PR930-partitioned-transactions-source.excludes @@ -0,0 +1,5 @@ +ProblemFilters.exclude[MissingTypesProblem]("akka.kafka.ConsumerMessage$PartitionOffsetCommittedMarker$") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.ConsumerMessage#PartitionOffsetCommittedMarker.apply") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.ConsumerMessage#PartitionOffsetCommittedMarker.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.ConsumerMessage#PartitionOffset.withCommittedMarker") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.kafka.ConsumerMessage#PartitionOffsetCommittedMarker.unapply") diff --git a/core/src/main/scala/akka/kafka/ConsumerMessage.scala b/core/src/main/scala/akka/kafka/ConsumerMessage.scala index 7957edce9..5af56eecc 100644 --- a/core/src/main/scala/akka/kafka/ConsumerMessage.scala +++ b/core/src/main/scala/akka/kafka/ConsumerMessage.scala @@ -96,9 +96,6 @@ object ConsumerMessage { def withMetadata(metadata: String) = PartitionOffsetMetadata(key, offset, metadata) - @InternalApi private[kafka] def withCommittedMarker(committedMarker: CommittedMarker) = - PartitionOffsetCommittedMarker(key, offset, committedMarker) - override def toString = s"PartitionOffset(key=$key,offset=$offset)" override def equals(other: Any): Boolean = other match { @@ -138,7 +135,8 @@ object ConsumerMessage { @InternalApi private[kafka] final case class PartitionOffsetCommittedMarker( override val key: GroupTopicPartition, override val offset: Long, - private[kafka] val committedMarker: CommittedMarker + private[kafka] val committedMarker: CommittedMarker, + private[kafka] val fromPartitionedSource: Boolean ) extends PartitionOffset(key, offset) /** diff --git a/core/src/main/scala/akka/kafka/internal/CommittableSources.scala b/core/src/main/scala/akka/kafka/internal/CommittableSources.scala index 4ed72fc6c..33f1d2da8 100644 --- a/core/src/main/scala/akka/kafka/internal/CommittableSources.scala +++ b/core/src/main/scala/akka/kafka/internal/CommittableSources.scala @@ -10,11 +10,12 @@ import akka.annotation.InternalApi import akka.kafka.ConsumerMessage.{CommittableMessage, CommittableOffset, CommittableOffsetBatch} import akka.kafka._ import akka.kafka.internal.KafkaConsumerActor.Internal.{Commit, CommitSingle, CommitWithoutReply} +import akka.kafka.internal.SubSourceLogic._ import akka.kafka.scaladsl.Consumer.Control import akka.pattern.AskTimeoutException import akka.stream.SourceShape import akka.stream.scaladsl.Source -import akka.stream.stage.GraphStageLogic +import akka.stream.stage.{AsyncCallback, GraphStageLogic} import akka.util.Timeout import akka.{Done, NotUsed} import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, OffsetAndMetadata} @@ -102,17 +103,34 @@ private[kafka] final class CommittableSubSource[K, V]( ) { override protected def logic( shape: SourceShape[(TopicPartition, Source[CommittableMessage[K, V], NotUsed])] - ): GraphStageLogic with Control = - new SubSourceLogic[K, V, CommittableMessage[K, V]](shape, settings, subscription, getOffsetsOnAssign, onRevoke) - with CommittableMessageBuilder[K, V] - with MetricsControl { - override def metadataFromRecord(record: ConsumerRecord[K, V]): String = _metadataFromRecord(record) - override def groupId: String = settings.properties(ConsumerConfig.GROUP_ID_CONFIG) - lazy val committer: KafkaAsyncConsumerCommitterRef = { - val ec = materializer.executionContext - new KafkaAsyncConsumerCommitterRef(consumerActor, settings.commitTimeout)(ec) - } + ): GraphStageLogic with Control = { + + val factory = new SubSourceStageLogicFactory[K, V, CommittableMessage[K, V]] { + def create( + shape: SourceShape[CommittableMessage[K, V]], + tp: TopicPartition, + consumerActor: ActorRef, + subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)], + subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)], + actorNumber: Int + ): SubSourceStageLogic[K, V, CommittableMessage[K, V]] = + new CommittableSubSourceStageLogic(shape, + tp, + consumerActor, + subSourceStartedCb, + subSourceCancelledCb, + actorNumber, + settings, + _metadataFromRecord) + } + new SubSourceLogic[K, V, CommittableMessage[K, V]](shape, + settings, + subscription, + getOffsetsOnAssign, + onRevoke, + subSourceStageLogicFactory = factory) + } } /** @@ -191,3 +209,29 @@ private[kafka] class KafkaAsyncConsumerCommitterRef(private val consumerActor: A case _ => false } } + +@InternalApi +private class CommittableSubSourceStageLogic[K, V]( + shape: SourceShape[CommittableMessage[K, V]], + tp: TopicPartition, + consumerActor: ActorRef, + subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)], + subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)], + actorNumber: Int, + consumerSettings: ConsumerSettings[K, V], + _metadataFromRecord: ConsumerRecord[K, V] => String = CommittableMessageBuilder.NoMetadataFromRecord +) extends SubSourceStageLogic[K, V, CommittableMessage[K, V]](shape, + tp, + consumerActor, + subSourceStartedCb, + subSourceCancelledCb, + actorNumber) + with CommittableMessageBuilder[K, V] { + + override def metadataFromRecord(record: ConsumerRecord[K, V]): String = _metadataFromRecord(record) + override def groupId: String = consumerSettings.properties(ConsumerConfig.GROUP_ID_CONFIG) + lazy val committer: KafkaAsyncConsumerCommitterRef = { + val ec = materializer.executionContext + new KafkaAsyncConsumerCommitterRef(consumerActor, consumerSettings.commitTimeout)(ec) + } +} diff --git a/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala b/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala index 1de9c77ab..76995ae86 100644 --- a/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala +++ b/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala @@ -73,7 +73,7 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V, // ---- initialization override def preStart(): Unit = { super.preStart() - resolveProducer() + resolveProducer(stage.producerSettings) } /** When the producer is set up, the sink pulls and schedules the first commit. */ diff --git a/core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala b/core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala index 4726ea595..0184f8437 100644 --- a/core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala +++ b/core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala @@ -51,6 +51,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: private lazy val decider: Decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) protected val awaitingConfirmation = new AtomicInteger(0) + private var inIsClosed = false private var completionState: Option[Try[Done]] = None @@ -60,7 +61,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: override def preStart(): Unit = { super.preStart() - resolveProducer() + resolveProducer(stage.settings) } def checkForCompletion(): Unit = @@ -85,11 +86,14 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: failStage(ex) } - def postSend(msg: Envelope[K, V, P]) = () + def filterSend(msg: Envelope[K, V, P]): Boolean = true + + def postSend(msg: Envelope[K, V, P]): Unit = () override protected def producerAssigned(): Unit = resumeDemand() protected def resumeDemand(tryToPull: Boolean = true): Unit = { + log.debug("Resume demand") setHandler(stage.out, new OutHandler { override def onPull(): Unit = tryPull(stage.in) }) @@ -99,21 +103,28 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: } } - protected def suspendDemand(): Unit = + protected def suspendDemand(fromStageLogicConstructor: Boolean = false): Unit = { + // not permitted to access stage logic members from constructor + if (!fromStageLogicConstructor) log.debug("Suspend demand") setHandler( stage.out, new OutHandler { override def onPull(): Unit = () } ) + } // suspend demand until a Producer has been created - suspendDemand() + suspendDemand(fromStageLogicConstructor = true) setHandler( stage.in, new InHandler { - override def onPush(): Unit = produce(grab(stage.in)) + override def onPush(): Unit = { + val msg = grab(stage.in) + if (filterSend(msg)) + produce(msg) + } override def onUpstreamFinish(): Unit = { inIsClosed = true @@ -183,5 +194,4 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: closeProducer() super.postStop() } - } diff --git a/core/src/main/scala/akka/kafka/internal/DeferredProducer.scala b/core/src/main/scala/akka/kafka/internal/DeferredProducer.scala index 24d4a311f..24300dac9 100644 --- a/core/src/main/scala/akka/kafka/internal/DeferredProducer.scala +++ b/core/src/main/scala/akka/kafka/internal/DeferredProducer.scala @@ -15,6 +15,24 @@ import org.apache.kafka.clients.producer.Producer import scala.util.control.NonFatal import scala.util.{Failure, Success} +/** + * INTERNAL API + */ +@InternalApi +private[kafka] object DeferredProducer { + + /** + * The [[ProducerAssignmentLifecycle]] allows us to change track the status of the aynchronous producer assignment + * within the stage. This is useful when we need to manage different behavior during the assignment process. For + * example, in [[TransactionalProducerStageLogic]] we match on the lifecycle when extracting the transactional.id + * of the first message received from a partitioned source. + */ + sealed trait ProducerAssignmentLifecycle + case object Unassigned extends ProducerAssignmentLifecycle + case object AsyncCreateRequestSent extends ProducerAssignmentLifecycle + case object Assigned extends ProducerAssignmentLifecycle +} + /** * INTERNAL API */ @@ -22,8 +40,11 @@ import scala.util.{Failure, Success} private[kafka] trait DeferredProducer[K, V] { self: GraphStageLogic with StageIdLogging => + import DeferredProducer._ + /** The Kafka producer may be created lazily, assigned via `preStart` in `assignProducer`. */ protected var producer: Producer[K, V] = _ + protected var producerAssignmentLifecycle: ProducerAssignmentLifecycle = Unassigned protected def producerSettings: ProducerSettings[K, V] protected def producerAssigned(): Unit @@ -31,11 +52,12 @@ private[kafka] trait DeferredProducer[K, V] { private def assignProducer(p: Producer[K, V]): Unit = { producer = p + changeProducerAssignmentLifecycle(Assigned) producerAssigned() } - final protected def resolveProducer(): Unit = { - val producerFuture = producerSettings.createKafkaProducerAsync()(materializer.executionContext) + final protected def resolveProducer(settings: ProducerSettings[K, V]): Unit = { + val producerFuture = settings.createKafkaProducerAsync()(materializer.executionContext) producerFuture.value match { case Some(Success(p)) => assignProducer(p) case Some(Failure(e)) => failStage(e) @@ -50,9 +72,16 @@ private[kafka] trait DeferredProducer[K, V] { e } )(ExecutionContexts.sameThreadExecutionContext) + changeProducerAssignmentLifecycle(AsyncCreateRequestSent) } } + protected def changeProducerAssignmentLifecycle(state: ProducerAssignmentLifecycle): Unit = { + val oldState = producerAssignmentLifecycle + producerAssignmentLifecycle = state + log.debug("Asynchronous producer assignment lifecycle changed '{} -> {}'", oldState, state) + } + protected def closeProducerImmediately(): Unit = if (producer != null) { // Discard unsent ProducerRecords after encountering a send-failure in ProducerStage @@ -61,7 +90,7 @@ private[kafka] trait DeferredProducer[K, V] { } protected def closeProducer(): Unit = - if (producerSettings.closeProducerOnStop && producer != null) { + if (producerSettings.closeProducerOnStop && producerAssignmentLifecycle == Assigned) { try { // we do not have to check if producer was already closed in send-callback as `flush()` and `close()` are effectively no-ops in this case producer.flush() diff --git a/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala b/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala index 847b19232..dded9150c 100644 --- a/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala +++ b/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala @@ -45,6 +45,8 @@ private[kafka] trait TransactionalMessageBuilderBase[K, V, Msg] extends MessageB def committedMarker: CommittedMarker def onMessage(consumerMessage: ConsumerRecord[K, V]): Unit + + def fromPartitionedSource: Boolean } /** Internal API */ @@ -60,7 +62,8 @@ private[kafka] trait TransactionalMessageBuilder[K, V] partition = rec.partition ), offset = rec.offset, - committedMarker + committedMarker, + fromPartitionedSource ) ConsumerMessage.TransactionalMessage(rec, offset) } @@ -79,7 +82,8 @@ private[kafka] trait TransactionalOffsetContextBuilder[K, V] partition = rec.partition ), offset = rec.offset, - committedMarker + committedMarker, + fromPartitionedSource ) (rec, offset) } diff --git a/core/src/main/scala/akka/kafka/internal/PlainSources.scala b/core/src/main/scala/akka/kafka/internal/PlainSources.scala index 709732d91..51de61643 100644 --- a/core/src/main/scala/akka/kafka/internal/PlainSources.scala +++ b/core/src/main/scala/akka/kafka/internal/PlainSources.scala @@ -4,14 +4,16 @@ */ package akka.kafka.internal + import akka.NotUsed import akka.actor.ActorRef import akka.annotation.InternalApi import akka.kafka.scaladsl.Consumer.Control import akka.kafka.{AutoSubscription, ConsumerSettings, ManualSubscription, Subscription} +import akka.kafka.internal.SubSourceLogic._ import akka.stream.SourceShape import akka.stream.scaladsl.Source -import akka.stream.stage.GraphStageLogic +import akka.stream.stage.{AsyncCallback, GraphStageLogic} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition @@ -51,8 +53,30 @@ private[kafka] final class PlainSubSource[K, V]( ) { override protected def logic( shape: SourceShape[(TopicPartition, Source[ConsumerRecord[K, V], NotUsed])] - ): GraphStageLogic with Control = - new SubSourceLogic[K, V, ConsumerRecord[K, V]](shape, settings, subscription, getOffsetsOnAssign, onRevoke) - with PlainMessageBuilder[K, V] - with MetricsControl + ): GraphStageLogic with Control = { + + val factory = new SubSourceStageLogicFactory[K, V, ConsumerRecord[K, V]] { + def create( + shape: SourceShape[ConsumerRecord[K, V]], + tp: TopicPartition, + consumerActor: ActorRef, + subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)], + subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)], + actorNumber: Int + ): SubSourceStageLogic[K, V, ConsumerRecord[K, V]] = + new SubSourceStageLogic[K, V, ConsumerRecord[K, V]](shape, + tp, + consumerActor, + subSourceStartedCb, + subSourceCancelledCb, + actorNumber) with PlainMessageBuilder[K, V] + } + + new SubSourceLogic[K, V, ConsumerRecord[K, V]](shape, + settings, + subscription, + getOffsetsOnAssign, + onRevoke, + subSourceStageLogicFactory = factory) + } } diff --git a/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala b/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala index fdf827cd2..489fd8524 100644 --- a/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala +++ b/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala @@ -11,8 +11,10 @@ import akka.actor.{ActorRef, ExtendedActorSystem, Terminated} import akka.annotation.InternalApi import akka.kafka.Subscriptions.{TopicSubscription, TopicSubscriptionPattern} import akka.kafka.internal.KafkaConsumerActor.Internal.RegisterSubStage +import akka.kafka.internal.SubSourceLogic._ import akka.kafka.{AutoSubscription, ConsumerFailed, ConsumerSettings} import akka.kafka.scaladsl.Consumer.Control +import akka.kafka.scaladsl.PartitionAssignmentHandler import akka.pattern.{ask, AskTimeoutException} import akka.stream.scaladsl.Source import akka.stream.stage.GraphStageLogic.StageActor @@ -31,19 +33,27 @@ import scala.util.{Failure, Success} /** * Internal API. * - * Anonymous sub-class instance is created in [[CommittableSubSource]]. + * A `SubSourceLogic` is used to create partitioned sources from the various types of sub sources available: + * [[CommittableSubSource]], [[PlainSubSource]] and [[TransactionalSubSource]]. + * + * A `SubSourceLogic` emits a source of `SubSourceStage` per subscribed Kafka partition. + * + * The `SubSourceLogic.subSourceStageLogicFactory` parameter is passed to each `SubSourceStage` so that a new + * `SubSourceStageLogic` can be created for each stage. Context parameters from the `SubSourceLogic` are passed down to + * `SubSourceStage` and on to the `SubSourceStageLogicFactory` when the stage creates a `GraphStageLogic`. + * */ @InternalApi -private abstract class SubSourceLogic[K, V, Msg]( +private class SubSourceLogic[K, V, Msg]( val shape: SourceShape[(TopicPartition, Source[Msg, NotUsed])], settings: ConsumerSettings[K, V], subscription: AutoSubscription, getOffsetsOnAssign: Option[Set[TopicPartition] => Future[Map[TopicPartition, Long]]] = None, - onRevoke: Set[TopicPartition] => Unit = _ => () + onRevoke: Set[TopicPartition] => Unit = _ => (), + subSourceStageLogicFactory: SubSourceStageLogicFactory[K, V, Msg] ) extends TimerGraphStageLogic(shape) with PromiseControl with MetricsControl - with MessageBuilder[K, V, Msg] with StageIdLogging { import SubSourceLogic._ @@ -59,7 +69,7 @@ private abstract class SubSourceLogic[K, V, Msg]( /** We have created a source for these partitions, but it has not started up and is not in subSources yet. */ var partitionsInStartup: immutable.Set[TopicPartition] = immutable.Set.empty - var subSources: Map[TopicPartition, Control] = immutable.Map.empty + var subSources: Map[TopicPartition, ControlAndStageActor] = immutable.Map.empty /** Kafka has signalled these partitions are revoked, but some may be re-assigned just after revoking. */ var partitionsToRevoke: Set[TopicPartition] = Set.empty @@ -173,45 +183,54 @@ private abstract class SubSourceLogic[K, V, Msg]( onRevoke(partitionsToRevoke) pendingPartitions --= partitionsToRevoke partitionsInStartup --= partitionsToRevoke - partitionsToRevoke.flatMap(subSources.get).foreach(_.shutdown()) + partitionsToRevoke.flatMap(subSources.get).map(_.control).foreach(_.shutdown()) subSources --= partitionsToRevoke partitionsToRevoke = Set.empty } - val subsourceCancelledCB: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])] = - getAsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])] { - case (tp, firstUnconsumed) => + val subsourceCancelledCB: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)] = + getAsyncCallback[(TopicPartition, SubSourceCancellationStrategy)] { + case (tp, cancellationStrategy: SubSourceCancellationStrategy) => subSources -= tp partitionsInStartup -= tp - pendingPartitions += tp - firstUnconsumed match { - case Some(record) => + + cancellationStrategy match { + case SeekToOffsetAndReEmit(offset) => + // re-add this partition to pending partitions so it can be re-emitted + pendingPartitions += tp if (log.isDebugEnabled) { - log.debug("#{} Seeking {} to {} after partition SubSource cancelled", actorNumber, tp, record.offset()) + log.debug("#{} Seeking {} to {} after partition SubSource cancelled", actorNumber, tp, offset) } - seekAndEmitSubSources(formerlyUnknown = Set.empty, Map(tp -> record.offset())) - case None => emitSubSourcesForPendingPartitions() + seekAndEmitSubSources(formerlyUnknown = Set.empty, Map(tp -> offset)) + case ReEmit => + // re-add this partition to pending partitions so it can be re-emitted + pendingPartitions += tp + emitSubSourcesForPendingPartitions() + case DoNothing => } } - val subsourceStartedCB: AsyncCallback[(TopicPartition, Control)] = getAsyncCallback[(TopicPartition, Control)] { - case (tp, control) => - if (!partitionsInStartup.contains(tp)) { - // Partition was revoked while - // starting up. Kill! - control.shutdown() - } else { - subSources += (tp -> control) - partitionsInStartup -= tp - } - } + val subsourceStartedCB: AsyncCallback[(TopicPartition, ControlAndStageActor)] = + getAsyncCallback[(TopicPartition, ControlAndStageActor)] { + case (tp, value @ ControlAndStageActor(control, _)) => + if (!partitionsInStartup.contains(tp)) { + // Partition was revoked while + // starting up. Kill! + control.shutdown() + } else { + subSources += (tp -> value) + partitionsInStartup -= tp + } + } - setHandler(shape.out, new OutHandler { - override def onPull(): Unit = - emitSubSourcesForPendingPartitions() - override def onDownstreamFinish(): Unit = - performShutdown() - }) + setHandler( + shape.out, + new OutHandler { + override def onPull(): Unit = + emitSubSourcesForPendingPartitions() + override def onDownstreamFinish(): Unit = performShutdown() + } + ) private def updatePendingPartitionsAndEmitSubSources(formerlyUnknownPartitions: Set[TopicPartition]): Unit = { pendingPartitions ++= formerlyUnknownPartitions.filter(!partitionsInStartup.contains(_)) @@ -230,8 +249,8 @@ private abstract class SubSourceLogic[K, V, Msg]( consumerActor, subsourceStartedCB, subsourceCancelledCB, - messageBuilder = this, - actorNumber) + actorNumber, + subSourceStageLogicFactory) ) push(shape.out, (tp, subSource)) emitSubSourcesForPendingPartitions() @@ -246,7 +265,7 @@ private abstract class SubSourceLogic[K, V, Msg]( override def performStop(): Unit = { setKeepGoing(true) subSources.foreach { - case (_, control) => control.stop() + case (_, ControlAndStageActor(control, _)) => control.stop() } complete(shape.out) onStop() @@ -256,9 +275,8 @@ private abstract class SubSourceLogic[K, V, Msg]( setKeepGoing(true) //todo we should wait for subsources to be shutdown and next shutdown main stage subSources.foreach { - case (_, control) => control.shutdown() + case (_, ControlAndStageActor(control, _)) => control.shutdown() } - if (!isClosed(shape.out)) { complete(shape.out) } @@ -273,98 +291,165 @@ private abstract class SubSourceLogic[K, V, Msg]( }) } + /** + * Opportunity for subclasses to add their logic to the partition assignment callbacks. + */ + protected def addToPartitionAssignmentHandler(handler: PartitionAssignmentHandler): PartitionAssignmentHandler = + handler + } /** Internal API */ private object SubSourceLogic { case object CloseRevokedPartitions + + /** Internal API + * + * SubSourceStageLogic [[akka.kafka.scaladsl.Consumer.Control]] and the stage actor [[ActorRef]] + */ + @InternalApi + final case class ControlAndStageActor(control: Control, stageActor: ActorRef) + + /** Internal API + * Used to determine how the [[SubSourceLogic]] will handle the cancellation of a sub source stage. The default + * behavior requested by the [[SubSourceStageLogic]] is to ask the consumer to seek to the last committed offset and + * then re-emit the sub source stage downstream. + */ + sealed trait SubSourceCancellationStrategy + final case class SeekToOffsetAndReEmit(offset: Long) extends SubSourceCancellationStrategy + case object ReEmit extends SubSourceCancellationStrategy + case object DoNothing extends SubSourceCancellationStrategy + + /** Internal API + * + * Encapsulates a factory method to create a [[SubSourceStageLogic]] within [[SubSourceLogic]] where the context + * parameters exist. + */ + @InternalApi + trait SubSourceStageLogicFactory[K, V, Msg] { + def create( + shape: SourceShape[Msg], + tp: TopicPartition, + consumerActor: ActorRef, + subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)], + subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)], + actorNumber: Int + ): SubSourceStageLogic[K, V, Msg] + } } -/** Internal API */ +/** Internal API + * + * A [[SubSourceStage]] is created per partition in [[SubSourceLogic]]. + */ +@InternalApi private final class SubSourceStage[K, V, Msg]( tp: TopicPartition, consumerActor: ActorRef, - subSourceStartedCb: AsyncCallback[(TopicPartition, Control)], - subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])], - messageBuilder: MessageBuilder[K, V, Msg], - actorNumber: Int + subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)], + subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)], + actorNumber: Int, + subSourceStageLogicFactory: SubSourceStageLogicFactory[K, V, Msg] ) extends GraphStage[SourceShape[Msg]] { stage => + val out = Outlet[Msg]("out") - val shape = new SourceShape(out) + val shape: SourceShape[Msg] = new SourceShape(out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) with PromiseControl with MetricsControl with StageIdLogging { - override def executionContext: ExecutionContext = materializer.executionContext - override def consumerFuture: Future[ActorRef] = Future.successful(consumerActor) - val shape = stage.shape - val requestMessages = KafkaConsumerActor.Internal.RequestMessages(0, Set(tp)) - var requested = false - var subSourceActor: StageActor = _ - var buffer: Iterator[ConsumerRecord[K, V]] = Iterator.empty - - override def preStart(): Unit = { - log.debug("#{} Starting SubSource for partition {}", actorNumber, tp) - super.preStart() - subSourceStartedCb.invoke(tp -> this.asInstanceOf[Control]) - subSourceActor = getStageActor { - case (_, msg: KafkaConsumerActor.Internal.Messages[K, V]) => - requested = false - // do not use simple ++ because of https://issues.scala-lang.org/browse/SI-9766 - if (buffer.hasNext) { - buffer = buffer ++ msg.messages - } else { - buffer = msg.messages - } - pump() - case (_, Status.Failure(e)) => - failStage(e) - case (_, Terminated(ref)) if ref == consumerActor => - failStage(new ConsumerFailed) - } - subSourceActor.watch(consumerActor) - consumerActor.tell(RegisterSubStage, subSourceActor.ref) - } + subSourceStageLogicFactory.create(shape, tp, consumerActor, subSourceStartedCb, subSourceCancelledCb, actorNumber) +} - override def postStop(): Unit = { - onShutdown() - super.postStop() +/** Internal API + * + * A [[SubSourceStageLogic]] is the [[GraphStageLogic]] of a [[SubSourceStage]]. + * This emits Kafka messages downstream (not sources). + */ +@InternalApi +private abstract class SubSourceStageLogic[K, V, Msg]( + val shape: SourceShape[Msg], + tp: TopicPartition, + consumerActor: ActorRef, + subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)], + subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)], + actorNumber: Int +) extends GraphStageLogic(shape) + with PromiseControl + with MetricsControl + with MessageBuilder[K, V, Msg] + with StageIdLogging { + override def executionContext: ExecutionContext = materializer.executionContext + override def consumerFuture: Future[ActorRef] = Future.successful(consumerActor) + private val requestMessages = KafkaConsumerActor.Internal.RequestMessages(0, Set(tp)) + var requested = false + var subSourceActor: StageActor = _ + var buffer: Iterator[ConsumerRecord[K, V]] = Iterator.empty + + override def preStart(): Unit = { + log.debug("#{} Starting SubSource for partition {}", actorNumber, tp) + super.preStart() + subSourceActor = getStageActor(messageHandling) + subSourceActor.watch(consumerActor) + val controlAndActor = ControlAndStageActor(this.asInstanceOf[Control], subSourceActor.ref) + subSourceStartedCb.invoke(tp -> controlAndActor) + consumerActor.tell(RegisterSubStage, subSourceActor.ref) + } + + protected def messageHandling: PartialFunction[(ActorRef, Any), Unit] = { + case (_, msg: KafkaConsumerActor.Internal.Messages[K, V]) => + requested = false + // do not use simple ++ because of https://issues.scala-lang.org/browse/SI-9766 + if (buffer.hasNext) { + buffer = buffer ++ msg.messages + } else { + buffer = msg.messages } + pump() + case (_, Status.Failure(e)) => + failStage(e) + case (_, Terminated(ref)) if ref == consumerActor => + failStage(new ConsumerFailed) + } - setHandler( - out, - new OutHandler { - override def onPull(): Unit = - pump() - - override def onDownstreamFinish(): Unit = { - val firstUnconsumed = if (buffer.hasNext) { - Some(buffer.next()) - } else { - None - } + protected def onDownstreamFinishSubSourceCancellationStrategy(): SubSourceCancellationStrategy = + if (buffer.hasNext) { + SeekToOffsetAndReEmit(buffer.next().offset()) + } else { + ReEmit + } - subSourceCancelledCb.invoke(tp -> firstUnconsumed) - super.onDownstreamFinish() - } - } - ) + override def postStop(): Unit = { + onShutdown() + super.postStop() + } - def performShutdown() = { - log.debug("#{} Completing SubSource for partition {}", actorNumber, tp) - completeStage() + setHandler( + shape.out, + new OutHandler { + override def onPull(): Unit = + pump() + + override def onDownstreamFinish(): Unit = { + subSourceCancelledCb.invoke(tp -> onDownstreamFinishSubSourceCancellationStrategy()) + super.onDownstreamFinish() } + } + ) - @tailrec - private def pump(): Unit = - if (isAvailable(out)) { - if (buffer.hasNext) { - val msg = buffer.next() - push(out, messageBuilder.createMessage(msg)) - pump() - } else if (!requested) { - requested = true - consumerActor.tell(requestMessages, subSourceActor.ref) - } - } + def performShutdown() = { + log.debug("#{} Completing SubSource for partition {}", actorNumber, tp) + completeStage() + } + + @tailrec + private def pump(): Unit = + if (isAvailable(shape.out)) { + if (buffer.hasNext) { + val msg = buffer.next() + push(shape.out, createMessage(msg)) + pump() + } else if (!requested) { + requested = true + consumerActor.tell(requestMessages, subSourceActor.ref) + } } } diff --git a/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala b/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala index efd0532ce..821fffb9a 100644 --- a/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala +++ b/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala @@ -9,11 +9,13 @@ import akka.Done import akka.annotation.InternalApi import akka.kafka.ConsumerMessage.{GroupTopicPartition, PartitionOffsetCommittedMarker} import akka.kafka.ProducerMessage.{Envelope, Results} +import akka.kafka.internal.DeferredProducer._ import akka.kafka.internal.ProducerStage.{MessageCallback, ProducerCompletionState} import akka.kafka.{ConsumerMessage, ProducerSettings} import akka.stream.stage._ import akka.stream.{Attributes, FlowShape} import org.apache.kafka.clients.consumer.OffsetAndMetadata +import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.TopicPartition import scala.concurrent.Future @@ -25,12 +27,13 @@ import scala.jdk.CollectionConverters._ */ @InternalApi private[kafka] final class TransactionalProducerStage[K, V, P]( - val settings: ProducerSettings[K, V] + val settings: ProducerSettings[K, V], + transactionalId: String ) extends GraphStage[FlowShape[Envelope[K, V, P], Future[Results[K, V, P]]]] with ProducerStage[K, V, P, Envelope[K, V, P], Results[K, V, P]] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new TransactionalProducerStageLogic(this, inheritedAttributes) + new TransactionalProducerStageLogic(this, transactionalId, inheritedAttributes) } /** Internal API */ @@ -96,6 +99,7 @@ private object TransactionalProducerStage { */ private final class TransactionalProducerStageLogic[K, V, P]( stage: TransactionalProducerStage[K, V, P], + transactionalId: String, inheritedAttributes: Attributes ) extends DefaultProducerStageLogic[K, V, P, Envelope[K, V, P], Results[K, V, P]](stage, inheritedAttributes) with StageIdLogging @@ -111,27 +115,36 @@ private final class TransactionalProducerStageLogic[K, V, P]( private var demandSuspended = false + private var firstMessage: Option[Envelope[K, V, P]] = None + override protected def logSource: Class[_] = classOf[TransactionalProducerStage[_, _, _]] - override def preStart(): Unit = super.preStart() + // we need to peek at the first message to generate the producer transactional id for partitioned sources + override def preStart(): Unit = resumeDemand() override protected def producerAssigned(): Unit = { initTransactions() beginTransaction() + produceFirstMessage() resumeDemand() scheduleOnce(commitSchedulerKey, producerSettings.eosCommitInterval) } - // suspend demand until a Producer has been created - suspendDemand() + private def produceFirstMessage(): Unit = firstMessage match { + case Some(msg) => + produce(msg) + firstMessage = None + case _ => + throw new IllegalStateException("Should never attempt to produce first message if it does not exist.") + } override protected def resumeDemand(tryToPull: Boolean = true): Unit = { super.resumeDemand(tryToPull) demandSuspended = false } - override protected def suspendDemand(): Unit = { - if (!demandSuspended) super.suspendDemand() + override protected def suspendDemand(fromStageLogicConstructor: Boolean = false): Unit = { + if (!demandSuspended) super.suspendDemand(fromStageLogicConstructor) demandSuspended = true } @@ -157,6 +170,47 @@ private final class TransactionalProducerStageLogic[K, V, P]( } } + override def filterSend(msg: Envelope[K, V, P]): Boolean = + producerAssignmentLifecycle match { + case Assigned => true + case Unassigned => + if (firstMessage.nonEmpty) { + // this should never happen because demand should be suspended until the producer is assigned + throw new IllegalStateException("Cannot reapply first message") + } + // stash the first message so it can be sent after the producer is assigned + firstMessage = Some(msg) + // initiate async async producer request _after_ first message is stashed in case future eagerly resolves + // instead of asynccallback + resolveProducer(generatedTransactionalConfig(msg)) + // suspend demand after we receive the first message until the producer is assigned + suspendDemand() + false + case AsyncCreateRequestSent => + throw new IllegalStateException(s"Should never receive new messages while in state '$AsyncCreateRequestSent'") + } + + private def generatedTransactionalConfig(msg: Envelope[K, V, P]): ProducerSettings[K, V] = { + val txId = msg.passThrough match { + case committedMarker: PartitionOffsetCommittedMarker if committedMarker.fromPartitionedSource => + val gtp = committedMarker.key + val txId = s"$transactionalId-${gtp.groupId}-${gtp.topic}-${gtp.partition}" + log.debug("Generated transactional id from partitioned source '{}'", txId) + txId + case _ => transactionalId + } + + stage.settings.withEnrichAsync { settings => + Future.successful( + settings.withProperties( + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG -> true.toString, + ProducerConfig.TRANSACTIONAL_ID_CONFIG -> txId, + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> 1.toString + ) + ) + } + } + override def postSend(msg: Envelope[K, V, P]): Unit = msg.passThrough match { case o: ConsumerMessage.PartitionOffsetCommittedMarker => batchOffsets = batchOffsets.updated(o) } @@ -177,11 +231,17 @@ private final class TransactionalProducerStageLogic[K, V, P]( private def commitTransaction(batch: NonemptyTransactionBatch, beginNewTransaction: Boolean): Unit = { val group = batch.group - log.debug("Committing transaction for consumer group '{}' with offsets: {}", group, batch.offsets) + log.debug("Committing transaction for transactional id '{}' consumer group '{}' with offsets: {}", + transactionalId, + group, + batch.offsets) val offsetMap = batch.offsetMap() producer.sendOffsetsToTransaction(offsetMap.asJava, group) producer.commitTransaction() - log.debug("Committed transaction for consumer group '{}' with offsets: {}", group, batch.offsets) + log.debug("Committed transaction for transactional id '{}' consumer group '{}' with offsets: {}", + transactionalId, + group, + batch.offsets) batchOffsets = TransactionBatch.empty batch .internalCommit() @@ -212,6 +272,6 @@ private final class TransactionalProducerStageLogic[K, V, P]( private def abortTransaction(): Unit = { log.debug("Aborting transaction") - producer.abortTransaction() + if (producerAssignmentLifecycle == Assigned) producer.abortTransaction() } } diff --git a/core/src/main/scala/akka/kafka/internal/TransactionalSource.scala b/core/src/main/scala/akka/kafka/internal/TransactionalSources.scala similarity index 54% rename from core/src/main/scala/akka/kafka/internal/TransactionalSource.scala rename to core/src/main/scala/akka/kafka/internal/TransactionalSources.scala index f54db4694..5f91f1cd8 100644 --- a/core/src/main/scala/akka/kafka/internal/TransactionalSource.scala +++ b/core/src/main/scala/akka/kafka/internal/TransactionalSources.scala @@ -4,19 +4,22 @@ */ package akka.kafka.internal + import java.util.Locale -import akka.Done +import akka.{Done, NotUsed} import akka.actor.{ActorRef, Status, Terminated} import akka.actor.Status.Failure import akka.annotation.InternalApi import akka.kafka.ConsumerMessage.{PartitionOffset, TransactionalMessage} import akka.kafka.internal.KafkaConsumerActor.Internal.Revoked +import akka.kafka.internal.SubSourceLogic._ import akka.kafka.scaladsl.Consumer.Control import akka.kafka.scaladsl.PartitionAssignmentHandler -import akka.kafka.{ConsumerFailed, ConsumerSettings, RestrictedConsumer, Subscription} +import akka.kafka.{AutoSubscription, ConsumerFailed, ConsumerSettings, RestrictedConsumer, Subscription} import akka.stream.SourceShape -import akka.stream.stage.GraphStageLogic +import akka.stream.scaladsl.Source +import akka.stream.stage.{AsyncCallback, GraphStageLogic} import akka.util.Timeout import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, OffsetAndMetadata} import org.apache.kafka.common.TopicPartition @@ -38,8 +41,9 @@ private[kafka] final class TransactionalSource[K, V](consumerSettings: ConsumerS override protected def logic(shape: SourceShape[TransactionalMessage[K, V]]): GraphStageLogic with Control = new TransactionalSourceLogic(shape, TransactionalSource.txConsumerSettings(consumerSettings), subscription) - with TransactionalMessageBuilder[K, V] - + with TransactionalMessageBuilder[K, V] { + override val fromPartitionedSource: Boolean = false + } } /** Internal API */ @@ -74,8 +78,9 @@ private[kafka] final class TransactionalSourceWithOffsetContext[K, V](consumerSe shape: SourceShape[(ConsumerRecord[K, V], PartitionOffset)] ): GraphStageLogic with Control = new TransactionalSourceLogic(shape, TransactionalSource.txConsumerSettings(consumerSettings), subscription) - with TransactionalOffsetContextBuilder[K, V] - + with TransactionalOffsetContextBuilder[K, V] { + override val fromPartitionedSource: Boolean = false + } } /** Internal API */ @@ -178,6 +183,91 @@ private[internal] abstract class TransactionalSourceLogic[K, V, Msg](shape: Sour } } +/** Internal API */ +@InternalApi +private[kafka] final class TransactionalSubSource[K, V]( + consumerSettings: ConsumerSettings[K, V], + subscription: AutoSubscription +) extends KafkaSourceStage[K, V, (TopicPartition, Source[TransactionalMessage[K, V], NotUsed])]( + s"TransactionalSubSource ${subscription.renderStageAttribute}" + ) { + import TransactionalSourceLogic._ + + require(consumerSettings.properties(ConsumerConfig.GROUP_ID_CONFIG).nonEmpty, "You must define a Consumer group.id.") + + /** + * We set the isolation.level config to read_committed to make sure that any consumed messages are from + * committed transactions. Note that the consuming partitions may be produced by multiple producers, and these + * producers may either use transactional messaging or not at all. So the fetching partitions may have both + * transactional and non-transactional messages, and by setting isolation.level config to read_committed consumers + * will still consume non-transactional messages. + */ + private val txConsumerSettings = consumerSettings.withProperty( + ConsumerConfig.ISOLATION_LEVEL_CONFIG, + IsolationLevel.READ_COMMITTED.toString.toLowerCase(Locale.ENGLISH) + ) + + override protected def logic( + shape: SourceShape[(TopicPartition, Source[TransactionalMessage[K, V], NotUsed])] + ): GraphStageLogic with Control = { + val factory = new SubSourceStageLogicFactory[K, V, TransactionalMessage[K, V]] { + def create( + shape: SourceShape[TransactionalMessage[K, V]], + tp: TopicPartition, + consumerActor: ActorRef, + subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)], + subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)], + actorNumber: Int + ): SubSourceStageLogic[K, V, TransactionalMessage[K, V]] = + new TransactionalSubSourceStageLogic(shape, + tp, + consumerActor, + subSourceStartedCb, + subSourceCancelledCb, + actorNumber, + txConsumerSettings) + } + + new SubSourceLogic(shape, txConsumerSettings, subscription, subSourceStageLogicFactory = factory) { + + override protected def addToPartitionAssignmentHandler( + handler: PartitionAssignmentHandler + ): PartitionAssignmentHandler = { + val blockingRevokedCall = new PartitionAssignmentHandler { + override def onAssign(assignedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = () + + // This is invoked in the KafkaConsumerActor thread when doing poll. + override def onRevoke(revokedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = + if (revokedTps.isEmpty) () + else if (waitForDraining(revokedTps)) { + subSources.values.map(_.stageActor).foreach(_ ! Revoked(revokedTps.toList)) + } else { + sourceActor.ref ! Status.Failure(new Error("Timeout while draining")) + consumerActor ! KafkaConsumerActor.Internal.Stop + } + + override def onStop(revokedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = () + } + new PartitionAssignmentHelpers.Chain(handler, blockingRevokedCall) + } + + def waitForDraining(partitions: Set[TopicPartition]): Boolean = { + import akka.pattern.ask + implicit val timeout = Timeout(txConsumerSettings.commitTimeout) + try { + val drainCommandFutures = subSources.values.map(_.stageActor).map(ask(_, Drain(partitions, None, Drained))) + implicit val ec = executionContext + Await.result(Future.sequence(drainCommandFutures), timeout.duration) + true + } catch { + case t: Throwable => + false + } + } + } + } +} + /** Internal API */ @InternalApi private object TransactionalSourceLogic { @@ -190,8 +280,7 @@ private object TransactionalSourceLogic { case class Committed(offsets: Map[TopicPartition, OffsetAndMetadata]) case object CommittingFailure - private[TransactionalSourceLogic] final case class CommittedMarkerRef(sourceActor: ActorRef, - commitTimeout: FiniteDuration)( + private[internal] final case class CommittedMarkerRef(sourceActor: ActorRef, commitTimeout: FiniteDuration)( implicit ec: ExecutionContext ) extends CommittedMarker { override def committed(offsets: Map[TopicPartition, OffsetAndMetadata]): Future[Done] = { @@ -205,7 +294,7 @@ private object TransactionalSourceLogic { sourceActor ! CommittingFailure } - private[TransactionalSourceLogic] trait InFlightRecords { + private[internal] trait InFlightRecords { // Assumes that offsets per topic partition are added in the increasing order // The assumption is true for Kafka consumer that guarantees that elements are emitted // per partition in offset-increasing order. @@ -218,7 +307,7 @@ private object TransactionalSourceLogic { def empty(partitions: Set[TopicPartition]): Boolean } - private[TransactionalSourceLogic] object InFlightRecords { + private[internal] object InFlightRecords { def empty = new Impl class Impl extends InFlightRecords { @@ -246,3 +335,90 @@ private object TransactionalSourceLogic { } } } + +@InternalApi +private class TransactionalSubSourceStageLogic[K, V]( + shape: SourceShape[TransactionalMessage[K, V]], + tp: TopicPartition, + consumerActor: ActorRef, + subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)], + subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)], + actorNumber: Int, + consumerSettings: ConsumerSettings[K, V] +) extends SubSourceStageLogic[K, V, TransactionalMessage[K, V]](shape, + tp, + consumerActor, + subSourceStartedCb, + subSourceCancelledCb, + actorNumber) + with TransactionalMessageBuilder[K, V] { + + import TransactionalSourceLogic._ + + val inFlightRecords = InFlightRecords.empty + + override def groupId: String = consumerSettings.properties(ConsumerConfig.GROUP_ID_CONFIG) + + override def onMessage(rec: ConsumerRecord[K, V]): Unit = + inFlightRecords.add(Map(new TopicPartition(rec.topic(), rec.partition()) -> rec.offset())) + + override val fromPartitionedSource: Boolean = true + + override protected def messageHandling: PartialFunction[(ActorRef, Any), Unit] = + super.messageHandling.orElse(drainHandling).orElse { + case (_, Revoked(tps)) => + inFlightRecords.revoke(tps.toSet) + } + + override protected def onDownstreamFinishSubSourceCancellationStrategy(): SubSourceCancellationStrategy = DoNothing + + def shuttingDownReceive: PartialFunction[(ActorRef, Any), Unit] = + drainHandling + .orElse { + case (_, Status.Failure(e)) => + failStage(e) + case (_, Terminated(ref)) if ref == consumerActor => + failStage(new ConsumerFailed()) + } + + override def performShutdown(): Unit = { + log.debug("#{} Completing SubSource for partition {}", actorNumber, tp) + setKeepGoing(true) + if (!isClosed(shape.out)) { + complete(shape.out) // initiate shutdown of SubSource + } + subSourceActor.become(shuttingDownReceive) + drainAndComplete() + } + + def drainAndComplete(): Unit = + subSourceActor.ref.tell(Drain(inFlightRecords.assigned(), None, "complete"), subSourceActor.ref) + + def drainHandling: PartialFunction[(ActorRef, Any), Unit] = { + case (sender, Committed(offsets)) => + inFlightRecords.committed(offsets.view.mapValues(_.offset() - 1).toMap) + sender ! Done + case (sender, CommittingFailure) => { + log.info("Committing failed, resetting in flight offsets") + inFlightRecords.reset() + } + case (sender, Drain(partitions, ack, msg)) => + if (inFlightRecords.empty(partitions)) { + log.debug(s"Partitions drained ${partitions.mkString(",")}") + ack.getOrElse(sender) ! msg + } else { + log.debug(s"Draining partitions {}", partitions) + materializer.scheduleOnce(consumerSettings.drainingCheckInterval, new Runnable { + override def run(): Unit = + subSourceActor.ref ! Drain(partitions, ack.orElse(Some(sender)), msg) + }) + } + case (sender, "complete") => + completeStage() + } + + lazy val committedMarker: CommittedMarker = { + val ec = materializer.executionContext + CommittedMarkerRef(subSourceActor.ref, consumerSettings.commitTimeout)(ec) + } +} diff --git a/core/src/main/scala/akka/kafka/javadsl/Transactional.scala b/core/src/main/scala/akka/kafka/javadsl/Transactional.scala index 049e65f86..60d935a54 100644 --- a/core/src/main/scala/akka/kafka/javadsl/Transactional.scala +++ b/core/src/main/scala/akka/kafka/javadsl/Transactional.scala @@ -7,7 +7,8 @@ package akka.kafka.javadsl import java.util.concurrent.CompletionStage -import akka.annotation.ApiMayChange +import org.apache.kafka.common.TopicPartition +import akka.annotation.{ApiMayChange, InternalApi} import akka.japi.Pair import akka.kafka.ConsumerMessage.{PartitionOffset, TransactionalMessage} import akka.kafka.ProducerMessage._ @@ -54,6 +55,31 @@ object Transactional { .map(_._1) .asJava + /** + * Internal API. Work in progress. + * + * The `partitionedSource` is a way to track automatic partition assignment from kafka. + * Each source is setup for for Exactly Only Once (EoS) kafka message semantics. + * To enable EoS it's necessary to use the [[Transactional.sink]] or [[Transactional.flow]] (for passthrough). + * When Kafka rebalances partitions, all sources complete before the remaining sources are issued again. + * + * By generating the `transactionalId` from the [[TopicPartition]], multiple instances of your application can run + * without having to manually assign partitions to each instance. + */ + @ApiMayChange + @InternalApi + private[kafka] def partitionedSource[K, V]( + consumerSettings: ConsumerSettings[K, V], + subscription: AutoSubscription + ): Source[Pair[TopicPartition, Source[TransactionalMessage[K, V], NotUsed]], Control] = + scaladsl.Transactional + .partitionedSource(consumerSettings, subscription) + .map { + case (tp, source) => Pair(tp, source.asJava) + } + .mapMaterializedValue(ConsumerControlAsJava.apply) + .asJava + /** * Sink that is aware of the [[ConsumerMessage.TransactionalMessage.partitionOffset]] from a [[Transactional.source]]. It will * initialize, begin, produce, and commit the consumer offset as part of a transaction. diff --git a/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala b/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala index 518702a51..bb4c6c0db 100644 --- a/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala +++ b/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala @@ -5,17 +5,22 @@ package akka.kafka.scaladsl -import akka.annotation.ApiMayChange +import akka.annotation.{ApiMayChange, InternalApi} import akka.kafka.ConsumerMessage.{PartitionOffset, TransactionalMessage} import akka.kafka.ProducerMessage._ -import akka.kafka.internal.{TransactionalProducerStage, TransactionalSource, TransactionalSourceWithOffsetContext} +import akka.kafka.internal.{ + TransactionalProducerStage, + TransactionalSource, + TransactionalSourceWithOffsetContext, + TransactionalSubSource +} import akka.kafka.scaladsl.Consumer.Control -import akka.kafka.{ConsumerMessage, ConsumerSettings, ProducerSettings, Subscription} +import akka.kafka.{AutoSubscription, ConsumerMessage, ConsumerSettings, ProducerSettings, Subscription} import akka.stream.ActorAttributes import akka.stream.scaladsl.{Flow, FlowWithContext, Keep, Sink, Source, SourceWithContext} import akka.{Done, NotUsed} import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.TopicPartition import scala.concurrent.Future @@ -48,6 +53,25 @@ object Transactional { .asSourceWithContext(_._2) .map(_._1) + /** + * Internal API. Work in progress. + * + * The `partitionedSource` is a way to track automatic partition assignment from kafka. + * Each source is setup for for Exactly Only Once (EoS) kafka message semantics. + * To enable EoS it's necessary to use the [[Transactional.sink]] or [[Transactional.flow]] (for passthrough). + * When Kafka rebalances partitions, all sources complete before the remaining sources are issued again. + * + * By generating the `transactionalId` from the [[TopicPartition]], multiple instances of your application can run + * without having to manually assign partitions to each instance. + */ + @ApiMayChange + @InternalApi + private[kafka] def partitionedSource[K, V]( + settings: ConsumerSettings[K, V], + subscription: AutoSubscription + ): Source[(TopicPartition, Source[TransactionalMessage[K, V], NotUsed]), Control] = + Source.fromGraph(new TransactionalSubSource[K, V](settings, subscription)) + /** * Sink that is aware of the [[ConsumerMessage.TransactionalMessage.partitionOffset]] from a [[Transactional.source]]. It will * initialize, begin, produce, and commit the consumer offset as part of a transaction. @@ -87,19 +111,13 @@ object Transactional { require(transactionalId != null && transactionalId.length > 0, "You must define a Transactional id.") require(settings.producerFactorySync.isEmpty, "You cannot use a shared or external producer factory.") - val txSettings = settings.withProperties( - ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG -> true.toString, - ProducerConfig.TRANSACTIONAL_ID_CONFIG -> transactionalId, - ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> 1.toString - ) - val flow = Flow .fromGraph( - new TransactionalProducerStage[K, V, ConsumerMessage.PartitionOffset](txSettings) + new TransactionalProducerStage[K, V, ConsumerMessage.PartitionOffset](settings, transactionalId) ) - .mapAsync(txSettings.parallelism)(identity) + .mapAsync(settings.parallelism)(identity) - flowWithDispatcher(txSettings, flow) + flowWithDispatcher(settings, flow) } /** diff --git a/docs/src/main/paradox/transactions.md b/docs/src/main/paradox/transactions.md index f13caa7da..fcf318512 100644 --- a/docs/src/main/paradox/transactions.md +++ b/docs/src/main/paradox/transactions.md @@ -17,6 +17,39 @@ A consumer group ID must be provided. Only use this source if you have the intention to connect it to a @apidoc[Transactional.flow](Transactional$) or @apidoc[Transactional.sink](Transactional$). + + ## Transactional Sink and Flow The @apidoc[Transactional.sink](Transactional$) is similar to the @apidoc[Producer.committableSink](Producer$) in that messages will be automatically committed as part of a transaction. The @apidoc[Transactional.flow](Transactional$) or @apidoc[Transactional.sink](Transactional$) are required when connecting a consumer to a producer to achieve a transactional workflow. @@ -45,6 +78,14 @@ Scala Java : @@ snip [snip](/tests/src/test/java/docs/javadsl/TransactionsExampleTest.java) { #transactionalSink } +### Partitioned Source Example + +Scala +: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/TransactionsExample.scala) { #partitionedTransactionalSink } + +Java +: @@ snip [snip](/tests/src/test/java/docs/javadsl/TransactionsExampleTest.java) { #partitionedTransactionalSink } + ### Recovery From Failure When any stage in the stream fails the whole stream will be torn down. In the general case it's desirable to allow transient errors to fail the whole stream because they cannot be recovered from within the application. Transient errors can be caused by network partitions, Kafka broker failures, @javadoc[ProducerFencedException](org.apache.kafka.common.errors.ProducerFencedException)'s from other application instances, and so on. When the stream encounters transient errors then the current transaction will be aborted before the stream is torn down. Any produced messages that were not committed will not be available to downstream consumers as long as those consumers are configured with `isolation.level = read_committed`. @@ -65,6 +106,10 @@ All of the scenarios covered in the @ref[At-Least-Once Delivery documentation](a Only one application instance per `transactional.id` is allowed. If two application instances with the same `transactional.id` are run at the same time then the instance that registers with Kafka's transaction coordinator second will throw a @javadoc[ProducerFencedException](org.apache.kafka.common.errors.ProducerFencedException) so it doesn't interfere with transactions in process by the first instance. To distribute multiple transactional workflows for the same subscription the user must manually subdivide the subscription across multiple instances of the application. This may be handled internally in future versions. + + Any state in the transformation logic is not part of a transaction. It's left to the user to rebuild state when applying stateful operations with transaction. It's possible to encode state into messages produced to topics during a transaction. For example you could produce messages to a topic that represents an event log as part of a transaction. This event log can be replayed to reconstitute the correct state before the stateful stream resumes consuming again at startup. Any side effects that occur in the transformation logic is not part of a transaction (i.e. writes to an database). @@ -77,5 +122,6 @@ For more information on exactly once and transactions in Kafka please consult th * [KIP-98: Exactly Once Delivery and Transactional Messaging](https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging) ([Design Document](https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#heading=h.xq0ee1vnpz4o)) * [KIP-129: Streams Exactly-Once Semantics](https://cwiki.apache.org/confluence/display/KAFKA/KIP-129%3A+Streams+Exactly-Once+Semantics) ([Design Document](https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMaduFK1DAB8_gBYA2c/edit#heading=h.vkrkjfth3p8p)) +* [KIP-447: EOS Scalability Design](https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics) ([Design Document](https://docs.google.com/document/d/1LhzHGeX7_Lay4xvrEXxfciuDWATjpUXQhrEIkph9qRE/edit)) * [You Cannot Have Exactly-Once Delivery Redux](http://bravenewgeek.com/you-cannot-have-exactly-once-delivery-redux/) by Tyler Treat * [Exactly-once Semantics are Possible: Here’s How Kafka Does it](https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/) diff --git a/testkit/src/main/scala/akka/kafka/testkit/internal/KafkaTestKit.scala b/testkit/src/main/scala/akka/kafka/testkit/internal/KafkaTestKit.scala index 69b0a5a8f..5be589aca 100644 --- a/testkit/src/main/scala/akka/kafka/testkit/internal/KafkaTestKit.scala +++ b/testkit/src/main/scala/akka/kafka/testkit/internal/KafkaTestKit.scala @@ -189,6 +189,8 @@ abstract class KafkaTestKitClass(override val system: ActorSystem, override val object KafkaTestKitClass { val topicCounter = new AtomicInteger() def createReplicationFactorBrokerProps(replicationFactor: Int): Map[String, String] = Map( - "offsets.topic.replication.factor" -> s"$replicationFactor" + "offsets.topic.replication.factor" -> s"$replicationFactor", + "transaction.state.log.replication.factor" -> s"$replicationFactor", + "transaction.state.log.min.isr" -> s"$replicationFactor" ) } diff --git a/tests/src/it/resources/logback-test.xml b/tests/src/it/resources/logback-test.xml index 742a2ebf1..87ee21503 100644 --- a/tests/src/it/resources/logback-test.xml +++ b/tests/src/it/resources/logback-test.xml @@ -29,6 +29,6 @@ - + diff --git a/tests/src/it/scala/akka/kafka/TransactionsPartitionedSourceSpec.scala b/tests/src/it/scala/akka/kafka/TransactionsPartitionedSourceSpec.scala new file mode 100644 index 000000000..c57f0567c --- /dev/null +++ b/tests/src/it/scala/akka/kafka/TransactionsPartitionedSourceSpec.scala @@ -0,0 +1,127 @@ +package akka.kafka + +import java.util.concurrent.atomic.AtomicInteger + +import akka.Done +import akka.kafka.scaladsl.SpecBase +import akka.kafka.testkit.KafkaTestkitTestcontainersSettings +import akka.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike +import akka.stream._ +import akka.stream.scaladsl.{Keep, RestartSource, Sink} +import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.{Ignore, Matchers, WordSpecLike} + +import scala.collection.immutable +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future, TimeoutException} +import scala.util.{Failure, Success} + +@Ignore +class TransactionsPartitionedSourceSpec extends SpecBase + with TestcontainersKafkaPerClassLike + with WordSpecLike + with ScalaFutures + with Matchers + with TransactionsOps + with Repeated { + + val replicationFactor = 2 + + implicit val pc = PatienceConfig(45.seconds, 1.second) + + override val testcontainersSettings = KafkaTestkitTestcontainersSettings(system) + .withNumBrokers(3) + .withInternalTopicsReplicationFactor(replicationFactor) + + "A multi-broker consume-transform-produce cycle" must { + "provide consistency when multiple partitioned transactional streams are being restarted" in assertAllStagesStopped { + val sourcePartitions = 4 + val destinationPartitions = 4 + val consumers = 3 + val replication = replicationFactor + + val sourceTopic = createTopic(1, sourcePartitions, replication) + val sinkTopic = createTopic(2, destinationPartitions, replication) + val group = createGroupId(1) + val transactionalId = createTransactionalId() + + val elements = 100 * 1000 // 100 * 1,000 = 100,000 + val restartAfter = (10 * 1000) / sourcePartitions // (10 * 1,000) / 10 = 100 + + val producers: immutable.Seq[Future[Done]] = + (0 until sourcePartitions).map { part => + produce(sourceTopic, range = 1 to elements, partition = part) + } + + Await.result(Future.sequence(producers), 4.minute) + + val consumerSettings = consumerDefaults.withGroupId(group) + + val completedCopy = new AtomicInteger(0) + val completedWithTimeout = new AtomicInteger(0) + + def runStream(id: String): UniqueKillSwitch = + RestartSource + .onFailuresWithBackoff(10.millis, 100.millis, 0.2)( + () => { + transactionalPartitionedCopyStream( + consumerSettings, + txProducerDefaults, + sourceTopic, + sinkTopic, + transactionalId, + idleTimeout = 10.seconds, + maxPartitions = sourcePartitions, + restartAfter = Some(restartAfter) + ) + .recover { + case e: TimeoutException => + if (completedWithTimeout.incrementAndGet() > 10) + "no more messages to copy" + else + throw new Error("Continue restarting copy stream") + } + } + ) + .viaMat(KillSwitches.single)(Keep.right) + .toMat(Sink.onComplete { + case Success(_) => + completedCopy.incrementAndGet() + case Failure(_) => // restart + })(Keep.left) + .run() + + val controls: Seq[UniqueKillSwitch] = (0 until consumers) + .map(_.toString) + .map(runStream) + + while (completedCopy.get() < consumers) { + Thread.sleep(2000) + } + + val consumer = consumePartitionOffsetValues( + probeConsumerSettings(createGroupId(2)), + sinkTopic, + elementsToTake = (elements * destinationPartitions).toLong + ) + + val actualValues = Await.result(consumer, 10.minutes) + + log.debug("Expected elements: {}, actual elements: {}", elements, actualValues.length) + + assertPartitionedConsistency(elements, destinationPartitions, actualValues) + + controls.foreach(_.shutdown()) + } + } + + private def probeConsumerSettings(groupId: String): ConsumerSettings[String, String] = + withProbeConsumerSettings(consumerDefaults, groupId) + + override def producerDefaults: ProducerSettings[String, String] = + withTestProducerSettings(super.producerDefaults) + + def txProducerDefaults: ProducerSettings[String, String] = + withTransactionalProducerSettings(super.producerDefaults) +} diff --git a/tests/src/it/scala/akka/kafka/TransactionsSourceSpec.scala b/tests/src/it/scala/akka/kafka/TransactionsSourceSpec.scala new file mode 100644 index 000000000..5a203e7b4 --- /dev/null +++ b/tests/src/it/scala/akka/kafka/TransactionsSourceSpec.scala @@ -0,0 +1,189 @@ +package akka.kafka + +import java.util.concurrent.atomic.AtomicInteger + +import akka.Done +import akka.kafka.scaladsl.Consumer.Control +import akka.kafka.scaladsl.{Consumer, SpecBase, Transactional} +import akka.kafka.testkit.KafkaTestkitTestcontainersSettings +import akka.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike +import akka.stream._ +import akka.stream.scaladsl.{Flow, Keep, RestartSource, Sink} +import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped +import org.apache.kafka.clients.producer.ProducerRecord +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.{Matchers, WordSpecLike} + +import scala.collection.immutable +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future, TimeoutException} +import scala.util.{Failure, Success} + +class TransactionsSourceSpec extends SpecBase + with TestcontainersKafkaPerClassLike + with WordSpecLike + with ScalaFutures + with Matchers + with TransactionsOps + with Repeated { + + implicit val pc = PatienceConfig(45.seconds, 1.second) + + override val testcontainersSettings = KafkaTestkitTestcontainersSettings(system) + .withNumBrokers(3) + .withInternalTopicsReplicationFactor(2) + + "A multi-broker consume-transform-produce cycle" must { + "provide consistency when multiple transactional streams are being restarted" in assertAllStagesStopped { + val sourcePartitions = 10 + val destinationPartitions = 4 + val consumers = 3 + val replication = 2 + + val sourceTopic = createTopic(1, sourcePartitions, replication) + val sinkTopic = createTopic(2, destinationPartitions, replication) + val group = createGroupId(1) + + val elements = 100 * 1000 + val restartAfter = 10 * 1000 + + val partitionSize = elements / sourcePartitions + val producers: immutable.Seq[Future[Done]] = + (0 until sourcePartitions).map( + part => produce(sourceTopic, ((part * partitionSize) + 1) to (partitionSize * (part + 1)), part) + ) + + Await.result(Future.sequence(producers), 1.minute) + + val consumerSettings = consumerDefaults.withGroupId(group) + + val completedCopy = new AtomicInteger(0) + val completedWithTimeout = new AtomicInteger(0) + + def runStream(id: String): UniqueKillSwitch = + RestartSource + .onFailuresWithBackoff(10.millis, 100.millis, 0.2)( + () => { + val transactionId = s"$group-$id" + transactionalCopyStream(consumerSettings, txProducerDefaults, sourceTopic, sinkTopic, transactionId, 10.seconds, Some(restartAfter)) + .recover { + case e: TimeoutException => + if (completedWithTimeout.incrementAndGet() > 10) + "no more messages to copy" + else + throw new Error("Continue restarting copy stream") + } + } + ) + .viaMat(KillSwitches.single)(Keep.right) + .toMat(Sink.onComplete { + case Success(_) => + completedCopy.incrementAndGet() + case Failure(_) => // restart + })(Keep.left) + .run() + + val controls: Seq[UniqueKillSwitch] = (0 until consumers) + .map(_.toString) + .map(runStream) + + val probeConsumerGroup = createGroupId(2) + + while (completedCopy.get() < consumers) { + Thread.sleep(2000) + } + + val consumer = offsetValueSource(probeConsumerSettings(probeConsumerGroup), sinkTopic) + .take(elements.toLong) + .idleTimeout(30.seconds) + .alsoTo( + Flow[(Long, String)] + .scan(0) { case (count, _) => count + 1 } + .filter(_ % 10000 == 0) + .log("received") + .to(Sink.ignore) + ) + .recover { + case t => (0L, "no-more-elements") + } + .filter(_._2 != "no-more-elements") + .runWith(Sink.seq) + + val values = Await.result(consumer, 10.minutes) + + val expected = (1 to elements).map(_.toString) + + log.info("Expected elements: {}, actual elements: {}", elements, values.length) + + checkForMissing(values, expected) + checkForDuplicates(values, expected) + + controls.foreach(_.shutdown()) + } + + "drain stream on partitions rebalancing" in assertAllStagesStopped { + // Runs a copying transactional flows that delay writing to the output partition using a `delay` stage. + // Creates more flows than ktps to trigger partition rebalancing. + // The output topic should contain the same elements as the input topic. + + val sourceTopic = createTopic(1, partitions = 2) + val sinkTopic = createTopic(2, partitions = 4) + val group = createGroupId(1) + + val elements = 100 + val batchSize = 10 + Await.result(produce(sourceTopic, 1 to elements), remainingOrDefault) + + val elementsWrote = new AtomicInteger(0) + + val consumerSettings = consumerDefaults.withGroupId(group) + + def runStream(id: String): Consumer.Control = { + val control: Control = + Transactional + .source(consumerSettings, Subscriptions.topics(sourceTopic)) + .map { msg => + ProducerMessage.single(new ProducerRecord[String, String](sinkTopic, msg.record.value), + msg.partitionOffset) + } + .take(batchSize.toLong) + .delay(3.seconds, strategy = DelayOverflowStrategy.backpressure) + .addAttributes(Attributes.inputBuffer(batchSize, batchSize + 1)) + .via(Transactional.flow(producerDefaults, s"$group-$id")) + .map(_ => elementsWrote.incrementAndGet()) + .toMat(Sink.ignore)(Keep.left) + .run() + control + } + + val controls: Seq[Control] = (0 until elements / batchSize) + .map(_.toString) + .map(runStream) + + val probeConsumerGroup = createGroupId(2) + val probeConsumer = valuesProbeConsumer(probeConsumerSettings(probeConsumerGroup), sinkTopic) + + periodicalCheck("Wait for elements written to Kafka", maxTries = 30, 1.second) { () => + elementsWrote.get() + }(_ > 10) + + probeConsumer + .request(elements.toLong) + .expectNextUnorderedN((1 to elements).map(_.toString)) + + probeConsumer.cancel() + + val futures: Seq[Future[Done]] = controls.map(_.shutdown()) + Await.result(Future.sequence(futures), remainingOrDefault) + } + } + + private def probeConsumerSettings(groupId: String): ConsumerSettings[String, String] = + withProbeConsumerSettings(consumerDefaults, groupId) + + override def producerDefaults: ProducerSettings[String, String] = + withTestProducerSettings(super.producerDefaults) + + def txProducerDefaults: ProducerSettings[String, String] = + withTransactionalProducerSettings(super.producerDefaults) +} diff --git a/tests/src/test/java/docs/javadsl/TransactionsExampleTest.java b/tests/src/test/java/docs/javadsl/TransactionsExampleTest.java index bb583d6fe..05b45de17 100644 --- a/tests/src/test/java/docs/javadsl/TransactionsExampleTest.java +++ b/tests/src/test/java/docs/javadsl/TransactionsExampleTest.java @@ -21,6 +21,7 @@ import org.junit.AfterClass; import org.junit.Test; +import java.time.Duration; import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutorService; @@ -53,6 +54,11 @@ protected Flow business() { return Flow.create(); } + /** Overridden to set a different default timeout for [[#resultOf]]. Default is 5 seconds. */ + protected Duration resultOfTimeout() { + return Duration.ofSeconds(15); + } + @Test public void sourceSink() throws Exception { ConsumerSettings consumerSettings = @@ -164,4 +170,48 @@ public void usingRestartSource() throws Exception { assertEquals(messages, resultOf(consumer.drainAndShutdown(ec)).size()); assertDone(streamCompletion); } + + // @Test + // public void partitionedSourceSink() throws Exception { + // ConsumerSettings consumerSettings = + // consumerDefaults().withGroupId(createGroupId(1)); + // String sourceTopic = createTopic(1, 2, 1); + // String targetTopic = createTopic(2, 1, 1); + // String transactionalId = createTransactionalId(1); + // // #partitionedTransactionalSink + // Consumer.DrainingControl control = + // Transactional.partitionedSource(consumerSettings, Subscriptions.topics(sourceTopic)) + // .mapAsync( + // 8, + // pair -> { + // Source, NotUsed> source = + // pair.second(); + // return source + // .via(business()) + // .map( + // msg -> + // ProducerMessage.single( + // new ProducerRecord<>( + // targetTopic, msg.record().key(), msg.record().value()), + // msg.partitionOffset())) + // .runWith(Transactional.sink(producerSettings, transactionalId), + // materializer); + // }) + // .toMat(Sink.ignore(), Keep.both()) + // .mapMaterializedValue(Consumer::createDrainingControl) + // .run(materializer); + // + // // ... + // + // // #partitionedTransactionalSink + // Consumer.DrainingControl>> consumer = + // consumeString(targetTopic, 10); + // produceString(sourceTopic, 10, partition0); + // assertDone(consumer.isShutdown()); + // // #partitionedTransactionalSink + // control.drainAndShutdown(ec); + // // #partitionedTransactionalSink + // assertDone(control.isShutdown()); + // assertEquals(10, resultOf(consumer.drainAndShutdown(ec)).size()); + // } } diff --git a/tests/src/test/resources/application.conf b/tests/src/test/resources/application.conf index ef53920e8..404d59b7e 100644 --- a/tests/src/test/resources/application.conf +++ b/tests/src/test/resources/application.conf @@ -17,6 +17,9 @@ akka { } } +# default is 10 seconds +# akka.kafka.testkit.consumer-group-timeout = 20 seconds + # #consumer-config-inheritance our-kafka-consumer: ${akka.kafka.consumer} { kafka-clients { diff --git a/tests/src/test/resources/logback-test.xml b/tests/src/test/resources/logback-test.xml index 7cef73fb5..c68dcca95 100644 --- a/tests/src/test/resources/logback-test.xml +++ b/tests/src/test/resources/logback-test.xml @@ -43,7 +43,7 @@ - + diff --git a/tests/src/test/scala/akka/kafka/Repeated.scala b/tests/src/test/scala/akka/kafka/Repeated.scala new file mode 100644 index 000000000..7f237ce23 --- /dev/null +++ b/tests/src/test/scala/akka/kafka/Repeated.scala @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2014 - 2016 Softwaremill + * Copyright (C) 2016 - 2019 Lightbend Inc. + */ + +package akka.kafka + +import org.scalatest._ + +/** + * Repeat test suite n times. Default: 1. + * Define number of times to repeat by overriding `timesToRepeat` or passing `-DtimesToRepeat=n` + * + * Ex) To run a single test 10 times from the terminal + * + * {{{ + * sbt "tests/testOnly *.TransactionsSpec -- -z \"must support copy stream with merging and multi message\" -DtimesToRepeat=2" + * }}} + */ +trait Repeated extends TestSuiteMixin { this: TestSuite => + def timesToRepeat: Int = 1 + + protected abstract override def runTest(testName: String, args: Args): Status = { + def run0(times: Int): Status = { + val status = super.runTest(testName, args) + if (times <= 1) status else status.thenRun(run0(times - 1)) + } + + run0(args.configMap.getWithDefault("timesToRepeat", timesToRepeat.toString).toInt) + } + + /** + * Retry a code block n times or until Success + */ + @annotation.tailrec + final def retry[T](n: Int)(fn: Int => T): T = + util.Try { fn(n + 1) } match { + case util.Success(x) => x + case _ if n > 1 => retry(n - 1)(fn) + case util.Failure(e) => throw e + } +} diff --git a/tests/src/test/scala/akka/kafka/TransactionsOps.scala b/tests/src/test/scala/akka/kafka/TransactionsOps.scala new file mode 100644 index 000000000..5eeabd74c --- /dev/null +++ b/tests/src/test/scala/akka/kafka/TransactionsOps.scala @@ -0,0 +1,212 @@ +/* + * Copyright (C) 2014 - 2016 Softwaremill + * Copyright (C) 2016 - 2019 Lightbend Inc. + */ + +package akka.kafka +import akka.{Done, NotUsed} +import akka.actor.ActorSystem +import akka.kafka.ConsumerMessage.PartitionOffset +import akka.kafka.ProducerMessage.MultiMessage +import akka.kafka.scaladsl.Consumer.Control +import akka.kafka.scaladsl.{Consumer, Producer, Transactional} +import akka.stream.Materializer +import akka.stream.scaladsl.{Flow, Sink, Source} +import akka.stream.testkit.TestSubscriber +import akka.stream.testkit.scaladsl.TestSink +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} +import org.scalatest.{Matchers, TestSuite} + +import scala.collection.immutable +import scala.concurrent.Future +import scala.concurrent.duration._ + +trait TransactionsOps extends TestSuite with Matchers { + def transactionalCopyStream( + consumerSettings: ConsumerSettings[String, String], + producerSettings: ProducerSettings[String, String], + sourceTopic: String, + sinkTopic: String, + transactionalId: String, + idleTimeout: FiniteDuration, + restartAfter: Option[Int] = None + ): Source[ProducerMessage.Results[String, String, PartitionOffset], Control] = + Transactional + .source(consumerSettings, Subscriptions.topics(sourceTopic)) + .zip(Source.unfold(1)(count => Some((count + 1, count)))) + .map { + case (msg, count) => + if (restartAfter.exists(restartAfter => count >= restartAfter)) + throw new Error("Restarting transactional copy stream") + msg + } + .idleTimeout(idleTimeout) + .map { msg => + ProducerMessage.single(new ProducerRecord[String, String](sinkTopic, msg.record.value), msg.partitionOffset) + } + .via(Transactional.flow(producerSettings, transactionalId)) + + /** + * Copy messages from a source to sink topic. Source and sink must have exactly the same number of partitions. + */ + def transactionalPartitionedCopyStream( + consumerSettings: ConsumerSettings[String, String], + producerSettings: ProducerSettings[String, String], + sourceTopic: String, + sinkTopic: String, + transactionalId: String, + idleTimeout: FiniteDuration, + maxPartitions: Int, + restartAfter: Option[Int] = None + ): Source[ProducerMessage.Results[String, String, PartitionOffset], Control] = + Transactional + .partitionedSource(consumerSettings, Subscriptions.topics(sourceTopic)) + .flatMapMerge( + maxPartitions, { + case (_, source) => + val results: Source[ProducerMessage.Results[String, String, PartitionOffset], NotUsed] = source + .zip(Source.unfold(1)(count => Some((count + 1, count)))) + .map { + case (msg, count) => + if (restartAfter.exists(restartAfter => count >= restartAfter)) + throw new Error("Restarting transactional copy stream") + msg + } + .idleTimeout(idleTimeout) + .map { msg => + ProducerMessage.single(new ProducerRecord[String, String](sinkTopic, + msg.record.partition(), + msg.record.key(), + msg.record.value), + msg.partitionOffset) + } + .via(Transactional.flow(producerSettings, transactionalId)) + results + } + ) + + def produceToAllPartitions(producerSettings: ProducerSettings[String, String], + topic: String, + partitions: Int, + range: Range)(implicit mat: Materializer): Future[Done] = + Source(range) + .map { n => + val msgs = (0 until partitions).map(p => new ProducerRecord(topic, p, n.toString, n.toString)) + MultiMessage(msgs, n) + } + .via(Producer.flexiFlow(producerSettings)) + .runWith(Sink.ignore) + + def checkForDuplicates(values: immutable.Seq[(Long, String)], expected: immutable.IndexedSeq[String]): Unit = + withClue("Checking for duplicates: ") { + val duplicates = values.map(_._2) diff expected + if (duplicates.nonEmpty) { + val duplicatesWithDifferentOffsets = values + .filter { + case (_, value) => duplicates.contains(value) + } + .groupBy(_._2) // message + // workaround for Scala collection refactoring of `mapValues` to remain compat with 2.12/2.13 cross build + .map { case (k, v) => (k, v.map(_._1)) } // keep offset + .filter { + case (_, offsets) => offsets.distinct.size > 1 + } + + if (duplicatesWithDifferentOffsets.nonEmpty) { + fail(s"Got ${duplicates.size} duplicates. Messages and their offsets: $duplicatesWithDifferentOffsets") + } else { + println("Got duplicates, but all of them were due to rebalance replay when counting") + } + } + } + + def checkForMissing(values: immutable.Seq[(Long, String)], expected: immutable.IndexedSeq[String]): Unit = + withClue("Checking for missing: ") { + val missing = expected diff values.map(_._2) + if (missing.nonEmpty) { + val continuousBlocks = missing + .scanLeft(("-1", 0)) { + case ((last, block), curr) => if (last.toInt + 1 == curr.toInt) (curr, block) else (curr, block + 1) + } + .tail + .groupBy(_._2) + val blockDescription = continuousBlocks + .map { block => + val msgs = block._2.map(_._1) + s"Missing ${msgs.size} in continuous block, first ten: ${msgs.take(10)}" + } + .mkString(" ") + fail(s"Did not get ${missing.size} expected messages. $blockDescription") + } + } + + def valuesProbeConsumer( + settings: ConsumerSettings[String, String], + topic: String + )(implicit actorSystem: ActorSystem, mat: Materializer): TestSubscriber.Probe[String] = + offsetValueSource(settings, topic) + .map(_._2) + .runWith(TestSink.probe) + + def offsetValueSource(settings: ConsumerSettings[String, String], + topic: String): Source[(Long, String), Consumer.Control] = + Consumer + .plainSource(settings, Subscriptions.topics(topic)) + .map(r => (r.offset(), r.value())) + + def consumePartitionOffsetValues(settings: ConsumerSettings[String, String], topic: String, elementsToTake: Long)( + implicit mat: Materializer + ): Future[immutable.Seq[(Int, Long, String)]] = + Consumer + .plainSource(settings, Subscriptions.topics(topic)) + .map(r => (r.partition(), r.offset(), r.value())) + .take(elementsToTake) + .idleTimeout(30.seconds) + .alsoTo( + Flow[(Int, Long, String)] + .scan(0) { case (count, _) => count + 1 } + .filter(_ % 100 == 0) + .log("received") + .to(Sink.ignore) + ) + .recover { + case t => (0, 0L, "no-more-elements") + } + .filter(_._3 != "no-more-elements") + .runWith(Sink.seq) + + def assertPartitionedConsistency( + elements: Int, + maxPartitions: Int, + values: immutable.Seq[(Int, Long, String)] + ): Unit = { + val expectedValues: immutable.Seq[String] = (1 to elements).map(_.toString) + + for (partition <- 0 until maxPartitions) { + println(s"Asserting values for partition: $partition") + + val partitionMessages: immutable.Seq[String] = + values.filter(_._1 == partition).map { case (_, _, value) => value } + + assert(partitionMessages.length == elements) + expectedValues should contain theSameElementsInOrderAs partitionMessages + } + } + + def withProbeConsumerSettings(settings: ConsumerSettings[String, String], + groupId: String): ConsumerSettings[String, String] = + settings + .withGroupId(groupId) + .withProperties(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed") + + def withTestProducerSettings(settings: ProducerSettings[String, String]): ProducerSettings[String, String] = + settings + .withCloseTimeout(Duration.Zero) + .withProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") + + def withTransactionalProducerSettings(settings: ProducerSettings[String, String]): ProducerSettings[String, String] = + settings + .withParallelism(20) + .withCloseTimeout(Duration.Zero) +} diff --git a/tests/src/test/scala/akka/kafka/internal/ProducerSpec.scala b/tests/src/test/scala/akka/kafka/internal/ProducerSpec.scala index 072cdce74..3732f778d 100644 --- a/tests/src/test/scala/akka/kafka/internal/ProducerSpec.scala +++ b/tests/src/test/scala/akka/kafka/internal/ProducerSpec.scala @@ -8,7 +8,7 @@ package akka.kafka.internal import java.util.concurrent.CompletableFuture import akka.actor.ActorSystem -import akka.kafka.ConsumerMessage.{GroupTopicPartition, PartitionOffset} +import akka.kafka.ConsumerMessage.{GroupTopicPartition, PartitionOffset, PartitionOffsetCommittedMarker} import akka.kafka.ProducerMessage._ import akka.kafka.scaladsl.Producer import akka.kafka.{ConsumerMessage, ProducerMessage, ProducerSettings} @@ -70,13 +70,20 @@ class ProducerSpec(_system: ActorSystem) -1) def toMessage(tuple: (Record, RecordMetadata)) = Message(tuple._1, NotUsed) - private[kafka] def toTxMessage(tuple: (Record, RecordMetadata), committer: CommittedMarker) = + private[kafka] def toTxMessage(tuple: (Record, RecordMetadata), committer: CommittedMarker) = { + val consumerMessage = ConsumerMessage + .PartitionOffset(GroupTopicPartition(group, tuple._1.topic(), 1), tuple._2.offset()) + val partitionOffsetCommittedMarker = + PartitionOffsetCommittedMarker(consumerMessage.key, + consumerMessage.offset, + committer, + fromPartitionedSource = false) ProducerMessage.Message( tuple._1, - ConsumerMessage - .PartitionOffset(GroupTopicPartition(group, tuple._1.topic(), 1), tuple._2.offset()) - .withCommittedMarker(committer) + partitionOffsetCommittedMarker ) + } + def result(r: Record, m: RecordMetadata) = Result(m, ProducerMessage.Message(r, NotUsed)) val toResult = (result _).tupled @@ -106,7 +113,7 @@ class ProducerSpec(_system: ActorSystem) val pSettings = settings.withProducerFactory(_ => mock.mock).withCloseProducerOnStop(closeOnStop) Flow .fromGraph( - new TransactionalProducerStage[K, V, P](pSettings) + new TransactionalProducerStage[K, V, P](pSettings, "transactionalId") ) .mapAsync(1)(identity) } @@ -367,6 +374,23 @@ class ProducerSpec(_system: ActorSystem) } } + it should "not initialize and begin transaction when there are no messages" in { + assertAllStagesStopped { + val client = new ProducerMock[K, V](ProducerMock.handlers.fail) + + val probe = Source + .empty[Msg] + .via(testTransactionProducerFlow(client)) + .runWith(TestSink.probe) + + probe + .request(1) + .expectComplete() + + client.verifyNoMoreInteractions() + } + } + it should "initialize and begin a transaction when first run" in { assertAllStagesStopped { val input = recordAndMetadata(1) @@ -387,8 +411,7 @@ class ProducerSpec(_system: ActorSystem) source.sendNext(txMsg) sink.requestNext() - // we must wait for the producer to be asynchronously assigned before observing interactions with the mock - awaitAssert(client.verifyTxInitialized()) + awaitAssert(client.verifyTxInitialized(), 2.second) source.sendComplete() sink.expectComplete() @@ -498,10 +521,14 @@ class ProducerSpec(_system: ActorSystem) val txMsg = toTxMessage(input, committedMarker.mock) source.sendNext(txMsg) + + awaitAssert(client.verifyTxInitialized()) + source.sendError(new Exception()) // Here we can not be sure that all messages from source delivered to producer // because of buffers in akka-stream and faster error pushing that ignores buffers + // TODO: we can await a tx to be initialized before sending the error (which means the producer was assigned and first msg processed). does that invalidate this test? Await.ready(sink, remainingOrDefault) sink.value should matchPattern { diff --git a/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala index deb655223..c49f80832 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala @@ -13,6 +13,7 @@ import akka.kafka._ import akka.kafka.scaladsl.Consumer.DrainingControl import akka.kafka.testkit.scaladsl.TestcontainersKafkaLike import akka.pattern.ask +import akka.stream.OverflowStrategy import akka.stream.scaladsl.{Keep, Sink, Source} import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped import akka.stream.testkit.scaladsl.TestSink @@ -328,6 +329,91 @@ class IntegrationSpec extends SpecBase with TestcontainersKafkaLike with Inside } } + "rebalance safely using transactional partitioned flow" in assertAllStagesStopped { + val partitions = 4 + val totalMessages = 200L + + val topic = createTopic(1, partitions) + val outTopic = createTopic(2, partitions) + val group = createGroupId(1) + val transactionalId = createTransactionalId() + val sourceSettings = consumerDefaults + .withGroupId(group) + + val topicSubscription = Subscriptions.topics(topic) + + def createAndRunTransactionalFlow(subscription: AutoSubscription) = + Transactional + .partitionedSource(sourceSettings, subscription) + .map { + case (tp, source) => + source + .map { msg => + ProducerMessage.single(new ProducerRecord[String, String](outTopic, + msg.record.partition(), + msg.record.key(), + msg.record.value() + "-out"), + msg.partitionOffset) + } + .to(Transactional.sink(producerDefaults, transactionalId)) + .run() + } + .toMat(Sink.ignore)(Keep.both) + .mapMaterializedValue(DrainingControl.apply) + .run() + + def createAndRunProducer(elements: immutable.Iterable[Long]) = + Source(elements) + .map(n => new ProducerRecord(topic, (n % partitions).toInt, DefaultKey, n.toString)) + .runWith(Producer.plainSink(producerDefaults.withProducer(testProducer))) + + val control = createAndRunTransactionalFlow(topicSubscription) + + // waits until all partitions are assigned to the single consumer + waitUntilConsumerSummary(group) { + case singleConsumer :: Nil => singleConsumer.assignment.topicPartitions.size == partitions + } + + createAndRunProducer(0L until totalMessages / 2).futureValue + + // create another consumer with the same groupId to trigger re-balancing + val control2 = createAndRunTransactionalFlow(topicSubscription) + + // waits until partitions are assigned across both consumers + waitUntilConsumerSummary(group) { + case consumer1 :: consumer2 :: Nil => + val half = partitions / 2 + consumer1.assignment.topicPartitions.size == half && consumer2.assignment.topicPartitions.size == half + } + + createAndRunProducer(totalMessages / 2 until totalMessages).futureValue + + val checkingGroup = createGroupId(2) + + val (counterQueue, counterCompletion) = Source + .queue[String](8, OverflowStrategy.fail) + .scan(0L)((c, _) => c + 1) + .takeWhile(_ < totalMessages, inclusive = true) + .toMat(Sink.last)(Keep.both) + .run() + + val streamMessages = Consumer + .plainSource[String, String](consumerDefaults + .withGroupId(checkingGroup) + .withProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"), + Subscriptions.topics(outTopic)) + .mapAsync(1)(el => counterQueue.offer(el.value()).map(_ => el)) + .scan(0L)((c, _) => c + 1) + .toMat(Sink.last)(Keep.both) + .mapMaterializedValue(DrainingControl.apply) + .run() + + counterCompletion.futureValue shouldBe totalMessages + + control.drainAndShutdown().futureValue + control2.drainAndShutdown().futureValue + streamMessages.drainAndShutdown().futureValue shouldBe totalMessages + } } "Consumer control" must { diff --git a/tests/src/test/scala/akka/kafka/scaladsl/PartitionedSourcesSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/PartitionedSourcesSpec.scala index f7452723a..e9d15c8fc 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/PartitionedSourcesSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/PartitionedSourcesSpec.scala @@ -30,10 +30,6 @@ class PartitionedSourcesSpec extends SpecBase with TestcontainersKafkaLike with implicit val patience = PatienceConfig(15.seconds, 500.millis) override def sleepAfterProduce: FiniteDuration = 500.millis - override def consumerDefaults: ConsumerSettings[String, String] = - super.consumerDefaults - .withStopTimeout(10.millis) - "Partitioned source" must { "begin consuming from the beginning of the topic" in assertAllStagesStopped { diff --git a/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala index 7cd224479..f0f1e0495 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala @@ -5,28 +5,23 @@ package akka.kafka.scaladsl -import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicBoolean import akka.Done import akka.kafka.ConsumerMessage.PartitionOffset -import akka.kafka.{ProducerMessage, _} import akka.kafka.scaladsl.Consumer.Control -import akka.kafka.testkit.scaladsl.{TestcontainersKafkaLike} -import akka.stream.{Attributes, DelayOverflowStrategy, KillSwitches, UniqueKillSwitch} -import akka.stream.scaladsl.{Flow, Keep, RestartSource, Sink, Source} -import akka.stream.testkit.TestSubscriber +import akka.kafka.testkit.scaladsl.TestcontainersKafkaLike +import akka.kafka.{ProducerMessage, _} +import akka.stream.scaladsl.{Keep, RestartSource, Sink} import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped -import akka.stream.testkit.scaladsl.TestSink -import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerRecord +import org.scalatest.RecoverMethods._ import scala.collection.immutable -import scala.concurrent.{Await, Future, TimeoutException} import scala.concurrent.duration._ -import scala.util.{Failure, Success} - -class TransactionsSpec extends SpecBase with TestcontainersKafkaLike { +import scala.concurrent.{Await, Future} +class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with TransactionsOps with Repeated { "A consume-transform-produce cycle" must { "complete in happy-path scenario" in { @@ -39,9 +34,10 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike { val consumerSettings = consumerDefaults.withGroupId(group) - val control = transactionalCopyStream(consumerSettings, sourceTopic, sinkTopic, group, Int.MaxValue, 10.seconds) - .toMat(Sink.ignore)(Keep.left) - .run() + val control = + transactionalCopyStream(consumerSettings, txProducerDefaults, sourceTopic, sinkTopic, group, 10.seconds) + .toMat(Sink.ignore)(Keep.left) + .run() val probeConsumerGroup = createGroupId(2) @@ -214,7 +210,12 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike { def runStream(id: String): Consumer.Control = { val control: Control = - transactionalCopyStream(consumerSettings, sourceTopic, sinkTopic, s"$group-$id", Int.MaxValue, 10.seconds) + transactionalCopyStream(consumerSettings, + txProducerDefaults, + sourceTopic, + sinkTopic, + s"$group-$id", + 10.seconds) .toMat(Sink.ignore)(Keep.left) .run() control @@ -238,180 +239,6 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike { Await.result(Future.sequence(futures), remainingOrDefault) } - "provide consistency when multiple transactional streams are being restarted" in assertAllStagesStopped { - val sourcePartitions = 10 - val destinationPartitions = 4 - val consumers = 3 - - val sourceTopic = createTopic(1, sourcePartitions) - val sinkTopic = createTopic(2, destinationPartitions) - val group = createGroupId(1) - - val elements = 100 * 1000 - val restartAfter = 10 * 1000 - - val partitionSize = elements / sourcePartitions - val producers = - (0 until sourcePartitions).map( - part => produce(sourceTopic, ((part * partitionSize) + 1) to (partitionSize * (part + 1)), part) - ) - Await.result(Future.sequence(producers), 1.minute) - - val consumerSettings = consumerDefaults.withGroupId(group) - - val completedCopy = new AtomicInteger(0) - val completedWithTimeout = new AtomicInteger(0) - - def runStream(id: String): UniqueKillSwitch = - RestartSource - .onFailuresWithBackoff(10.millis, 100.millis, 0.2)( - () => { - val transactionId = s"$group-$id" - transactionalCopyStream(consumerSettings, sourceTopic, sinkTopic, transactionId, restartAfter, 10.seconds) - .recover { - case e: TimeoutException => - if (completedWithTimeout.incrementAndGet() > 10) - "no more messages to copy" - else - throw new Error("Continue restarting copy stream") - } - } - ) - .viaMat(KillSwitches.single)(Keep.right) - .toMat(Sink.onComplete { - case Success(_) => - completedCopy.incrementAndGet() - case Failure(_) => // restart - })(Keep.left) - .run() - - val controls: Seq[UniqueKillSwitch] = (0 until consumers) - .map(_.toString) - .map(runStream) - - val probeConsumerGroup = createGroupId(2) - - while (completedCopy.get() < consumers) { - Thread.sleep(2000) - } - - val consumer = offsetValueSource(probeConsumerSettings(probeConsumerGroup), sinkTopic) - .take(elements.toLong) - .idleTimeout(30.seconds) - .alsoTo( - Flow[(Long, String)] - .scan(0) { case (count, _) => count + 1 } - .filter(_ % 10000 == 0) - .log("received") - .to(Sink.ignore) - ) - .recover { - case t => (0, "no-more-elements") - } - .filter(_._2 != "no-more-elements") - .runWith(Sink.seq) - val values = Await.result(consumer, 10.minutes) - - val expected = (1 to elements).map(_.toString) - withClue("Checking for duplicates: ") { - val duplicates = values.map(_._2) diff expected - if (duplicates.nonEmpty) { - val duplicatesWithDifferentOffsets = values - .filter { - case (_, value) => duplicates.contains(value) - } - .groupBy(_._2) // message - // workaround for Scala collection refactoring of `mapValues` to remain compat with 2.12/2.13 cross build - .map { case (k, v) => (k, v.map(_._1)) } // keep offset - .filter { - case (_, offsets) => offsets.distinct.size > 1 - } - - if (duplicatesWithDifferentOffsets.nonEmpty) { - fail(s"Got ${duplicates.size} duplicates. Messages and their offsets: $duplicatesWithDifferentOffsets") - } else { - println("Got duplicates, but all of them were due to rebalance replay when counting") - } - } - } - withClue("Checking for missing: ") { - val missing = expected diff values.map(_._2) - if (missing.nonEmpty) { - val continuousBlocks = missing - .scanLeft(("-1", 0)) { - case ((last, block), curr) => if (last.toInt + 1 == curr.toInt) (curr, block) else (curr, block + 1) - } - .tail - .groupBy(_._2) - val blockDescription = continuousBlocks - .map { block => - val msgs = block._2.map(_._1) - s"Missing ${msgs.size} in continuous block, first ten: ${msgs.take(10)}" - } - .mkString(" ") - fail(s"Did not get ${missing.size} expected messages. $blockDescription") - } - } - - controls.map(_.shutdown()) - } - - "drain stream on partitions rebalancing" in assertAllStagesStopped { - // Runs a copying transactional flows that delay writing to the output partition using a `delay` stage. - // Creates more flows than ktps to trigger partition rebalancing. - // The output topic should contain the same elements as the input topic. - - val sourceTopic = createTopic(1, partitions = 2) - val sinkTopic = createTopic(2, partitions = 4) - val group = createGroupId(1) - - val elements = 100 - val batchSize = 10 - Await.result(produce(sourceTopic, 1 to elements), remainingOrDefault) - - val elementsWrote = new AtomicInteger(0) - - val consumerSettings = consumerDefaults.withGroupId(group) - - def runStream(id: String): Consumer.Control = { - val control: Control = - Transactional - .source(consumerSettings, Subscriptions.topics(sourceTopic)) - .map { msg => - ProducerMessage.single(new ProducerRecord[String, String](sinkTopic, msg.record.value), - msg.partitionOffset) - } - .take(batchSize.toLong) - .delay(3.seconds, strategy = DelayOverflowStrategy.backpressure) - .addAttributes(Attributes.inputBuffer(batchSize, batchSize + 1)) - .via(Transactional.flow(producerDefaults, s"$group-$id")) - .map(_ => elementsWrote.incrementAndGet()) - .toMat(Sink.ignore)(Keep.left) - .run() - control - } - - val controls: Seq[Control] = (0 until elements / batchSize) - .map(_.toString) - .map(runStream) - - val probeConsumerGroup = createGroupId(2) - val probeConsumer = valuesProbeConsumer(probeConsumerSettings(probeConsumerGroup), sinkTopic) - - periodicalCheck("Wait for elements written to Kafka", maxTries = 30, 1.second) { () => - elementsWrote.get() - }(_ > 10) - - probeConsumer - .request(elements.toLong) - .expectNextUnorderedN((1 to elements).map(_.toString)) - - probeConsumer.cancel() - - val futures: Seq[Future[Done]] = controls.map(_.shutdown()) - Await.result(Future.sequence(futures), remainingOrDefault) - } - "support copy stream with merging and multi messages" in assertAllStagesStopped { val sourceTopic = createTopic(1) val sumsTopic = createTopic(2) @@ -427,6 +254,9 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike { .source(consumerSettings, Subscriptions.topics(sourceTopic)) .groupedWithin(10, 5.seconds) .map { msgs => + val values = msgs.map(_.record.value().toInt) + log.debug(s"msg group length {}. values: {}", values.length, values) + val sum = msgs.map(_.record.value().toInt).sum.toString val concat = msgs.map(_.record.value()).reduce(_ + _) @@ -459,49 +289,154 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike { concatsConsumer.cancel() Await.result(control.shutdown(), remainingOrDefault) } - } - private def transactionalCopyStream( - consumerSettings: ConsumerSettings[String, String], - sourceTopic: String, - sinkTopic: String, - transactionalId: String, - restartAfter: Int, - idleTimeout: FiniteDuration - ): Source[ProducerMessage.Results[String, String, PartitionOffset], Control] = - Transactional - .source(consumerSettings, Subscriptions.topics(sourceTopic)) - .zip(Source.unfold(1)(count => Some((count + 1, count)))) - .map { - case (msg, count) => - if (count >= restartAfter) throw new Error("Restarting transactional copy stream") - msg - } - .idleTimeout(idleTimeout) - .map { msg => - ProducerMessage.single(new ProducerRecord[String, String](sinkTopic, msg.record.value), msg.partitionOffset) + "complete partitioned source in happy-path scenario" in { + assertAllStagesStopped { + val elements = 200 + val maxPartitions = 2 + val sourceTopic = createTopic(1, maxPartitions) + val sinkTopic = createTopic(2, maxPartitions) + val group = createGroupId(1) + val transactionalId = createTransactionalId() + + val consumerSettings = consumerDefaults.withGroupId(group) + + def runTransactional = + Transactional + .partitionedSource(consumerSettings, Subscriptions.topics(sourceTopic)) + .mapAsyncUnordered(maxPartitions) { + case (_, source) => + source + .map { msg => + ProducerMessage.single(new ProducerRecord[String, String](sinkTopic, + msg.record.partition(), + msg.record.key(), + msg.record.value), + msg.partitionOffset) + } + .runWith(Transactional.sink(producerDefaults, transactionalId)) + } + .toMat(Sink.ignore)(Keep.left) + .run() + + val control = runTransactional + val control2 = runTransactional + + waitUntilConsumerSummary(group) { + case consumer1 :: consumer2 :: Nil => + val half = maxPartitions / 2 + consumer1.assignment.topicPartitions.size == half && consumer2.assignment.topicPartitions.size == half + } + + val testProducerSettings = producerDefaults.withProducer(testProducer) + Await.result( + produceToAllPartitions(testProducerSettings, sourceTopic, maxPartitions, 1 to elements), + remainingOrDefault + ) + + val consumer = consumePartitionOffsetValues( + probeConsumerSettings(createGroupId(2)), + sinkTopic, + elementsToTake = (elements * maxPartitions).toLong + ) + + val actualValues: immutable.Seq[(Int, Long, String)] = Await.result(consumer, 60.seconds) + assertPartitionedConsistency(elements, maxPartitions, actualValues) + + Await.result(control.shutdown(), remainingOrDefault) + Await.result(control2.shutdown(), remainingOrDefault) } - .via(Transactional.flow(producerDefaults, transactionalId)) + } - private def probeConsumerSettings(groupId: String): ConsumerSettings[String, String] = - consumerDefaults - .withGroupId(groupId) - .withProperties(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed") + "complete partitioned source with a sub source failure and rebalance of partitions to second instance" in { + assertAllStagesStopped { + val elements = 200 + val maxPartitions = 2 + val sourceTopic = createTopic(1, maxPartitions) + val sinkTopic = createTopic(2, maxPartitions) + val group = createGroupId(1) + val transactionalId = createTransactionalId() + + val testProducerSettings = producerDefaults.withProducer(testProducer) + val consumerSettings = consumerDefaults + .withGroupId(group) + .withStopTimeout(0.millis) // time to wait to schedule Stop to be sent to consumer actor + .withCloseTimeout(0.millis) // timeout while waiting for KafkaConsumer.close + + val failureOccurred = new AtomicBoolean() - private def valuesProbeConsumer(settings: ConsumerSettings[String, String], - topic: String): TestSubscriber.Probe[String] = - offsetValueSource(settings, topic) - .map(_._2) - .runWith(TestSink.probe) + def runTransactional(): (Control, Future[Done]) = + Transactional + .partitionedSource(consumerSettings, Subscriptions.topics(sourceTopic)) + .mapAsyncUnordered(maxPartitions) { + case (tp, source) => + source + .map { msg => + if (tp.partition() == 1 && msg.record.value.toInt == elements / 2 && !failureOccurred.get()) { + failureOccurred.set(true) + throw new Exception("sub source failure") + } + ProducerMessage.single(new ProducerRecord[String, String](sinkTopic, + msg.record.partition(), + msg.record.key(), + msg.record.value), + msg.partitionOffset) + } + .runWith(Transactional.sink(producerDefaults, transactionalId)) + } + .toMat(Sink.ignore)(Keep.both) + .run() - private def offsetValueSource(settings: ConsumerSettings[String, String], - topic: String): Source[(Long, String), Consumer.Control] = - Consumer - .plainSource(settings, Subscriptions.topics(topic)) - .map(r => (r.offset(), r.value())) + log.info("Running 2 transactional workloads with prefix transactional id: {}", transactionalId) + val (control1, streamResult1) = runTransactional() + val (control2, streamResult2) = runTransactional() + + log.info("Waiting until partitions are assigned across both consumers") + waitUntilConsumerSummary(group) { + case consumer1 :: consumer2 :: Nil => + val half = maxPartitions / 2 + consumer1.assignment.topicPartitions.size == half && consumer2.assignment.topicPartitions.size == half + } + + log.info("Seeding topic with '{}' elements for all partitions ({})", elements, maxPartitions) + Await.result(produceToAllPartitions(testProducerSettings, sourceTopic, maxPartitions, 1 to elements), + remainingOrDefault) + + val consumer = consumePartitionOffsetValues( + probeConsumerSettings(createGroupId(2)), + sinkTopic, + elementsToTake = (elements * maxPartitions).toLong + ) + + log.info("Retrieve actual values") + val actualValues: immutable.Seq[(Int, Long, String)] = Await.result(consumer, 60.seconds) + + log.info("Waiting until partitions are assigned to one non-failed consumer") + waitUntilConsumerSummary(group) { + case singleConsumer :: Nil => singleConsumer.assignment.topicPartitions.size == maxPartitions + } + + log.info("Assert that one of the stream results has failed") + if (streamResult1.isCompleted) + recoverToSucceededIf[Exception](streamResult1) + else if (streamResult2.isCompleted) + recoverToSucceededIf[Exception](streamResult2) + else fail("Expected one of the stream results to have failed") + + assertPartitionedConsistency(elements, maxPartitions, actualValues) + + Await.result(control1.shutdown(), remainingOrDefault) + Await.result(control2.shutdown(), remainingOrDefault) + } + } + } + + def probeConsumerSettings(groupId: String): ConsumerSettings[String, String] = + withProbeConsumerSettings(consumerDefaults, groupId) override def producerDefaults: ProducerSettings[String, String] = - super.producerDefaults - .withParallelism(20) - .withCloseTimeout(Duration.Zero) + withTestProducerSettings(super.producerDefaults) + + def txProducerDefaults: ProducerSettings[String, String] = + withTransactionalProducerSettings(super.producerDefaults) } diff --git a/tests/src/test/scala/docs/scaladsl/TransactionsExample.scala b/tests/src/test/scala/docs/scaladsl/TransactionsExample.scala index 9fa552bb2..4854b46b3 100644 --- a/tests/src/test/scala/docs/scaladsl/TransactionsExample.scala +++ b/tests/src/test/scala/docs/scaladsl/TransactionsExample.scala @@ -127,4 +127,45 @@ class TransactionsExample extends DocsSpecBase with TestcontainersKafkaLike { result.futureValue should have size (10) } +// "Partitioned transactional sink" should "work" in { +// val consumerSettings = consumerDefaults.withGroupId(createGroupId()) +// val producerSettings = producerDefaults +// val maxPartitions = 2 +// val sourceTopic = createTopic(1, maxPartitions, 1) +// val sinkTopic = createTopicName(2) +// val transactionalId = createTransactionalId() +// // #partitionedTransactionalSink +// val control = +// Transactional +// .partitionedSource(consumerSettings, Subscriptions.topics(sourceTopic)) +// .mapAsyncUnordered(maxPartitions) { +// case (tp, source) => +// source +// .via(businessFlow) +// .map { msg => +// ProducerMessage.single(new ProducerRecord(sinkTopic, msg.record.key, msg.record.value), +// msg.partitionOffset) +// } +// .runWith(Transactional.sink(producerSettings, transactionalId)) +// } +// .toMat(Sink.ignore)(Keep.both) +// .mapMaterializedValue(DrainingControl.apply) +// .run() +// // ... +// +// // #partitionedTransactionalSink +// val (control2, result) = Consumer +// .plainSource(consumerSettings, Subscriptions.topics(sinkTopic)) +// .toMat(Sink.seq)(Keep.both) +// .run() +// +// awaitProduce(produce(sourceTopic, 1 to 10)) +// control.drainAndShutdown().futureValue should be(Done) +// control2.shutdown().futureValue should be(Done) +// // #partitionedTransactionalSink +// control.drainAndShutdown() +// // #partitionedTransactionalSink +// result.futureValue should have size (10) +// } + } From df5bf2e925b7922b64714f58c84da6e6b620bcc3 Mon Sep 17 00:00:00 2001 From: Sean Glover Date: Thu, 28 Nov 2019 11:30:23 -0500 Subject: [PATCH 2/2] Transactional partitioned: Adapt to current state, more tests --- .travis.yml | 6 +- .../kafka/internal/CommittableSources.scala | 2 +- .../kafka/internal/DefaultProducerStage.scala | 61 ++++++------ .../kafka/internal/DeferredProducer.scala | 4 +- .../akka/kafka/internal/SubSourceLogic.scala | 28 +++--- .../internal/TransactionalProducerStage.scala | 56 ++++++----- .../kafka/internal/TransactionalSources.scala | 46 +++++---- .../akka/kafka/javadsl/Transactional.scala | 48 +++++----- docs/src/main/paradox/transactions.md | 3 + .../TransactionsPartitionedSourceSpec.scala | 5 +- .../akka/kafka/TransactionsSourceSpec.scala | 5 +- .../akka/kafka/scaladsl/IntegrationSpec.scala | 87 ----------------- .../kafka/scaladsl/TransactionsSpec.scala | 95 ++++++++++++++++++- 13 files changed, 232 insertions(+), 214 deletions(-) diff --git a/.travis.yml b/.travis.yml index 9b47a5e3d..92b0aeda0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -61,7 +61,7 @@ jobs: - stage: integration env: CMD="tests/it:test" - name: "Run multi-broker integration tests" + name: "Run multi-broker and long running integration tests" - env: CMD="benchmarks/it:compile" name: "Compile benchmark tests" @@ -75,10 +75,6 @@ jobs: name: "Publish API and reference documentation" stages: - # runs on master commits and PRs - - name: debug - if: NOT tag =~ ^v - # runs on master commits and PRs - name: check if: NOT tag =~ ^v diff --git a/core/src/main/scala/akka/kafka/internal/CommittableSources.scala b/core/src/main/scala/akka/kafka/internal/CommittableSources.scala index 33f1d2da8..1cc662d17 100644 --- a/core/src/main/scala/akka/kafka/internal/CommittableSources.scala +++ b/core/src/main/scala/akka/kafka/internal/CommittableSources.scala @@ -211,7 +211,7 @@ private[kafka] class KafkaAsyncConsumerCommitterRef(private val consumerActor: A } @InternalApi -private class CommittableSubSourceStageLogic[K, V]( +private final class CommittableSubSourceStageLogic[K, V]( shape: SourceShape[CommittableMessage[K, V]], tp: TopicPartition, consumerActor: ActorRef, diff --git a/core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala b/core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala index 0184f8437..ea4066a05 100644 --- a/core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala +++ b/core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala @@ -59,6 +59,22 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: final override val producerSettings: ProducerSettings[K, V] = stage.settings + protected class DefaultInHandler extends InHandler { + override def onPush(): Unit = produce(grab(stage.in)) + + override def onUpstreamFinish(): Unit = { + inIsClosed = true + completionState = Some(Success(Done)) + checkForCompletion() + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + inIsClosed = true + completionState = Some(Failure(ex)) + checkForCompletion() + } + } + override def preStart(): Unit = { super.preStart() resolveProducer(stage.settings) @@ -86,9 +102,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: failStage(ex) } - def filterSend(msg: Envelope[K, V, P]): Boolean = true - - def postSend(msg: Envelope[K, V, P]): Unit = () + protected def postSend(msg: Envelope[K, V, P]): Unit = () override protected def producerAssigned(): Unit = resumeDemand() @@ -103,9 +117,13 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: } } - protected def suspendDemand(fromStageLogicConstructor: Boolean = false): Unit = { - // not permitted to access stage logic members from constructor - if (!fromStageLogicConstructor) log.debug("Suspend demand") + protected def suspendDemand(): Unit = { + log.debug("Suspend demand") + suspendDemandOutHandler() + } + + // factored out of suspendDemand because logging is not permitted when called from the stage logic constructor + private def suspendDemandOutHandler(): Unit = { setHandler( stage.out, new OutHandler { @@ -114,33 +132,14 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: ) } - // suspend demand until a Producer has been created - suspendDemand(fromStageLogicConstructor = true) - - setHandler( - stage.in, - new InHandler { - override def onPush(): Unit = { - val msg = grab(stage.in) - if (filterSend(msg)) - produce(msg) - } - - override def onUpstreamFinish(): Unit = { - inIsClosed = true - completionState = Some(Success(Done)) - checkForCompletion() - } + protected def initialInHandler(): Unit = producingInHandler() + protected def producingInHandler(): Unit = setHandler(stage.in, new DefaultInHandler()) - override def onUpstreamFailure(ex: Throwable): Unit = { - inIsClosed = true - completionState = Some(Failure(ex)) - checkForCompletion() - } - } - ) + // suspend demand until a Producer has been created + suspendDemandOutHandler() + initialInHandler() - def produce(in: Envelope[K, V, P]): Unit = + protected def produce(in: Envelope[K, V, P]): Unit = in match { case msg: Message[K, V, P] => val r = Promise[Result[K, V, P]] diff --git a/core/src/main/scala/akka/kafka/internal/DeferredProducer.scala b/core/src/main/scala/akka/kafka/internal/DeferredProducer.scala index 24300dac9..e03cc970b 100644 --- a/core/src/main/scala/akka/kafka/internal/DeferredProducer.scala +++ b/core/src/main/scala/akka/kafka/internal/DeferredProducer.scala @@ -22,7 +22,7 @@ import scala.util.{Failure, Success} private[kafka] object DeferredProducer { /** - * The [[ProducerAssignmentLifecycle]] allows us to change track the status of the aynchronous producer assignment + * The [[ProducerAssignmentLifecycle]] allows us to track the state of the asynchronous producer assignment * within the stage. This is useful when we need to manage different behavior during the assignment process. For * example, in [[TransactionalProducerStageLogic]] we match on the lifecycle when extracting the transactional.id * of the first message received from a partitioned source. @@ -76,7 +76,7 @@ private[kafka] trait DeferredProducer[K, V] { } } - protected def changeProducerAssignmentLifecycle(state: ProducerAssignmentLifecycle): Unit = { + private def changeProducerAssignmentLifecycle(state: ProducerAssignmentLifecycle): Unit = { val oldState = producerAssignmentLifecycle producerAssignmentLifecycle = state log.debug("Asynchronous producer assignment lifecycle changed '{} -> {}'", oldState, state) diff --git a/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala b/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala index 489fd8524..854f7755f 100644 --- a/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala +++ b/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala @@ -57,22 +57,22 @@ private class SubSourceLogic[K, V, Msg]( with StageIdLogging { import SubSourceLogic._ - val consumerPromise = Promise[ActorRef] + private val consumerPromise = Promise[ActorRef] final val actorNumber = KafkaConsumerActor.Internal.nextNumber() override def executionContext: ExecutionContext = materializer.executionContext override def consumerFuture: Future[ActorRef] = consumerPromise.future - var consumerActor: ActorRef = _ - var sourceActor: StageActor = _ + protected var consumerActor: ActorRef = _ + protected var sourceActor: StageActor = _ /** Kafka has notified us that we have these partitions assigned, but we have not created a source for them yet. */ - var pendingPartitions: immutable.Set[TopicPartition] = immutable.Set.empty + private var pendingPartitions: immutable.Set[TopicPartition] = immutable.Set.empty /** We have created a source for these partitions, but it has not started up and is not in subSources yet. */ - var partitionsInStartup: immutable.Set[TopicPartition] = immutable.Set.empty - var subSources: Map[TopicPartition, ControlAndStageActor] = immutable.Map.empty + private var partitionsInStartup: immutable.Set[TopicPartition] = immutable.Set.empty + protected var subSources: Map[TopicPartition, ControlAndStageActor] = immutable.Map.empty /** Kafka has signalled these partitions are revoked, but some may be re-assigned just after revoking. */ - var partitionsToRevoke: Set[TopicPartition] = Set.empty + private var partitionsToRevoke: Set[TopicPartition] = Set.empty override def preStart(): Unit = { super.preStart() @@ -121,7 +121,7 @@ private class SubSourceLogic[K, V, Msg]( failStage(ex) } - val partitionAssignedCB = getAsyncCallback[Set[TopicPartition]] { assigned => + private val partitionAssignedCB = getAsyncCallback[Set[TopicPartition]] { assigned => val formerlyUnknown = assigned -- partitionsToRevoke if (log.isDebugEnabled && formerlyUnknown.nonEmpty) { @@ -170,7 +170,7 @@ private class SubSourceLogic[K, V, Msg]( } } - val partitionRevokedCB = getAsyncCallback[Set[TopicPartition]] { revoked => + private val partitionRevokedCB = getAsyncCallback[Set[TopicPartition]] { revoked => partitionsToRevoke ++= revoked scheduleOnce(CloseRevokedPartitions, settings.waitClosePartition) } @@ -188,7 +188,7 @@ private class SubSourceLogic[K, V, Msg]( partitionsToRevoke = Set.empty } - val subsourceCancelledCB: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)] = + private val subsourceCancelledCB: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)] = getAsyncCallback[(TopicPartition, SubSourceCancellationStrategy)] { case (tp, cancellationStrategy: SubSourceCancellationStrategy) => subSources -= tp @@ -210,7 +210,7 @@ private class SubSourceLogic[K, V, Msg]( } } - val subsourceStartedCB: AsyncCallback[(TopicPartition, ControlAndStageActor)] = + private val subsourceStartedCB: AsyncCallback[(TopicPartition, ControlAndStageActor)] = getAsyncCallback[(TopicPartition, ControlAndStageActor)] { case (tp, value @ ControlAndStageActor(control, _)) => if (!partitionsInStartup.contains(tp)) { @@ -380,9 +380,9 @@ private abstract class SubSourceStageLogic[K, V, Msg]( override def executionContext: ExecutionContext = materializer.executionContext override def consumerFuture: Future[ActorRef] = Future.successful(consumerActor) private val requestMessages = KafkaConsumerActor.Internal.RequestMessages(0, Set(tp)) - var requested = false - var subSourceActor: StageActor = _ - var buffer: Iterator[ConsumerRecord[K, V]] = Iterator.empty + private var requested = false + protected var subSourceActor: StageActor = _ + private var buffer: Iterator[ConsumerRecord[K, V]] = Iterator.empty override def preStart(): Unit = { log.debug("#{} Starting SubSource for partition {}", actorNumber, tp) diff --git a/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala b/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala index 821fffb9a..26db9b90d 100644 --- a/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala +++ b/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala @@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition import scala.concurrent.Future import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ +import scala.util.{Failure, Success} /** * INTERNAL API @@ -123,6 +124,7 @@ private final class TransactionalProducerStageLogic[K, V, P]( override def preStart(): Unit = resumeDemand() override protected def producerAssigned(): Unit = { + producingInHandler() initTransactions() beginTransaction() produceFirstMessage() @@ -143,11 +145,16 @@ private final class TransactionalProducerStageLogic[K, V, P]( demandSuspended = false } - override protected def suspendDemand(fromStageLogicConstructor: Boolean = false): Unit = { - if (!demandSuspended) super.suspendDemand(fromStageLogicConstructor) + override protected def suspendDemand(): Unit = { + if (!demandSuspended) super.suspendDemand() demandSuspended = true } + override protected def initialInHandler(): Unit = + setHandler(stage.in, new DefaultInHandler { + override def onPush(): Unit = parseFirstMessage(grab(stage.in)) + }) + override protected def onTimer(timerKey: Any): Unit = if (timerKey == commitSchedulerKey) { maybeCommitTransaction() @@ -160,8 +167,7 @@ private final class TransactionalProducerStageLogic[K, V, P]( case batch: NonemptyTransactionBatch if awaitingConf == 0 => commitTransaction(batch, beginNewTransaction) case _: EmptyTransactionBatch if awaitingConf == 0 && abortEmptyTransactionOnComplete => - log.debug("Aborting empty transaction because we're completing.") - abortTransaction() + abortTransaction("Transaction is empty and stage is completing") case _ if awaitingConf > 0 => suspendDemand() scheduleOnce(commitSchedulerKey, messageDrainInterval) @@ -170,14 +176,17 @@ private final class TransactionalProducerStageLogic[K, V, P]( } } - override def filterSend(msg: Envelope[K, V, P]): Boolean = + /** + * When using partitioned sources we extract the transactional id, group id, and topic partition information from + * the first message in order to define a `transacitonal.id` before constructing the [[org.apache.kafka.clients.producer.KafkaProducer]] + */ + private def parseFirstMessage(msg: Envelope[K, V, P]): Boolean = producerAssignmentLifecycle match { case Assigned => true + case Unassigned if firstMessage.nonEmpty => + // this should never happen because demand should be suspended until the producer is assigned + throw new IllegalStateException("Cannot reapply first message") case Unassigned => - if (firstMessage.nonEmpty) { - // this should never happen because demand should be suspended until the producer is assigned - throw new IllegalStateException("Cannot reapply first message") - } // stash the first message so it can be sent after the producer is assigned firstMessage = Some(msg) // initiate async async producer request _after_ first message is stashed in case future eagerly resolves @@ -187,7 +196,9 @@ private final class TransactionalProducerStageLogic[K, V, P]( suspendDemand() false case AsyncCreateRequestSent => - throw new IllegalStateException(s"Should never receive new messages while in state '$AsyncCreateRequestSent'") + throw new IllegalStateException( + s"Should never receive new messages while in producer assignment state '$AsyncCreateRequestSent'" + ) } private def generatedTransactionalConfig(msg: Envelope[K, V, P]): ProducerSettings[K, V] = { @@ -200,18 +211,14 @@ private final class TransactionalProducerStageLogic[K, V, P]( case _ => transactionalId } - stage.settings.withEnrichAsync { settings => - Future.successful( - settings.withProperties( - ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG -> true.toString, - ProducerConfig.TRANSACTIONAL_ID_CONFIG -> txId, - ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> 1.toString - ) - ) - } + stage.settings.withProperties( + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG -> true.toString, + ProducerConfig.TRANSACTIONAL_ID_CONFIG -> txId, + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> 1.toString + ) } - override def postSend(msg: Envelope[K, V, P]): Unit = msg.passThrough match { + override protected def postSend(msg: Envelope[K, V, P]): Unit = msg.passThrough match { case o: ConsumerMessage.PartitionOffsetCommittedMarker => batchOffsets = batchOffsets.updated(o) } @@ -223,8 +230,7 @@ private final class TransactionalProducerStageLogic[K, V, P]( } override def onCompletionFailure(ex: Throwable): Unit = { - log.debug("Aborting transaction due to stage failure") - abortTransaction() + abortTransaction("Stage failure") batchOffsets.committingFailed() super.onCompletionFailure(ex) } @@ -254,7 +260,7 @@ private final class TransactionalProducerStageLogic[K, V, P]( } } - val onInternalCommitAckCb: AsyncCallback[Unit] = { + private val onInternalCommitAckCb: AsyncCallback[Unit] = { getAsyncCallback[Unit]( _ => scheduleOnce(commitSchedulerKey, producerSettings.eosCommitInterval) ) @@ -270,8 +276,8 @@ private final class TransactionalProducerStageLogic[K, V, P]( producer.beginTransaction() } - private def abortTransaction(): Unit = { - log.debug("Aborting transaction") + private def abortTransaction(reason: String): Unit = { + log.debug("Aborting transaction: {}", reason) if (producerAssignmentLifecycle == Assigned) producer.abortTransaction() } } diff --git a/core/src/main/scala/akka/kafka/internal/TransactionalSources.scala b/core/src/main/scala/akka/kafka/internal/TransactionalSources.scala index 5f91f1cd8..deb8837ef 100644 --- a/core/src/main/scala/akka/kafka/internal/TransactionalSources.scala +++ b/core/src/main/scala/akka/kafka/internal/TransactionalSources.scala @@ -14,6 +14,7 @@ import akka.annotation.InternalApi import akka.kafka.ConsumerMessage.{PartitionOffset, TransactionalMessage} import akka.kafka.internal.KafkaConsumerActor.Internal.Revoked import akka.kafka.internal.SubSourceLogic._ +import akka.kafka.internal.TransactionalSubSourceStageLogic.DrainingComplete import akka.kafka.scaladsl.Consumer.Control import akka.kafka.scaladsl.PartitionAssignmentHandler import akka.kafka.{AutoSubscription, ConsumerFailed, ConsumerSettings, RestrictedConsumer, Subscription} @@ -240,10 +241,10 @@ private[kafka] final class TransactionalSubSource[K, V]( override def onRevoke(revokedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = if (revokedTps.isEmpty) () else if (waitForDraining(revokedTps)) { - subSources.values.map(_.stageActor).foreach(_ ! Revoked(revokedTps.toList)) + subSources.values.map(_.stageActor).foreach(_.tell(Revoked(revokedTps.toList), stageActor.ref)) } else { - sourceActor.ref ! Status.Failure(new Error("Timeout while draining")) - consumerActor ! KafkaConsumerActor.Internal.Stop + sourceActor.ref.tell(Status.Failure(new Error("Timeout while draining")), stageActor.ref) + consumerActor.tell(KafkaConsumerActor.Internal.Stop, stageActor.ref) } override def onStop(revokedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = () @@ -251,7 +252,7 @@ private[kafka] final class TransactionalSubSource[K, V]( new PartitionAssignmentHelpers.Chain(handler, blockingRevokedCall) } - def waitForDraining(partitions: Set[TopicPartition]): Boolean = { + private def waitForDraining(partitions: Set[TopicPartition]): Boolean = { import akka.pattern.ask implicit val timeout = Timeout(txConsumerSettings.commitTimeout) try { @@ -274,10 +275,10 @@ private object TransactionalSourceLogic { type Offset = Long case object Drained - case class Drain[T](partitions: Set[TopicPartition], - drainedConfirmationRef: Option[ActorRef], - drainedConfirmationMsg: T) - case class Committed(offsets: Map[TopicPartition, OffsetAndMetadata]) + final case class Drain[T](partitions: Set[TopicPartition], + drainedConfirmationRef: Option[ActorRef], + drainedConfirmationMsg: T) + final case class Committed(offsets: Map[TopicPartition, OffsetAndMetadata]) case object CommittingFailure private[internal] final case class CommittedMarkerRef(sourceActor: ActorRef, commitTimeout: FiniteDuration)( @@ -337,7 +338,7 @@ private object TransactionalSourceLogic { } @InternalApi -private class TransactionalSubSourceStageLogic[K, V]( +private final class TransactionalSubSourceStageLogic[K, V]( shape: SourceShape[TransactionalMessage[K, V]], tp: TopicPartition, consumerActor: ActorRef, @@ -355,7 +356,7 @@ private class TransactionalSubSourceStageLogic[K, V]( import TransactionalSourceLogic._ - val inFlightRecords = InFlightRecords.empty + private val inFlightRecords = InFlightRecords.empty override def groupId: String = consumerSettings.properties(ConsumerConfig.GROUP_ID_CONFIG) @@ -372,7 +373,7 @@ private class TransactionalSubSourceStageLogic[K, V]( override protected def onDownstreamFinishSubSourceCancellationStrategy(): SubSourceCancellationStrategy = DoNothing - def shuttingDownReceive: PartialFunction[(ActorRef, Any), Unit] = + private def shuttingDownReceive: PartialFunction[(ActorRef, Any), Unit] = drainHandling .orElse { case (_, Status.Failure(e)) => @@ -391,10 +392,10 @@ private class TransactionalSubSourceStageLogic[K, V]( drainAndComplete() } - def drainAndComplete(): Unit = - subSourceActor.ref.tell(Drain(inFlightRecords.assigned(), None, "complete"), subSourceActor.ref) + private def drainAndComplete(): Unit = + subSourceActor.ref.tell(Drain(inFlightRecords.assigned(), None, DrainingComplete), subSourceActor.ref) - def drainHandling: PartialFunction[(ActorRef, Any), Unit] = { + private def drainHandling: PartialFunction[(ActorRef, Any), Unit] = { case (sender, Committed(offsets)) => inFlightRecords.committed(offsets.view.mapValues(_.offset() - 1).toMap) sender ! Done @@ -408,12 +409,15 @@ private class TransactionalSubSourceStageLogic[K, V]( ack.getOrElse(sender) ! msg } else { log.debug(s"Draining partitions {}", partitions) - materializer.scheduleOnce(consumerSettings.drainingCheckInterval, new Runnable { - override def run(): Unit = - subSourceActor.ref ! Drain(partitions, ack.orElse(Some(sender)), msg) - }) + materializer.scheduleOnce( + consumerSettings.drainingCheckInterval, + new Runnable { + override def run(): Unit = + subSourceActor.ref.tell(Drain(partitions, ack.orElse(Some(sender)), msg), stageActor.ref) + } + ) } - case (sender, "complete") => + case (sender, DrainingComplete) => completeStage() } @@ -422,3 +426,7 @@ private class TransactionalSubSourceStageLogic[K, V]( CommittedMarkerRef(subSourceActor.ref, consumerSettings.commitTimeout)(ec) } } + +private object TransactionalSubSourceStageLogic { + case object DrainingComplete +} diff --git a/core/src/main/scala/akka/kafka/javadsl/Transactional.scala b/core/src/main/scala/akka/kafka/javadsl/Transactional.scala index 60d935a54..8b92188ac 100644 --- a/core/src/main/scala/akka/kafka/javadsl/Transactional.scala +++ b/core/src/main/scala/akka/kafka/javadsl/Transactional.scala @@ -55,30 +55,30 @@ object Transactional { .map(_._1) .asJava - /** - * Internal API. Work in progress. - * - * The `partitionedSource` is a way to track automatic partition assignment from kafka. - * Each source is setup for for Exactly Only Once (EoS) kafka message semantics. - * To enable EoS it's necessary to use the [[Transactional.sink]] or [[Transactional.flow]] (for passthrough). - * When Kafka rebalances partitions, all sources complete before the remaining sources are issued again. - * - * By generating the `transactionalId` from the [[TopicPartition]], multiple instances of your application can run - * without having to manually assign partitions to each instance. - */ - @ApiMayChange - @InternalApi - private[kafka] def partitionedSource[K, V]( - consumerSettings: ConsumerSettings[K, V], - subscription: AutoSubscription - ): Source[Pair[TopicPartition, Source[TransactionalMessage[K, V], NotUsed]], Control] = - scaladsl.Transactional - .partitionedSource(consumerSettings, subscription) - .map { - case (tp, source) => Pair(tp, source.asJava) - } - .mapMaterializedValue(ConsumerControlAsJava.apply) - .asJava +// /** +// * Internal API. Work in progress. +// * +// * The `partitionedSource` is a way to track automatic partition assignment from kafka. +// * Each source is setup for for Exactly Only Once (EoS) kafka message semantics. +// * To enable EoS it's necessary to use the [[Transactional.sink]] or [[Transactional.flow]] (for passthrough). +// * When Kafka rebalances partitions, all sources complete before the remaining sources are issued again. +// * +// * By generating the `transactionalId` from the [[TopicPartition]], multiple instances of your application can run +// * without having to manually assign partitions to each instance. +// */ +// @ApiMayChange +// @InternalApi +// private[kafka] def partitionedSource[K, V]( +// consumerSettings: ConsumerSettings[K, V], +// subscription: AutoSubscription +// ): Source[Pair[TopicPartition, Source[TransactionalMessage[K, V], NotUsed]], Control] = +// scaladsl.Transactional +// .partitionedSource(consumerSettings, subscription) +// .map { +// case (tp, source) => Pair(tp, source.asJava) +// } +// .mapMaterializedValue(ConsumerControlAsJava.apply) +// .asJava /** * Sink that is aware of the [[ConsumerMessage.TransactionalMessage.partitionOffset]] from a [[Transactional.source]]. It will diff --git a/docs/src/main/paradox/transactions.md b/docs/src/main/paradox/transactions.md index fcf318512..1fb54f141 100644 --- a/docs/src/main/paradox/transactions.md +++ b/docs/src/main/paradox/transactions.md @@ -78,6 +78,8 @@ Scala Java : @@ snip [snip](/tests/src/test/java/docs/javadsl/TransactionsExampleTest.java) { #transactionalSink } + + ### Recovery From Failure diff --git a/tests/src/it/scala/akka/kafka/TransactionsPartitionedSourceSpec.scala b/tests/src/it/scala/akka/kafka/TransactionsPartitionedSourceSpec.scala index c57f0567c..e028c71ea 100644 --- a/tests/src/it/scala/akka/kafka/TransactionsPartitionedSourceSpec.scala +++ b/tests/src/it/scala/akka/kafka/TransactionsPartitionedSourceSpec.scala @@ -9,6 +9,7 @@ import akka.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike import akka.stream._ import akka.stream.scaladsl.{Keep, RestartSource, Sink} import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped +import org.scalatest.concurrent.PatienceConfiguration.Interval import org.scalatest.concurrent.ScalaFutures import org.scalatest.{Ignore, Matchers, WordSpecLike} @@ -96,8 +97,8 @@ class TransactionsPartitionedSourceSpec extends SpecBase .map(_.toString) .map(runStream) - while (completedCopy.get() < consumers) { - Thread.sleep(2000) + eventually(Interval(2.seconds)) { + completedCopy.get() should be < consumers } val consumer = consumePartitionOffsetValues( diff --git a/tests/src/it/scala/akka/kafka/TransactionsSourceSpec.scala b/tests/src/it/scala/akka/kafka/TransactionsSourceSpec.scala index 5a203e7b4..d1328d646 100644 --- a/tests/src/it/scala/akka/kafka/TransactionsSourceSpec.scala +++ b/tests/src/it/scala/akka/kafka/TransactionsSourceSpec.scala @@ -11,6 +11,7 @@ import akka.stream._ import akka.stream.scaladsl.{Flow, Keep, RestartSource, Sink} import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped import org.apache.kafka.clients.producer.ProducerRecord +import org.scalatest.concurrent.PatienceConfiguration.Interval import org.scalatest.concurrent.ScalaFutures import org.scalatest.{Matchers, WordSpecLike} @@ -89,8 +90,8 @@ class TransactionsSourceSpec extends SpecBase val probeConsumerGroup = createGroupId(2) - while (completedCopy.get() < consumers) { - Thread.sleep(2000) + eventually(Interval(2.seconds)) { + completedCopy.get() should be < consumers } val consumer = offsetValueSource(probeConsumerSettings(probeConsumerGroup), sinkTopic) diff --git a/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala index c49f80832..2131e3889 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala @@ -13,7 +13,6 @@ import akka.kafka._ import akka.kafka.scaladsl.Consumer.DrainingControl import akka.kafka.testkit.scaladsl.TestcontainersKafkaLike import akka.pattern.ask -import akka.stream.OverflowStrategy import akka.stream.scaladsl.{Keep, Sink, Source} import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped import akka.stream.testkit.scaladsl.TestSink @@ -328,92 +327,6 @@ class IntegrationSpec extends SpecBase with TestcontainersKafkaLike with Inside probe.cancel() } } - - "rebalance safely using transactional partitioned flow" in assertAllStagesStopped { - val partitions = 4 - val totalMessages = 200L - - val topic = createTopic(1, partitions) - val outTopic = createTopic(2, partitions) - val group = createGroupId(1) - val transactionalId = createTransactionalId() - val sourceSettings = consumerDefaults - .withGroupId(group) - - val topicSubscription = Subscriptions.topics(topic) - - def createAndRunTransactionalFlow(subscription: AutoSubscription) = - Transactional - .partitionedSource(sourceSettings, subscription) - .map { - case (tp, source) => - source - .map { msg => - ProducerMessage.single(new ProducerRecord[String, String](outTopic, - msg.record.partition(), - msg.record.key(), - msg.record.value() + "-out"), - msg.partitionOffset) - } - .to(Transactional.sink(producerDefaults, transactionalId)) - .run() - } - .toMat(Sink.ignore)(Keep.both) - .mapMaterializedValue(DrainingControl.apply) - .run() - - def createAndRunProducer(elements: immutable.Iterable[Long]) = - Source(elements) - .map(n => new ProducerRecord(topic, (n % partitions).toInt, DefaultKey, n.toString)) - .runWith(Producer.plainSink(producerDefaults.withProducer(testProducer))) - - val control = createAndRunTransactionalFlow(topicSubscription) - - // waits until all partitions are assigned to the single consumer - waitUntilConsumerSummary(group) { - case singleConsumer :: Nil => singleConsumer.assignment.topicPartitions.size == partitions - } - - createAndRunProducer(0L until totalMessages / 2).futureValue - - // create another consumer with the same groupId to trigger re-balancing - val control2 = createAndRunTransactionalFlow(topicSubscription) - - // waits until partitions are assigned across both consumers - waitUntilConsumerSummary(group) { - case consumer1 :: consumer2 :: Nil => - val half = partitions / 2 - consumer1.assignment.topicPartitions.size == half && consumer2.assignment.topicPartitions.size == half - } - - createAndRunProducer(totalMessages / 2 until totalMessages).futureValue - - val checkingGroup = createGroupId(2) - - val (counterQueue, counterCompletion) = Source - .queue[String](8, OverflowStrategy.fail) - .scan(0L)((c, _) => c + 1) - .takeWhile(_ < totalMessages, inclusive = true) - .toMat(Sink.last)(Keep.both) - .run() - - val streamMessages = Consumer - .plainSource[String, String](consumerDefaults - .withGroupId(checkingGroup) - .withProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"), - Subscriptions.topics(outTopic)) - .mapAsync(1)(el => counterQueue.offer(el.value()).map(_ => el)) - .scan(0L)((c, _) => c + 1) - .toMat(Sink.last)(Keep.both) - .mapMaterializedValue(DrainingControl.apply) - .run() - - counterCompletion.futureValue shouldBe totalMessages - - control.drainAndShutdown().futureValue - control2.drainAndShutdown().futureValue - streamMessages.drainAndShutdown().futureValue shouldBe totalMessages - } } "Consumer control" must { diff --git a/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala index f0f1e0495..df37112b2 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala @@ -9,11 +9,13 @@ import java.util.concurrent.atomic.AtomicBoolean import akka.Done import akka.kafka.ConsumerMessage.PartitionOffset -import akka.kafka.scaladsl.Consumer.Control +import akka.kafka.scaladsl.Consumer.{Control, DrainingControl} import akka.kafka.testkit.scaladsl.TestcontainersKafkaLike import akka.kafka.{ProducerMessage, _} -import akka.stream.scaladsl.{Keep, RestartSource, Sink} +import akka.stream.OverflowStrategy +import akka.stream.scaladsl.{Keep, RestartSource, Sink, Source} import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped +import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerRecord import org.scalatest.RecoverMethods._ @@ -22,6 +24,9 @@ import scala.concurrent.duration._ import scala.concurrent.{Await, Future} class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with TransactionsOps with Repeated { + + implicit val patience = PatienceConfig(5.seconds, 15.millis) + "A consume-transform-produce cycle" must { "complete in happy-path scenario" in { @@ -429,6 +434,92 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa Await.result(control2.shutdown(), remainingOrDefault) } } + + "rebalance safely using transactional partitioned flow" in assertAllStagesStopped { + val partitions = 4 + val totalMessages = 200L + + val topic = createTopic(1, partitions) + val outTopic = createTopic(2, partitions) + val group = createGroupId(1) + val transactionalId = createTransactionalId() + val sourceSettings = consumerDefaults + .withGroupId(group) + + val topicSubscription = Subscriptions.topics(topic) + + def createAndRunTransactionalFlow(subscription: AutoSubscription) = + Transactional + .partitionedSource(sourceSettings, subscription) + .map { + case (tp, source) => + source + .map { msg => + ProducerMessage.single(new ProducerRecord[String, String](outTopic, + msg.record.partition(), + msg.record.key(), + msg.record.value() + "-out"), + msg.partitionOffset) + } + .to(Transactional.sink(producerDefaults, transactionalId)) + .run() + } + .toMat(Sink.ignore)(Keep.both) + .mapMaterializedValue(DrainingControl.apply) + .run() + + def createAndRunProducer(elements: immutable.Iterable[Long]) = + Source(elements) + .map(n => new ProducerRecord(topic, (n % partitions).toInt, DefaultKey, n.toString)) + .runWith(Producer.plainSink(producerDefaults.withProducer(testProducer))) + + val control = createAndRunTransactionalFlow(topicSubscription) + + // waits until all partitions are assigned to the single consumer + waitUntilConsumerSummary(group) { + case singleConsumer :: Nil => singleConsumer.assignment.topicPartitions.size == partitions + } + + createAndRunProducer(0L until totalMessages / 2).futureValue + + // create another consumer with the same groupId to trigger re-balancing + val control2 = createAndRunTransactionalFlow(topicSubscription) + + // waits until partitions are assigned across both consumers + waitUntilConsumerSummary(group) { + case consumer1 :: consumer2 :: Nil => + val half = partitions / 2 + consumer1.assignment.topicPartitions.size == half && consumer2.assignment.topicPartitions.size == half + } + + createAndRunProducer(totalMessages / 2 until totalMessages).futureValue + + val checkingGroup = createGroupId(2) + + val (counterQueue, counterCompletion) = Source + .queue[String](8, OverflowStrategy.fail) + .scan(0L)((c, _) => c + 1) + .takeWhile(_ < totalMessages, inclusive = true) + .toMat(Sink.last)(Keep.both) + .run() + + val streamMessages = Consumer + .plainSource[String, String](consumerDefaults + .withGroupId(checkingGroup) + .withProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"), + Subscriptions.topics(outTopic)) + .mapAsync(1)(el => counterQueue.offer(el.value()).map(_ => el)) + .scan(0L)((c, _) => c + 1) + .toMat(Sink.last)(Keep.both) + .mapMaterializedValue(DrainingControl.apply) + .run() + + counterCompletion.futureValue shouldBe totalMessages + + control.drainAndShutdown().futureValue + control2.drainAndShutdown().futureValue + streamMessages.drainAndShutdown().futureValue shouldBe totalMessages + } } def probeConsumerSettings(groupId: String): ConsumerSettings[String, String] =