Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert commitAsync callback handling to ZIO sooner #1404

Merged
merged 77 commits into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
fd0c49d
Refactor stream completion status in preparation of additional logging
svroonland Nov 3, 2024
0d6258b
Add logging + fix condition
svroonland Nov 3, 2024
399dcac
Also some debug logging while waiting
svroonland Nov 3, 2024
c56dc69
Fix lint
svroonland Nov 3, 2024
11b3238
Increase timeout
svroonland Nov 3, 2024
dc38739
Correct race condition
svroonland Nov 5, 2024
2ee212c
Remove sequential
svroonland Nov 5, 2024
9caee50
Update zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.s…
svroonland Nov 6, 2024
e034b77
Update zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.s…
svroonland Nov 6, 2024
5b2d13e
Merge branch 'master' into more-rebalance-safe-commits-logging
svroonland Nov 6, 2024
221b283
Update zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.s…
svroonland Nov 6, 2024
bad1def
Stricter comparison of pending commit offset to last pulled offset
svroonland Nov 9, 2024
a7f7cd2
Merge remote-tracking branch 'origin/master' into more-rebalance-safe…
svroonland Nov 9, 2024
cc377dd
More rebalance safe commits logging alt (#1367)
erikvanoosten Nov 10, 2024
7994954
Remove withFilter usage
svroonland Nov 10, 2024
d3e0270
Fix timeToDeadlineMillis
svroonland Nov 10, 2024
54494f3
Extract a Committer and RunloopRebalanceListener from the Runloop
svroonland Nov 10, 2024
1e47de2
Fix unstable test, seems unrelated
svroonland Nov 10, 2024
ac4e978
Fix timeToDeadlineMillis
svroonland Nov 10, 2024
5917c1d
Small convenience method
svroonland Nov 10, 2024
dc614da
Adjust unrepresentative test
svroonland Nov 10, 2024
02532da
Document parameter
svroonland Nov 10, 2024
6ad6eaf
Inline RebalanceEvent modifications
svroonland Nov 10, 2024
9c36a5d
Use startTime in Commit
svroonland Nov 10, 2024
07189e2
Merge branch 'master' into separate-rebalance-listener-file
svroonland Nov 10, 2024
d6db595
Merge commit '727e7d7acce4cb0a6f301451e1b25df3dd887e2b' into separate…
svroonland Nov 10, 2024
02a36ee
Merge remote-tracking branch 'origin/master' into separate-rebalance-…
svroonland Nov 10, 2024
d1957d8
Do not depend on entire ConsumerSettings in Committer
svroonland Nov 12, 2024
807fdf9
Make Committer better testable
svroonland Nov 12, 2024
98a1031
Tests for Committer
svroonland Nov 12, 2024
810362b
Let RunloopRebalanceListener own its last event
svroonland Nov 12, 2024
2310ea1
Extract Committer trait
svroonland Nov 13, 2024
51d3a06
Add last committed offset to stream completion status
svroonland Nov 13, 2024
3439e10
Some tests for the rebalance listener
svroonland Nov 13, 2024
8fb3622
Renames
svroonland Nov 13, 2024
b60bd8e
More tests + cleanup
svroonland Nov 13, 2024
c995799
Merge commit '1f8d5904' into separate-rebalance-listener-file
svroonland Nov 13, 2024
7744d66
Merge commit '18aa941d' into separate-rebalance-listener-file
svroonland Nov 13, 2024
05d710c
Merge commit 'e1a41448' into separate-rebalance-listener-file
svroonland Nov 13, 2024
ae15221
Merge commit '504074f6' into separate-rebalance-listener-file
svroonland Nov 13, 2024
4acefdf
Move CommittOffsets to Committer as it is part of its interface
svroonland Nov 13, 2024
0fd4114
Cleanup
svroonland Nov 13, 2024
3915b08
cFix doc
svroonland Nov 13, 2024
e86d757
Formatting
svroonland Nov 13, 2024
ea59926
Merge branch 'master' into separate-rebalance-listener-file
svroonland Nov 13, 2024
4072dae
Merge branch 'master' into separate-rebalance-listener-file
svroonland Nov 16, 2024
b6c9a26
Merge branch 'master' into separate-rebalance-listener-file
erikvanoosten Nov 23, 2024
77e5426
Fix log message
svroonland Nov 24, 2024
b0e0aae
WIP
svroonland Nov 24, 2024
7cd3f52
Process review comments
svroonland Nov 24, 2024
7e21ebb
Separate file for mocked metrics
svroonland Nov 24, 2024
cc17fab
Merge branch 'separate-rebalance-listener-file' into async-commit-zio
svroonland Nov 24, 2024
c517f46
Fix
svroonland Nov 24, 2024
66f3504
Merge branch 'separate-rebalance-listener-file' into async-commit-zio
svroonland Nov 24, 2024
4889cd1
Make it work
svroonland Nov 24, 2024
094258a
Touchups
svroonland Nov 24, 2024
564e498
WIP
svroonland Nov 24, 2024
b170243
Styling
svroonland Nov 24, 2024
0e9812f
Fix flaky test
svroonland Nov 24, 2024
a126ea4
Merge remote-tracking branch 'origin/separate-rebalance-listener-file…
svroonland Nov 24, 2024
2dfb744
Fix discarded unit warning
svroonland Nov 24, 2024
585d741
Fixup
svroonland Nov 24, 2024
7859e4d
Fix flaky tests
svroonland Nov 24, 2024
ca199ac
Improve processQueuedCommits
svroonland Nov 25, 2024
9a519a0
Fix flaky test
svroonland Nov 25, 2024
d3ce328
Merge remote-tracking branch 'origin/master' into separate-rebalance-…
svroonland Nov 26, 2024
fa1ebdd
Merge remote-tracking branch 'origin/master' into async-commit-zio
svroonland Nov 26, 2024
9018341
Merge remote-tracking branch 'origin/separate-rebalance-listener-file…
svroonland Nov 26, 2024
a29c96f
Merge remote-tracking branch 'origin/master' into async-commit-zio
svroonland Nov 26, 2024
60a67ea
Only one commitAsyncZio. Callers are ensuring locking and execution t…
svroonland Nov 26, 2024
7050acc
Refactor commit completion
svroonland Nov 26, 2024
208cd46
Reduce diff
svroonland Nov 26, 2024
1149458
Update zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommi…
svroonland Nov 27, 2024
63af5a7
Update zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.s…
svroonland Nov 27, 2024
efa2441
Handle commit completion on the kafka thread
svroonland Nov 27, 2024
be7a333
Don't need this anymore
svroonland Nov 28, 2024
7467e0f
Merge branch 'master' into async-commit-zio
erikvanoosten Dec 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1591,6 +1591,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.provideSome[Scope & Kafka](producer)
.provideSomeShared[Scope](
Kafka.embedded
) @@ withLiveClock @@ timeout(2.minutes)
) @@ withLiveClock @@ timeout(2.minutes) @@ TestAspect.timed

}
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
package zio.kafka.consumer.internal

import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.consumer.{ MockConsumer, OffsetAndMetadata, OffsetCommitCallback, OffsetResetStrategy }
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.RebalanceInProgressException
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.test._
import zio.{ durationInt, Promise, Ref, ZIO }
import zio.{ durationInt, Promise, Queue, Ref, Task, Unsafe, ZIO }

import java.util.{ Map => JavaMap }
import scala.jdk.CollectionConverters.MapHasAsJava
import scala.jdk.CollectionConverters.{ MapHasAsJava, MapHasAsScala }

object CommitterSpec extends ZIOSpecDefault {
override def spec = suite("Committer")(
test("signals that a new commit is available") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
committer <- LiveCommitter
.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
onCommitAvailable = commitAvailable.succeed(()).unit
)
tp = new TopicPartition("topic", 0)
_ <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
Expand All @@ -31,98 +29,97 @@ object CommitterSpec extends ZIOSpecDefault {
},
test("handles a successful commit by completing the commit effect") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
onCommitAvailable = commitAvailable.succeed(()).unit
)
tp = new TopicPartition("topic", 0)
commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
_ <- commitAvailable.await
_ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null)))
_ <- commitFiber.join
consumer <- createMockConsumer(offsets => ZIO.succeed(offsets))
_ <- committer.processQueuedCommits(consumer)
_ <- commitFiber.join
} yield assertCompletes
},
test("handles a failed commit by completing the commit effect with a failure") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
onCommitAvailable = commitAvailable.succeed(()).unit
)
tp = new TopicPartition("topic", 0)
commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
_ <- commitAvailable.await
_ <- committer.processQueuedCommits((offsets, callback) =>
ZIO.attempt(callback.onComplete(offsets, new RuntimeException("Commit failed")))
)
result <- commitFiber.await
consumer <- createMockConsumer(_ => ZIO.fail(new RuntimeException("Commit failed")))
_ <- committer.processQueuedCommits(consumer)
result <- commitFiber.await
} yield assertTrue(result.isFailure)
},
test("retries when rebalancing") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
commitAvailable <- Queue.bounded[Unit](1)
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
onCommitAvailable = commitAvailable.offer(()).unit
)
tp = new TopicPartition("topic", 0)
commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
_ <- commitAvailable.await
_ <- committer.processQueuedCommits((offsets, callback) =>
ZIO.attempt(callback.onComplete(offsets, new RebalanceInProgressException("Rebalance in progress")))
)
_ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null)))
result <- commitFiber.await
} yield assertTrue(result.isSuccess)
_ <- commitAvailable.take
callCount <- Ref.make(0)
consumer <-
createMockConsumer { offsets =>
callCount.updateAndGet(_ + 1).flatMap { count =>
if (count == 1) {
ZIO.fail(new RebalanceInProgressException("Rebalance in progress"))
} else {
ZIO.succeed(offsets)
}
}
}
_ <- committer.processQueuedCommits(consumer)
_ <- commitAvailable.take
_ <- committer.processQueuedCommits(consumer)
_ <- commitFiber.join
} yield assertCompletes
},
test("adds 1 to the committed last offset") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
onCommitAvailable = commitAvailable.succeed(()).unit
)
tp = new TopicPartition("topic", 0)
_ <- committer.commit(Map(tp -> new OffsetAndMetadata(1))).forkScoped
_ <- commitAvailable.await
committedOffsets <- Promise.make[Nothing, JavaMap[TopicPartition, OffsetAndMetadata]]
_ <- committer.processQueuedCommits((offsets, callback) =>
committedOffsets.succeed(offsets) *> ZIO.attempt(callback.onComplete(offsets, null))
)
consumer <- createMockConsumer(offsets => committedOffsets.succeed(offsets.asJava).as(offsets))
_ <- committer.processQueuedCommits(consumer)
offsetsCommitted <- committedOffsets.await
} yield assertTrue(
offsetsCommitted == Map(tp -> new OffsetAndMetadata(2)).asJava
)
},
test("batches commits from multiple partitions and offsets") {
for {
runtime <- ZIO.runtime[Any]
commitsAvailable <- Promise.make[Nothing, Unit]
nrCommitsDone <- Ref.make(0)
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable =
ZIO.whenZIO(nrCommitsDone.updateAndGet(_ + 1).map(_ == 3))(commitsAvailable.succeed(())).unit,
sameThreadRuntime = runtime
ZIO.whenZIO(nrCommitsDone.updateAndGet(_ + 1).map(_ == 3))(commitsAvailable.succeed(())).unit
)
tp = new TopicPartition("topic", 0)
tp2 = new TopicPartition("topic", 1)
Expand All @@ -131,9 +128,8 @@ object CommitterSpec extends ZIOSpecDefault {
commitFiber3 <- committer.commit(Map(tp2 -> new OffsetAndMetadata(3))).forkScoped
_ <- commitsAvailable.await
committedOffsets <- Promise.make[Nothing, JavaMap[TopicPartition, OffsetAndMetadata]]
_ <- committer.processQueuedCommits((offsets, callback) =>
committedOffsets.succeed(offsets) *> ZIO.attempt(callback.onComplete(offsets, null))
)
consumer <- createMockConsumer(offsets => committedOffsets.succeed(offsets.asJava).as(offsets))
_ <- committer.processQueuedCommits(consumer)
_ <- commitFiber1.join zip commitFiber2.join zip commitFiber3.join
offsetsCommitted <- committedOffsets.await
} yield assertTrue(
Expand All @@ -142,63 +138,85 @@ object CommitterSpec extends ZIOSpecDefault {
},
test("keeps track of pending commits") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
onCommitAvailable = commitAvailable.succeed(()).unit
)
tp = new TopicPartition("topic", 0)
commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
_ <- commitAvailable.await
_ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null)))
commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
_ <- commitAvailable.await
consumer <- createMockConsumer(offsets => ZIO.succeed(offsets))
_ <- committer.processQueuedCommits(consumer)
pendingCommitsDuringCommit <- committer.pendingCommitCount
_ <- commitFiber.join
_ <- committer.cleanupPendingCommits
pendingCommitsAfterCommit <- committer.pendingCommitCount
_ <- commitFiber.join
} yield assertTrue(pendingCommitsDuringCommit == 1 && pendingCommitsAfterCommit == 0)
},
test("keep track of committed offsets") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
onCommitAvailable = commitAvailable.succeed(()).unit
)
tp = new TopicPartition("topic", 0)
commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
_ <- commitAvailable.await
_ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null)))
committedOffsets <- committer.getCommittedOffsets
commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
_ <- commitAvailable.await
consumer <- createMockConsumer(offsets => ZIO.succeed(offsets))
_ <- committer.processQueuedCommits(consumer)
_ <- commitFiber.join
committedOffsets <- committer.getCommittedOffsets
} yield assertTrue(committedOffsets.offsets == Map(tp -> 0L))
},
test("clean committed offsets of no-longer assigned partitions") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
onCommitAvailable = commitAvailable.succeed(()).unit
)
tp = new TopicPartition("topic", 0)
commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
_ <- commitAvailable.await
_ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null)))
_ <- committer.keepCommitsForPartitions(Set.empty)
committedOffsets <- committer.getCommittedOffsets
commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
_ <- commitAvailable.await
consumer <- createMockConsumer(offsets => ZIO.succeed(offsets))
_ <- committer.processQueuedCommits(consumer)
_ <- commitFiber.join
_ <- committer.keepCommitsForPartitions(Set.empty)
committedOffsets <- committer.getCommittedOffsets
} yield assertTrue(committedOffsets.offsets.isEmpty)
}
) @@ TestAspect.withLiveClock @@ TestAspect.nonFlaky(100)

private def createMockConsumer(
onCommitAsync: Map[TopicPartition, OffsetAndMetadata] => Task[Map[TopicPartition, OffsetAndMetadata]]
): ZIO[Any, Nothing, MockConsumer[Array[Byte], Array[Byte]]] =
ZIO.runtime[Any].map { runtime =>
new MockConsumer[Array[Byte], Array[Byte]](OffsetResetStrategy.LATEST) {
override def commitAsync(
offsets: JavaMap[TopicPartition, OffsetAndMetadata],
callback: OffsetCommitCallback
): Unit =
Unsafe.unsafe { implicit unsafe =>
runtime.unsafe
.run(
onCommitAsync(offsets.asScala.toMap)
.tapBoth(
e => ZIO.attempt(callback.onComplete(offsets, e.asInstanceOf[Exception])),
offsets => ZIO.attempt(callback.onComplete(offsets.asJava, null))
)
.ignore
)
.getOrThrowFiberFailure()
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.apache.kafka.common.TopicPartition
import zio.kafka.ZIOSpecDefaultSlf4j
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.internal.Committer.CommitOffsets
import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer
import zio.kafka.consumer.internal.LiveCommitter.Commit
import zio.kafka.consumer.internal.RebalanceCoordinator.RebalanceEvent
import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord
Expand Down Expand Up @@ -96,8 +97,7 @@ object RebalanceCoordinatorSpec extends ZIOSpecDefaultSlf4j {
records = createTestRecords(3)
recordsPulled <- Promise.make[Nothing, Unit]
_ <- streamControl.offerRecords(records)
runtime <- ZIO.runtime[Any]
committer <- LiveCommitter.make(10.seconds, Diagnostics.NoOp, mockMetrics, ZIO.unit, runtime)
committer <- LiveCommitter.make(10.seconds, Diagnostics.NoOp, mockMetrics, ZIO.unit)

streamDrain <-
streamControl.stream
Expand Down Expand Up @@ -173,16 +173,19 @@ object RebalanceCoordinatorSpec extends ZIOSpecDefaultSlf4j {
settings: ConsumerSettings = ConsumerSettings(List("")).withCommitTimeout(1.second),
rebalanceSafeCommits: Boolean = false
): ZIO[Scope, Throwable, RebalanceCoordinator] =
Semaphore.make(1).map(new ConsumerAccess(mockConsumer, _)).map { consumerAccess =>
new RebalanceCoordinator(
lastEvent,
settings.withRebalanceSafeCommits(rebalanceSafeCommits),
consumerAccess,
5.seconds,
ZIO.succeed(assignedStreams),
committer
)
}
Semaphore
.make(1)
.map(new ConsumerAccess(mockConsumer, _))
.map { consumerAccess =>
new RebalanceCoordinator(
lastEvent,
settings.withRebalanceSafeCommits(rebalanceSafeCommits),
consumerAccess,
5.seconds,
ZIO.succeed(assignedStreams),
committer
)
}

private def createTestRecords(count: Int): Chunk[ByteArrayCommittableRecord] =
Chunk.fromIterable(
Expand All @@ -205,10 +208,8 @@ object RebalanceCoordinatorSpec extends ZIOSpecDefaultSlf4j {
abstract class MockCommitter extends Committer {
override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit

override def processQueuedCommits(
commitAsync: (java.util.Map[TopicPartition, OffsetAndMetadata], OffsetCommitCallback) => zio.Task[Unit],
executeOnEmpty: Boolean
): zio.Task[Unit] = ZIO.unit
override def processQueuedCommits(consumer: ByteArrayKafkaConsumer, executeOnEmpty: Boolean): Task[Unit] = ZIO.unit

override def queueSize: UIO[Int] = ZIO.succeed(0)
override def pendingCommitCount: UIO[Int] = ZIO.succeed(0)
override def getPendingCommits: UIO[CommitOffsets] = ZIO.succeed(CommitOffsets.empty)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package zio.kafka.consumer.internal

import org.apache.kafka.clients.consumer.{ OffsetAndMetadata, OffsetCommitCallback }
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import zio.kafka.consumer.internal.Committer.CommitOffsets
import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer
import zio.kafka.consumer.internal.LiveCommitter.Commit
import zio.{ Chunk, Task, UIO }

import java.lang.Math.max
import java.util.{ Map => JavaMap }
import scala.collection.mutable

private[internal] trait Committer {
Expand All @@ -21,14 +21,13 @@ private[internal] trait Committer {
* WARNING: this method is used during a rebalance from the same-thread-runtime. This restricts what ZIO operations
* may be used. Please see [[RebalanceCoordinator]] for more information.
*
* @param commitAsync
* Function 'commitAsync' on the KafkaConsumer. This is isolated from the whole KafkaConsumer for testing purposes.
* The caller should ensure exclusive access to the KafkaConsumer.
* @param consumer
* KafkaConsumer to use. The caller is responsible or guaranteeing exclusive access.
* @param executeOnEmpty
* Execute commitAsync() even if there are no commits
*/
def processQueuedCommits(
commitAsync: (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback) => Task[Unit],
consumer: ByteArrayKafkaConsumer,
executeOnEmpty: Boolean = false
): Task[Unit]

Expand Down
Loading
Loading