Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class DescribeGroupsResponse extends AbstractResponse {
private static final Schema DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0 = new Schema(
ERROR_CODE,
new Field(GROUP_ID_KEY_NAME, STRING),
new Field(GROUP_STATE_KEY_NAME, STRING, "The current state of the group (one of: Dead, Stable, AwaitingSync, " +
new Field(GROUP_STATE_KEY_NAME, STRING, "The current state of the group (one of: Dead, Stable, CompletingRebalance, " +
"PreparingRebalance, or empty if there is no active group)"),
new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "The current group protocol type (will be empty if there is no active group)"),
new Field(PROTOCOL_KEY_NAME, STRING, "The current group protocol (only provided if the group is Stable)"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.Utils;

import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand Down Expand Up @@ -202,4 +203,17 @@ protected Struct toStruct(short version) {

return struct;
}

@Override
public String toString() {
return "JoinGroupResponse" +
"(throttleTimeMs=" + throttleTimeMs +
", error=" + error +
", generationId=" + generationId +
", groupProtocol=" + groupProtocol +
", memberId=" + memberId +
", leaderId=" + leaderId +
", members=" + ((members == null) ? "null" :
Utils.join(members.keySet(), ",")) + ")";
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ object ConsumerGroupCommand extends Logging {
case Some("Empty") =>
System.err.println(s"Consumer group '$groupId' has no active members.")
printAssignment(assignments, true)
case Some("PreparingRebalance") | Some("AwaitingSync") =>
case Some("PreparingRebalance") | Some("CompletingRebalance") =>
System.err.println(s"Warning: Consumer group '$groupId' is rebalancing.")
printAssignment(assignments, true)
case Some("Stable") =>
Expand Down
22 changes: 11 additions & 11 deletions core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class GroupCoordinator(val brokerId: Int,
updateMemberAndRebalance(group, member, protocols, responseCallback)
}

case AwaitingSync =>
case CompletingRebalance =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
} else {
Expand Down Expand Up @@ -261,7 +261,7 @@ class GroupCoordinator(val brokerId: Int,
case PreparingRebalance =>
responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS)

case AwaitingSync =>
case CompletingRebalance =>
group.get(memberId).awaitingSyncCallback = responseCallback

// if this is the leader, then we can attempt to persist state and transition to stable
Expand All @@ -275,9 +275,9 @@ class GroupCoordinator(val brokerId: Int,
groupManager.storeGroup(group, assignment, (error: Errors) => {
group synchronized {
// another member may have joined the group while we were awaiting this callback,
// so we must ensure we are still in the AwaitingSync state and the same generation
// so we must ensure we are still in the CompletingRebalance state and the same generation
// when it gets invoked. if we have transitioned to another state, then do nothing
if (group.is(AwaitingSync) && generationId == group.generationId) {
if (group.is(CompletingRebalance) && generationId == group.generationId) {
if (error != Errors.NONE) {
resetAndPropagateAssignmentError(group, error)
maybePrepareRebalance(group)
Expand Down Expand Up @@ -361,7 +361,7 @@ class GroupCoordinator(val brokerId: Int,
case Empty =>
responseCallback(Errors.UNKNOWN_MEMBER_ID)

case AwaitingSync =>
case CompletingRebalance =>
if (!group.has(memberId))
responseCallback(Errors.UNKNOWN_MEMBER_ID)
else
Expand Down Expand Up @@ -456,7 +456,7 @@ class GroupCoordinator(val brokerId: Int,
// the group is only using Kafka to store offsets
// Also, for transactional offset commits we don't need to validate group membership and the generation.
groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId, producerEpoch)
} else if (group.is(AwaitingSync)) {
} else if (group.is(CompletingRebalance)) {
responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS))
} else if (!group.has(memberId)) {
responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
Expand Down Expand Up @@ -545,7 +545,7 @@ class GroupCoordinator(val brokerId: Int,
}
joinPurgatory.checkAndComplete(GroupKey(group.groupId))

case Stable | AwaitingSync =>
case Stable | CompletingRebalance =>
for (member <- group.allMemberMetadata) {
if (member.awaitingSyncCallback != null) {
member.awaitingSyncCallback(Array.empty[Byte], Errors.NOT_COORDINATOR)
Expand Down Expand Up @@ -574,13 +574,13 @@ class GroupCoordinator(val brokerId: Int,
}

private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]) {
assert(group.is(AwaitingSync))
assert(group.is(CompletingRebalance))
group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId))
propagateAssignment(group, Errors.NONE)
}

private def resetAndPropagateAssignmentError(group: GroupMetadata, error: Errors) {
assert(group.is(AwaitingSync))
assert(group.is(CompletingRebalance))
group.allMemberMetadata.foreach(_.assignment = Array.empty[Byte])
propagateAssignment(group, error)
}
Expand Down Expand Up @@ -674,7 +674,7 @@ class GroupCoordinator(val brokerId: Int,

private def prepareRebalance(group: GroupMetadata) {
// if any members are awaiting sync, cancel their request and have them rejoin
if (group.is(AwaitingSync))
if (group.is(CompletingRebalance))
resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)

val delayedRebalance = if (group.is(Empty))
Expand All @@ -700,7 +700,7 @@ class GroupCoordinator(val brokerId: Int,
group.remove(member.memberId)
group.currentState match {
case Dead | Empty =>
case Stable | AwaitingSync => maybePrepareRebalance(group)
case Stable | CompletingRebalance => maybePrepareRebalance(group)
case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}
}
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private[group] sealed trait GroupState
* park join group requests from new or existing members until all expected members have joined
* allow offset commits from previous generation
* allow offset fetch requests
* transition: some members have joined by the timeout => AwaitingSync
* transition: some members have joined by the timeout => CompletingRebalance
* all members have left the group => Empty
* group is removed by partition emigration => Dead
*/
Expand All @@ -54,7 +54,7 @@ private[group] case object PreparingRebalance extends GroupState
* member failure detected => PreparingRebalance
* group is removed by partition emigration => Dead
*/
private[group] case object AwaitingSync extends GroupState
private[group] case object CompletingRebalance extends GroupState

/**
* Group is stable
Expand Down Expand Up @@ -105,10 +105,10 @@ private[group] case object Empty extends GroupState

private object GroupMetadata {
private val validPreviousStates: Map[GroupState, Set[GroupState]] =
Map(Dead -> Set(Stable, PreparingRebalance, AwaitingSync, Empty, Dead),
AwaitingSync -> Set(PreparingRebalance),
Stable -> Set(AwaitingSync),
PreparingRebalance -> Set(Stable, AwaitingSync, Empty),
Map(Dead -> Set(Stable, PreparingRebalance, CompletingRebalance, Empty, Dead),
CompletingRebalance -> Set(PreparingRebalance),
Stable -> Set(CompletingRebalance),
PreparingRebalance -> Set(Stable, CompletingRebalance, Empty),
Empty -> Set(PreparingRebalance))
}

Expand Down Expand Up @@ -256,7 +256,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
if (members.nonEmpty) {
generationId += 1
protocol = selectProtocol
transitionTo(AwaitingSync)
transitionTo(CompletingRebalance)
} else {
generationId += 1
protocol = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,19 +82,57 @@ class GroupMetadataManager(brokerId: Int,

this.logIdent = s"[GroupMetadataManager brokerId=$brokerId] "

newGauge("NumOffsets",
private def recreateGauge[T](name: String, gauge: Gauge[T]): Gauge[T] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why we want to remove-then-recreate here? Isn't this a one-time call for the life time?

removeMetric(name)
newGauge(name, gauge)
}

recreateGauge("NumOffsets",
new Gauge[Int] {
def value = groupMetadataCache.values.map(group => {
group synchronized { group.numOffsets }
}).sum
}
)
})

newGauge("NumGroups",
recreateGauge("NumGroups",
new Gauge[Int] {
def value = groupMetadataCache.size
}
)
})

recreateGauge("NumGroupsPreparingRebalance",
new Gauge[Int] {
def value(): Int = groupMetadataCache.values.count(group => {
group synchronized { group.is(PreparingRebalance) }
})
})

recreateGauge("NumGroupsCompletingRebalance",
new Gauge[Int] {
def value(): Int = groupMetadataCache.values.count(group => {
group synchronized { group.is(CompletingRebalance) }
})
})

recreateGauge("NumGroupsStable",
new Gauge[Int] {
def value(): Int = groupMetadataCache.values.count(group => {
group synchronized { group.is(Stable) }
})
})

recreateGauge("NumGroupsDead",
new Gauge[Int] {
def value(): Int = groupMetadataCache.values.count(group => {
group synchronized { group.is(Dead) }
})
})

recreateGauge("NumGroupsEmpty",
new Gauge[Int] {
def value(): Int = groupMetadataCache.values.count(group => {
group synchronized { group.is(Empty) }
})
})

def enableMetadataExpiration() {
scheduler.startup()
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ trait KafkaMetricsGroup extends Logging {
* @param tags Additional attributes which mBean will have.
* @return Sanitized metric name object.
*/
protected def metricName(name: String, tags: scala.collection.Map[String, String]): MetricName = {
def metricName(name: String, tags: scala.collection.Map[String, String]): MetricName = {
val klass = this.getClass
val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/utils/ZkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ class ZooKeeperClientMetrics(zkClient: ZkClient, val time: Time)
extends ZooKeeperClientWrapper(zkClient) with KafkaMetricsGroup {
val latencyMetric = newHistogram("ZooKeeperRequestLatencyMs")

override protected def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = {
override def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = {
explicitMetricName("kafka.server", "ZooKeeperClientMetrics", name, metricTags)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ class GroupCoordinatorTest extends JUnitSuite {
}

@Test
def testCommitOffsetInAwaitingSync() {
def testCommitOffsetInCompletingRebalance() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val tp = new TopicPartition("topic", 0)
val offset = OffsetAndMetadata(0)
Expand Down Expand Up @@ -1232,7 +1232,7 @@ class GroupCoordinatorTest extends JUnitSuite {
assertEquals(Errors.NONE, error)
assertEquals(protocolType, summary.protocolType)
assertEquals(GroupCoordinator.NoProtocol, summary.protocol)
assertEquals(AwaitingSync.toString, summary.state)
assertEquals(CompletingRebalance.toString, summary.state)
assertTrue(summary.members.map(_.memberId).contains(joinGroupResult.memberId))
assertTrue(summary.members.forall(_.metadata.isEmpty))
assertTrue(summary.members.forall(_.assignment.isEmpty))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import kafka.common.OffsetAndMetadata
import kafka.log.{Log, LogAppendInfo}
import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManager}
import kafka.utils.TestUtils.fail
import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils}
import kafka.utils.{KafkaScheduler, Logging, MockTime, TestUtils, ZkUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record._
Expand All @@ -34,6 +34,8 @@ import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
import org.junit.{Before, Test}
import java.nio.ByteBuffer

import com.yammer.metrics.Metrics
import com.yammer.metrics.core.Gauge
import org.apache.kafka.common.internals.Topic

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -1394,4 +1396,29 @@ class GroupMetadataManagerTest {
EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
}

private def getGauge(manager: GroupMetadataManager, name: String): Gauge[Int] = {
Metrics.defaultRegistry().allMetrics().get(manager.metricName(name, Map.empty)).asInstanceOf[Gauge[Int]]
}

private def expectMetrics(manager: GroupMetadataManager,
expectedNumGroups: Int,
expectedNumGroupsPreparingRebalance: Int,
expectedNumGroupsCompletingRebalance: Int): Unit = {
assertEquals(expectedNumGroups, getGauge(manager, "NumGroups").value)
assertEquals(expectedNumGroupsPreparingRebalance, getGauge(manager, "NumGroupsPreparingRebalance").value)
assertEquals(expectedNumGroupsCompletingRebalance, getGauge(manager, "NumGroupsCompletingRebalance").value)
}

@Test
def testMetrics() {
groupMetadataManager.cleanupGroupMetadata()
expectMetrics(groupMetadataManager, 0, 0, 0)
val group = new GroupMetadata("foo2", Stable)
groupMetadataManager.addGroup(group)
expectMetrics(groupMetadataManager, 1, 0, 0)
group.transitionTo(PreparingRebalance)
expectMetrics(groupMetadataManager, 1, 1, 0)
group.transitionTo(CompletingRebalance)
expectMetrics(groupMetadataManager, 1, 0, 1)
}
}
Loading