From bdde6e3f089e5841e4286ea32065708b9f8f6ac8 Mon Sep 17 00:00:00 2001 From: svroonland Date: Tue, 26 Nov 2024 19:05:45 +0100 Subject: [PATCH] Extract Committer and RebalanceCoordinator classes from Runloop + unit tests (#1375) The code is mostly just moved to a different place, the logic was left mostly intact. The 'interface' between the components has been decoupled more, i.e. the rebalance listener no longer access the full Runloop's State and the pending commits are stored internally in the Committer. Care has been taken to make the Committer usable during rebalancing as well, with the proper access to the Consumer for example. The part that waits the end of the streams and their commits has been changed to use the Committer. Includes unit tests for the two new components. --------- Co-authored-by: Erik van Oosten --- .../zio/kafka/consumer/ConsumerSpec.scala | 31 +- ...setsSpec.scala => CommitOffsetsSpec.scala} | 34 +- .../consumer/internal/CommitterSpec.scala | 204 +++++++ .../consumer/internal/DummyMetrics.scala | 23 + .../internal/RebalanceCoordinatorSpec.scala | 218 ++++++++ .../kafka/consumer/internal/Committer.scala | 90 +++ .../consumer/internal/ConsumerMetrics.scala | 16 +- .../consumer/internal/LiveCommitter.scala | 167 ++++++ .../internal/RebalanceCoordinator.scala | 288 ++++++++++ .../zio/kafka/consumer/internal/Runloop.scala | 521 ++---------------- 10 files changed, 1099 insertions(+), 493 deletions(-) rename zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/{RunloopCommitOffsetsSpec.scala => CommitOffsetsSpec.scala} (75%) create mode 100644 zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitterSpec.scala create mode 100644 zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/DummyMetrics.scala create mode 100644 zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala create mode 100644 zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala create mode 100644 zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala create mode 100644 zio-kafka/src/main/scala/zio/kafka/consumer/internal/RebalanceCoordinator.scala diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 2df17a78d..13d51c789 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -28,6 +28,7 @@ import zio.test.Assertion._ import zio.test.TestAspect._ import zio.test._ +import java.util.concurrent.ExecutionException import scala.reflect.ClassTag //noinspection SimplifyAssertInspection @@ -323,21 +324,20 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { } yield assert(offset.map(_.offset))(isSome(equalTo(9L))) }, test("process outstanding commits after a graceful shutdown with aggregateAsync using `maxRebalanceDuration`") { - val kvs = (1 to 100).toList.map(i => (s"key$i", s"msg$i")) for { - topic <- randomTopic - group <- randomGroup - client <- randomClient - _ <- produceMany(topic, kvs) - messagesReceived <- Ref.make[Int](0) + topic <- randomTopic + group <- randomGroup + client <- randomClient + _ <- scheduledProduce(topic, Schedule.fixed(50.millis).jittered).runDrain.forkScoped + lastProcessedOffset <- Ref.make(0L) offset <- ( Consumer .plainStream(Subscription.topics(topic), Serde.string, Serde.string) - .mapConcatZIO { record => + .mapZIO { record => for { - nr <- messagesReceived.updateAndGet(_ + 1) + nr <- lastProcessedOffset.updateAndGet(_ + 1) _ <- Consumer.stopConsumption.when(nr == 10) - } yield if (nr < 10) Seq(record.offset) else Seq.empty + } yield record.offset } .aggregateAsync(Consumer.offsetBatches) .mapZIO(_.commit) @@ -353,7 +353,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { maxRebalanceDuration = 6.seconds ) ) - } yield assertTrue(offset.map(_.offset).contains(9L)) + lastOffset <- lastProcessedOffset.get + } yield assertTrue(offset.map(_.offset).contains(lastOffset)) } @@ TestAspect.nonFlaky(2), test("a consumer timeout interrupts the stream and shuts down the consumer") { // Setup of this test: @@ -1399,9 +1400,13 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { }: _*) @@ TestAspect.nonFlaky(2), test("running streams don't stall after a poll timeout") { for { - topic <- randomTopic - clientId <- randomClient - _ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic)) + topic <- randomTopic + clientId <- randomClient + _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic)).flatMap(ZIO.fromTry(_)).retryN(3).catchSome { + case e: ExecutionException + if e.getCause.isInstanceOf[org.apache.kafka.common.errors.TopicExistsException] => + ZIO.unit + } settings <- consumerSettings(clientId) consumer <- Consumer.make(settings.withPollTimeout(50.millis)) recordsOut <- Queue.unbounded[Unit] diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitOffsetsSpec.scala similarity index 75% rename from zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala rename to zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitOffsetsSpec.scala index 77dbc77b0..ffc79101e 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitOffsetsSpec.scala @@ -1,11 +1,13 @@ package zio.kafka.consumer.internal +import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.TopicPartition import zio._ -import org.apache.kafka.clients.consumer.OffsetAndMetadata +import zio.kafka.consumer.internal.Committer.CommitOffsets +import zio.kafka.consumer.internal.LiveCommitter.Commit import zio.test._ -object RunloopCommitOffsetsSpec extends ZIOSpecDefault { +object CommitOffsetsSpec extends ZIOSpecDefault { private val tp10 = new TopicPartition("t1", 0) private val tp11 = new TopicPartition("t1", 1) @@ -14,9 +16,9 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { private val tp22 = new TopicPartition("t2", 2) override def spec: Spec[TestEnvironment with Scope, Any] = - suite("Runloop.CommitOffsets spec")( + suite("CommitOffsets spec")( test("addCommits adds to empty CommitOffsets") { - val s1 = Runloop.CommitOffsets(Map.empty) + val s1 = CommitOffsets(Map.empty) val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 10)))) assertTrue( inc == 0, @@ -24,7 +26,7 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { ) }, test("addCommits updates offset when it is higher") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 4L)) + val s1 = CommitOffsets(Map(tp10 -> 4L)) val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 10)))) assertTrue( inc == 10 - 4, @@ -32,7 +34,7 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { ) }, test("addCommits ignores an offset when it is lower") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L)) + val s1 = CommitOffsets(Map(tp10 -> 10L)) val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 5)))) assertTrue( inc == 0, @@ -40,7 +42,7 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { ) }, test("addCommits keeps unrelated partitions") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L)) + val s1 = CommitOffsets(Map(tp10 -> 10L)) val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp11 -> 11)))) assertTrue( inc == 0, @@ -48,7 +50,7 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { ) }, test("addCommits does it all at once") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 205L, tp21 -> 210L, tp22 -> 220L)) + val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 205L, tp21 -> 210L, tp22 -> 220L)) val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp11 -> 11, tp20 -> 206L, tp21 -> 209L, tp22 -> 220L)))) assertTrue( inc == /* tp10 */ 0 + /* tp11 */ 0 + /* tp20 */ 1 + /* tp21 */ 0 + /* tp22 */ 0, @@ -56,7 +58,7 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { ) }, test("addCommits adds multiple commits") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 200L, tp21 -> 210L, tp22 -> 220L)) + val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 200L, tp21 -> 210L, tp22 -> 220L)) val (inc, s2) = s1.addCommits( Chunk( makeCommit(Map(tp11 -> 11, tp20 -> 199L, tp21 -> 211L, tp22 -> 219L)), @@ -69,35 +71,35 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { ) }, test("keepPartitions removes some partitions") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) + val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) val s2 = s1.keepPartitions(Set(tp10)) assertTrue(s2.offsets == Map(tp10 -> 10L)) }, test("does not 'contain' offset when tp is not present") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L)) + val s1 = CommitOffsets(Map(tp10 -> 10L)) val result = s1.contains(tp20, 10) assertTrue(!result) }, test("does not 'contain' a higher offset") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) + val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) val result = s1.contains(tp10, 11) assertTrue(!result) }, test("does 'contain' equal offset") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) + val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) val result = s1.contains(tp10, 10) assertTrue(result) }, test("does 'contain' lower offset") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) + val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) val result = s1.contains(tp20, 19) assertTrue(result) } ) - private def makeCommit(offsets: Map[TopicPartition, Long]): Runloop.Commit = { + private def makeCommit(offsets: Map[TopicPartition, Long]): Commit = { val o = offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset) } val p = Unsafe.unsafe(implicit unsafe => Promise.unsafe.make[Throwable, Unit](FiberId.None)) - Runloop.Commit(0L, o, p) + Commit(0L, o, p) } } diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitterSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitterSpec.scala new file mode 100644 index 000000000..9e0832201 --- /dev/null +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitterSpec.scala @@ -0,0 +1,204 @@ +package zio.kafka.consumer.internal + +import org.apache.kafka.clients.consumer.OffsetAndMetadata +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.RebalanceInProgressException +import zio.kafka.consumer.diagnostics.Diagnostics +import zio.test._ +import zio.{ durationInt, Promise, Ref, ZIO } + +import java.util.{ Map => JavaMap } +import scala.jdk.CollectionConverters.MapHasAsJava + +object CommitterSpec extends ZIOSpecDefault { + override def spec = suite("Committer")( + test("signals that a new commit is available") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter + .make( + 10.seconds, + Diagnostics.NoOp, + new DummyMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + _ <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + } yield assertCompletes + }, + test("handles a successful commit by completing the commit effect") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + new DummyMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + _ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null))) + _ <- commitFiber.join + } yield assertCompletes + }, + test("handles a failed commit by completing the commit effect with a failure") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + new DummyMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + _ <- committer.processQueuedCommits((offsets, callback) => + ZIO.attempt(callback.onComplete(offsets, new RuntimeException("Commit failed"))) + ) + result <- commitFiber.await + } yield assertTrue(result.isFailure) + }, + test("retries when rebalancing") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + new DummyMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + _ <- committer.processQueuedCommits((offsets, callback) => + ZIO.attempt(callback.onComplete(offsets, new RebalanceInProgressException("Rebalance in progress"))) + ) + _ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null))) + result <- commitFiber.await + } yield assertTrue(result.isSuccess) + }, + test("adds 1 to the committed last offset") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + new DummyMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + _ <- committer.commit(Map(tp -> new OffsetAndMetadata(1))).forkScoped + _ <- commitAvailable.await + committedOffsets <- Promise.make[Nothing, JavaMap[TopicPartition, OffsetAndMetadata]] + _ <- committer.processQueuedCommits((offsets, callback) => + committedOffsets.succeed(offsets) *> ZIO.attempt(callback.onComplete(offsets, null)) + ) + offsetsCommitted <- committedOffsets.await + } yield assertTrue( + offsetsCommitted == Map(tp -> new OffsetAndMetadata(2)).asJava + ) + }, + test("batches commits from multiple partitions and offsets") { + for { + runtime <- ZIO.runtime[Any] + commitsAvailable <- Promise.make[Nothing, Unit] + nrCommitsDone <- Ref.make(0) + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + new DummyMetrics, + onCommitAvailable = + ZIO.whenZIO(nrCommitsDone.updateAndGet(_ + 1).map(_ == 3))(commitsAvailable.succeed(())).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + tp2 = new TopicPartition("topic", 1) + commitFiber1 <- committer.commit(Map(tp -> new OffsetAndMetadata(1))).forkScoped + commitFiber2 <- committer.commit(Map(tp -> new OffsetAndMetadata(2))).forkScoped + commitFiber3 <- committer.commit(Map(tp2 -> new OffsetAndMetadata(3))).forkScoped + _ <- commitsAvailable.await + committedOffsets <- Promise.make[Nothing, JavaMap[TopicPartition, OffsetAndMetadata]] + _ <- committer.processQueuedCommits((offsets, callback) => + committedOffsets.succeed(offsets) *> ZIO.attempt(callback.onComplete(offsets, null)) + ) + _ <- commitFiber1.join zip commitFiber2.join zip commitFiber3.join + offsetsCommitted <- committedOffsets.await + } yield assertTrue( + offsetsCommitted == Map(tp -> new OffsetAndMetadata(3), tp2 -> new OffsetAndMetadata(4)).asJava + ) + }, + test("keeps track of pending commits") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + new DummyMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + _ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null))) + pendingCommitsDuringCommit <- committer.pendingCommitCount + _ <- committer.cleanupPendingCommits + pendingCommitsAfterCommit <- committer.pendingCommitCount + _ <- commitFiber.join + } yield assertTrue(pendingCommitsDuringCommit == 1 && pendingCommitsAfterCommit == 0) + }, + test("keep track of committed offsets") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + new DummyMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + _ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null))) + committedOffsets <- committer.getCommittedOffsets + _ <- commitFiber.join + } yield assertTrue(committedOffsets.offsets == Map(tp -> 0L)) + }, + test("clean committed offsets of no-longer assigned partitions") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + new DummyMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + _ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null))) + _ <- committer.keepCommitsForPartitions(Set.empty) + committedOffsets <- committer.getCommittedOffsets + _ <- commitFiber.join + } yield assertTrue(committedOffsets.offsets.isEmpty) + } + ) @@ TestAspect.withLiveClock @@ TestAspect.nonFlaky(100) +} diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/DummyMetrics.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/DummyMetrics.scala new file mode 100644 index 000000000..5879a1a1c --- /dev/null +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/DummyMetrics.scala @@ -0,0 +1,23 @@ +package zio.kafka.consumer.internal +import zio.{ UIO, ZIO } + +private[internal] class DummyMetrics extends ConsumerMetrics { + override def observePoll(resumedCount: Int, pausedCount: Int, latency: zio.Duration, pollSize: Int): UIO[Unit] = + ZIO.unit + + override def observeCommit(latency: zio.Duration): UIO[Unit] = ZIO.unit + override def observeAggregatedCommit(latency: zio.Duration, commitSize: NanoTime): UIO[Unit] = ZIO.unit + override def observeRebalance( + currentlyAssignedCount: Int, + assignedCount: Int, + revokedCount: Int, + lostCount: Int + ): UIO[Unit] = ZIO.unit + override def observeRunloopMetrics( + state: Runloop.State, + commandQueueSize: Int, + commitQueueSize: Int, + pendingCommits: Int + ): UIO[Unit] = ZIO.unit + override def observePollAuthError(): UIO[Unit] = ZIO.unit +} diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala new file mode 100644 index 000000000..562db3f2c --- /dev/null +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala @@ -0,0 +1,218 @@ +package zio.kafka.consumer.internal + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.TopicPartition +import zio.kafka.ZIOSpecDefaultSlf4j +import zio.kafka.consumer.diagnostics.Diagnostics +import zio.kafka.consumer.internal.Committer.CommitOffsets +import zio.kafka.consumer.internal.LiveCommitter.Commit +import zio.kafka.consumer.internal.RebalanceCoordinator.RebalanceEvent +import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord +import zio.kafka.consumer.{ CommittableRecord, ConsumerSettings } +import zio.test._ +import zio.{ durationInt, Chunk, Promise, Ref, Scope, Semaphore, Task, UIO, ZIO } + +import java.util + +object RebalanceCoordinatorSpec extends ZIOSpecDefaultSlf4j { + type BinaryMockConsumer = MockConsumer[Array[Byte], Array[Byte]] + + private val mockMetrics = new ConsumerMetrics { + override def observePoll(resumedCount: Int, pausedCount: Int, latency: zio.Duration, pollSize: Int): UIO[Unit] = + ZIO.unit + + override def observeCommit(latency: zio.Duration): UIO[Unit] = ZIO.unit + override def observeAggregatedCommit(latency: zio.Duration, commitSize: NanoTime): UIO[Unit] = ZIO.unit + override def observeRebalance( + currentlyAssignedCount: Int, + assignedCount: Int, + revokedCount: Int, + lostCount: Int + ): UIO[Unit] = ZIO.unit + override def observeRunloopMetrics( + state: Runloop.State, + commandQueueSize: Int, + commitQueueSize: Int, + pendingCommits: Int + ): UIO[Unit] = ZIO.unit + override def observePollAuthError(): UIO[Unit] = ZIO.unit + } + + def spec = suite("RunloopRebalanceListener")( + test("should track assigned, revoked and lost partitions") { + for { + lastEvent <- Ref.Synchronized.make(RebalanceCoordinator.RebalanceEvent.None) + consumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) {} + tp = new TopicPartition("topic", 0) + tp2 = new TopicPartition("topic", 1) + tp3 = new TopicPartition("topic", 2) + tp4 = new TopicPartition("topic", 3) + listener <- makeCoordinator(lastEvent, consumer) + _ <- listener.toRebalanceListener.onAssigned(Set(tp)) + _ <- listener.toRebalanceListener.onAssigned(Set(tp4)) + _ <- listener.toRebalanceListener.onRevoked(Set(tp2)) + _ <- listener.toRebalanceListener.onLost(Set(tp3)) + event <- lastEvent.get + } yield assertTrue( + event.wasInvoked && event.assignedTps == Set(tp, tp4) && event.revokedTps == Set(tp2) && event.lostTps == Set( + tp3 + ) + ) + }, + test("should end streams for revoked and lost partitions") { + for { + lastEvent <- Ref.Synchronized.make(RebalanceCoordinator.RebalanceEvent.None) + consumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) {} + tp = new TopicPartition("topic", 0) + tp2 = new TopicPartition("topic", 1) + tp3 = new TopicPartition("topic", 2) + assignedStreams <- ZIO.foreach(Chunk(tp, tp2, tp3))(makeStreamControl) + listener <- makeCoordinator(lastEvent, consumer, assignedStreams = assignedStreams) + _ <- listener.toRebalanceListener.onAssigned(Set(tp)) + _ <- listener.toRebalanceListener.onRevoked(Set(tp2)) + _ <- listener.toRebalanceListener.onLost(Set(tp3)) + event <- lastEvent.get + // Lost and end partition's stream should be ended + _ <- assignedStreams(1).stream.runDrain + _ <- assignedStreams(2).stream.runDrain + } yield assertTrue(event.endedStreams.map(_.tp).toSet == Set(tp2, tp3)) + }, + suite("rebalanceSafeCommits")( + test("should wait for the last pulled offset to commit") { + for { + lastEvent <- Ref.Synchronized.make(RebalanceCoordinator.RebalanceEvent.None) + consumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) { + override def commitAsync( + offsets: util.Map[TopicPartition, OffsetAndMetadata], + callback: OffsetCommitCallback + ): Unit = + // Do nothing during rebalancing + if (callback != null) callback.onComplete(offsets, null) + + override def commitSync(): Unit = () + } + tp = new TopicPartition("topic", 0) + streamControl <- makeStreamControl(tp) + records = createTestRecords(3) + recordsPulled <- Promise.make[Nothing, Unit] + _ <- streamControl.offerRecords(records) + runtime <- ZIO.runtime[Any] + committer <- LiveCommitter.make(10.seconds, Diagnostics.NoOp, mockMetrics, ZIO.unit, runtime) + + streamDrain <- + streamControl.stream + .tap(_ => recordsPulled.succeed(())) + .tap(record => + committer + .commit( + Map( + new TopicPartition("topic", record.partition) -> new OffsetAndMetadata(record.offset.offset, null) + ) + ) + ) + .runDrain + .forkScoped + listener <- + makeCoordinator( + lastEvent, + consumer, + assignedStreams = Chunk(streamControl), + rebalanceSafeCommits = true, + committer = committer + ) + _ <- listener.toRebalanceListener.onRevoked(Set(tp)) + _ <- streamDrain.join + } yield assertCompletes + }, + test("should continue if waiting for the stream to continue has timed out") { + for { + lastEvent <- Ref.Synchronized.make(RebalanceCoordinator.RebalanceEvent.None) + consumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) {} + tp = new TopicPartition("topic", 0) + streamControl <- makeStreamControl(tp) + records = createTestRecords(3) + recordsPulled <- Promise.make[Nothing, Unit] + _ <- streamControl.offerRecords(records) + committedOffsets <- Ref.make(CommitOffsets.empty) + done <- Promise.make[Throwable, Unit] + committer = new MockCommitter { + override val commit = + offsets => + committedOffsets + .update(_.addCommits(Chunk(Commit(0L, offsets, done)))._2) + override def getCommittedOffsets = committedOffsets.get + } + streamDrain <- + streamControl.stream + .tap(_ => recordsPulled.succeed(())) + .runDrain + .forkScoped + listener <- + makeCoordinator( + lastEvent, + consumer, + assignedStreams = Chunk(streamControl), + rebalanceSafeCommits = true, + committer = committer + ) + _ <- listener.toRebalanceListener.onRevoked(Set(tp)) + _ <- streamDrain.join + } yield assertCompletes + } + ) + ) @@ TestAspect.withLiveClock + + private def makeStreamControl(tp: TopicPartition): UIO[PartitionStreamControl] = + PartitionStreamControl.newPartitionStream(tp, ZIO.unit, Diagnostics.NoOp, 30.seconds) + + private def makeCoordinator( + lastEvent: Ref.Synchronized[RebalanceEvent], + mockConsumer: BinaryMockConsumer, + assignedStreams: Chunk[PartitionStreamControl] = Chunk.empty, + committer: Committer = new MockCommitter {}, + settings: ConsumerSettings = ConsumerSettings(List("")).withCommitTimeout(1.second), + rebalanceSafeCommits: Boolean = false + ): ZIO[Scope, Throwable, RebalanceCoordinator] = + Semaphore.make(1).map(new ConsumerAccess(mockConsumer, _)).map { consumerAccess => + new RebalanceCoordinator( + lastEvent, + settings.withRebalanceSafeCommits(rebalanceSafeCommits), + consumerAccess, + 5.seconds, + ZIO.succeed(assignedStreams), + committer + ) + } + + private def createTestRecords(count: Int): Chunk[ByteArrayCommittableRecord] = + Chunk.fromIterable( + (1 to count).map(i => + CommittableRecord( + record = new ConsumerRecord[Array[Byte], Array[Byte]]( + "test-topic", + 0, + i.toLong, + Array[Byte](), + Array[Byte]() + ), + commitHandle = _ => ZIO.unit, + consumerGroupMetadata = None + ) + ) + ) +} + +abstract class MockCommitter extends Committer { + override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit + + override def processQueuedCommits( + commitAsync: (java.util.Map[TopicPartition, OffsetAndMetadata], OffsetCommitCallback) => zio.Task[Unit], + executeOnEmpty: Boolean + ): zio.Task[Unit] = ZIO.unit + override def queueSize: UIO[Int] = ZIO.succeed(0) + override def pendingCommitCount: UIO[Int] = ZIO.succeed(0) + override def getPendingCommits: UIO[CommitOffsets] = ZIO.succeed(CommitOffsets.empty) + override def cleanupPendingCommits: UIO[Unit] = ZIO.unit + override def keepCommitsForPartitions(assignedPartitions: Set[TopicPartition]): UIO[Unit] = ZIO.unit + override def getCommittedOffsets: UIO[CommitOffsets] = ZIO.succeed(CommitOffsets.empty) +} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala new file mode 100644 index 000000000..ab8b98ac5 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala @@ -0,0 +1,90 @@ +package zio.kafka.consumer.internal + +import org.apache.kafka.clients.consumer.{ OffsetAndMetadata, OffsetCommitCallback } +import org.apache.kafka.common.TopicPartition +import zio.kafka.consumer.internal.Committer.CommitOffsets +import zio.kafka.consumer.internal.LiveCommitter.Commit +import zio.{ Chunk, Task, UIO } + +import java.lang.Math.max +import java.util.{ Map => JavaMap } +import scala.collection.mutable + +private[internal] trait Committer { + val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] + + /** + * Takes commits from the queue, commits them and adds them to pending commits + * + * If the queue is empty, nothing happens, unless executeOnEmpty is true. + * + * WARNING: this method is used during a rebalance from the same-thread-runtime. This restricts what ZIO operations + * may be used. Please see [[RebalanceCoordinator]] for more information. + * + * @param commitAsync + * Function 'commitAsync' on the KafkaConsumer. This is isolated from the whole KafkaConsumer for testing purposes. + * The caller should ensure exclusive access to the KafkaConsumer. + * @param executeOnEmpty + * Execute commitAsync() even if there are no commits + */ + def processQueuedCommits( + commitAsync: (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback) => Task[Unit], + executeOnEmpty: Boolean = false + ): Task[Unit] + + def queueSize: UIO[Int] + + def pendingCommitCount: UIO[Int] + + def getPendingCommits: UIO[CommitOffsets] + + /** Removes all completed commits from `pendingCommits`. */ + def cleanupPendingCommits: UIO[Unit] + + def keepCommitsForPartitions(assignedPartitions: Set[TopicPartition]): UIO[Unit] + + def getCommittedOffsets: UIO[CommitOffsets] +} + +private[internal] object Committer { + final case class CommitOffsets(offsets: Map[TopicPartition, Long]) { + + /** Returns an estimate of the total offset increase, and a new `CommitOffsets` with the given offsets added. */ + def addCommits(c: Chunk[Commit]): (Long, CommitOffsets) = { + val updatedOffsets = mutable.Map.empty[TopicPartition, Long] + updatedOffsets.sizeHint(offsets.size) + updatedOffsets ++= offsets + var offsetIncrease = 0L + c.foreach { commit => + commit.offsets.foreach { case (tp, offsetAndMeta) => + val offset = offsetAndMeta.offset() + val maxOffset = updatedOffsets.get(tp) match { + case Some(existingOffset) => + offsetIncrease += max(0L, offset - existingOffset) + max(existingOffset, offset) + case None => + // This partition was not committed to from this consumer yet. Therefore we do not know the offset + // increase. A good estimate would be the poll size for this consumer, another okayish estimate is 0. + // Let's go with the simplest for now: ```offsetIncrease += 0``` + offset + } + updatedOffsets += tp -> maxOffset + } + } + (offsetIncrease, CommitOffsets(offsets = updatedOffsets.toMap)) + } + + def keepPartitions(tps: Set[TopicPartition]): CommitOffsets = + CommitOffsets(offsets.filter { case (tp, _) => tps.contains(tp) }) + + def contains(tp: TopicPartition, offset: Long): Boolean = + offsets.get(tp).exists(_ >= offset) + + def get(tp: TopicPartition): Option[Long] = offsets.get(tp) + } + + private[internal] object CommitOffsets { + val empty: CommitOffsets = CommitOffsets(Map.empty) + } + +} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerMetrics.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerMetrics.scala index f43329a4f..aff76f7cd 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerMetrics.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerMetrics.scala @@ -16,7 +16,12 @@ private[internal] trait ConsumerMetrics { def observeCommit(latency: Duration): UIO[Unit] def observeAggregatedCommit(latency: Duration, commitSize: Long): UIO[Unit] def observeRebalance(currentlyAssignedCount: Int, assignedCount: Int, revokedCount: Int, lostCount: Int): UIO[Unit] - def observeRunloopMetrics(state: Runloop.State, commandQueueSize: Int, commitQueueSize: Int): UIO[Unit] + def observeRunloopMetrics( + state: Runloop.State, + commandQueueSize: Int, + commitQueueSize: Int, + pendingCommits: Int + ): UIO[Unit] def observePollAuthError(): UIO[Unit] } @@ -330,14 +335,19 @@ private[internal] class ZioConsumerMetrics(metricLabels: Set[MetricLabel]) exten .contramap[Int](_.toDouble) .tagged(metricLabels) - override def observeRunloopMetrics(state: Runloop.State, commandQueueSize: Int, commitQueueSize: Int): UIO[Unit] = + override def observeRunloopMetrics( + state: Runloop.State, + commandQueueSize: Int, + commitQueueSize: Int, + pendingCommits: Int + ): UIO[Unit] = for { _ <- ZIO.foreachDiscard(state.assignedStreams)(_.outstandingPolls @@ queuePollsHistogram) queueSizes <- ZIO.foreach(state.assignedStreams)(_.queueSize) _ <- ZIO.foreachDiscard(queueSizes)(qs => queueSizeHistogram.update(qs)) _ <- allQueueSizeHistogram.update(queueSizes.sum) _ <- pendingRequestsHistogram.update(state.pendingRequests.size) - _ <- pendingCommitsHistogram.update(state.pendingCommits.size) + _ <- pendingCommitsHistogram.update(pendingCommits) _ <- subscriptionStateGauge.update(state.subscriptionState) _ <- commandQueueSizeHistogram.update(commandQueueSize) _ <- commitQueueSizeHistogram.update(commitQueueSize) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala new file mode 100644 index 000000000..f976c1f9e --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala @@ -0,0 +1,167 @@ +package zio.kafka.consumer.internal +import org.apache.kafka.clients.consumer.{ OffsetAndMetadata, OffsetCommitCallback } +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.RebalanceInProgressException +import zio.kafka.consumer.Consumer.CommitTimeout +import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } +import zio.kafka.consumer.internal.Committer.CommitOffsets +import zio.kafka.consumer.internal.LiveCommitter.Commit +import zio.{ durationLong, Chunk, Duration, Exit, Promise, Queue, Ref, Runtime, Scope, Task, UIO, Unsafe, ZIO } + +import java.util +import java.util.{ Map => JavaMap } +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +private[consumer] final class LiveCommitter( + commitQueue: Queue[Commit], + commitTimeout: Duration, + diagnostics: Diagnostics, + consumerMetrics: ConsumerMetrics, + onCommitAvailable: UIO[Unit], + committedOffsetsRef: Ref[CommitOffsets], + sameThreadRuntime: Runtime[Any], + pendingCommits: Ref.Synchronized[Chunk[Commit]] +) extends Committer { + + /** This is the implementation behind the user facing api `Offset.commit`. */ + override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = + offsets => + for { + p <- Promise.make[Throwable, Unit] + startTime = java.lang.System.nanoTime() + _ <- commitQueue.offer(Commit(startTime, offsets, p)) + _ <- onCommitAvailable + _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) + _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) + endTime = java.lang.System.nanoTime() + latency = (endTime - startTime).nanoseconds + _ <- consumerMetrics.observeCommit(latency) + } yield () + + /** + * WARNING: this method is used during a rebalance from the same-thread-runtime. This restricts what ZIO operations + * may be used. Please see [[RebalanceCoordinator]] for more information. + */ + override def processQueuedCommits( + commitAsync: (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback) => Task[Unit], + executeOnEmpty: Boolean = false + ): Task[Unit] = for { + commits <- commitQueue.takeAll + _ <- ZIO.logDebug(s"Processing ${commits.size} commits") + _ <- ZIO.unless(commits.isEmpty && !executeOnEmpty) { + val (offsets, callback, onFailure) = asyncCommitParameters(commits) + pendingCommits.update(_ ++ commits) *> + // We don't wait for the completion of the commit here, because it + // will only complete once we poll again. + commitAsync(offsets, callback) + .catchAll(onFailure) + } + } yield () + + /** + * Merge commits and prepare parameters for calling `consumer.commitAsync`. + * + * WARNING: this method is used during a rebalance from the same-thread-runtime. This restricts what ZIO operations + * may be used. Please see [[RebalanceCoordinator]] for more information. + */ + private def asyncCommitParameters( + commits: Chunk[Commit] + ): (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback, Throwable => UIO[Unit]) = { + val offsets = commits + .foldLeft(mutable.Map.empty[TopicPartition, OffsetAndMetadata]) { case (acc, commit) => + commit.offsets.foreach { case (tp, offset) => + acc += (tp -> acc + .get(tp) + .map(current => if (current.offset() > offset.offset()) current else offset) + .getOrElse(offset)) + } + acc + } + .toMap + val offsetsWithMetaData = offsets.map { case (tp, offset) => + tp -> new OffsetAndMetadata(offset.offset + 1, offset.leaderEpoch, offset.metadata) + } + val cont = (e: Exit[Throwable, Unit]) => ZIO.foreachDiscard(commits)(_.cont.done(e)) + // We assume the commit is started immediately after returning from this method. + val startTime = java.lang.System.nanoTime() + val onSuccess = { + val endTime = java.lang.System.nanoTime() + val latency = (endTime - startTime).nanoseconds + for { + offsetIncrease <- committedOffsetsRef.modify(_.addCommits(commits)) + _ <- consumerMetrics.observeAggregatedCommit(latency, offsetIncrease).when(commits.nonEmpty) + result <- cont(Exit.unit) + _ <- diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData)) + } yield result + } + val onFailure: Throwable => UIO[Unit] = { + case _: RebalanceInProgressException => + for { + _ <- ZIO.logDebug(s"Rebalance in progress, commit for offsets $offsets will be retried") + _ <- commitQueue.offerAll(commits) + _ <- onCommitAvailable + } yield () + case err: Throwable => + cont(Exit.fail(err)) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsetsWithMetaData, err)) + } + val callback = + new OffsetCommitCallback { + override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = + Unsafe.unsafe { implicit u => + sameThreadRuntime.unsafe.run { + if (exception eq null) onSuccess else onFailure(exception) + } + .getOrThrowFiberFailure() + } + } + (offsetsWithMetaData.asJava, callback, onFailure) + } + + override def queueSize: UIO[Int] = commitQueue.size + + override def pendingCommitCount: UIO[Int] = pendingCommits.get.map(_.size) + + override def getPendingCommits: UIO[CommitOffsets] = + pendingCommits.get.map(CommitOffsets.empty.addCommits(_)._2) + + override def cleanupPendingCommits: UIO[Unit] = + pendingCommits.updateZIO(_.filterZIO(_.isPending)) + + override def keepCommitsForPartitions(assignedPartitions: Set[TopicPartition]): UIO[Unit] = + committedOffsetsRef.update(_.keepPartitions(assignedPartitions)) + + override def getCommittedOffsets: UIO[CommitOffsets] = committedOffsetsRef.get +} + +private[internal] object LiveCommitter { + def make( + commitTimeout: Duration, + diagnostics: Diagnostics, + consumerMetrics: ConsumerMetrics, + onCommitAvailable: UIO[Unit], + sameThreadRuntime: Runtime[Any] + ): ZIO[Scope, Nothing, LiveCommitter] = for { + pendingCommits <- Ref.Synchronized.make(Chunk.empty[Commit]) + commitQueue <- ZIO.acquireRelease(Queue.unbounded[Commit])(_.shutdown) + committedOffsetsRef <- Ref.make(CommitOffsets.empty) + } yield new LiveCommitter( + commitQueue, + commitTimeout, + diagnostics, + consumerMetrics, + onCommitAvailable, + committedOffsetsRef, + sameThreadRuntime, + pendingCommits + ) + + private[internal] final case class Commit( + createdAt: NanoTime, + offsets: Map[TopicPartition, OffsetAndMetadata], + cont: Promise[Throwable, Unit] + ) { + @inline def isPending: UIO[Boolean] = cont.isDone.negate + } + +} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RebalanceCoordinator.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RebalanceCoordinator.scala new file mode 100644 index 000000000..43be57980 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RebalanceCoordinator.scala @@ -0,0 +1,288 @@ +package zio.kafka.consumer.internal +import org.apache.kafka.common.TopicPartition +import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer +import zio.kafka.consumer.internal.RebalanceCoordinator.{ + EndOffsetCommitPending, + EndOffsetCommitted, + EndOffsetNotCommitted, + RebalanceEvent, + StreamCompletionStatus +} +import zio.kafka.consumer.{ ConsumerSettings, RebalanceListener } +import zio.stream.ZStream +import zio.{ durationInt, Chunk, Duration, Ref, Task, UIO, ZIO } + +/** + * The Runloop's RebalanceListener gets notified of partitions that are assigned, revoked and lost + * + * Because this happens during the call to `poll()`, we communicate any results to the Runloop via a `Ref` + * + * When rebalanceSafeCommits is enabled, we await completion of all revoked partitions' streams and their commits before + * continuing. + */ +private[internal] class RebalanceCoordinator( + lastRebalanceEvent: Ref.Synchronized[RebalanceEvent], + settings: ConsumerSettings, + consumer: ConsumerAccess, + maxRebalanceDuration: Duration, + getCurrentAssignedStreams: UIO[Chunk[PartitionStreamControl]], + committer: Committer +) { + private val commitTimeoutNanos = settings.commitTimeout.toNanos + + private val restartStreamsOnRebalancing = settings.restartStreamOnRebalancing + private val rebalanceSafeCommits = settings.rebalanceSafeCommits + private val commitTimeout = settings.commitTimeout + + // All code in this block is called from the rebalance listener and therefore runs on the same-thread-runtime. This + // is because the Java kafka client requires us to invoke the consumer from the same thread that invoked the + // rebalance listener. + // Unfortunately the same-thread-runtime does not work for all ZIO operations. For example, `ZIO.timeout`, + // `ZStream.repeat`, `Promise.await` on non-completed promises, and any other ZIO operation that shifts the work to + // another thread cannot be used. + + // Time between polling the commit queue from the rebalance listener when `rebalanceSafeCommits` is enabled. + private val commitQueuePollInterval = 100.millis + + def getAndResetLastEvent: UIO[RebalanceEvent] = + lastRebalanceEvent.getAndSet(RebalanceEvent.None) + + // End streams from the rebalance listener. + // When `rebalanceSafeCommits` is enabled, wait for consumed offsets to be committed. + private def endStreams(streamsToEnd: Chunk[PartitionStreamControl]): Task[Any] = + ZIO.unless(streamsToEnd.isEmpty) { + for { + _ <- ZIO.foreachDiscard(streamsToEnd)(_.end) + _ <- consumer.rebalanceListenerAccess(doAwaitStreamCommits(_, streamsToEnd)).when(rebalanceSafeCommits) + } yield () + } + + private def doAwaitStreamCommits( + consumer: ByteArrayKafkaConsumer, + streamsToEnd: Chunk[PartitionStreamControl] + ): Task[Unit] = { + val deadline = java.lang.System.nanoTime() + maxRebalanceDuration.toNanos - commitTimeoutNanos + + def timeToDeadlineMillis(): Long = (deadline - java.lang.System.nanoTime()) / 1000000L + + def completionStatusesAsString(completionStatuses: Chunk[StreamCompletionStatus]): String = + "Revoked partitions: " + completionStatuses.map(_.toString).mkString("; ") + + def getStreamCompletionStatuses: UIO[Chunk[StreamCompletionStatus]] = + for { + committedOffsets <- committer.getCommittedOffsets + latestPendingCommitOffsets <- committer.getPendingCommits.map(_.offsets) + streamResults <- + ZIO.foreach(streamsToEnd) { stream => + for { + isDone <- stream.completedPromise.isDone + lastPulledOffset <- stream.lastPulledOffset + endOffset <- if (isDone) stream.completedPromise.await else ZIO.none + + endOffsetCommitStatus = + endOffset match { + case Some(endOffset) if committedOffsets.contains(stream.tp, endOffset.offset) => + EndOffsetCommitted + case Some(endOffset) if latestPendingCommitOffsets.get(stream.tp).contains(endOffset.offset) => + EndOffsetCommitPending + case _ => EndOffsetNotCommitted + } + } yield StreamCompletionStatus( + stream.tp, + isDone, + lastPulledOffset.map(_.offset), + committedOffsets.get(stream.tp), + endOffsetCommitStatus + ) + } + } yield streamResults + + @inline + def logStreamCompletionStatuses(completionStatuses: Chunk[StreamCompletionStatus]): UIO[Unit] = { + val statusStrings = completionStatusesAsString(completionStatuses) + ZIO.logDebug( + s"Delaying rebalance until ${streamsToEnd.size} streams (of revoked partitions) have committed " + + s"the offsets of the records they consumed. Deadline in ${timeToDeadlineMillis()}ms. $statusStrings" + ) + } + + def logInitialStreamCompletionStatuses: UIO[Unit] = + for { + completionStatuses <- getStreamCompletionStatuses + _ <- logStreamCompletionStatuses(completionStatuses) + } yield () + + def endingStreamsCompletedAndCommitsExist: UIO[Boolean] = + for { + completionStatuses <- getStreamCompletionStatuses + _ <- logStreamCompletionStatuses(completionStatuses) + } yield completionStatuses.forall { status => + // A stream is complete when it never got any records, or when it committed the offset of the last consumed record + status.lastPulledOffset.isEmpty || (status.streamEnded && status.endOffsetCommitStatus != EndOffsetNotCommitted) + } + + def logFinalStreamCompletionStatuses(completed: Boolean): UIO[Unit] = + if (completed) + ZIO.logInfo("Continuing rebalance, all offsets of consumed records in the revoked partitions were committed.") + else + for { + completionStatuses <- getStreamCompletionStatuses + statusStrings = completionStatusesAsString(completionStatuses) + _ <- + ZIO.logWarning( + s"Exceeded deadline waiting for streams (of revoked partitions) to commit the offsets of " + + s"the records they consumed; the rebalance will continue. " + + s"This might cause another consumer to process some records again. $statusStrings" + ) + } yield () + + def commitSync: Task[Unit] = + ZIO.attempt(consumer.commitSync(java.util.Collections.emptyMap(), commitTimeout)) + + // Outline: + // - Every `commitQueuePollInterval` until the deadline has been reached: + // - Get all commits from the commit queue. + // - Start an async commit for these commits. + // - Collect all these new (pending) commits. + // - repeat the above until: + // - All streams that were ended have completed their work, and + // - we have seen a completed or pending commit for all end-offsets. + // An end-offset of a stream is the offset of the last record given to that stream. + // - Do a single sync commit without any offsets, this has the side-effect of blocking until all + // preceding async commits are complete (this requires kafka-client 3.6.0 or later). + // Because all commits created here (including those from non-ending streams) are now complete, we do not + // have to add them to the pending commits of the runloop state. + // + // Note, we cannot use ZStream.fromQueue because that will emit nothing when the queue is empty. + // Instead, we poll the queue in a loop. + for { + _ <- logInitialStreamCompletionStatuses + completed <- + ZStream + .fromZIO(blockingSleep(commitQueuePollInterval)) + .forever + // Even if there is nothing to commit, continue to drive communication with the broker + // so that commits can complete and the streams can make progress, by setting + // executeOnEmpty = true + .tap { _ => + committer.processQueuedCommits( + (offsets, callback) => ZIO.attempt(consumer.commitAsync(offsets, callback)), + executeOnEmpty = true + ) + } + .takeWhile(_ => java.lang.System.nanoTime() <= deadline) + .mapZIO(_ => endingStreamsCompletedAndCommitsExist) + .takeUntil(completed => completed) + .runLast + .map(_.getOrElse(false)) + _ <- logFinalStreamCompletionStatuses(completed) + _ <- commitSync + _ <- ZIO.logDebug(s"Done waiting for ${streamsToEnd.size} streams to end") + } yield () + } + + // During a poll, the java kafka client might call each method of the rebalance listener 0 or 1 times. + // We do not know the order in which the call-back methods are invoked. + // + // Ref `lastRebalanceEvent` is used to track what happens during the poll. Just before the poll the + // `RebalanceEvent.None` is stored. Then during the poll, inside each method of the rebalance listener, + // the ref is updated. + // + // Each method: + // - emits a diagnostic event + // - determines if this is the first method invoked during this poll (`rebalanceEvent.wasInvoked`) to + // make sure that the `restartStreamsOnRebalancing` feature is applied only once per poll + // - ends streams that need to be ended + // - updates `lastRebalanceEvent` + // + def toRebalanceListener: RebalanceListener = RebalanceListener( + onAssigned = assignedTps => + lastRebalanceEvent.updateZIO { rebalanceEvent => + for { + _ <- ZIO.logDebug { + val sameRebalance = if (rebalanceEvent.wasInvoked) " in same rebalance" else "" + s"${assignedTps.size} partitions are assigned$sameRebalance" + } + assignedStreams <- getCurrentAssignedStreams + streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) assignedStreams + else Chunk.empty + _ <- endStreams(streamsToEnd) + _ <- ZIO.logTrace("onAssigned done") + } yield rebalanceEvent.copy( + wasInvoked = true, + assignedTps = rebalanceEvent.assignedTps ++ assignedTps, + endedStreams = rebalanceEvent.endedStreams ++ streamsToEnd + ) + }, + onRevoked = revokedTps => + lastRebalanceEvent.updateZIO { rebalanceEvent => + for { + _ <- ZIO.logDebug { + val sameRebalance = if (rebalanceEvent.wasInvoked) " in same rebalance" else "" + s"${revokedTps.size} partitions are revoked$sameRebalance" + } + assignedStreams <- getCurrentAssignedStreams + streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) assignedStreams + else assignedStreams.filter(control => revokedTps.contains(control.tp)) + _ <- endStreams(streamsToEnd) + _ <- ZIO.logTrace("onRevoked done") + } yield rebalanceEvent.copy( + wasInvoked = true, + assignedTps = rebalanceEvent.assignedTps -- revokedTps, + revokedTps = rebalanceEvent.revokedTps ++ revokedTps, + endedStreams = rebalanceEvent.endedStreams ++ streamsToEnd + ) + }, + onLost = lostTps => + lastRebalanceEvent.updateZIO { rebalanceEvent => + for { + _ <- ZIO.logDebug(s"${lostTps.size} partitions are lost") + assignedStreams <- getCurrentAssignedStreams + lostStreams = assignedStreams.filter(control => lostTps.contains(control.tp)) + _ <- ZIO.foreachDiscard(lostStreams)(_.lost) + _ <- ZIO.logTrace(s"onLost done") + } yield rebalanceEvent.copy( + wasInvoked = true, + assignedTps = rebalanceEvent.assignedTps -- lostTps, + lostTps = rebalanceEvent.lostTps ++ lostTps, + endedStreams = rebalanceEvent.endedStreams ++ lostStreams + ) + } + ) +} + +private[internal] object RebalanceCoordinator { + + sealed trait EndOffsetCommitStatus + case object EndOffsetNotCommitted extends EndOffsetCommitStatus { override def toString = "not committed" } + case object EndOffsetCommitPending extends EndOffsetCommitStatus { override def toString = "commit pending" } + case object EndOffsetCommitted extends EndOffsetCommitStatus { override def toString = "committed" } + + final case class StreamCompletionStatus( + tp: TopicPartition, + streamEnded: Boolean, + lastPulledOffset: Option[Long], + lastCommittedOffset: Option[Long], + endOffsetCommitStatus: EndOffsetCommitStatus + ) { + override def toString: String = + s"$tp: " + + s"${if (streamEnded) "stream ended" else "stream is running"}, " + + s"last pulled offset=${lastPulledOffset.getOrElse("none")}, " + + s"last committed offset=${lastCommittedOffset.getOrElse("none")}, " + + endOffsetCommitStatus + } + + final case class RebalanceEvent( + wasInvoked: Boolean, + assignedTps: Set[TopicPartition], + revokedTps: Set[TopicPartition], + lostTps: Set[TopicPartition], + endedStreams: Chunk[PartitionStreamControl] + ) + + object RebalanceEvent { + val None: RebalanceEvent = + RebalanceEvent(wasInvoked = false, Set.empty, Set.empty, Set.empty, Chunk.empty) + } +} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 945a9ce11..b4ceddbb2 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -2,21 +2,18 @@ package zio.kafka.consumer.internal import org.apache.kafka.clients.consumer._ import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException, RebalanceInProgressException } +import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException } import zio._ -import zio.kafka.consumer.Consumer.{ CommitTimeout, OffsetRetrieval } +import zio.kafka.consumer.Consumer.OffsetRetrieval import zio.kafka.consumer._ import zio.kafka.consumer.diagnostics.DiagnosticEvent.{ Finalization, Rebalance } import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer import zio.kafka.consumer.internal.Runloop._ import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment +import zio.kafka.consumer.internal.RebalanceCoordinator.RebalanceEvent import zio.stream._ -import java.lang.Math.max -import java.util -import java.util.{ Map => JavaMap } -import scala.collection.mutable import scala.jdk.CollectionConverters._ //noinspection SimplifyWhenInspection,SimplifyUnlessInspection @@ -25,24 +22,15 @@ private[consumer] final class Runloop private ( topLevelExecutor: Executor, sameThreadRuntime: Runtime[Any], consumer: ConsumerAccess, - commitQueue: Queue[Commit], commandQueue: Queue[RunloopCommand], - lastRebalanceEvent: Ref.Synchronized[Runloop.RebalanceEvent], partitionsHub: Hub[Take[Throwable, PartitionAssignment]], diagnostics: Diagnostics, maxStreamPullInterval: Duration, - maxRebalanceDuration: Duration, currentStateRef: Ref[State], - committedOffsetsRef: Ref[CommitOffsets] + rebalanceCoordinator: RebalanceCoordinator, + consumerMetrics: ConsumerMetrics, + committer: Committer ) { - private val commitTimeout = settings.commitTimeout - private val commitTimeoutNanos = settings.commitTimeout.toNanos - - private val restartStreamsOnRebalancing = settings.restartStreamOnRebalancing - private val rebalanceSafeCommits = settings.rebalanceSafeCommits - - private val consumerMetrics = new ZioConsumerMetrics(settings.metricLabels) - private def newPartitionStream(tp: TopicPartition): UIO[PartitionStreamControl] = PartitionStreamControl.newPartitionStream( tp, @@ -81,236 +69,6 @@ private[consumer] final class Runloop private ( commandQueue.offer(RunloopCommand.RemoveSubscription(subscription)).unit private def makeRebalanceListener: ConsumerRebalanceListener = { - // All code in this block is called from the rebalance listener and therefore runs on the same-thread-runtime. This - // is because the Java kafka client requires us to invoke the consumer from the same thread that invoked the - // rebalance listener. - // Unfortunately the same-thread-runtime does not work for all ZIO operations. For example, `ZIO.timeout`, - // `ZStream.repeat`, `Promise.await` on non-completed promises, and any other ZIO operation that shifts the work to - // another thread cannot be used. - - // Time between polling the commit queue from the rebalance listener when `rebalanceSafeCommits` is enabled. - val commitQueuePollInterval = 100.millis - - // End streams from the rebalance listener. - // When `rebalanceSafeCommits` is enabled, wait for consumed offsets to be committed. - def endStreams(state: State, streamsToEnd: Chunk[PartitionStreamControl]): Task[Unit] = - if (streamsToEnd.isEmpty) ZIO.unit - else { - for { - _ <- ZIO.foreachDiscard(streamsToEnd)(_.end) - _ <- if (rebalanceSafeCommits) - consumer.rebalanceListenerAccess(doAwaitStreamCommits(_, state, streamsToEnd)) - else ZIO.unit - } yield () - } - - def doAwaitStreamCommits( - consumer: ByteArrayKafkaConsumer, - state: State, - streamsToEnd: Chunk[PartitionStreamControl] - ): Task[Unit] = { - val deadline = java.lang.System.nanoTime() + maxRebalanceDuration.toNanos - commitTimeoutNanos - - def timeToDeadlineMillis(): Long = (deadline - java.lang.System.nanoTime()) / 1000000L - - val endingTps = streamsToEnd.map(_.tp).toSet - - def commitsOfEndingStreams(commits: Chunk[Runloop.Commit]): Chunk[Runloop.Commit] = - commits.filter(commit => (commit.offsets.keySet intersect endingTps).nonEmpty) - - lazy val previousPendingCommits: Chunk[Commit] = - commitsOfEndingStreams(state.pendingCommits) - - def commitAsync(commits: Chunk[Commit]): UIO[Unit] = - if (commits.nonEmpty) { - val (offsets, callback, onFailure) = asyncCommitParameters(commits) - ZIO.logDebug(s"Async commit of ${offsets.size} offsets for ${commits.size} commits") *> - ZIO.attempt(consumer.commitAsync(offsets, callback)).catchAll(onFailure) - } else { - // Continue to drive communication with the broker so that commits can complete and the streams can - // make progress. - ZIO.attempt(consumer.commitAsync(java.util.Collections.emptyMap(), null)).orDie - } - - sealed trait EndOffsetCommitStatus - case object EndOffsetNotCommitted extends EndOffsetCommitStatus { override def toString = "not committed" } - case object EndOffsetCommitPending extends EndOffsetCommitStatus { override def toString = "commit pending" } - case object EndOffsetCommitted extends EndOffsetCommitStatus { override def toString = "committed" } - - final case class StreamCompletionStatus( - tp: TopicPartition, - streamEnded: Boolean, - lastPulledOffset: Option[Long], - endOffsetCommitStatus: EndOffsetCommitStatus - ) { - override def toString: String = - s"${tp}: " + - s"${if (streamEnded) "stream ended" else "stream is running"}, " + - s"last pulled offset=${lastPulledOffset.getOrElse("none")}, " + - endOffsetCommitStatus - } - - def completionStatusesAsString(completionStatuses: Chunk[StreamCompletionStatus]): String = - "Revoked partitions: " + completionStatuses.map(_.toString).mkString("; ") - - def getStreamCompletionStatuses(newCommits: Chunk[Commit]): UIO[Chunk[StreamCompletionStatus]] = - for { - committedOffsets <- committedOffsetsRef.get - allPendingCommitOffsets = - (previousPendingCommits ++ commitsOfEndingStreams(newCommits)).flatMap(_.offsets).map { - case (tp, offsetAndMetadata) => (tp, offsetAndMetadata.offset()) - } - streamResults <- - ZIO.foreach(streamsToEnd) { stream => - for { - isDone <- stream.completedPromise.isDone - lastPulledOffset <- stream.lastPulledOffset - endOffset <- if (isDone) stream.completedPromise.await else ZIO.none - - endOffsetCommitStatus = - endOffset match { - case Some(endOffset) if committedOffsets.contains(stream.tp, endOffset.offset) => - EndOffsetCommitted - case Some(endOffset) if allPendingCommitOffsets.contains((stream.tp, endOffset.offset)) => - EndOffsetCommitPending - case _ => EndOffsetNotCommitted - } - } yield StreamCompletionStatus(stream.tp, isDone, lastPulledOffset.map(_.offset), endOffsetCommitStatus) - } - } yield streamResults - - @inline - def logStreamCompletionStatuses(completionStatuses: Chunk[StreamCompletionStatus]): UIO[Unit] = { - val statusStrings = completionStatusesAsString(completionStatuses) - ZIO.logInfo( - s"Delaying rebalance until ${streamsToEnd.size} streams (of revoked partitions) have committed " + - s"the offsets of the records they consumed. Deadline in ${timeToDeadlineMillis()}ms. $statusStrings" - ) - } - - def logInitialStreamCompletionStatuses: UIO[Unit] = - for { - completionStatuses <- getStreamCompletionStatuses(newCommits = Chunk.empty) - _ <- logStreamCompletionStatuses(completionStatuses) - } yield () - - def endingStreamsCompletedAndCommitsExist(newCommits: Chunk[Commit]): UIO[Boolean] = - for { - completionStatuses <- getStreamCompletionStatuses(newCommits) - _ <- logStreamCompletionStatuses(completionStatuses) - } yield completionStatuses.forall { status => - // A stream is complete when it never got any records, or when it committed the offset of the last consumed record - status.lastPulledOffset.isEmpty || (status.streamEnded && status.endOffsetCommitStatus != EndOffsetNotCommitted) - } - - def logFinalStreamCompletionStatuses(completed: Boolean, newCommits: Chunk[Commit]): UIO[Unit] = - if (completed) - ZIO.logInfo("Continuing rebalance, all offsets of consumed records in the revoked partitions were committed.") - else - for { - completionStatuses <- getStreamCompletionStatuses(newCommits) - statusStrings = completionStatusesAsString(completionStatuses) - _ <- - ZIO.logWarning( - s"Exceeded deadline waiting for streams (of revoked partitions) to commit the offsets of " + - s"the records they consumed; the rebalance will continue. " + - s"This might cause another consumer to process some records again. $statusStrings" - ) - } yield () - - def commitSync: Task[Unit] = - ZIO.attempt(consumer.commitSync(java.util.Collections.emptyMap(), commitTimeout)) - - // Outline: - // - Every `commitQueuePollInterval` until the deadline has been reached: - // - Get all commits from the commit queue. - // - Start an async commit for these commits. - // - Collect all these new (pending) commits. - // - repeat the above until: - // - All streams that were ended have completed their work, and - // - we have seen a completed or pending commit for all end-offsets. - // An end-offset of a stream is the offset of the last record given to that stream. - // - Do a single sync commit without any offsets, this has the side-effect of blocking until all - // preceding async commits are complete (this requires kafka-client 3.6.0 or later). - // Because all commits created here (including those from non-ending streams) are now complete, we do not - // have to add them to the pending commits of the runloop state. - // - // Note, we cannot use ZStream.fromQueue because that will emit nothing when the queue is empty. - // Instead, we poll the queue in a loop. - for { - _ <- logInitialStreamCompletionStatuses - completedAndCommits <- - ZStream - .fromZIO(blockingSleep(commitQueuePollInterval) *> commitQueue.takeAll) - .tap(commitAsync) - .forever - .takeWhile(_ => java.lang.System.nanoTime() <= deadline) - .scan(Chunk.empty[Runloop.Commit])(_ ++ _) - .mapZIO(commits => endingStreamsCompletedAndCommitsExist(commits).map((_, commits))) - .takeUntil { case (completed, _) => completed } - .runLast - .map(_.getOrElse((false, Chunk.empty))) - _ <- logFinalStreamCompletionStatuses(completedAndCommits._1, completedAndCommits._2) - _ <- commitSync - _ <- ZIO.logDebug(s"Done waiting for ${streamsToEnd.size} streams to end") - } yield () - } - - // During a poll, the java kafka client might call each method of the rebalance listener 0 or 1 times. - // We do not know the order in which the call-back methods are invoked. - // - // Ref `lastRebalanceEvent` is used to track what happens during the poll. Just before the poll the - // `RebalanceEvent.None` is stored. Then during the poll, inside each method of the rebalance listener, - // the ref is updated. - // - // Each method: - // - emits a diagnostic event - // - determines if this is the first method invoked during this poll (`rebalanceEvent.wasInvoked`) to - // make sure that the `restartStreamsOnRebalancing` feature is applied only once per poll - // - ends streams that need to be ended - // - updates `lastRebalanceEvent` - // - val recordRebalanceRebalancingListener = RebalanceListener( - onAssigned = assignedTps => - for { - rebalanceEvent <- lastRebalanceEvent.get - _ <- ZIO.logDebug { - val sameRebalance = if (rebalanceEvent.wasInvoked) " in same rebalance" else "" - s"${assignedTps.size} partitions are assigned$sameRebalance" - } - state <- currentStateRef.get - streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams - else Chunk.empty - _ <- endStreams(state, streamsToEnd) - _ <- lastRebalanceEvent.set(rebalanceEvent.onAssigned(assignedTps, endedStreams = streamsToEnd)) - _ <- ZIO.logTrace("onAssigned done") - } yield (), - onRevoked = revokedTps => - for { - rebalanceEvent <- lastRebalanceEvent.get - _ <- ZIO.logDebug { - val sameRebalance = if (rebalanceEvent.wasInvoked) " in same rebalance" else "" - s"${revokedTps.size} partitions are revoked$sameRebalance" - } - state <- currentStateRef.get - streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams - else state.assignedStreams.filter(control => revokedTps.contains(control.tp)) - _ <- endStreams(state, streamsToEnd) - _ <- lastRebalanceEvent.set(rebalanceEvent.onRevoked(revokedTps, endedStreams = streamsToEnd)) - _ <- ZIO.logTrace("onRevoked done") - } yield (), - onLost = lostTps => - for { - _ <- ZIO.logDebug(s"${lostTps.size} partitions are lost") - rebalanceEvent <- lastRebalanceEvent.get - state <- currentStateRef.get - lostStreams = state.assignedStreams.filter(control => lostTps.contains(control.tp)) - _ <- ZIO.foreachDiscard(lostStreams)(_.lost) - _ <- lastRebalanceEvent.set(rebalanceEvent.onLost(lostTps, lostStreams)) - _ <- ZIO.logTrace(s"onLost done") - } yield () - ) - // Here we just want to avoid any executor shift if the user provided listener is the noop listener. val userRebalanceListener = settings.rebalanceListener match { @@ -318,93 +76,9 @@ private[consumer] final class Runloop private ( case _ => settings.rebalanceListener.runOnExecutor(topLevelExecutor) } - RebalanceListener.toKafka(recordRebalanceRebalancingListener ++ userRebalanceListener, sameThreadRuntime) + RebalanceListener.toKafka(rebalanceCoordinator.toRebalanceListener ++ userRebalanceListener, sameThreadRuntime) } - /** This is the implementation behind the user facing api `Offset.commit`. */ - private val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = - offsets => - for { - p <- Promise.make[Throwable, Unit] - startTime = java.lang.System.nanoTime() - _ <- commitQueue.offer(Runloop.Commit(startTime, offsets, p)) - _ <- commandQueue.offer(RunloopCommand.CommitAvailable) - _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) - _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) - endTime = java.lang.System.nanoTime() - latency = (endTime - startTime).nanoseconds - _ <- consumerMetrics.observeCommit(latency) - } yield () - - /** Merge commits and prepare parameters for calling `consumer.commitAsync`. */ - private def asyncCommitParameters( - commits: Chunk[Runloop.Commit] - ): (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback, Throwable => UIO[Unit]) = { - val offsets = commits - .foldLeft(mutable.Map.empty[TopicPartition, OffsetAndMetadata]) { case (acc, commit) => - commit.offsets.foreach { case (tp, offset) => - acc += (tp -> acc - .get(tp) - .map(current => if (current.offset() > offset.offset()) current else offset) - .getOrElse(offset)) - } - acc - } - .toMap - val offsetsWithMetaData = offsets.map { case (tp, offset) => - tp -> new OffsetAndMetadata(offset.offset + 1, offset.leaderEpoch, offset.metadata) - } - val cont = (e: Exit[Throwable, Unit]) => ZIO.foreachDiscard(commits)(_.cont.done(e)) - // We assume the commit is started immediately after returning from this method. - val startTime = java.lang.System.nanoTime() - val onSuccess = { - val endTime = java.lang.System.nanoTime() - val latency = (endTime - startTime).nanoseconds - for { - offsetIncrease <- committedOffsetsRef.modify(_.addCommits(commits)) - _ <- consumerMetrics.observeAggregatedCommit(latency, offsetIncrease).when(commits.nonEmpty) - result <- cont(Exit.unit) - _ <- diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData)) - } yield result - } - val onFailure: Throwable => UIO[Unit] = { - case _: RebalanceInProgressException => - for { - _ <- ZIO.logDebug(s"Rebalance in progress, commit for offsets $offsets will be retried") - _ <- commitQueue.offerAll(commits) - _ <- commandQueue.offer(RunloopCommand.CommitAvailable) - } yield () - case err: Throwable => - cont(Exit.fail(err)) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsetsWithMetaData, err)) - } - val callback = - new OffsetCommitCallback { - override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = - Unsafe.unsafe { implicit u => - sameThreadRuntime.unsafe.run { - if (exception eq null) onSuccess else onFailure(exception) - } - .getOrThrowFiberFailure() - } - } - (offsetsWithMetaData.asJava, callback, onFailure) - } - - private def handleCommits(state: State, commits: Chunk[Runloop.Commit]): UIO[State] = - if (commits.isEmpty) { - ZIO.succeed(state) - } else { - val (offsets, callback, onFailure) = asyncCommitParameters(commits) - val newState = state.addPendingCommits(commits) - consumer.runloopAccess { c => - // We don't wait for the completion of the commit here, because it - // will only complete once we poll again. - ZIO.attempt(c.commitAsync(offsets, callback)) - } - .catchAll(onFailure) - .as(newState) - } - /** * Does all needed to end revoked partitions: * 1. Complete the revoked assigned streams 2. Remove from the list of pending requests @@ -468,7 +142,7 @@ private[consumer] final class Runloop private ( builder += CommittableRecord[Array[Byte], Array[Byte]]( record = consumerRecord, - commitHandle = commit, + commitHandle = committer.commit, consumerGroupMetadata = consumerGroupMetadata ) } @@ -535,7 +209,7 @@ private[consumer] final class Runloop private ( partitionsToFetch <- settings.fetchStrategy.selectPartitionsToFetch(state.assignedStreams) _ <- ZIO.logDebug( s"Starting poll with ${state.pendingRequests.size} pending requests and" + - s" ${state.pendingCommits.size} pending commits," + + s" ${committer.pendingCommitCount} pending commits," + s" resuming $partitionsToFetch partitions" ) _ <- currentStateRef.set(state) @@ -559,7 +233,7 @@ private[consumer] final class Runloop private ( tpWithoutData = requestedPartitions -- providedTps ) } - pollresult <- lastRebalanceEvent.getAndSet(RebalanceEvent.None).flatMap { + pollresult <- rebalanceCoordinator.getAndResetLastEvent.flatMap { case RebalanceEvent(false, _, _, _, _) => // The fast track, rebalance listener was not invoked: // no assignment changes, no new commits, only new records. @@ -613,8 +287,7 @@ private[consumer] final class Runloop private ( // Remove committed offsets for partitions that are no longer assigned: // NOTE: the type annotation is needed to keep the IntelliJ compiler happy. _ <- - committedOffsetsRef - .update(_.keepPartitions(updatedAssignedStreams.map(_.tp).toSet)): Task[Unit] + committer.keepCommitsForPartitions(updatedAssignedStreams.map(_.tp).toSet): Task[Unit] _ <- consumerMetrics.observeRebalance( currentAssigned.size, @@ -656,11 +329,10 @@ private[consumer] final class Runloop private ( pollResult.ignoreRecordsForTps, pollResult.records ) - updatedPendingCommits <- ZIO.filter(state.pendingCommits)(_.isPending) - _ <- checkStreamPullInterval(pollResult.assignedStreams) + _ <- committer.cleanupPendingCommits + _ <- checkStreamPullInterval(pollResult.assignedStreams) } yield state.copy( pendingRequests = fulfillResult.pendingRequests, - pendingCommits = updatedPendingCommits, assignedStreams = pollResult.assignedStreams ) } @@ -819,19 +491,21 @@ private[consumer] final class Runloop private ( .takeWhile(_ != RunloopCommand.StopRunloop) .runFoldChunksDiscardZIO(initialState) { (state, commands) => for { - commitCommands <- commitQueue.takeAll - _ <- ZIO.logDebug( - s"Processing ${commitCommands.size} commits," + - s" ${commands.size} commands: ${commands.mkString(",")}" - ) - stateAfterCommits <- handleCommits(state, commitCommands) + _ <- ZIO.logDebug(s"Processing ${commands.size} commands: ${commands.mkString(",")}") + _ <- consumer.runloopAccess { consumer => + committer.processQueuedCommits((offsets, callback) => + ZIO.attempt(consumer.commitAsync(offsets, callback)) + ) + } streamCommands = commands.collect { case cmd: RunloopCommand.StreamCommand => cmd } - stateAfterCommands <- ZIO.foldLeft(streamCommands)(stateAfterCommits)(handleCommand) + stateAfterCommands <- ZIO.foldLeft(streamCommands)(state)(handleCommand) - updatedStateAfterPoll <- if (stateAfterCommands.shouldPoll) handlePoll(stateAfterCommands) - else ZIO.succeed(stateAfterCommands) + updatedStateAfterPoll <- shouldPoll(stateAfterCommands).flatMap { + case true => handlePoll(stateAfterCommands) + case false => ZIO.succeed(stateAfterCommands) + } // Immediately poll again, after processing all new queued commands - _ <- if (updatedStateAfterPoll.shouldPoll) commandQueue.offer(RunloopCommand.Poll) else ZIO.unit + _ <- ZIO.whenZIO(shouldPoll(updatedStateAfterPoll))(commandQueue.offer(RunloopCommand.Poll)) // Save the current state for other parts of Runloop (read-only, for metrics only) _ <- currentStateRef.set(updatedStateAfterPoll) } yield updatedStateAfterPoll @@ -840,13 +514,19 @@ private[consumer] final class Runloop private ( .onError(cause => partitionsHub.offer(Take.failCause(cause))) } + def shouldPoll(state: State): UIO[Boolean] = + committer.pendingCommitCount.map { pendingCommitCount => + state.subscriptionState.isSubscribed && (state.pendingRequests.nonEmpty || pendingCommitCount > 0 || state.assignedStreams.isEmpty) + } + private def observeRunloopMetrics(runloopMetricsSchedule: Schedule[Any, Unit, Long]): ZIO[Any, Nothing, Unit] = { val observe = for { - currentState <- currentStateRef.get - commandQueueSize <- commandQueue.size - commitQueueSize <- commitQueue.size + currentState <- currentStateRef.get + commandQueueSize <- commandQueue.size + commitQueueSize <- committer.queueSize + pendingCommitCount <- committer.pendingCommitCount _ <- consumerMetrics - .observeRunloopMetrics(currentState, commandQueueSize, commitQueueSize) + .observeRunloopMetrics(currentState, commandQueueSize, commitQueueSize, pendingCommitCount) } yield () observe @@ -892,57 +572,6 @@ object Runloop { pendingRequests: Chunk[RunloopCommand.Request] ) - private final case class RebalanceEvent( - wasInvoked: Boolean, - assignedTps: Set[TopicPartition], - revokedTps: Set[TopicPartition], - lostTps: Set[TopicPartition], - endedStreams: Chunk[PartitionStreamControl] - ) { - def onAssigned( - assigned: Set[TopicPartition], - endedStreams: Chunk[PartitionStreamControl] - ): RebalanceEvent = - copy( - wasInvoked = true, - assignedTps = assignedTps ++ assigned, - endedStreams = this.endedStreams ++ endedStreams - ) - - def onRevoked( - revoked: Set[TopicPartition], - endedStreams: Chunk[PartitionStreamControl] - ): RebalanceEvent = - copy( - wasInvoked = true, - assignedTps = assignedTps -- revoked, - revokedTps = revokedTps ++ revoked, - endedStreams = this.endedStreams ++ endedStreams - ) - - def onLost(lost: Set[TopicPartition], endedStreams: Chunk[PartitionStreamControl]): RebalanceEvent = - copy( - wasInvoked = true, - assignedTps = assignedTps -- lost, - lostTps = lostTps ++ lost, - endedStreams = this.endedStreams ++ endedStreams - ) - } - - private object RebalanceEvent { - val None: RebalanceEvent = - RebalanceEvent(wasInvoked = false, Set.empty, Set.empty, Set.empty, Chunk.empty) - } - - private[internal] final case class Commit( - createdAt: NanoTime, - offsets: Map[TopicPartition, OffsetAndMetadata], - cont: Promise[Throwable, Unit] - ) { - @inline def isDone: UIO[Boolean] = cont.isDone - @inline def isPending: UIO[Boolean] = isDone.negate - } - private[consumer] def make( settings: ConsumerSettings, maxStreamPullInterval: Duration, @@ -953,28 +582,42 @@ object Runloop { ): URIO[Scope, Runloop] = for { _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) - commitQueue <- ZIO.acquireRelease(Queue.unbounded[Runloop.Commit])(_.shutdown) commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown) - lastRebalanceEvent <- Ref.Synchronized.make[Runloop.RebalanceEvent](Runloop.RebalanceEvent.None) + lastRebalanceEvent <- Ref.Synchronized.make[RebalanceEvent](RebalanceEvent.None) initialState = State.initial - currentStateRef <- Ref.make(initialState) - committedOffsetsRef <- Ref.make(CommitOffsets.empty) - sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer) - executor <- ZIO.executor + currentStateRef <- Ref.make(initialState) + sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer) + executor <- ZIO.executor + metrics = new ZioConsumerMetrics(settings.metricLabels) + committer <- LiveCommitter + .make( + settings.commitTimeout, + diagnostics, + metrics, + commandQueue.offer(RunloopCommand.CommitAvailable).unit, + sameThreadRuntime + ) + rebalanceCoordinator = new RebalanceCoordinator( + lastRebalanceEvent, + settings, + consumer, + maxRebalanceDuration, + currentStateRef.get.map(_.assignedStreams), + committer + ) runloop = new Runloop( settings = settings, topLevelExecutor = executor, sameThreadRuntime = sameThreadRuntime, consumer = consumer, - commitQueue = commitQueue, commandQueue = commandQueue, - lastRebalanceEvent = lastRebalanceEvent, partitionsHub = partitionsHub, diagnostics = diagnostics, maxStreamPullInterval = maxStreamPullInterval, - maxRebalanceDuration = maxRebalanceDuration, currentStateRef = currentStateRef, - committedOffsetsRef = committedOffsetsRef + consumerMetrics = metrics, + rebalanceCoordinator = rebalanceCoordinator, + committer = committer ) _ <- ZIO.logDebug("Starting Runloop") @@ -995,62 +638,18 @@ object Runloop { private[internal] final case class State( pendingRequests: Chunk[RunloopCommand.Request], - pendingCommits: Chunk[Runloop.Commit], assignedStreams: Chunk[PartitionStreamControl], subscriptionState: SubscriptionState ) { - def addPendingCommits(c: Chunk[Runloop.Commit]): State = copy(pendingCommits = pendingCommits ++ c) - def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r) - - def shouldPoll: Boolean = - subscriptionState.isSubscribed && (pendingRequests.nonEmpty || pendingCommits.nonEmpty || assignedStreams.isEmpty) + def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r) } private object State { val initial: State = State( pendingRequests = Chunk.empty, - pendingCommits = Chunk.empty, assignedStreams = Chunk.empty, subscriptionState = SubscriptionState.NotSubscribed ) } - // package private for unit testing - private[internal] final case class CommitOffsets(offsets: Map[TopicPartition, Long]) { - - /** Returns an estimate of the total offset increase, and a new `CommitOffsets` with the given offsets added. */ - def addCommits(c: Chunk[Runloop.Commit]): (Long, CommitOffsets) = { - val updatedOffsets = mutable.Map.empty[TopicPartition, Long] - updatedOffsets.sizeHint(offsets.size) - updatedOffsets ++= offsets - var offsetIncrease = 0L - c.foreach { commit => - commit.offsets.foreach { case (tp, offsetAndMeta) => - val offset = offsetAndMeta.offset() - val maxOffset = updatedOffsets.get(tp) match { - case Some(existingOffset) => - offsetIncrease += max(0L, offset - existingOffset) - max(existingOffset, offset) - case None => - // This partition was not committed to from this consumer yet. Therefore we do not know the offset - // increase. A good estimate would be the poll size for this consumer, another okayish estimate is 0. - // Lets go with the simplest for now: ```offsetIncrease += 0``` - offset - } - updatedOffsets += tp -> maxOffset - } - } - (offsetIncrease, CommitOffsets(offsets = updatedOffsets.toMap)) - } - - def keepPartitions(tps: Set[TopicPartition]): CommitOffsets = - CommitOffsets(offsets.filter { case (tp, _) => tps.contains(tp) }) - - def contains(tp: TopicPartition, offset: Long): Boolean = - offsets.get(tp).exists(_ >= offset) - } - - private[internal] object CommitOffsets { - val empty: CommitOffsets = CommitOffsets(Map.empty) - } }