Skip to content

Commit

Permalink
Do not start streams for immediately revoked/lost partitions
Browse files Browse the repository at this point in the history
As discovered by @ytalashko (see #1294), it is possible that a partition is assigned and immediately revoked in the same poll. No stream should be started for these partitions.

Unlike #1294, this PR does _not_ hide the assigned+revoked partitions from the diagnostic events and rebalance metrics. In addition, this PR also supports the unlikely event of a assigned+lost partition.
  • Loading branch information
erikvanoosten committed Aug 14, 2024
1 parent f18a00a commit 6144585
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,21 @@ object DiagnosticEvent {
final case class Failure(offsets: Map[TopicPartition, OffsetAndMetadata], cause: Throwable) extends Commit
}

/**
* The partitions that were involved in a rebalance.
*
* Note: when a partition was assigned and immediately revoked/lost, it will occur in multiple sets.
*
* @param revoked
* the partitions that were revoked during the rebalance
* @param assigned
* the partitions that were assigned during the rebalance
* @param lost
* the partitions that were lost during the rebalance
* @param ended
* the partition streams that were ended during the rebalance, a partition stream can be ended because it was
* revoked/lost or because `restartStreamsOnRebalancing` is used
*/
final case class Rebalance(
revoked: Set[TopicPartition],
assigned: Set[TopicPartition],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,14 +509,16 @@ private[consumer] final class Runloop private (

val currentAssigned = c.assignment().asScala.toSet
val endedTps = endedStreams.map(_.tp).toSet
// Some partitions might have been both assigned and revoked/lost.
val actualAssigned = assignedTps -- revokedTps -- lostTps
for {
ignoreRecordsForTps <- doSeekForNewPartitions(c, assignedTps)
ignoreRecordsForTps <- doSeekForNewPartitions(c, actualAssigned)

// The topic partitions that need a new stream are:
// 1. Those that are freshly assigned
// 2. Those that are still assigned but were ended in the rebalance listener because
// of `restartStreamsOnRebalancing` being true
startingTps = assignedTps ++ (currentAssigned intersect endedTps)
startingTps = actualAssigned ++ (currentAssigned intersect endedTps)

startingStreams <-
ZIO.foreach(Chunk.fromIterable(startingTps))(newPartitionStream).tap { newStreams =>
Expand Down

0 comments on commit 6144585

Please sign in to comment.