Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ class Partition(val topic: String,
}

private def updateIsr(newIsr: Set[Replica]) {
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(_.brokerId).toList, zkVersion)
val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId,
newLeaderAndIsr, controllerEpoch, zkVersion)

Expand All @@ -538,6 +538,7 @@ class Partition(val topic: String,
zkVersion = newVersion
trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion))
} else {
replicaManager.failedIsrUpdatesRate.mark()
info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package kafka.controller
import java.net.SocketTimeoutException
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}

import com.yammer.metrics.core.Gauge
import kafka.api._
import kafka.cluster.Broker
import kafka.common.{KafkaException, TopicAndPartition}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.KafkaConfig
import kafka.utils._
import org.apache.kafka.clients._
Expand All @@ -38,11 +40,26 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.collection.{Set, mutable}

class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging {
object ControllerChannelManager {
val QueueSizeMetricName = "QueueSize"
}

class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics,
threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
import ControllerChannelManager._
protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
private val brokerLock = new Object
this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "

newGauge(
"TotalQueueSize",
new Gauge[Int] {
def value: Int = brokerLock synchronized {
brokerStateInfo.values.iterator.map(_.messageQueue.size).sum
}
}
)

controllerContext.liveBrokers.foreach(addNewBroker)

def startup() = {
Expand Down Expand Up @@ -133,9 +150,21 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
brokerNode, config, time, threadName)
requestThread.setDaemon(false)
brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread))

val queueSizeGauge = newGauge(
QueueSizeMetricName,
new Gauge[Int] {
def value: Int = messageQueue.size
},
queueSizeTags(broker.id)
)

brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue,
requestThread, queueSizeGauge))
}

private def queueSizeTags(brokerId: Int) = Map("broker-id" -> brokerId.toString)

private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) {
try {
// Shutdown the RequestSendThread before closing the NetworkClient to avoid the concurrent use of the
Expand All @@ -145,6 +174,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
brokerState.requestSendThread.shutdown()
brokerState.networkClient.close()
brokerState.messageQueue.clear()
removeMetric(QueueSizeMetricName, queueSizeTags(brokerState.brokerNode.id))
brokerStateInfo.remove(brokerState.brokerNode.id)
} catch {
case e: Throwable => error("Error while removing broker by the controller", e)
Expand Down Expand Up @@ -465,7 +495,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
case class ControllerBrokerStateInfo(networkClient: NetworkClient,
brokerNode: Node,
messageQueue: BlockingQueue[QueueItem],
requestSendThread: RequestSendThread)
requestSendThread: RequestSendThread,
queueSizeGauge: Gauge[Int])

case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractResponse => Unit = null)

Expand Down
65 changes: 65 additions & 0 deletions core/src/main/scala/kafka/controller/ControllerEventManager.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.controller

import java.util.concurrent.LinkedBlockingQueue

import scala.collection._

import kafka.metrics.KafkaTimer
import kafka.utils.ShutdownableThread

class ControllerEventManager(rateAndTimeMetrics: Map[ControllerState, KafkaTimer],
eventProcessedListener: ControllerEvent => Unit) {

@volatile private var _state: ControllerState = ControllerState.Idle

private val queue = new LinkedBlockingQueue[ControllerEvent]
private val thread = new ControllerEventThread("controller-event-thread")

def state: ControllerState = _state

def start(): Unit = thread.start()

def close(): Unit = thread.shutdown()

def put(event: ControllerEvent): Unit = queue.put(event)

class ControllerEventThread(name: String) extends ShutdownableThread(name = name) {
override def doWork(): Unit = {
val controllerEvent = queue.take()
_state = controllerEvent.state

try {
rateAndTimeMetrics(state).time {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! This is exactly what I was hoping the loop would look like.

controllerEvent.process()
}
} catch {
case e: Throwable => error(s"Error processing event $controllerEvent", e)
}

try eventProcessedListener(controllerEvent)
catch {
case e: Throwable => error(s"Error while invoking listener for processed event $controllerEvent", e)
}

_state = ControllerState.Idle
}
}

}
83 changes: 83 additions & 0 deletions core/src/main/scala/kafka/controller/ControllerState.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.controller

import scala.collection.Seq

sealed abstract class ControllerState {

def value: Byte

def rateAndTimeMetricName: Option[String] =
if (hasRateAndTimeMetric) Some(s"${toString}RateAndTimeMs") else None

protected def hasRateAndTimeMetric: Boolean = true
}

object ControllerState {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The names below a little inconsistent although they match the KIP. More concretely: Idle, ControllerChange, BrokerChange, TopicChange, IsrChange, ControlledShutdown, TopicDeletion versus PartitionReassigning, AutoLeaderBalancing, ManualLeaderBalancing. It seems like we should do either of the following:

  1. Change the former to be: Idling, ControllerChanging, BrokerChanging, etc.
  2. Change the latter to be: PartitionReassignment, AutoLeaderBalance, ManualLeaderBalance

Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that #2 is better?


// Note: `rateAndTimeMetricName` is based on the case object name by default. Changing a name is a breaking change
// unless `rateAndTimeMetricName` is overridden.

case object Idle extends ControllerState {
def value = 0
override protected def hasRateAndTimeMetric: Boolean = false
}

case object ControllerChange extends ControllerState {
def value = 1
}

case object BrokerChange extends ControllerState {
def value = 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have an existing metric ControllerStats.leaderElectionTimer, for this state, perhaps it's better not to measure it again in ControllerEventThread?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was unsure about consistency and clarity over duplication. I'll do as you suggest for now, as we can add a BrokerChange metric in the future, if this proves to be confusing.

// The LeaderElectionRateAndTimeMs metric existed before `ControllerState` was introduced and we keep the name
// for backwards compatibility. The alternative would be to have the same metric under two different names.
override def rateAndTimeMetricName = Some("LeaderElectionRateAndTimeMs")
}

case object TopicChange extends ControllerState {
def value = 3
}

case object TopicDeletion extends ControllerState {
def value = 4
}

case object PartitionReassignment extends ControllerState {
def value = 5
}

case object AutoLeaderBalance extends ControllerState {
def value = 6
}

case object ManualLeaderBalance extends ControllerState {
def value = 7
}

case object ControlledShutdown extends ControllerState {
def value = 8
}

case object IsrChange extends ControllerState {
def value = 9
}

val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, TopicChange, TopicDeletion,
PartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange)
}
Loading