Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import java.util.{Optional, OptionalInt}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.ConcurrentHashMap
import com.yammer.metrics.core.Gauge
import kafka.common.OffsetAndMetadata
import kafka.coordinator.group.GroupMetadataManager.{MetricNames, NumGroupsCompletingRebalanceMetricName, NumGroupsDeadMetricName, NumGroupsEmptyMetricName, NumGroupsMetricName, NumGroupsPreparingRebalanceMetricName, NumGroupsStableMetricName, NumOffsetsMetricName}
import kafka.server.{ReplicaManager, RequestLocal}
import kafka.utils.CoreUtils.inLock
import kafka.utils.Implicits._
Expand Down Expand Up @@ -124,55 +124,54 @@ class GroupMetadataManager(brokerId: Int,

this.logIdent = s"[GroupMetadataManager brokerId=$brokerId] "

private def recreateGauge[T](name: String, gauge: Gauge[T]): Gauge[T] = {
metricsGroup.removeMetric(name)
metricsGroup.newGauge(name, gauge)
}

recreateGauge("NumOffsets",
metricsGroup.newGauge(NumOffsetsMetricName,
() => groupMetadataCache.values.map { group =>
group.inLock { group.numOffsets }
}.sum
)
}.sum)

recreateGauge("NumGroups",
metricsGroup.newGauge(NumGroupsMetricName,
() => groupMetadataCache.size
)

recreateGauge("NumGroupsPreparingRebalance",
metricsGroup.newGauge(NumGroupsPreparingRebalanceMetricName,
() => groupMetadataCache.values.count { group =>
group synchronized {
group.is(PreparingRebalance)
}
})
}
)

recreateGauge("NumGroupsCompletingRebalance",
metricsGroup.newGauge(NumGroupsCompletingRebalanceMetricName,
() => groupMetadataCache.values.count { group =>
group synchronized {
group.is(CompletingRebalance)
}
})
}
)

recreateGauge("NumGroupsStable",
metricsGroup.newGauge(NumGroupsStableMetricName,
() => groupMetadataCache.values.count { group =>
group synchronized {
group.is(Stable)
}
})
}
)

recreateGauge("NumGroupsDead",
metricsGroup.newGauge(NumGroupsDeadMetricName,
() => groupMetadataCache.values.count { group =>
group synchronized {
group.is(Dead)
}
})
}
)

recreateGauge("NumGroupsEmpty",
metricsGroup.newGauge(NumGroupsEmptyMetricName,
() => groupMetadataCache.values.count { group =>
group synchronized {
group.is(Empty)
}
})
}
)

def startup(retrieveGroupMetadataTopicPartitionCount: () => Int, enableMetadataExpiration: Boolean): Unit = {
groupMetadataTopicPartitionCount = retrieveGroupMetadataTopicPartitionCount()
Expand Down Expand Up @@ -985,7 +984,7 @@ class GroupMetadataManager(brokerId: Int,
metrics.removeSensor(GroupMetadataManager.LoadTimeSensor)
metrics.removeSensor(GroupMetadataManager.OffsetCommitsSensor)
metrics.removeSensor(GroupMetadataManager.OffsetExpiredSensor)

MetricNames.foreach(metricsGroup.removeMetric)
// TODO: clear the caches
}

Expand Down Expand Up @@ -1049,6 +1048,25 @@ object GroupMetadataManager {
val OffsetCommitsSensor: String = "OffsetCommits"
val OffsetExpiredSensor: String = "OffsetExpired"

private val NumOffsetsMetricName = "NumOffsets"
private val NumGroupsMetricName = "NumGroups"
private val NumGroupsPreparingRebalanceMetricName = "NumGroupsPreparingRebalance"
private val NumGroupsCompletingRebalanceMetricName = "NumGroupsCompletingRebalance"
private val NumGroupsStableMetricName = "NumGroupsStable"
private val NumGroupsDeadMetricName = "NumGroupsDead"
private val NumGroupsEmptyMetricName = "NumGroupsEmpty"

// Visible for test
private[group] val MetricNames = Set(
NumOffsetsMetricName,
NumGroupsMetricName,
NumGroupsPreparingRebalanceMetricName,
NumGroupsCompletingRebalanceMetricName,
NumGroupsStableMetricName,
NumGroupsDeadMetricName,
NumGroupsEmptyMetricName
)

/**
* Generates the key for offset commit message for given (group, topic, partition)
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, OffsetCommitValue}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, LogAppendInfo, LogOffsetMetadata}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.ArgumentMatchers.{any, anyInt, anyLong, anyShort}
import org.mockito.Mockito.{mock, reset, times, verify, when}
import org.mockito.Mockito.{mock, mockConstruction, reset, times, verify, verifyNoMoreInteractions, when}

import scala.jdk.CollectionConverters._
import scala.collection._
Expand Down Expand Up @@ -108,6 +108,28 @@ class GroupMetadataManagerTest {
groupMetadataManager.shutdown()
}

@Test
def testRemoveMetricsOnClose(): Unit = {
val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
try {
val gmm = new GroupMetadataManager(1, MetadataVersion.latest, offsetConfig, replicaManager,
time, metrics)

// shutdown GroupMetadataManager so that metrics are removed
gmm.shutdown()
val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
// verify that each `metricsGroup` metric in `GroupMetadataManager` is registered when initialized.
GroupMetadataManager.MetricNames.foreach(metricName =>
verify(mockMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any()))
// verify that each `metricsGroup` metric in `GroupMetadataManager` is removed when shutdown.
GroupMetadataManager.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))
// assert that we have verified all invocations on
verifyNoMoreInteractions(mockMetricsGroup)
} finally {
mockMetricsGroupCtor.close()
}
}

@Test
def testLogInfoFromCleanupGroupMetadata(): Unit = {
var expiredOffsets: Int = 0
Expand Down