Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract Committer and RebalanceCoordinator classes from Runloop + unit tests #1375

Merged
merged 53 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
fd0c49d
Refactor stream completion status in preparation of additional logging
svroonland Nov 3, 2024
0d6258b
Add logging + fix condition
svroonland Nov 3, 2024
399dcac
Also some debug logging while waiting
svroonland Nov 3, 2024
c56dc69
Fix lint
svroonland Nov 3, 2024
11b3238
Increase timeout
svroonland Nov 3, 2024
dc38739
Correct race condition
svroonland Nov 5, 2024
2ee212c
Remove sequential
svroonland Nov 5, 2024
9caee50
Update zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.s…
svroonland Nov 6, 2024
e034b77
Update zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.s…
svroonland Nov 6, 2024
5b2d13e
Merge branch 'master' into more-rebalance-safe-commits-logging
svroonland Nov 6, 2024
221b283
Update zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.s…
svroonland Nov 6, 2024
bad1def
Stricter comparison of pending commit offset to last pulled offset
svroonland Nov 9, 2024
a7f7cd2
Merge remote-tracking branch 'origin/master' into more-rebalance-safe…
svroonland Nov 9, 2024
cc377dd
More rebalance safe commits logging alt (#1367)
erikvanoosten Nov 10, 2024
7994954
Remove withFilter usage
svroonland Nov 10, 2024
d3e0270
Fix timeToDeadlineMillis
svroonland Nov 10, 2024
54494f3
Extract a Committer and RunloopRebalanceListener from the Runloop
svroonland Nov 10, 2024
1e47de2
Fix unstable test, seems unrelated
svroonland Nov 10, 2024
ac4e978
Fix timeToDeadlineMillis
svroonland Nov 10, 2024
5917c1d
Small convenience method
svroonland Nov 10, 2024
dc614da
Adjust unrepresentative test
svroonland Nov 10, 2024
02532da
Document parameter
svroonland Nov 10, 2024
6ad6eaf
Inline RebalanceEvent modifications
svroonland Nov 10, 2024
9c36a5d
Use startTime in Commit
svroonland Nov 10, 2024
07189e2
Merge branch 'master' into separate-rebalance-listener-file
svroonland Nov 10, 2024
d6db595
Merge commit '727e7d7acce4cb0a6f301451e1b25df3dd887e2b' into separate…
svroonland Nov 10, 2024
02a36ee
Merge remote-tracking branch 'origin/master' into separate-rebalance-…
svroonland Nov 10, 2024
d1957d8
Do not depend on entire ConsumerSettings in Committer
svroonland Nov 12, 2024
807fdf9
Make Committer better testable
svroonland Nov 12, 2024
98a1031
Tests for Committer
svroonland Nov 12, 2024
810362b
Let RunloopRebalanceListener own its last event
svroonland Nov 12, 2024
2310ea1
Extract Committer trait
svroonland Nov 13, 2024
51d3a06
Add last committed offset to stream completion status
svroonland Nov 13, 2024
3439e10
Some tests for the rebalance listener
svroonland Nov 13, 2024
8fb3622
Renames
svroonland Nov 13, 2024
b60bd8e
More tests + cleanup
svroonland Nov 13, 2024
c995799
Merge commit '1f8d5904' into separate-rebalance-listener-file
svroonland Nov 13, 2024
7744d66
Merge commit '18aa941d' into separate-rebalance-listener-file
svroonland Nov 13, 2024
05d710c
Merge commit 'e1a41448' into separate-rebalance-listener-file
svroonland Nov 13, 2024
ae15221
Merge commit '504074f6' into separate-rebalance-listener-file
svroonland Nov 13, 2024
4acefdf
Move CommittOffsets to Committer as it is part of its interface
svroonland Nov 13, 2024
0fd4114
Cleanup
svroonland Nov 13, 2024
3915b08
cFix doc
svroonland Nov 13, 2024
e86d757
Formatting
svroonland Nov 13, 2024
ea59926
Merge branch 'master' into separate-rebalance-listener-file
svroonland Nov 13, 2024
4072dae
Merge branch 'master' into separate-rebalance-listener-file
svroonland Nov 16, 2024
b6c9a26
Merge branch 'master' into separate-rebalance-listener-file
erikvanoosten Nov 23, 2024
7cd3f52
Process review comments
svroonland Nov 24, 2024
7e21ebb
Separate file for mocked metrics
svroonland Nov 24, 2024
c517f46
Fix
svroonland Nov 24, 2024
b170243
Styling
svroonland Nov 24, 2024
0e9812f
Fix flaky test
svroonland Nov 24, 2024
d3ce328
Merge remote-tracking branch 'origin/master' into separate-rebalance-…
svroonland Nov 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._

import java.util.concurrent.ExecutionException
import scala.reflect.ClassTag

//noinspection SimplifyAssertInspection
Expand Down Expand Up @@ -323,21 +324,20 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
} yield assert(offset.map(_.offset))(isSome(equalTo(9L)))
},
test("process outstanding commits after a graceful shutdown with aggregateAsync using `maxRebalanceDuration`") {
val kvs = (1 to 100).toList.map(i => (s"key$i", s"msg$i"))
for {
topic <- randomTopic
group <- randomGroup
client <- randomClient
_ <- produceMany(topic, kvs)
messagesReceived <- Ref.make[Int](0)
topic <- randomTopic
group <- randomGroup
client <- randomClient
_ <- scheduledProduce(topic, Schedule.fixed(50.millis).jittered).runDrain.forkScoped
lastProcessedOffset <- Ref.make(0L)
offset <- (
Consumer
.plainStream(Subscription.topics(topic), Serde.string, Serde.string)
.mapConcatZIO { record =>
.mapZIO { record =>
for {
nr <- messagesReceived.updateAndGet(_ + 1)
nr <- lastProcessedOffset.updateAndGet(_ + 1)
_ <- Consumer.stopConsumption.when(nr == 10)
} yield if (nr < 10) Seq(record.offset) else Seq.empty
} yield record.offset
}
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit)
Expand All @@ -353,7 +353,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
maxRebalanceDuration = 6.seconds
)
)
} yield assertTrue(offset.map(_.offset).contains(9L))
lastOffset <- lastProcessedOffset.get
} yield assertTrue(offset.map(_.offset).contains(lastOffset))
} @@ TestAspect.nonFlaky(2),
test("a consumer timeout interrupts the stream and shuts down the consumer") {
// Setup of this test:
Expand Down Expand Up @@ -1399,9 +1400,13 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
}: _*) @@ TestAspect.nonFlaky(2),
test("running streams don't stall after a poll timeout") {
for {
topic <- randomTopic
clientId <- randomClient
_ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic))
topic <- randomTopic
clientId <- randomClient
_ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic)).flatMap(ZIO.fromTry(_)).retryN(3).catchSome {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the retrying needed here, and not in other tests?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't figure that out either..

case e: ExecutionException
if e.getCause.isInstanceOf[org.apache.kafka.common.errors.TopicExistsException] =>
ZIO.unit
}
settings <- consumerSettings(clientId)
consumer <- Consumer.make(settings.withPollTimeout(50.millis))
recordsOut <- Queue.unbounded[Unit]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package zio.kafka.consumer.internal

import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import zio._
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import zio.kafka.consumer.internal.Committer.CommitOffsets
import zio.kafka.consumer.internal.LiveCommitter.Commit
import zio.test._

object RunloopCommitOffsetsSpec extends ZIOSpecDefault {
object CommitOffsetsSpec extends ZIOSpecDefault {

private val tp10 = new TopicPartition("t1", 0)
private val tp11 = new TopicPartition("t1", 1)
Expand All @@ -14,49 +16,49 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault {
private val tp22 = new TopicPartition("t2", 2)

override def spec: Spec[TestEnvironment with Scope, Any] =
suite("Runloop.CommitOffsets spec")(
suite("CommitOffsets spec")(
test("addCommits adds to empty CommitOffsets") {
val s1 = Runloop.CommitOffsets(Map.empty)
val s1 = CommitOffsets(Map.empty)
val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 10))))
assertTrue(
inc == 0,
s2.offsets == Map(tp10 -> 10L)
)
},
test("addCommits updates offset when it is higher") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 4L))
val s1 = CommitOffsets(Map(tp10 -> 4L))
val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 10))))
assertTrue(
inc == 10 - 4,
s2.offsets == Map(tp10 -> 10L)
)
},
test("addCommits ignores an offset when it is lower") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L))
val s1 = CommitOffsets(Map(tp10 -> 10L))
val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 5))))
assertTrue(
inc == 0,
s2.offsets == Map(tp10 -> 10L)
)
},
test("addCommits keeps unrelated partitions") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L))
val s1 = CommitOffsets(Map(tp10 -> 10L))
val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp11 -> 11))))
assertTrue(
inc == 0,
s2.offsets == Map(tp10 -> 10L, tp11 -> 11L)
)
},
test("addCommits does it all at once") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 205L, tp21 -> 210L, tp22 -> 220L))
val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 205L, tp21 -> 210L, tp22 -> 220L))
val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp11 -> 11, tp20 -> 206L, tp21 -> 209L, tp22 -> 220L))))
assertTrue(
inc == /* tp10 */ 0 + /* tp11 */ 0 + /* tp20 */ 1 + /* tp21 */ 0 + /* tp22 */ 0,
s2.offsets == Map(tp10 -> 10L, tp11 -> 11L, tp20 -> 206L, tp21 -> 210L, tp22 -> 220L)
)
},
test("addCommits adds multiple commits") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 200L, tp21 -> 210L, tp22 -> 220L))
val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 200L, tp21 -> 210L, tp22 -> 220L))
val (inc, s2) = s1.addCommits(
Chunk(
makeCommit(Map(tp11 -> 11, tp20 -> 199L, tp21 -> 211L, tp22 -> 219L)),
Expand All @@ -69,35 +71,35 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault {
)
},
test("keepPartitions removes some partitions") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val s2 = s1.keepPartitions(Set(tp10))
assertTrue(s2.offsets == Map(tp10 -> 10L))
},
test("does not 'contain' offset when tp is not present") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L))
val s1 = CommitOffsets(Map(tp10 -> 10L))
val result = s1.contains(tp20, 10)
assertTrue(!result)
},
test("does not 'contain' a higher offset") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val result = s1.contains(tp10, 11)
assertTrue(!result)
},
test("does 'contain' equal offset") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val result = s1.contains(tp10, 10)
assertTrue(result)
},
test("does 'contain' lower offset") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val result = s1.contains(tp20, 19)
assertTrue(result)
}
)

private def makeCommit(offsets: Map[TopicPartition, Long]): Runloop.Commit = {
private def makeCommit(offsets: Map[TopicPartition, Long]): Commit = {
val o = offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset) }
val p = Unsafe.unsafe(implicit unsafe => Promise.unsafe.make[Throwable, Unit](FiberId.None))
Runloop.Commit(0L, o, p)
Commit(0L, o, p)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package zio.kafka.consumer.internal

import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.RebalanceInProgressException
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.test._
import zio.{ durationInt, Promise, Ref, ZIO }

import java.util.{ Map => JavaMap }
import scala.jdk.CollectionConverters.MapHasAsJava

object CommitterSpec extends ZIOSpecDefault {
override def spec = suite("Committer")(
test("signals that a new commit is available") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
committer <- LiveCommitter
.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
)
tp = new TopicPartition("topic", 0)
_ <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
_ <- commitAvailable.await
} yield assertCompletes
},
test("handles a successful commit by completing the commit effect") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
)
tp = new TopicPartition("topic", 0)
commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
_ <- commitAvailable.await
_ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null)))
_ <- commitFiber.join
} yield assertCompletes
},
test("handles a failed commit by completing the commit effect with a failure") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
)
tp = new TopicPartition("topic", 0)
commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
_ <- commitAvailable.await
_ <- committer.processQueuedCommits((offsets, callback) =>
ZIO.attempt(callback.onComplete(offsets, new RuntimeException("Commit failed")))
)
result <- commitFiber.await
} yield assertTrue(result.isFailure)
},
test("retries when rebalancing") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
)
tp = new TopicPartition("topic", 0)
commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
_ <- commitAvailable.await
_ <- committer.processQueuedCommits((offsets, callback) =>
ZIO.attempt(callback.onComplete(offsets, new RebalanceInProgressException("Rebalance in progress")))
)
_ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null)))
result <- commitFiber.await
} yield assertTrue(result.isSuccess)
},
test("adds 1 to the committed last offset") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
)
tp = new TopicPartition("topic", 0)
_ <- committer.commit(Map(tp -> new OffsetAndMetadata(1))).forkScoped
_ <- commitAvailable.await
committedOffsets <- Promise.make[Nothing, JavaMap[TopicPartition, OffsetAndMetadata]]
_ <- committer.processQueuedCommits((offsets, callback) =>
committedOffsets.succeed(offsets) *> ZIO.attempt(callback.onComplete(offsets, null))
)
offsetsCommitted <- committedOffsets.await
} yield assertTrue(
offsetsCommitted == Map(tp -> new OffsetAndMetadata(2)).asJava
)
},
test("batches commits from multiple partitions and offsets") {
for {
runtime <- ZIO.runtime[Any]
commitsAvailable <- Promise.make[Nothing, Unit]
nrCommitsDone <- Ref.make(0)
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable =
ZIO.whenZIO(nrCommitsDone.updateAndGet(_ + 1).map(_ == 3))(commitsAvailable.succeed(())).unit,
sameThreadRuntime = runtime
)
tp = new TopicPartition("topic", 0)
tp2 = new TopicPartition("topic", 1)
commitFiber1 <- committer.commit(Map(tp -> new OffsetAndMetadata(1))).forkScoped
commitFiber2 <- committer.commit(Map(tp -> new OffsetAndMetadata(2))).forkScoped
commitFiber3 <- committer.commit(Map(tp2 -> new OffsetAndMetadata(3))).forkScoped
_ <- commitsAvailable.await
committedOffsets <- Promise.make[Nothing, JavaMap[TopicPartition, OffsetAndMetadata]]
_ <- committer.processQueuedCommits((offsets, callback) =>
committedOffsets.succeed(offsets) *> ZIO.attempt(callback.onComplete(offsets, null))
)
_ <- commitFiber1.join zip commitFiber2.join zip commitFiber3.join
offsetsCommitted <- committedOffsets.await
} yield assertTrue(
offsetsCommitted == Map(tp -> new OffsetAndMetadata(3), tp2 -> new OffsetAndMetadata(4)).asJava
)
},
test("keeps track of pending commits") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
)
tp = new TopicPartition("topic", 0)
commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
_ <- commitAvailable.await
_ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null)))
pendingCommitsDuringCommit <- committer.pendingCommitCount
_ <- committer.cleanupPendingCommits
pendingCommitsAfterCommit <- committer.pendingCommitCount
_ <- commitFiber.join
} yield assertTrue(pendingCommitsDuringCommit == 1 && pendingCommitsAfterCommit == 0)
},
test("keep track of committed offsets") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
)
tp = new TopicPartition("topic", 0)
commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
_ <- commitAvailable.await
_ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null)))
committedOffsets <- committer.getCommittedOffsets
_ <- commitFiber.join
} yield assertTrue(committedOffsets.offsets == Map(tp -> 0L))
},
test("clean committed offsets of no-longer assigned partitions") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
)
tp = new TopicPartition("topic", 0)
commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
_ <- commitAvailable.await
_ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null)))
_ <- committer.keepCommitsForPartitions(Set.empty)
committedOffsets <- committer.getCommittedOffsets
_ <- commitFiber.join
} yield assertTrue(committedOffsets.offsets.isEmpty)
}
) @@ TestAspect.withLiveClock @@ TestAspect.nonFlaky(100)
}
Loading