diff --git a/README.md b/README.md
index ef3a5a405..f54721ebc 100644
--- a/README.md
+++ b/README.md
@@ -6,7 +6,7 @@ A tool for managing [Apache Kafka](http://kafka.apache.org).
It supports the following :
- Manage multiple clusters
- - Easy inspection of cluster state (topics, brokers, replica distribution, partition distribution)
+ - Easy inspection of cluster state (topics, consumers, offsets, brokers, replica distribution, partition distribution)
- Run preferred replica election
- Generate partition assignments with option to select brokers to use
- Run reassignment of partition (based on generated assignments)
@@ -18,6 +18,7 @@ It supports the following :
- Add partitions to existing topic
- Update config for existing topic
- Optionally enable JMX polling for broker level and topic level metrics.
+ - Optionally filter out consumers that do not have ids/ owners/ & offsets/ directories in zookeeper.
Cluster Management
@@ -37,9 +38,21 @@ Topic View
***
+Consumer List View
+
+![consumer](/img/consumer-list.png)
+
+***
+
+Consumed Topic View
+
+![consumer](/img/consumed-topic.png)
+
+***
+
Broker List
-![topic](/img/broker-list.png)
+![broker](/img/broker-list.png)
***
@@ -52,9 +65,9 @@ Broker View
Requirements
------------
-1. [Kafka 0.8.1.1 or 0.8.2.0](http://kafka.apache.org/downloads.html)
+1. [Kafka 0.8.1.1 or 0.8.2.1](http://kafka.apache.org/downloads.html)
2. [sbt 0.13.x](http://www.scala-sbt.org/download.html)
-3. Java 7+
+3. Java 8+
Configuration
-------------
diff --git a/app/controllers/Cluster.scala b/app/controllers/Cluster.scala
index bbb44e86f..96c691fae 100644
--- a/app/controllers/Cluster.scala
+++ b/app/controllers/Cluster.scala
@@ -70,7 +70,9 @@ object Cluster extends Controller {
"zkHosts" -> nonEmptyText.verifying(validateZkHosts),
"zkMaxRetry" -> ignored(100 : Int),
"jmxEnabled" -> boolean,
- "logkafkaEnabled" -> boolean
+ "filterConsumers" -> boolean,
+ "logkafkaEnabled" -> boolean,
+ "activeOffsetCacheEnabled" -> boolean
)(ClusterConfig.apply)(ClusterConfig.customUnapply)
)
@@ -93,7 +95,9 @@ object Cluster extends Controller {
"zkHosts" -> nonEmptyText.verifying(validateZkHosts),
"zkMaxRetry" -> ignored(100 : Int),
"jmxEnabled" -> boolean,
- "logkafkaEnabled" -> boolean
+ "filterConsumers" -> boolean,
+ "logkafkaEnabled" -> boolean,
+ "activeOffsetCacheEnabled" -> boolean
)(ClusterOperation.apply)(ClusterOperation.customUnapply)
)
@@ -154,7 +158,9 @@ object Cluster extends Controller {
cc.curatorConfig.zkConnect,
cc.curatorConfig.zkMaxRetry,
cc.jmxEnabled,
- cc.logkafkaEnabled))
+ cc.filterConsumers,
+ cc.logkafkaEnabled,
+ cc.activeOffsetCacheEnabled))
}))
}
}
@@ -166,7 +172,7 @@ object Cluster extends Controller {
clusterConfigForm.bindFromRequest.fold(
formWithErrors => Future.successful(BadRequest(views.html.cluster.addCluster(formWithErrors))),
clusterConfig => {
- kafkaManager.addCluster(clusterConfig.name, clusterConfig.version.toString, clusterConfig.curatorConfig.zkConnect, clusterConfig.jmxEnabled, clusterConfig.logkafkaEnabled).map { errorOrSuccess =>
+ kafkaManager.addCluster(clusterConfig.name, clusterConfig.version.toString, clusterConfig.curatorConfig.zkConnect, clusterConfig.jmxEnabled, clusterConfig.filterConsumers, clusterConfig.logkafkaEnabled, clusterConfig.activeOffsetCacheEnabled).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
models.navigation.BreadCrumbs.withView("Add Cluster"),
@@ -225,7 +231,9 @@ object Cluster extends Controller {
clusterOperation.clusterConfig.version.toString,
clusterOperation.clusterConfig.curatorConfig.zkConnect,
clusterOperation.clusterConfig.jmxEnabled,
- clusterOperation.clusterConfig.logkafkaEnabled
+ clusterOperation.clusterConfig.filterConsumers,
+ clusterOperation.clusterConfig.logkafkaEnabled,
+ clusterOperation.clusterConfig.activeOffsetCacheEnabled
).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
diff --git a/app/controllers/Consumer.scala b/app/controllers/Consumer.scala
new file mode 100644
index 000000000..5753acc33
--- /dev/null
+++ b/app/controllers/Consumer.scala
@@ -0,0 +1,37 @@
+/**
+ * Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
+ * See accompanying LICENSE file.
+ */
+
+package controllers
+
+import features.ApplicationFeatures
+import play.api.mvc._
+
+/**
+ * @author cvcal
+ */
+object Consumer extends Controller{
+ import play.api.libs.concurrent.Execution.Implicits.defaultContext
+
+ private[this] val kafkaManager = KafkaManagerContext.getKafkaManager
+ private[this] implicit val af: ApplicationFeatures = ApplicationFeatures.features
+
+ def consumers(cluster: String) = Action.async {
+ kafkaManager.getConsumerListExtended(cluster).map { errorOrConsumerList =>
+ Ok(views.html.consumer.consumerList(cluster, errorOrConsumerList))
+ }
+ }
+
+ def consumer(cluster: String, consumerGroup: String) = Action.async {
+ kafkaManager.getConsumerIdentity(cluster,consumerGroup).map { errorOrConsumerIdentity =>
+ Ok(views.html.consumer.consumerView(cluster,consumerGroup,errorOrConsumerIdentity))
+ }
+ }
+
+ def consumerAndTopic(cluster: String, consumerGroup: String, topic: String) = Action.async {
+ kafkaManager.getConsumedTopicState(cluster,consumerGroup,topic).map { errorOrConsumedTopicState =>
+ Ok(views.html.consumer.consumedTopicView(cluster,consumerGroup,topic,errorOrConsumedTopicState))
+ }
+ }
+}
diff --git a/app/controllers/Logkafka.scala b/app/controllers/Logkafka.scala
index 34c47f075..52cd6b163 100644
--- a/app/controllers/Logkafka.scala
+++ b/app/controllers/Logkafka.scala
@@ -233,4 +233,40 @@ object Logkafka extends Controller{
)
}
}
+
+ def handleEnableConfig(clusterName: String, hostname: String, log_path: String) = Action.async { implicit request =>
+ clusterFeatureGate(clusterName, KMLogKafkaFeature) { clusterContext =>
+ implicit val clusterFeatures = clusterContext.clusterFeatures
+ val props = new Properties();
+ props.put("valid", true.toString);
+ kafkaManager.updateLogkafkaConfig(clusterName, hostname, log_path, props, false).map { errorOrSuccess =>
+ Ok(views.html.common.resultOfCommand(
+ views.html.navigation.clusterMenu(clusterName, "Logkafka", "Logkafka View", Menus.clusterMenus(clusterName)),
+ models.navigation.BreadCrumbs.withNamedViewAndClusterAndLogkafka("Logkafka View", clusterName, hostname, log_path, "Update Config"),
+ errorOrSuccess,
+ "Enable Config",
+ FollowLink("Go to logkafka view.", routes.Logkafka.logkafka(clusterName, hostname, log_path).toString()),
+ FollowLink("Try again.", routes.Logkafka.updateConfig(clusterName, hostname, log_path).toString())
+ ))
+ }
+ }
+ }
+
+ def handleDisableConfig(clusterName: String, hostname: String, log_path: String) = Action.async { implicit request =>
+ clusterFeatureGate(clusterName, KMLogKafkaFeature) { clusterContext =>
+ implicit val clusterFeatures = clusterContext.clusterFeatures
+ val props = new Properties();
+ props.put("valid", false.toString);
+ kafkaManager.updateLogkafkaConfig(clusterName, hostname, log_path, props, false).map { errorOrSuccess =>
+ Ok(views.html.common.resultOfCommand(
+ views.html.navigation.clusterMenu(clusterName, "Logkafka", "Logkafka View", Menus.clusterMenus(clusterName)),
+ models.navigation.BreadCrumbs.withNamedViewAndClusterAndLogkafka("Logkafka View", clusterName, hostname, log_path, "Update Config"),
+ errorOrSuccess,
+ "Disable Config",
+ FollowLink("Go to logkafka view.", routes.Logkafka.logkafka(clusterName, hostname, log_path).toString()),
+ FollowLink("Try again.", routes.Logkafka.updateConfig(clusterName, hostname, log_path).toString())
+ ))
+ }
+ }
+ }
}
diff --git a/app/controllers/ReassignPartitions.scala b/app/controllers/ReassignPartitions.scala
index e61288334..46fb1705c 100644
--- a/app/controllers/ReassignPartitions.scala
+++ b/app/controllers/ReassignPartitions.scala
@@ -17,6 +17,8 @@ import play.api.data.Forms._
import play.api.data.validation.{Valid, Invalid, Constraint}
import play.api.mvc._
+import scala.collection.mutable
+
import scala.concurrent.Future
import scalaz.{\/, \/-, -\/}
@@ -204,7 +206,8 @@ object ReassignPartitions extends Controller{
bl <- blOrError
} yield {
Ok(views.html.topic.manualAssignments(
- c, t, manualReassignmentForm.fill(List(flattenTopicIdentity(ti))), bl, bv, manualReassignmentForm.errors
+ //c, t, manualReassignmentForm.fill(List(flattenTopicIdentity(ti))), bl, bv, manualReassignmentForm.errors
+ c, t, List(flattenTopicIdentity(ti)), bl, bv, manualReassignmentForm.errors
))
}
errorOrResult.fold(err => {
@@ -227,18 +230,19 @@ object ReassignPartitions extends Controller{
}, { topics: TopicListExtended =>
kafkaManager.getBrokerList(c).flatMap { errOrCV =>
errOrCV.fold(
- { err: ApiError =>
- Future.successful(Ok(views.html.topic.confirmMultipleAssignments(c, -\/(err))))
- }, { brokers: BrokerListExtended => {
- brokersViews.flatMap { errorOrBVs =>
- errorOrBVs.fold(
- { err: ApiError => Future.successful(Ok(views.html.topic.confirmMultipleAssignments(c, -\/(err))))}, { bVs: Seq[BVView] => Future {
- Ok(views.html.topic.manualMultipleAssignments(
- c, manualReassignmentForm.fill(flattenedTopicListExtended(topics)), brokers, bVs, manualReassignmentForm.errors
- ))
- }
- }
- )
+ {err: ApiError =>
+ Future.successful( Ok(views.html.topic.confirmMultipleAssignments( c, -\/(err) )))
+ },
+ { brokers: BrokerListExtended => {
+ brokersViews.flatMap { errorOrBVs =>
+ errorOrBVs.fold (
+ {err: ApiError => Future.successful( Ok(views.html.topic.confirmMultipleAssignments( c, -\/(err) )))},
+ {bVs => Future {
+ Ok(views.html.topic.manualMultipleAssignments(
+ c, flattenedTopicListExtended(topics), brokers , bVs, manualReassignmentForm.errors
+ ))
+ }}
+ )
}
}
}
@@ -283,7 +287,7 @@ object ReassignPartitions extends Controller{
errors => kafkaManager.getClusterList.flatMap { errorOrClusterList =>
responseScreen(
"Manual Reassign Partitions Failure",
- -\/(IndexedSeq(ApiError("There is something really wrong with your submitted data!")))
+ -\/(IndexedSeq(ApiError("There is something really wrong with your submitted data!\n\n" + errors.toString)))
)
},
assignment => {
@@ -397,7 +401,7 @@ object ReassignPartitions extends Controller{
),
cc =>
reassignPartitionsForm.bindFromRequest.fold(
- formWithErrors => Future.successful(BadRequest(views.html.topic.topicView(c, t, -\/(ApiError("Unknown operation!"))))),
+ formWithErrors => Future.successful(BadRequest(views.html.topic.topicView(c, t, -\/(ApiError("Unknown operation!")), None))),
op => op match {
case RunAssignment =>
implicit val clusterFeatures = cc.clusterFeatures
diff --git a/app/controllers/Topic.scala b/app/controllers/Topic.scala
index 0f0c07a34..41201d20a 100644
--- a/app/controllers/Topic.scala
+++ b/app/controllers/Topic.scala
@@ -139,8 +139,11 @@ object Topic extends Controller{
}
def topic(c: String, t: String) = Action.async {
- kafkaManager.getTopicIdentity(c,t).map { errorOrTopicIdentity =>
- Ok(views.html.topic.topicView(c,t,errorOrTopicIdentity))
+ val futureErrorOrTopicIdentity = kafkaManager.getTopicIdentity(c,t)
+ val futureErrorOrConsumerList = kafkaManager.getConsumersForTopic(c,t)
+
+ futureErrorOrTopicIdentity.zip(futureErrorOrConsumerList).map {case (errorOrTopicIdentity,errorOrConsumerList) =>
+ Ok(views.html.topic.topicView(c,t,errorOrTopicIdentity,errorOrConsumerList))
}
}
@@ -197,7 +200,8 @@ object Topic extends Controller{
BadRequest(views.html.topic.topicView(
clusterName,
topic,
- -\/(ApiError(formWithErrors.error("topic").map(_.toString).getOrElse("Unknown error deleting topic!")))))
+ -\/(ApiError(formWithErrors.error("topic").map(_.toString).getOrElse("Unknown error deleting topic!"))),
+ None))
),
deleteTopic => {
kafkaManager.deleteTopic(clusterName, deleteTopic.topic).map { errorOrSuccess =>
diff --git a/app/features/ApplicationFeature.scala b/app/features/ApplicationFeature.scala
index 019f4b818..dd5d45ba6 100644
--- a/app/features/ApplicationFeature.scala
+++ b/app/features/ApplicationFeature.scala
@@ -50,6 +50,13 @@ case class ApplicationFeatures(features: Set[ApplicationFeature])
object ApplicationFeatures {
import play.api.Play.current
+ private lazy val log = LoggerFactory.getLogger(classOf[ApplicationFeatures])
+
+ lazy val default : List[String] = List(
+ KMClusterManagerFeature,
+ KMTopicManagerFeature,
+ KMPreferredReplicaElectionFeature,
+ KMReassignPartitionsFeature).map(_.getClass.getSimpleName)
lazy val features = {
getApplicationFeatures(play.api.Play.configuration.underlying)
@@ -57,9 +64,13 @@ object ApplicationFeatures {
def getApplicationFeatures(config: Config) : ApplicationFeatures = {
import scala.collection.JavaConverters._
- val configFeatures: List[String] = config.getStringList("application.features").asScala.toList
+ val configFeatures: Option[List[String]] = Try(config.getStringList("application.features").asScala.toList).toOption
+
+ if(configFeatures.isEmpty) {
+ log.warn(s"application.features not found in conf file, using default values $default")
+ }
- val f = configFeatures.map(ApplicationFeature.from).flatten
+ val f = configFeatures.getOrElse(default).map(ApplicationFeature.from).flatten
ApplicationFeatures(f.toSet)
}
}
diff --git a/app/kafka/manager/ActorModel.scala b/app/kafka/manager/ActorModel.scala
index 731b39ef8..12ae1009e 100644
--- a/app/kafka/manager/ActorModel.scala
+++ b/app/kafka/manager/ActorModel.scala
@@ -8,12 +8,14 @@ package kafka.manager
import java.util.{Date, Properties}
import org.joda.time.DateTime
-import kafka.manager.utils.TopicAndPartition
+import kafka.common.TopicAndPartition
import org.slf4j.LoggerFactory
import scheduler.models.form.Failover
import scala.collection.immutable.Queue
-import scala.util.Try
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration.Duration
+import scala.util.{Try, Success, Failure}
import scalaz.{NonEmptyList, Validation}
/**
@@ -35,6 +37,8 @@ object ActorModel {
case object BVForceUpdate extends CommandRequest
case object BVGetTopicIdentities extends BVRequest
+ case object BVGetTopicConsumerMap extends BVRequest
+ case object BVGetConsumerIdentities extends BVRequest
case class BVGetView(id: Int) extends BVRequest
case object BVGetViews extends BVRequest
case class BVGetTopicMetrics(topic: String) extends BVRequest
@@ -53,7 +57,11 @@ object ActorModel {
case class CMGetTopicIdentity(topic: String) extends QueryRequest
case object CMGetClusterContext extends QueryRequest
case class CMView(topicsCount: Int, brokersCount: Int, clusterContext: ClusterContext) extends QueryResponse
+ case class CMGetConsumerIdentity(consumer: String) extends QueryRequest
+ case class CMGetConsumedTopicState(consumer: String, topic: String) extends QueryRequest
case class CMTopicIdentity(topicIdentity: Try[TopicIdentity]) extends QueryResponse
+ case class CMConsumerIdentity(consumerIdentity: Try[ConsumerIdentity]) extends QueryResponse
+ case class CMConsumedTopic(ctIdentity: Try[ConsumedTopicState]) extends QueryResponse
case object CMShutdown extends CommandRequest
case class CMCreateTopic(topic: String,
partitions: Int,
@@ -85,7 +93,9 @@ object ActorModel {
) extends CommandRequest
case class CMUpdateLogkafkaConfig(hostname: String,
log_path: String,
- config: Properties) extends CommandRequest
+ config: Properties,
+ checkConfig: Boolean = true
+ ) extends CommandRequest
case class CMDeleteLogkafka(hostname: String, log_path: String) extends CommandRequest
//##########
@@ -99,8 +109,8 @@ object ActorModel {
config: Properties) extends CommandRequest
case class KCAddTopicPartitions(topic: String,
brokers: Seq[Int],
- partitions: Int,
- partitionReplicaList: Map[Int, Seq[Int]],
+ partitions: Int,
+ partitionReplicaList: Map[Int, Seq[Int]],
readVersion: Int) extends CommandRequest
case class KCAddMultipleTopicsPartitions(topicsAndReplicas: Seq[(String, Map[Int, Seq[Int]])],
brokers: Seq[Int],
@@ -136,10 +146,15 @@ object ActorModel {
sealed trait KSRequest extends QueryRequest
case object KSGetTopics extends KSRequest
+ case object KSGetConsumers extends KSRequest
case class KSGetTopicConfig(topic: String) extends KSRequest
case class KSGetTopicDescription(topic: String) extends KSRequest
case class KSGetAllTopicDescriptions(lastUpdateMillis: Option[Long]= None) extends KSRequest
case class KSGetTopicDescriptions(topics: Set[String]) extends KSRequest
+ case class KSGetConsumerDescription(consumer: String) extends KSRequest
+ case class KSGetConsumedTopicDescription(consumer: String, topic: String) extends KSRequest
+ case class KSGetAllConsumerDescriptions(lastUpdateMillis: Option[Long]= None) extends KSRequest
+ case class KSGetConsumerDescriptions(consumers: Set[String]) extends KSRequest
case object KSGetTopicsLastUpdateMillis extends KSRequest
case object KSGetPreferredLeaderElection extends KSRequest
case object KSGetReassignPartition extends KSRequest
@@ -153,10 +168,12 @@ object ActorModel {
case class TopicList(list: IndexedSeq[String], deleteSet: Set[String], clusterContext: ClusterContext) extends QueryResponse
case class TopicConfig(topic: String, config: Option[(Int,String)]) extends QueryResponse
+ case class ConsumerList(list: IndexedSeq[String], clusterContext: ClusterContext) extends QueryResponse
case class TopicDescription(topic: String,
description: (Int,String),
partitionState: Option[Map[String, String]],
+ partitionOffsets: Future[PartitionOffsetsCapture],
config:Option[(Int,String)]) extends QueryResponse
case class TopicDescriptions(descriptions: IndexedSeq[TopicDescription], lastUpdateMillis: Long) extends QueryResponse
@@ -171,6 +188,16 @@ object ActorModel {
endTime: Option[DateTime],
clusterContext: ClusterContext) extends QueryResponse
+ case class ConsumedTopicDescription(consumer: String,
+ topic: String,
+ numPartitions: Int,
+ topicDescription: Option[TopicDescription],
+ partitionOwners: Option[Map[Int, String]],
+ partitionOffsets: Option[Map[Int, Long]])
+ case class ConsumerDescription(consumer: String,
+ topics: Map[String, ConsumedTopicDescription]) extends QueryResponse
+ case class ConsumerDescriptions(descriptions: IndexedSeq[ConsumerDescription], lastUpdateMillis: Long) extends QueryResponse
+
case object DCUpdateState extends CommandRequest
case class BrokerIdentity(id: Int, host: String, port: Int, jmxPort: Int)
@@ -191,17 +218,28 @@ object ActorModel {
}
}
- case class TopicPartitionIdentity(partNum: Int, leader:Int, isr: Seq[Int], replicas: Seq[Int], isPreferredLeader: Boolean = false, isUnderReplicated: Boolean = false)
+ case class TopicPartitionIdentity(partNum: Int,
+ leader: Int,
+ latestOffset: Option[Long],
+ rateOfChange: Option[Double],
+ isr: Seq[Int],
+ replicas: Seq[Int],
+ isPreferredLeader: Boolean = false,
+ isUnderReplicated: Boolean = false)
object TopicPartitionIdentity {
-
+
lazy val logger = LoggerFactory.getLogger(this.getClass)
-
+
import scalaz.syntax.applicative._
import org.json4s.jackson.JsonMethods._
import org.json4s.scalaz.JsonScalaz._
import scala.language.reflectiveCalls
- implicit def from(partition: Int, state:Option[String], replicas: Seq[Int]) : TopicPartitionIdentity = {
+ implicit def from(partition: Int,
+ state:Option[String],
+ offset: Option[Long],
+ rateOfChange: Option[Double],
+ replicas: Seq[Int]) : TopicPartitionIdentity = {
val leaderAndIsr = for {
json <- state
parsedJson = parse(json)
@@ -210,20 +248,49 @@ object ActorModel {
(leader: Int, isr: Seq[Int]) => leader -> isr
}
}
- val default = TopicPartitionIdentity(partition,-2,Seq.empty,replicas)
+ val default = TopicPartitionIdentity(partition,
+ -2,
+ offset,
+ rateOfChange,
+ Seq.empty,
+ replicas)
leaderAndIsr.fold(default) { parsedLeaderAndIsrOrError =>
parsedLeaderAndIsrOrError.fold({ e =>
logger.error(s"Failed to parse topic state $e")
default
}, {
case (leader, isr) =>
- TopicPartitionIdentity(partition, leader, isr, replicas, leader == replicas.head, isr.size != replicas.size)
+ TopicPartitionIdentity(partition, leader, offset, rateOfChange, isr, replicas, leader == replicas.head, isr.size != replicas.size)
})
}
}
}
case class BrokerTopicPartitions(id: Int, partitions: IndexedSeq[Int], isSkewed: Boolean)
+
+ case class PartitionOffsetsCapture(updateTimeMillis: Long, offsetsMap: Map[Int, Long])
+
+ object PartitionOffsetsCapture {
+ val ZERO : Option[Double] = Option(0D)
+
+ val EMPTY : PartitionOffsetsCapture = PartitionOffsetsCapture(0, Map.empty)
+
+ def getRate(part: Int, currentOffsets: PartitionOffsetsCapture, previousOffsets: PartitionOffsetsCapture): Option[Double] = {
+ val timeDiffMillis = currentOffsets.updateTimeMillis - previousOffsets.updateTimeMillis
+ val offsetDif = for {
+ currentOffset <- currentOffsets.offsetsMap.get(part)
+ previousOffset <- previousOffsets.offsetsMap.get(part)
+ } yield {
+ currentOffset - previousOffset
+ }
+ if(timeDiffMillis > 0) {
+ //multiply by 1000 since we have millis
+ offsetDif.map( od => od * 1000 * 1D / timeDiffMillis)
+ } else {
+ PartitionOffsetsCapture.ZERO
+ }
+ }
+ }
case class TopicIdentity(topic:String,
readVersion: Int,
@@ -251,6 +318,8 @@ object ActorModel {
}.toIndexedSeq.sortBy(_.id)
}
+ // a topic's log-size is the sum of its partitions' log-sizes, we take the sum of the ones we know the offset for.
+ val summedTopicOffsets : Long = partitionsIdentity.map(_._2.latestOffset).collect{case Some(offset) => offset}.sum
val preferredReplicasPercentage : Int = (100 * partitionsIdentity.count(_._2.isPreferredLeader)) / partitions
@@ -270,6 +339,7 @@ object ActorModel {
100 // everthing is spreaded if nothing has to be spreaded
}
+ val producerRate: String = BigDecimal(partitionsIdentity.map(_._2.rateOfChange.getOrElse(0D)).sum).setScale(2, BigDecimal.RoundingMode.HALF_UP).toString()
}
object TopicIdentity {
@@ -280,17 +350,69 @@ object ActorModel {
import org.json4s.scalaz.JsonScalaz._
import scala.language.reflectiveCalls
- implicit def from(brokers: Int,td: TopicDescription, tm: Option[BrokerMetrics], clusterContext: ClusterContext) : TopicIdentity = {
+ private[this] def getPartitionReplicaMap(td: TopicDescription) : Map[String, List[Int]] = {
+ // Get the topic description information
val descJson = parse(td.description._2)
- //val partMap = (descJson \ "partitions").as[Map[String,Seq[Int]]]
- val partMap = field[Map[String,List[Int]]]("partitions")(descJson).fold({ e =>
+ field[Map[String,List[Int]]]("partitions")(descJson).fold({ e =>
logger.error(s"[topic=${td.topic}] Failed to get partitions from topic json ${td.description._2}")
Map.empty
}, identity)
+ }
+
+ private[this] def getTopicPartitionIdentity(td: TopicDescription,
+ partMap: Map[String, List[Int]],
+ tdPrevious: Option[TopicDescription]) : Map[Int, TopicPartitionIdentity] = {
+
val stateMap = td.partitionState.getOrElse(Map.empty)
- val tpi : Map[Int,TopicPartitionIdentity] = partMap.map { case (part, replicas) =>
- (part.toInt,TopicPartitionIdentity.from(part.toInt,stateMap.get(part),replicas))
- }.toMap
+ // Assign the partition data to the TPI format
+ partMap.map { case (partition, replicas) =>
+ val partitionNum = partition.toInt
+ // block on the futures that hold the latest produced offset in each partition
+ val partitionOffsets: Option[PartitionOffsetsCapture] = Await.ready(td.partitionOffsets, Duration.Inf).value.get match {
+ case Success(offsetMap) =>
+ Option(offsetMap)
+ case Failure(e) =>
+ None
+ }
+
+ val previousPartitionOffsets: Option[PartitionOffsetsCapture] = tdPrevious.flatMap {
+ ptd => Await.ready(ptd.partitionOffsets, Duration.Inf).value.get match {
+ case Success(offsetMap) =>
+ Option(offsetMap)
+ case Failure(e) =>
+ None
+ }
+ }
+
+ val currentOffsetOption = partitionOffsets.flatMap(_.offsetsMap.get(partitionNum))
+ val rateOfChange = for {
+ currentOffsets <- partitionOffsets
+ previousOffsets <- previousPartitionOffsets
+ result <- PartitionOffsetsCapture.getRate(partitionNum, currentOffsets, previousOffsets)
+ } yield result
+
+ (partitionNum,TopicPartitionIdentity.from(partitionNum,
+ stateMap.get(partition),
+ currentOffsetOption,
+ rateOfChange,
+ replicas))
+ }
+ }
+
+ def getTopicPartitionIdentity(td: TopicDescription, tdPrevious: Option[TopicDescription]) : Map[Int, TopicPartitionIdentity] = {
+ // Get the topic description information
+ val partMap = getPartitionReplicaMap(td)
+
+ getTopicPartitionIdentity(td, partMap, tdPrevious)
+ }
+
+ implicit def from(brokers: Int,
+ td: TopicDescription,
+ tm: Option[BrokerMetrics],
+ clusterContext: ClusterContext, tdPrevious: Option[TopicDescription]) : TopicIdentity = {
+ // Get the topic description information
+ val partMap = getPartitionReplicaMap(td)
+ val tpi : Map[Int,TopicPartitionIdentity] = getTopicPartitionIdentity(td, partMap, tdPrevious)
val config : (Int,Map[String, String]) = {
try {
val resultOption: Option[(Int,Map[String, String])] = td.config.map { configString =>
@@ -311,8 +433,8 @@ object ActorModel {
TopicIdentity(td.topic,td.description._1,partMap.size,tpi,brokers,config._1,config._2.toList, clusterContext, tm)
}
- implicit def from(bl: BrokerList,td: TopicDescription, tm: Option[BrokerMetrics], clusterContext: ClusterContext) : TopicIdentity = {
- from(bl.list.size, td, tm, clusterContext)
+ implicit def from(bl: BrokerList,td: TopicDescription, tm: Option[BrokerMetrics], clusterContext: ClusterContext, tdPrevious: Option[TopicDescription]) : TopicIdentity = {
+ from(bl.list.size, td, tm, clusterContext, tdPrevious)
}
implicit def reassignReplicas(currentTopicIdentity: TopicIdentity,
@@ -322,7 +444,8 @@ object ActorModel {
val newReplicaSeq = assignedReplicas.get(part)
require(newReplicaSeq.isDefined, s"Missing replica assignment for partition $part for topic ${currentTopicIdentity.topic}")
val newReplicaSet = newReplicaSeq.get.toSet
- require(newReplicaSeq.get.size == newReplicaSet.size, s"Duplicates found in replica set ${newReplicaSeq.get} for partition $part for topic ${currentTopicIdentity.topic}")
+ require(newReplicaSeq.get.size == newReplicaSet.size,
+ s"Duplicates found in replica set ${newReplicaSeq.get} for partition $part for topic ${currentTopicIdentity.topic}")
(part,tpi.copy(replicas = newReplicaSeq.get))
}
TopicIdentity(
@@ -339,6 +462,79 @@ object ActorModel {
}
}
+ case class ConsumedTopicState(consumerGroup: String,
+ topic: String,
+ numPartitions: Int,
+ partitionLatestOffsets: Map[Int, Long],
+ partitionOwners: Map[Int, String],
+ partitionOffsets: Map[Int, Long],
+ clusterContext: ClusterContext) {
+ lazy val totalLag : Option[Long] = {
+ // only defined if every partition has a latest offset
+ if (partitionLatestOffsets.values.size == numPartitions && partitionLatestOffsets.size == numPartitions) {
+ Some(partitionLatestOffsets.values.sum - partitionOffsets.values.sum)
+ } else None
+ }
+ def topicOffsets(partitionNum: Int) : Option[Long] = partitionLatestOffsets.get(partitionNum)
+
+ def partitionLag(partitionNum: Int) : Option[Long] = {
+ topicOffsets(partitionNum).flatMap{topicOffset =>
+ partitionOffsets.get(partitionNum).map(topicOffset - _)}
+ }
+
+ // Percentage of the partitions that have an owner
+ def percentageCovered : Int =
+ if (numPartitions != 0) {
+ val numCovered = partitionOwners.size
+ 100 * numCovered / numPartitions
+ } else {
+ 100 // if there are no partitions to cover, they are all covered!
+ }
+ }
+
+ object ConsumedTopicState {
+ def from(ctd: ConsumedTopicDescription, clusterContext: ClusterContext): ConsumedTopicState = {
+ val partitionOffsetsMap = ctd.partitionOffsets.getOrElse(Map.empty)
+ val partitionOwnersMap = ctd.partitionOwners.getOrElse(Map.empty)
+ // block on the futures that hold the latest produced offset in each partition
+ val topicOffsetsOptMap: Map[Int, Long]= ctd.topicDescription.map{td: TopicDescription =>
+ Await.ready(td.partitionOffsets, Duration.Inf).value.get match {
+ case Success(offsetMap) =>
+ offsetMap.offsetsMap
+ case Failure(e) =>
+ Map.empty[Int, Long]
+ }}.getOrElse(Map.empty)
+
+ ConsumedTopicState(
+ ctd.consumer,
+ ctd.topic,
+ ctd.numPartitions,
+ topicOffsetsOptMap,
+ partitionOwnersMap,
+ partitionOffsetsMap,
+ clusterContext)
+ }
+ }
+
+ case class ConsumerIdentity(consumerGroup:String,
+ topicMap: Map[String, ConsumedTopicState],
+ clusterContext: ClusterContext)
+ object ConsumerIdentity {
+ lazy val logger = LoggerFactory.getLogger(this.getClass)
+ import scala.language.reflectiveCalls
+
+ implicit def from(cd: ConsumerDescription,
+ clusterContext: ClusterContext) : ConsumerIdentity = {
+ val topicMap: Seq[(String, ConsumedTopicState)] = for {
+ (topic, ctd) <- cd.topics.toSeq
+ cts = ConsumedTopicState.from(ctd, clusterContext)
+ } yield (topic, cts)
+ ConsumerIdentity(cd.consumer,
+ topicMap.toMap,
+ clusterContext)
+ }
+
+ }
case class BrokerMessagesPerSecCount(date: DateTime,
count: Long)
@@ -373,7 +569,7 @@ object ActorModel {
MeterMetric(0, 0, 0, 0, 0),
OSMetric(0D, 0D))
}
-
+
case class BrokerClusterStats(perMessages: BigDecimal, perIncoming: BigDecimal, perOutgoing: BigDecimal)
sealed trait LKVRequest extends QueryRequest
@@ -391,7 +587,9 @@ object ActorModel {
case class LKCUpdateLogkafkaConfig(hostname: String,
log_path: String,
config: Properties,
- logkafkaConfig: Option[LogkafkaConfig]) extends CommandRequest
+ logkafkaConfig: Option[LogkafkaConfig],
+ checkConfig: Boolean = true
+ ) extends CommandRequest
case class LKCCommandResult(result: Try[Unit]) extends CommandResponse
diff --git a/app/kafka/manager/BrokerViewCacheActor.scala b/app/kafka/manager/BrokerViewCacheActor.scala
index 0a5f86cfe..02106b0b0 100644
--- a/app/kafka/manager/BrokerViewCacheActor.scala
+++ b/app/kafka/manager/BrokerViewCacheActor.scala
@@ -32,8 +32,16 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
private[this] var topicIdentities : Map[String, TopicIdentity] = Map.empty
+ private[this] var previousTopicDescriptionsOption : Option[TopicDescriptions] = None
+
private[this] var topicDescriptionsOption : Option[TopicDescriptions] = None
+ private[this] var topicConsumerMap : Map[String, Iterable[String]] = Map.empty
+
+ private[this] var consumerIdentities : Map[String, ConsumerIdentity] = Map.empty
+
+ private[this] var consumerDescriptionsOption : Option[ConsumerDescriptions] = None
+
private[this] var brokerListOption : Option[BrokerList] = None
private[this] var brokerMetrics : Map[Int, BrokerMetrics] = Map.empty
@@ -98,15 +106,15 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
}
}
- private def allBrokerViews(): Seq[BVView] = {
- var bvs = mutable.MutableList[BVView]()
+ private def allBrokerViews(): Map[Int, BVView] = {
+ var bvs = mutable.Map[Int, BVView]()
for (key <- brokerTopicPartitions.keySet.toSeq.sorted) {
val bv = brokerTopicPartitions.get(key).map { bv => produceBViewWithBrokerClusterState(bv, key) }
if (bv.isDefined) {
- bvs += bv.get
+ bvs.put(key, bv.get)
}
}
- bvs.asInstanceOf[Seq[BVView]]
+ bvs.toMap
}
override def processActorRequest(request: ActorRequest): Unit = {
@@ -116,6 +124,7 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
//ask for topic descriptions
val lastUpdateMillisOption: Option[Long] = topicDescriptionsOption.map(_.lastUpdateMillis)
context.actorSelection(config.kafkaStateActorPath).tell(KSGetAllTopicDescriptions(lastUpdateMillisOption), self)
+ context.actorSelection(config.kafkaStateActorPath).tell(KSGetAllConsumerDescriptions(lastUpdateMillisOption), self)
context.actorSelection(config.kafkaStateActorPath).tell(KSGetBrokers, self)
case BVGetViews =>
@@ -136,6 +145,12 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
case BVGetTopicIdentities =>
sender ! topicIdentities
+ case BVGetTopicConsumerMap =>
+ sender ! topicConsumerMap
+
+ case BVGetConsumerIdentities =>
+ sender ! consumerIdentities
+
case BVUpdateTopicMetricsForBroker(id, metrics) =>
metrics.foreach {
case (topic, bm) =>
@@ -166,9 +181,14 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
override def processActorResponse(response: ActorResponse): Unit = {
response match {
case td: TopicDescriptions =>
+ previousTopicDescriptionsOption = topicDescriptionsOption
topicDescriptionsOption = Some(td)
updateView()
+ case cd: ConsumerDescriptions =>
+ consumerDescriptionsOption = Some(cd)
+ updateView()
+
case bl: BrokerList =>
brokerListOption = Some(bl)
updateView()
@@ -180,12 +200,21 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
implicit def queue2finitequeue[A](q: Queue[A]): FiniteQueue[A] = new FiniteQueue[A](q)
private[this] def updateView(): Unit = {
+ updateViewForBrokersAndTopics()
+ updateViewsForConsumers()
+ }
+
+ private[this] def updateViewForBrokersAndTopics(): Unit = {
for {
brokerList <- brokerListOption
topicDescriptions <- topicDescriptionsOption
+ previousDescriptionsMap: Option[Map[String, TopicDescription]] = previousTopicDescriptionsOption.map(_.descriptions.map(td => (td.topic, td)).toMap)
} {
- val topicIdentity : IndexedSeq[TopicIdentity] = topicDescriptions.descriptions.map(
- TopicIdentity.from(brokerList.list.size,_,None, config.clusterContext))
+ val topicIdentity : IndexedSeq[TopicIdentity] = topicDescriptions.descriptions.map {
+ tdCurrent =>
+ TopicIdentity.from(brokerList.list.size,tdCurrent,None, config.clusterContext, previousDescriptionsMap.flatMap(_.get(tdCurrent.topic)))
+
+ }
topicIdentities = topicIdentity.map(ti => (ti.topic, ti)).toMap
val topicPartitionByBroker = topicIdentity.flatMap(
ti => ti.partitionsByBroker.map(btp => (ti,btp.id,btp.partitions))).groupBy(_._2)
@@ -258,4 +287,18 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
}
}
}
+
+ private[this] def updateViewsForConsumers(): Unit = {
+ for {
+ consumerDescriptions <- consumerDescriptionsOption
+ } {
+ val consumerIdentity : IndexedSeq[ConsumerIdentity] = consumerDescriptions.descriptions.map(
+ ConsumerIdentity.from(_, config.clusterContext))
+ consumerIdentities = consumerIdentity.map(ci => (ci.consumerGroup, ci)).toMap
+
+ val c2tMap = consumerDescriptions.descriptions.map{cd: ConsumerDescription =>
+ (cd.consumer, cd.topics.keys.toList)}.toMap
+ topicConsumerMap = c2tMap.values.flatten.map(v => (v, c2tMap.keys.filter(c2tMap(_).contains(v)))).toMap
+ }
+ }
}
diff --git a/app/kafka/manager/ClusterManagerActor.scala b/app/kafka/manager/ClusterManagerActor.scala
index be7cf9072..678b1751c 100644
--- a/app/kafka/manager/ClusterManagerActor.scala
+++ b/app/kafka/manager/ClusterManagerActor.scala
@@ -17,7 +17,8 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCache
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex
import org.apache.zookeeper.CreateMode
-import kafka.manager.utils.{AdminUtils, TopicAndPartition}
+import kafka.common.TopicAndPartition
+import kafka.manager.utils.AdminUtils
import scala.collection.immutable
import scala.concurrent.{ExecutionContext, Future}
@@ -55,7 +56,9 @@ case class ClusterManagerActorConfig(pinnedDispatcherName: String,
threadPoolSize: Int = 2,
maxQueueSize: Int = 100,
askTimeoutMillis: Long = 2000,
- mutexTimeoutMillis: Int = 4000)
+ mutexTimeoutMillis: Int = 4000,
+ partitionOffsetCacheTimeoutSecs : Int = 5,
+ simpleConsumerSocketTimeoutMillis: Int = 10000)
class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
extends BaseQueryCommandActor with CuratorAwareActor with BaseZkPath {
@@ -90,7 +93,13 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
private[this] val adminUtils = new AdminUtils(cmConfig.clusterConfig.version)
- private[this] val ksProps = Props(classOf[KafkaStateActor],sharedClusterCurator, clusterContext)
+ private[this] val ksConfig = KafkaStateActorConfig(
+ sharedClusterCurator,
+ clusterContext,
+ LongRunningPoolConfig(Runtime.getRuntime.availableProcessors(), 1000),
+ cmConfig.partitionOffsetCacheTimeoutSecs,
+ cmConfig.simpleConsumerSocketTimeoutMillis)
+ private[this] val ksProps = Props(classOf[KafkaStateActor],ksConfig)
private[this] val kafkaStateActor : ActorPath = context.actorOf(ksProps.withDispatcher(cmConfig.pinnedDispatcherName),"kafka-state").path
private[this] val bvConfig = BrokerViewCacheActorConfig(
@@ -244,7 +253,7 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
bl <- eventualBrokerList
tm <- eventualTopicMetrics
tdO <- eventualTopicDescription
- } yield tdO.map( td => CMTopicIdentity(Try(TopicIdentity.from(bl,td,tm,clusterContext))))
+ } yield tdO.map( td => CMTopicIdentity(Try(TopicIdentity.from(bl,td,tm,clusterContext,None))))
result pipeTo sender
case CMGetLogkafkaIdentity(hostname) =>
@@ -257,6 +266,25 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
} yield Some(CMLogkafkaIdentity(Try(LogkafkaIdentity.from(hostname,lcg,lct))))
result pipeTo sender
+ case CMGetConsumerIdentity(consumer) =>
+ implicit val ec = context.dispatcher
+ val eventualConsumerDescription = withKafkaStateActor(KSGetConsumerDescription(consumer))(identity[Option[ConsumerDescription]])
+ val result: Future[Option[CMConsumerIdentity]] = for {
+ cdO <- eventualConsumerDescription
+ ciO = cdO.map( cd => CMConsumerIdentity(Try(ConsumerIdentity.from(cd,clusterContext))))
+ } yield ciO
+ result pipeTo sender
+
+ case CMGetConsumedTopicState(consumer, topic) =>
+ implicit val ec = context.dispatcher
+ val eventualConsumedTopicDescription = withKafkaStateActor(
+ KSGetConsumedTopicDescription(consumer,topic)
+ )(identity[ConsumedTopicDescription])
+ val result: Future[CMConsumedTopic] = eventualConsumedTopicDescription.map{
+ ctd: ConsumedTopicDescription => CMConsumedTopic(Try(ConsumedTopicState.from(ctd, clusterContext)))
+ }
+ result pipeTo sender
+
case any: Any => log.warning("cma : processQueryResponse : Received unknown message: {}", any)
}
}
@@ -377,29 +405,31 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
case CMGeneratePartitionAssignments(topics, brokers) =>
implicit val ec = longRunningExecutionContext
- val eventualBrokerList = withKafkaStateActor(KSGetBrokers)(identity[BrokerList])
- val eventualDescriptions = withKafkaStateActor(KSGetTopicDescriptions(topics))(identity[TopicDescriptions])
- val eventualReassignPartitions = withKafkaStateActor(KSGetReassignPartition)(identity[Option[ReassignPartitions]])
- val generated: Future[IndexedSeq[(String, Map[Int, Seq[Int]])]] = for {
- bl <- eventualBrokerList
- tds <- eventualDescriptions
- rp <- eventualReassignPartitions
- tis = tds.descriptions.map(TopicIdentity.from(bl, _, None,clusterContext))
- } yield {
- bl.list.map(_.id.toInt)
- // check if any topic undergoing reassignment got selected for reassignment
- val topicsUndergoingReassignment = getTopicsUnderReassignment(rp, topics)
- require(topicsUndergoingReassignment.isEmpty, "Topic(s) already undergoing reassignment(s): [%s]".format(topicsUndergoingReassignment.mkString(", ")))
- // check if any nonexistent broker got selected for reassignment
- val nonExistentBrokers = getNonExistentBrokers(bl, brokers)
- require(nonExistentBrokers.isEmpty, "Nonexistent broker(s) selected: [%s]".format(nonExistentBrokers.mkString(", ")))
- tis.map(ti => (ti.topic, adminUtils.assignReplicasToBrokers(
- brokers,
- ti.partitions,
- ti.replicationFactor)))
+ val topicCheckFutureBefore = checkTopicsUnderAssignment(topics)
+
+ val generated: Future[IndexedSeq[(String, Map[Int, Seq[Int]])]] = topicCheckFutureBefore.flatMap { _ =>
+ val eventualBrokerList = withKafkaStateActor(KSGetBrokers)(identity[BrokerList])
+ val eventualDescriptions = withKafkaStateActor(KSGetTopicDescriptions(topics))(identity[TopicDescriptions])
+ for {
+ bl <- eventualBrokerList
+ tds <- eventualDescriptions
+ tis = tds.descriptions.map(TopicIdentity.from(bl, _, None,clusterContext, None))
+ } yield {
+ bl.list.map(_.id.toInt)
+ // check if any nonexistent broker got selected for reassignment
+ val nonExistentBrokers = getNonExistentBrokers(bl, brokers)
+ require(nonExistentBrokers.isEmpty, "Nonexistent broker(s) selected: [%s]".format(nonExistentBrokers.mkString(", ")))
+ tis.map(ti => (ti.topic, adminUtils.assignReplicasToBrokers(
+ brokers,
+ ti.partitions,
+ ti.replicationFactor)))
+ }
}
- val result: Future[IndexedSeq[Try[Unit]]] = generated.map { list =>
+ val result: Future[IndexedSeq[Try[Unit]]] = for {
+ list <- generated
+ _ <- checkTopicsUnderAssignment(topics) //check again
+ } yield {
modify {
list.map { case (topic, assignments: Map[Int, Seq[Int]]) =>
updateAssignmentInZk(topic, assignments)
@@ -426,7 +456,7 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
val preferredLeaderElections = for {
bl <- eventualBrokerList
tds <- eventualDescriptions
- tis = tds.descriptions.map(TopicIdentity.from(bl, _, None, clusterContext))
+ tis = tds.descriptions.map(TopicIdentity.from(bl, _, None, clusterContext, None))
toElect = tis.map(ti => ti.partitionsIdentity.values.filter(!_.isPreferredLeader).map(tpi => TopicAndPartition(ti.topic, tpi.partNum))).flatten.toSet
} yield toElect
preferredLeaderElections.map { toElect =>
@@ -442,7 +472,7 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
val topicsAndReassignments = for {
bl <- eventualBrokerList
tds <- eventualDescriptions
- tis = tds.descriptions.map(TopicIdentity.from(bl, _, None, clusterContext))
+ tis = tds.descriptions.map(TopicIdentity.from(bl, _, None, clusterContext, None))
} yield {
val reassignments = tis.map { ti =>
val topicZkPath = zkPathFrom(baseTopicsZkPath, ti.topic)
@@ -493,11 +523,11 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
}
} pipeTo sender()
- case CMUpdateLogkafkaConfig(hostname, log_path, config) =>
+ case CMUpdateLogkafkaConfig(hostname, log_path, config, checkConfig) =>
implicit val ec = longRunningExecutionContext
val eventualLogkafkaConfig = withLogkafkaStateActor(LKSGetLogkafkaConfig(hostname))(identity[Option[LogkafkaConfig]])
eventualLogkafkaConfig.map { logkafkaConfigOption =>
- withLogkafkaCommandActor(LKCUpdateLogkafkaConfig(hostname, log_path, config, logkafkaConfigOption)) {
+ withLogkafkaCommandActor(LKCUpdateLogkafkaConfig(hostname, log_path, config, logkafkaConfigOption, checkConfig)) {
lkcResponse: LKCCommandResult =>
CMCommandResult(lkcResponse.result)
}
@@ -548,17 +578,17 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
}
}
- def getNonExistentBrokers(availableBrokers: BrokerList, selectedBrokers: Seq[Int]): Seq[Int] = {
+ private[this] def getNonExistentBrokers(availableBrokers: BrokerList, selectedBrokers: Seq[Int]): Seq[Int] = {
val availableBrokerIds: Set[Int] = availableBrokers.list.map(_.id.toInt).toSet
selectedBrokers filter { b: Int => !availableBrokerIds.contains(b) }
}
- def getNonExistentBrokers(availableBrokers: BrokerList, assignments: Map[Int, Seq[Int]]): Seq[Int] = {
+ private[this] def getNonExistentBrokers(availableBrokers: BrokerList, assignments: Map[Int, Seq[Int]]): Seq[Int] = {
val brokersAssigned = assignments.flatMap({ case (pt, bl) => bl }).toSet.toSeq
getNonExistentBrokers(availableBrokers, brokersAssigned)
}
- def getTopicsUnderReassignment(reassignPartitions: Option[ReassignPartitions], topicsToBeReassigned: Set[String]): Set[String] = {
+ private[this] def getTopicsUnderReassignment(reassignPartitions: Option[ReassignPartitions], topicsToBeReassigned: Set[String]): Set[String] = {
val topicsUnderReassignment = reassignPartitions.map { asgn =>
asgn.endTime.map(_ => Set[String]()).getOrElse{
asgn.partitionsToBeReassigned.map { case (t,s) => t.topic}.toSet
@@ -566,4 +596,16 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
}.getOrElse(Set[String]())
topicsToBeReassigned.intersect(topicsUnderReassignment)
}
+
+ private[this] def checkTopicsUnderAssignment(topicsToBeReassigned: Set[String])(implicit ec: ExecutionContext) : Future[Unit] = {
+ val eventualReassignPartitions = withKafkaStateActor(KSGetReassignPartition)(identity[Option[ReassignPartitions]])
+ for {
+ rp <- eventualReassignPartitions
+ } yield {
+ // check if any topic undergoing reassignment got selected for reassignment
+ val topicsUndergoingReassignment = getTopicsUnderReassignment(rp, topicsToBeReassigned)
+ require(topicsUndergoingReassignment.isEmpty, "Topic(s) already undergoing reassignment(s): [%s]"
+ .format(topicsUndergoingReassignment.mkString(", ")))
+ }
+ }
}
diff --git a/app/kafka/manager/KafkaManager.scala b/app/kafka/manager/KafkaManager.scala
index f695f8d2f..571a242d6 100644
--- a/app/kafka/manager/KafkaManager.scala
+++ b/app/kafka/manager/KafkaManager.scala
@@ -23,10 +23,19 @@ import scala.util.{Success, Failure, Try}
/**
* @author hiral
*/
-case class TopicListExtended(list: IndexedSeq[(String, Option[TopicIdentity])], deleteSet: Set[String], underReassignments: IndexedSeq[String], clusterContext: ClusterContext)
-case class BrokerListExtended(list: IndexedSeq[BrokerIdentity], metrics: Map[Int,BrokerMetrics], combinedMetric: Option[BrokerMetrics], clusterContext: ClusterContext)
+case class TopicListExtended(list: IndexedSeq[(String, Option[TopicIdentity])],
+ topicToConsumerMap: Map[String, Iterable[String]],
+ deleteSet: Set[String],
+ underReassignments: IndexedSeq[String],
+ clusterContext: ClusterContext)
+case class BrokerListExtended(list: IndexedSeq[BrokerIdentity],
+ metrics: Map[Int,BrokerMetrics],
+ combinedMetric: Option[BrokerMetrics],
+ clusterContext: ClusterContext)
+case class ConsumerListExtended(list: IndexedSeq[(String, Option[ConsumerIdentity])], clusterContext: ClusterContext)
case class LogkafkaListExtended(list: IndexedSeq[(String, Option[LogkafkaIdentity])], deleteSet: Set[String])
case class SchedulerBrokerListExtended(list: Seq[SchedulerBrokerIdentity], metrics: Map[Int,BrokerMetrics], combinedMetric: Option[BrokerMetrics], schedulerConfig: SchedulerConfig)
+
case class ApiError(msg: String)
object ApiError {
private[this] val log : Logger = LoggerFactory.getLogger(classOf[ApiError])
@@ -58,6 +67,9 @@ object KafkaManager {
val MutexTimeoutMillis = "kafka-manager.mutex-timeout-millis"
val StartDelayMillis = "kafka-manager.start-delay-millis"
val ApiTimeoutMillis = "kafka-manager.api-timeout-millis"
+ val ClusterActorsAskTimeoutMillis = "kafka-manager.cluster-actors-ask-timeout-millis"
+ val PartitionOffsetCacheTimeoutSecs = "kafka-manager.partition-offset-cache-timeout-secs"
+ val SimpleConsumerSocketTimeoutMillis = "kafka-manager.simple-consumer-socket-timeout-millis"
val DefaultConfig: Config = {
val defaults: Map[String, _ <: AnyRef] = Map(
@@ -71,7 +83,10 @@ object KafkaManager {
ThreadPoolSize -> "2",
MutexTimeoutMillis -> "4000",
StartDelayMillis -> "1000",
- ApiTimeoutMillis -> "5000"
+ ApiTimeoutMillis -> "5000",
+ ClusterActorsAskTimeoutMillis -> "2000",
+ PartitionOffsetCacheTimeoutSecs -> "5",
+ SimpleConsumerSocketTimeoutMillis -> "10000"
)
import scala.collection.JavaConverters._
ConfigFactory.parseMap(defaults.asJava)
@@ -99,7 +114,10 @@ class KafkaManager(akkaConfig: Config)
maxQueueSize = configWithDefaults.getInt(MaxQueueSize),
kafkaManagerUpdatePeriod = FiniteDuration(configWithDefaults.getInt(KafkaManagerUpdateSeconds), SECONDS),
deleteClusterUpdatePeriod = FiniteDuration(configWithDefaults.getInt(DeleteClusterUpdateSeconds), SECONDS),
- deletionBatchSize = configWithDefaults.getInt(DeletionBatchSize)
+ deletionBatchSize = configWithDefaults.getInt(DeletionBatchSize),
+ clusterActorsAskTimeoutMillis = configWithDefaults.getInt(ClusterActorsAskTimeoutMillis),
+ partitionOffsetCacheTimeoutSecs = configWithDefaults.getInt(PartitionOffsetCacheTimeoutSecs),
+ simpleConsumerSocketTimeoutMillis = configWithDefaults.getInt(SimpleConsumerSocketTimeoutMillis)
)
}
@@ -172,19 +190,39 @@ class KafkaManager(akkaConfig: Config)
}
//--------------------Commands--------------------------
- def addCluster(clusterName: String, version: String, zkHosts: String, jmxEnabled: Boolean, logkafkaEnabled: Boolean = false): Future[ApiError \/
+ def addCluster(clusterName: String,
+ version: String,
+ zkHosts: String,
+ jmxEnabled: Boolean,
+ filterConsumers: Boolean,
+ logkafkaEnabled: Boolean = false,
+ activeOffsetCacheEnabled: Boolean = false): Future[ApiError \/
Unit] =
{
- val cc = ClusterConfig(clusterName, version, zkHosts, jmxEnabled = jmxEnabled, logkafkaEnabled = logkafkaEnabled)
+ val cc = ClusterConfig(
+ clusterName,
+ version,
+ zkHosts,
+ jmxEnabled = jmxEnabled,
+ filterConsumers = filterConsumers,
+ logkafkaEnabled = logkafkaEnabled,
+ activeOffsetCacheEnabled = activeOffsetCacheEnabled)
tryWithKafkaManagerActor(KMAddCluster(cc)) { result: KMCommandResult =>
result.result.get
}
}
- def updateCluster(clusterName: String, version: String, zkHosts: String, jmxEnabled: Boolean, logkafkaEnabled: Boolean = false): Future[ApiError \/
+ def updateCluster(clusterName: String, version: String, zkHosts: String, jmxEnabled: Boolean, filterConsumers: Boolean, logkafkaEnabled: Boolean = false, activeOffsetCacheEnabled: Boolean = false): Future[ApiError \/
Unit] =
{
- val cc = ClusterConfig(clusterName, version, zkHosts, jmxEnabled = jmxEnabled, logkafkaEnabled = logkafkaEnabled)
+ val cc = ClusterConfig(
+ clusterName,
+ version,
+ zkHosts,
+ jmxEnabled = jmxEnabled,
+ filterConsumers = filterConsumers,
+ logkafkaEnabled = logkafkaEnabled,
+ activeOffsetCacheEnabled = activeOffsetCacheEnabled)
tryWithKafkaManagerActor(KMUpdateCluster(cc)) { result: KMCommandResult =>
result.result.get
}
@@ -404,14 +442,15 @@ class KafkaManager(akkaConfig: Config)
clusterName: String,
hostname: String,
log_path: String,
- config: Properties
+ config: Properties,
+ checkConfig: Boolean = true
): Future[ApiError \/ ClusterContext] =
{
implicit val ec = apiExecutionContext
withKafkaManagerActor(
KMClusterCommandRequest(
clusterName,
- CMUpdateLogkafkaConfig(hostname, log_path, config)
+ CMUpdateLogkafkaConfig(hostname, log_path, config, checkConfig)
)
) {
result: Future[CMCommandResult] =>
@@ -458,21 +497,44 @@ class KafkaManager(akkaConfig: Config)
}
def getTopicListExtended(clusterName: String): Future[ApiError \/ TopicListExtended] = {
- val futureTopicIdentities = tryWithKafkaManagerActor(KMClusterQueryRequest(clusterName, BVGetTopicIdentities))(identity[Map[String, TopicIdentity]])
+ val futureTopicIdentities = tryWithKafkaManagerActor(KMClusterQueryRequest(clusterName, BVGetTopicIdentities))(
+ identity[Map[String, TopicIdentity]])
val futureTopicList = tryWithKafkaManagerActor(KMClusterQueryRequest(clusterName, KSGetTopics))(identity[TopicList])
+ val futureTopicToConsumerMap = tryWithKafkaManagerActor(KMClusterQueryRequest(clusterName, BVGetTopicConsumerMap))(
+ identity[Map[String, Iterable[String]]])
val futureTopicsReasgn = getTopicsUnderReassignment(clusterName)
implicit val ec = apiExecutionContext
for {
errOrTi <- futureTopicIdentities
errOrTl <- futureTopicList
+ errOrTCm <- futureTopicToConsumerMap
errOrRap <- futureTopicsReasgn
} yield {
for {
ti <- errOrTi
tl <- errOrTl
+ tcm <- errOrTCm
rap <- errOrRap
} yield {
- TopicListExtended(tl.list.map(t => (t, ti.get(t))).sortBy(_._1), tl.deleteSet, rap, tl.clusterContext)
+ TopicListExtended(tl.list.map(t => (t, ti.get(t))).sortBy(_._1), tcm, tl.deleteSet, rap, tl.clusterContext)
+ }
+ }
+ }
+
+ def getConsumerListExtended(clusterName: String): Future[ApiError \/ ConsumerListExtended] = {
+ val futureConsumerIdentities = tryWithKafkaManagerActor(KMClusterQueryRequest(clusterName, BVGetConsumerIdentities))(
+ identity[Map[String, ConsumerIdentity]])
+ val futureConsumerList = tryWithKafkaManagerActor(KMClusterQueryRequest(clusterName, KSGetConsumers))(identity[ConsumerList])
+ implicit val ec = apiExecutionContext
+ for {
+ errorOrCI <- futureConsumerIdentities
+ errorOrCL <- futureConsumerList
+ } yield {
+ for {
+ ci <- errorOrCI
+ cl <- errorOrCL
+ } yield {
+ ConsumerListExtended(cl.list.map(c => (c, ci.get(c))), cl.clusterContext)
}
}
}
@@ -517,7 +579,7 @@ class KafkaManager(akkaConfig: Config)
}
}
- def getBrokersView(clusterName: String): Future[\/[ApiError, Seq[BVView]]] = {
+ def getBrokersView(clusterName: String): Future[\/[ApiError, Map[Int, BVView]]] = {
implicit val ec = apiExecutionContext
tryWithKafkaManagerActor(
@@ -525,7 +587,7 @@ class KafkaManager(akkaConfig: Config)
clusterName,
BVGetViews
)
- )(identity[Seq[BVView]])
+ )(identity[Map[Int, BVView]])
}
def getBrokerView(clusterName: String, brokerId: Int): Future[ApiError \/ BVView] = {
@@ -576,6 +638,61 @@ class KafkaManager(akkaConfig: Config)
}
}
+ def getConsumersForTopic(clusterName: String, topic: String): Future[Option[Iterable[String]]] = {
+ val futureTopicConsumerMap = tryWithKafkaManagerActor(KMClusterQueryRequest(clusterName, BVGetTopicConsumerMap))(
+ identity[Map[String, Iterable[String]]])
+ implicit val ec = apiExecutionContext
+ futureTopicConsumerMap.map[Option[Iterable[String]]] { errOrTCM =>
+ errOrTCM.fold[Option[Iterable[String]]] (_ => None, _.get(topic))
+ }
+ }
+
+ def getConsumerIdentity(clusterName: String, consumer: String): Future[ApiError \/ ConsumerIdentity] = {
+ val futureCMConsumerIdentity = tryWithKafkaManagerActor(KMClusterQueryRequest(clusterName, CMGetConsumerIdentity(consumer)))(
+ identity[Option[CMConsumerIdentity]]
+ )
+ implicit val ec = apiExecutionContext
+ futureCMConsumerIdentity.map[ApiError \/ ConsumerIdentity] { errOrCI =>
+ errOrCI.fold[ApiError \/ ConsumerIdentity](
+ { err: ApiError =>
+ -\/[ApiError](err)
+ }, { ciOption: Option[CMConsumerIdentity] =>
+ ciOption.fold[ApiError \/ ConsumerIdentity] {
+ -\/(ApiError(s"Consumer not found $consumer for cluster $clusterName"))
+ } { cmConsumerIdentity =>
+ cmConsumerIdentity.consumerIdentity match {
+ case scala.util.Failure(c) =>
+ -\/[ApiError](c)
+ case scala.util.Success(ci) =>
+ \/-(ci)
+ }
+ }
+ }
+ )
+ }
+ }
+
+ def getConsumedTopicState(clusterName: String, consumer: String, topic: String): Future[ApiError \/ ConsumedTopicState] = {
+ val futureCMConsumedTopic = tryWithKafkaManagerActor(KMClusterQueryRequest(clusterName, CMGetConsumedTopicState(consumer,topic)))(
+ identity[CMConsumedTopic]
+ )
+ implicit val ec = apiExecutionContext
+ futureCMConsumedTopic.map[ApiError \/ ConsumedTopicState] { errOrCT =>
+ errOrCT.fold[ApiError \/ ConsumedTopicState](
+ { err: ApiError =>
+ -\/[ApiError](err)
+ }, { cmConsumedTopic: CMConsumedTopic =>
+ cmConsumedTopic.ctIdentity match {
+ case scala.util.Failure(c) =>
+ -\/[ApiError](c)
+ case scala.util.Success(ci) =>
+ \/-(ci)
+ }
+ }
+ )
+ }
+ }
+
def getTopicMetrics(clusterName: String, topic: String): Future[ApiError \/ Option[BrokerMetrics]] = {
tryWithKafkaManagerActor(
KMClusterQueryRequest(
diff --git a/app/kafka/manager/KafkaManagerActor.scala b/app/kafka/manager/KafkaManagerActor.scala
index 3d181ec84..d8f3a5730 100644
--- a/app/kafka/manager/KafkaManagerActor.scala
+++ b/app/kafka/manager/KafkaManagerActor.scala
@@ -83,18 +83,33 @@ object ClusterConfig {
require(zkHosts.length > 0, "cluster zk hosts is illegal, can't be empty!")
}
- def apply(name: String, version : String, zkHosts: String, zkMaxRetry: Int = 100, jmxEnabled: Boolean, logkafkaEnabled: Boolean = false) : ClusterConfig = {
+ def apply(name: String,
+ version : String,
+ zkHosts: String,
+ zkMaxRetry: Int = 100,
+ jmxEnabled: Boolean,
+ filterConsumers: Boolean,
+ logkafkaEnabled: Boolean = false,
+ activeOffsetCacheEnabled: Boolean = false) : ClusterConfig = {
val kafkaVersion = KafkaVersion(version)
//validate cluster name
validateName(name)
//validate zk hosts
validateZkHosts(zkHosts)
val cleanZkHosts = zkHosts.replaceAll(" ","")
- new ClusterConfig(name, CuratorConfig(cleanZkHosts, zkMaxRetry), true, kafkaVersion, jmxEnabled, logkafkaEnabled)
+ new ClusterConfig(
+ name,
+ CuratorConfig(cleanZkHosts, zkMaxRetry),
+ true,
+ kafkaVersion,
+ jmxEnabled,
+ filterConsumers,
+ logkafkaEnabled,
+ activeOffsetCacheEnabled)
}
- def customUnapply(cc: ClusterConfig) : Option[(String, String, String, Int, Boolean, Boolean)] = {
- Some((cc.name, cc.version.toString, cc.curatorConfig.zkConnect, cc.curatorConfig.zkMaxRetry, cc.jmxEnabled, cc.logkafkaEnabled))
+ def customUnapply(cc: ClusterConfig) : Option[(String, String, String, Int, Boolean, Boolean, Boolean, Boolean)] = {
+ Some((cc.name, cc.version.toString, cc.curatorConfig.zkConnect, cc.curatorConfig.zkMaxRetry, cc.jmxEnabled, cc.filterConsumers, cc.logkafkaEnabled, cc.activeOffsetCacheEnabled))
}
import scalaz.{Failure,Success}
@@ -125,7 +140,9 @@ object ClusterConfig {
:: ("enabled" -> toJSON(config.enabled))
:: ("kafkaVersion" -> toJSON(config.version.toString))
:: ("jmxEnabled" -> toJSON(config.jmxEnabled))
+ :: ("filterConsumers" -> toJSON(config.filterConsumers))
:: ("logkafkaEnabled" -> toJSON(config.logkafkaEnabled))
+ :: ("activeOffsetCacheEnabled" -> toJSON(config.activeOffsetCacheEnabled))
:: Nil)
compact(render(json)).getBytes(StandardCharsets.UTF_8)
}
@@ -140,8 +157,17 @@ object ClusterConfig {
val versionString = field[String]("kafkaVersion")(json)
val version = versionString.map(KafkaVersion.apply).getOrElse(Kafka_0_8_1_1)
val jmxEnabled = field[Boolean]("jmxEnabled")(json)
+ val filterConsumers = field[Boolean]("filterConsumers")(json)
val logkafkaEnabled = field[Boolean]("logkafkaEnabled")(json)
- ClusterConfig.apply(name,curatorConfig,enabled,version,jmxEnabled.getOrElse(false),logkafkaEnabled.getOrElse(false))
+ val activeOffsetCacheEnabled = field[Boolean]("activeOffsetCacheEnabled")(json)
+ ClusterConfig.apply(
+ name,
+ curatorConfig,
+ enabled,version,
+ jmxEnabled.getOrElse(false),
+ filterConsumers.getOrElse(true),
+ logkafkaEnabled.getOrElse(false),
+ activeOffsetCacheEnabled.getOrElse(false))
}
result match {
@@ -157,7 +183,14 @@ object ClusterConfig {
}
case class ClusterContext(clusterFeatures: ClusterFeatures, config: ClusterConfig)
-case class ClusterConfig (name: String, curatorConfig : CuratorConfig, enabled: Boolean, version: KafkaVersion, jmxEnabled: Boolean, logkafkaEnabled: Boolean)
+case class ClusterConfig (name: String,
+ curatorConfig : CuratorConfig,
+ enabled: Boolean,
+ version: KafkaVersion,
+ jmxEnabled: Boolean,
+ filterConsumers: Boolean,
+ logkafkaEnabled: Boolean,
+ activeOffsetCacheEnabled: Boolean)
object SchedulerConfig {
@@ -255,7 +288,11 @@ case class KafkaManagerActorConfig(curatorConfig: CuratorConfig,
maxQueueSize: Int = 100,
kafkaManagerUpdatePeriod: FiniteDuration = 10 seconds,
deleteClusterUpdatePeriod: FiniteDuration = 10 seconds,
- deletionBatchSize : Int = 2)
+ deletionBatchSize : Int = 2,
+ clusterActorsAskTimeoutMillis: Int = 2000,
+ partitionOffsetCacheTimeoutSecs: Int = 5,
+ simpleConsumerSocketTimeoutMillis : Int = 10000
+ )
class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig)
extends BaseQueryCommandActor with CuratorAwareActor with BaseZkPath {
@@ -659,7 +696,13 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig)
getClusterZkPath(config),
kafkaManagerConfig.curatorConfig,
config,
- kafkaManagerConfig.brokerViewUpdatePeriod)
+ kafkaManagerConfig.brokerViewUpdatePeriod,
+ threadPoolSize = kafkaManagerConfig.threadPoolSize,
+ maxQueueSize = kafkaManagerConfig.maxQueueSize,
+ askTimeoutMillis = kafkaManagerConfig.clusterActorsAskTimeoutMillis,
+ mutexTimeoutMillis = kafkaManagerConfig.mutexTimeoutMillis,
+ partitionOffsetCacheTimeoutSecs = kafkaManagerConfig.partitionOffsetCacheTimeoutSecs,
+ simpleConsumerSocketTimeoutMillis = kafkaManagerConfig.simpleConsumerSocketTimeoutMillis)
val props = Props(classOf[ClusterManagerActor], clusterManagerConfig)
val newClusterManager = context.actorOf(props, config.name).path
clusterConfigMap += (config.name -> config)
@@ -701,7 +744,9 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig)
&& newConfig.enabled == currentConfig.enabled
&& newConfig.version == currentConfig.version
&& newConfig.jmxEnabled == currentConfig.jmxEnabled
- && newConfig.logkafkaEnabled == currentConfig.logkafkaEnabled) {
+ && newConfig.logkafkaEnabled == currentConfig.logkafkaEnabled
+ && newConfig.filterConsumers == currentConfig.filterConsumers
+ && newConfig.activeOffsetCacheEnabled == currentConfig.activeOffsetCacheEnabled) {
//nothing changed
false
} else {
diff --git a/app/kafka/manager/KafkaStateActor.scala b/app/kafka/manager/KafkaStateActor.scala
index 90d09ce13..d8e6d1164 100644
--- a/app/kafka/manager/KafkaStateActor.scala
+++ b/app/kafka/manager/KafkaStateActor.scala
@@ -5,15 +5,24 @@
package kafka.manager
+import akka.pattern._
import kafka.manager.features.KMDeleteTopicFeature
+import java.util.concurrent.TimeUnit
+
+import com.google.common.cache.{LoadingCache, CacheLoader, CacheBuilder}
+import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
+import kafka.consumer.SimpleConsumer
+import kafka.common.TopicAndPartition
import kafka.manager.utils.zero81.{ReassignPartitionCommand, PreferredReplicaLeaderElectionCommand}
+import kafka.manager.utils.ZkUtils
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode
import org.apache.curator.framework.recipes.cache._
import org.apache.curator.framework.CuratorFramework
import org.joda.time.{DateTimeZone, DateTime}
-import kafka.manager.utils.{TopicAndPartition, ZkUtils}
+import org.slf4j.{LoggerFactory, Logger}
import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Success, Failure, Try}
/**
@@ -22,19 +31,354 @@ import scala.util.{Success, Failure, Try}
import ActorModel._
import kafka.manager.utils._
import scala.collection.JavaConverters._
-class KafkaStateActor(curator: CuratorFramework,
- clusterContext: ClusterContext) extends BaseQueryCommandActor {
- // e.g. /brokers/topics/analytics_content/partitions/0/state
- private[this] val topicsTreeCache = new TreeCache(curator,ZkUtils.BrokerTopicsPath)
+trait OffsetCache {
+
+ def getCacheTimeoutSecs: Int
+
+ def getSimpleConsumerSocketTimeoutMillis: Int
+
+ protected[this] implicit def ec: ExecutionContext
+
+ protected[this] lazy val log : Logger = LoggerFactory.getLogger(this.getClass)
+
+ // Caches a map of partitions to offsets at a key that is the topic's name.
+ private[this] val partitionOffsetsCache: LoadingCache[String, Future[PartitionOffsetsCapture]] = CacheBuilder.newBuilder()
+ .expireAfterWrite(getCacheTimeoutSecs,TimeUnit.SECONDS) // TODO - update more or less often maybe, or make it configurable
+ .build(
+ new CacheLoader[String,Future[PartitionOffsetsCapture]] {
+ def load(topic: String): Future[PartitionOffsetsCapture] = {
+ loadPartitionOffsets(topic)
+ }
+ }
+ )
+
+ // Get the latest offsets for the partitions of the topic,
+ // Code based off of the GetOffsetShell tool in kafka.tools, kafka 0.8.2.1
+ private[this] def loadPartitionOffsets(topic: String): Future[PartitionOffsetsCapture] = {
+ // Get partition leader broker information
+ val optPartitionsWithLeaders : Option[List[(Int, Option[BrokerIdentity])]] = getTopicPartitionLeaders(topic)
+
+ val clientId = "partitionOffsetGetter"
+ val time = -1
+ val nOffsets = 1
+ val simpleConsumerBufferSize = 256 * 1024
+
+ val partitionsByBroker = optPartitionsWithLeaders.map {
+ listOfPartAndBroker => listOfPartAndBroker.collect {
+ case (part, broker) if broker.isDefined => (broker.get, part)
+ }.groupBy(_._1)
+ }
+
+ def getSimpleConsumer(bi: BrokerIdentity) =
+ new SimpleConsumer(bi.host, bi.port, getSimpleConsumerSocketTimeoutMillis, 256 * 1024, clientId)
+
+ // Get the latest offset for each partition
+ val futureMap: Future[PartitionOffsetsCapture] = {
+ partitionsByBroker.fold[Future[PartitionOffsetsCapture]]{
+ Future.failed(new IllegalArgumentException(s"Do not have partitions and their leaders for topic $topic"))
+ } { partitionsWithLeaders =>
+ try {
+ val listOfFutures = partitionsWithLeaders.toList.map(tpl => (getSimpleConsumer(tpl._1), tpl._2)).map {
+ case (simpleConsumer, parts) =>
+ val f: Future[Map[Int, Option[Long]]] = Future {
+ try {
+ val topicAndPartitions = parts.map(tpl => (TopicAndPartition(topic, tpl._2), PartitionOffsetRequestInfo(time, nOffsets)))
+ val request = OffsetRequest(topicAndPartitions.toMap)
+ simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets.map(tpl => (tpl._1.asTuple._2, tpl._2.offsets.headOption))
+ } finally {
+ simpleConsumer.close()
+ }
+ }
+ f.recover { case t =>
+ log.error(s"[topic=$topic] An error has occurred while getting topic offsets from broker $parts", t)
+ Map.empty[Int, Option[Long]]
+ }
+ }
+ val result: Future[Map[Int, Option[Long]]] = Future.sequence(listOfFutures).map(_.foldRight(Map.empty[Int, Option[Long]])((b, a) => b ++ a))
+ result.map(m => PartitionOffsetsCapture(System.currentTimeMillis(), m.mapValues(_.getOrElse(0L))))
+ }
+ catch {
+ case e: Exception =>
+ log.error(s"Failed to get offsets for topic $topic", e)
+ Future.failed(e)
+ }
+ }
+ }
+
+ futureMap onFailure {
+ case t => log.error(s"[topic=$topic] An error has occurred while getting topic offsets", t)
+ }
+ futureMap
+ }
+
+ protected def getTopicPartitionLeaders(topic: String) : Option[List[(Int, Option[BrokerIdentity])]]
+
+ protected def getTopicDescription(topic: String) : Option[TopicDescription]
+
+ def start()
+
+ def stop()
+
+ def getTopicPartitionOffsets(topic: String) : Future[PartitionOffsetsCapture] = partitionOffsetsCache.get(topic)
+
+ def lastUpdateMillis : Long
+
+ def getConsumerDescription(consumer: String) : Option[ConsumerDescription]
+
+ def getConsumedTopicDescription(consumer:String, topic:String) : ConsumedTopicDescription
+
+ def getConsumerList: ConsumerList
+}
+
+case class OffsetCacheActive(curator: CuratorFramework,
+ clusterContext: ClusterContext,
+ partitionLeaders: String => Option[List[(Int, Option[BrokerIdentity])]],
+ topicDescriptions: String => Option[TopicDescription],
+ cacheTimeoutSecs: Int,
+ socketTimeoutMillis: Int)
+ (implicit protected[this] val ec: ExecutionContext) extends OffsetCache {
+
+ def getCacheTimeoutSecs: Int = cacheTimeoutSecs
+
+ def getSimpleConsumerSocketTimeoutMillis: Int = socketTimeoutMillis
+
+ private[this] val consumersTreeCacheListener = new TreeCacheListener {
+ override def childEvent(client: CuratorFramework, event: TreeCacheEvent): Unit = {
+ event.getType match {
+ case TreeCacheEvent.Type.INITIALIZED | TreeCacheEvent.Type.NODE_ADDED |
+ TreeCacheEvent.Type.NODE_REMOVED | TreeCacheEvent.Type.NODE_UPDATED =>
+ consumersTreeCacheLastUpdateMillis = System.currentTimeMillis()
+ case _ =>
+ //do nothing
+ }
+ }
+ }
+
+ private[this] val consumersTreeCache = new TreeCache(curator, ZkUtils.ConsumersPath)
+
+ @volatile
+ private[this] var consumersTreeCacheLastUpdateMillis : Long = System.currentTimeMillis()
+
+ private[this] def withConsumersTreeCache[T](fn: TreeCache => T) : Option[T] = {
+ Option(fn(consumersTreeCache))
+ }
+
+ protected def getTopicPartitionLeaders(topic: String) : Option[List[(Int, Option[BrokerIdentity])]] = partitionLeaders(topic)
+
+ protected def getTopicDescription(topic: String) : Option[TopicDescription] = topicDescriptions(topic)
+
+ def start(): Unit = {
+ log.info("Starting consumers tree cache...")
+ consumersTreeCache.start()
+
+ log.info("Adding consumers tree cache listener...")
+ consumersTreeCache.getListenable.addListener(consumersTreeCacheListener)
+ }
+
+ def stop(): Unit = {
+ log.info("Removing consumers tree cache listener...")
+ Try(consumersTreeCache.getListenable.removeListener(consumersTreeCacheListener))
+
+ log.info("Shutting down consumers tree cache...")
+ Try(consumersTreeCache.close())
+ }
+
+ def lastUpdateMillis : Long = consumersTreeCacheLastUpdateMillis
+
+ def getConsumerDescription(consumer: String) : Option[ConsumerDescription] = {
+ val offsetPath = "%s/%s/%s".format(ZkUtils.ConsumersPath,consumer,"offsets")
+ val topicOffsetOption : Option[Map[String, ChildData]] = Option(consumersTreeCache.getCurrentChildren(offsetPath)).map(_.asScala.toMap)
+
+ val topicDescriptions: Option[Map[String, ConsumedTopicDescription]] =
+ topicOffsetOption.map[List[(String, ConsumedTopicDescription)]] { topics: Map[String, ChildData] =>
+ for {
+ topicAndData: (String, ChildData) <- topics.toList
+ topicDesc = getConsumedTopicDescription(consumer, topicAndData._1)
+ } yield (topicAndData._1, topicDesc)
+ }.map(_.toMap)
+
+ topicDescriptions.map(ConsumerDescription(consumer, _))
+ }
+
+ def getConsumedTopicDescription(consumer:String, topic:String) : ConsumedTopicDescription = {
+ val offsetPath = "%s/%s/%s/%s".format(ZkUtils.ConsumersPath, consumer, "offsets", topic)
+ val ownerPath = "%s/%s/%s/%s".format(ZkUtils.ConsumersPath, consumer, "owners", topic)
+ val partitionOffsets: Option[Map[Int, Long]] = for {
+ offsetsByPartition: Map[String, ChildData] <- Option(consumersTreeCache.getCurrentChildren(offsetPath)).map(_.asScala.toMap)
+ offsets : Map[Int, Long] = offsetsByPartition map {case (part, data) => (part.toInt, Option(data.getData).map(asString).getOrElse("-1").toLong)}
+ } yield offsets
+
+ val partitionOwners: Option[Map[Int, String]] = for {
+ ownersByPartition: Map[String, ChildData] <- Option(consumersTreeCache.getCurrentChildren(ownerPath)).map(_.asScala.toMap)
+ owners : Map[Int, String] = ownersByPartition map { case (part, data) => (part.toInt, Option(data.getData).map(asString).getOrElse("")) }
+ } yield owners
+
+ val optTopic = getTopicDescription(topic)
+ val numPartitions: Int = math.max(optTopic.flatMap(_.partitionState.map(_.size)).getOrElse(0),
+ partitionOffsets.map(_.size).getOrElse(0))
+ ConsumedTopicDescription(consumer, topic, numPartitions, optTopic, partitionOwners, partitionOffsets)
+ }
+
+ def getConsumerList: ConsumerList = {
+ withConsumersTreeCache { cache =>
+ cache.getCurrentChildren(ZkUtils.ConsumersPath)
+ }.fold {
+ ConsumerList(IndexedSeq.empty, clusterContext)
+ } { data: java.util.Map[String, ChildData] =>
+ val filteredList: IndexedSeq[String] = data.asScala.filter{
+ case (consumer, childData) =>
+ if (clusterContext.config.filterConsumers)
+ // Defining "inactive consumer" as a consumer that is missing one of three children ids/ offsets/ or owners/
+ childData.getStat.getNumChildren > 2
+ else true
+ }.keySet.toIndexedSeq
+ ConsumerList(filteredList, clusterContext)
+ }
+ }
+}
+
+case class OffsetCachePassive(curator: CuratorFramework,
+ clusterContext: ClusterContext,
+ partitionLeaders: String => Option[List[(Int, Option[BrokerIdentity])]],
+ topicDescriptions: String => Option[TopicDescription],
+ cacheTimeoutSecs: Int,
+ socketTimeoutMillis: Int)
+ (implicit protected[this] val ec: ExecutionContext) extends OffsetCache {
+
+ def getCacheTimeoutSecs: Int = cacheTimeoutSecs
+
+ def getSimpleConsumerSocketTimeoutMillis: Int = socketTimeoutMillis
+
+ private[this] val consumersPathChildrenCacheListener = new PathChildrenCacheListener {
+ override def childEvent(client: CuratorFramework, event: PathChildrenCacheEvent): Unit = {
+ event.getType match {
+ case PathChildrenCacheEvent.Type.INITIALIZED | PathChildrenCacheEvent.Type.CHILD_ADDED |
+ PathChildrenCacheEvent.Type.CHILD_REMOVED | PathChildrenCacheEvent.Type.CHILD_UPDATED =>
+ consumersTreeCacheLastUpdateMillis = System.currentTimeMillis()
+ case _ =>
+ //do nothing
+ }
+ }
+ }
+
+ private[this] val consumersPathChildrenCache = new PathChildrenCache(curator, ZkUtils.ConsumersPath, true)
+
+ @volatile
+ private[this] var consumersTreeCacheLastUpdateMillis : Long = System.currentTimeMillis()
+
+ private[this] def withConsumersPathChildrenCache[T](fn: PathChildrenCache => T) : Option[T] = {
+ Option(fn(consumersPathChildrenCache))
+ }
+
+ protected def getTopicPartitionLeaders(topic: String) : Option[List[(Int, Option[BrokerIdentity])]] = partitionLeaders(topic)
+
+ protected def getTopicDescription(topic: String) : Option[TopicDescription] = topicDescriptions(topic)
- private[this] val topicsConfigPathCache = new PathChildrenCache(curator,ZkUtils.TopicConfigPath,true)
+ def start(): Unit = {
+ log.info("Starting consumers path children cache...")
+ consumersPathChildrenCache.start(StartMode.BUILD_INITIAL_CACHE)
+
+ log.info("Adding consumers path children cache listener...")
+ consumersPathChildrenCache.getListenable.addListener(consumersPathChildrenCacheListener)
+ }
- private[this] val brokersPathCache = new PathChildrenCache(curator,ZkUtils.BrokerIdsPath,true)
+ def stop(): Unit = {
+ log.info("Removing consumers path children cache listener...")
+ Try(consumersPathChildrenCache.getListenable.removeListener(consumersPathChildrenCacheListener))
+
+ log.info("Shutting down consumers path children cache...")
+ Try(consumersPathChildrenCache.close())
+ }
+
+ def lastUpdateMillis : Long = consumersTreeCacheLastUpdateMillis
+
+ def getConsumerDescription(consumer: String) : Option[ConsumerDescription] = {
+ val offsetPath = "%s/%s/%s".format(ZkUtils.ConsumersPath,consumer,"offsets")
+ val topicOffsetOption : Option[List[String]] = Try(Option(curator.getChildren.forPath(offsetPath)).map(_.asScala.toList)).toOption.flatten
+
+ val topicDescriptions: Option[Map[String, ConsumedTopicDescription]] = topicOffsetOption.map {
+ topics =>
+ topics.map { topic =>
+ val topicDesc = getConsumedTopicDescription(consumer, topic)
+ (topic, topicDesc)
+ }.toMap
+ }
+
+ topicDescriptions.map(ConsumerDescription(consumer, _))
+ }
+
+ private[this] def readConsumerOffsetByTopicPartition(consumer: String, topic: String, tpi: Map[Int, TopicPartitionIdentity]) : Map[Int, Long] = {
+ tpi.map {
+ case (p, _) =>
+ val offsetPath = "%s/%s/%s/%s/%s".format(ZkUtils.ConsumersPath, consumer, "offsets", topic, p)
+ (p, ZkUtils.readDataMaybeNull(curator, offsetPath)._1.map(_.toLong).getOrElse(-1L))
+ }
+ }
- private[this] val adminPathCache = new PathChildrenCache(curator,ZkUtils.AdminPath,true)
+ private[this] def readConsumerOwnerByTopicPartition(consumer: String, topic: String, tpi: Map[Int, TopicPartitionIdentity]) : Map[Int, String] = {
+ tpi.map {
+ case (p, _) =>
+ val ownerPath = "%s/%s/%s/%s/%s".format(ZkUtils.ConsumersPath, consumer, "owners", topic, p)
+ (p, ZkUtils.readDataMaybeNull(curator, ownerPath)._1.orNull)
+ }.filter(_._2 != null)
+ }
- private[this] val deleteTopicsPathCache = new PathChildrenCache(curator, ZkUtils.DeleteTopicsPath,true)
+ def getConsumedTopicDescription(consumer:String, topic:String) : ConsumedTopicDescription = {
+ val optTopic = getTopicDescription(topic)
+ val optTpi = optTopic.map(TopicIdentity.getTopicPartitionIdentity(_, None))
+ val partitionOffsets = for {
+ td <- optTopic
+ tpi <- optTpi
+ } yield {
+ readConsumerOffsetByTopicPartition(consumer, topic, tpi)
+ }
+ val partitionOwners = for {
+ td <- optTopic
+ tpi <- optTpi
+ } yield {
+ readConsumerOwnerByTopicPartition(consumer, topic, tpi)
+ }
+
+ val numPartitions: Int = math.max(optTopic.flatMap(_.partitionState.map(_.size)).getOrElse(0),
+ partitionOffsets.map(_.size).getOrElse(0))
+ ConsumedTopicDescription(consumer, topic, numPartitions, optTopic, partitionOwners, partitionOffsets)
+ }
+
+ def getConsumerList: ConsumerList = {
+ withConsumersPathChildrenCache { cache =>
+ val currentData = cache.getCurrentData
+ currentData
+ }.fold {
+ ConsumerList(IndexedSeq.empty, clusterContext)
+ } { data: java.util.List[ChildData] =>
+ ConsumerList(data.asScala.map(cd => cd.getPath.split("/").last).toIndexedSeq, clusterContext)
+ }
+ }
+}
+
+case class KafkaStateActorConfig(curator: CuratorFramework,
+ clusterContext: ClusterContext,
+ longRunningPoolConfig: LongRunningPoolConfig,
+ partitionOffsetCacheTimeoutSecs: Int, simpleConsumerSocketTimeoutMillis: Int)
+class KafkaStateActor(config: KafkaStateActorConfig) extends BaseQueryCommandActor with LongRunningPoolActor {
+
+ override protected def longRunningPoolConfig: LongRunningPoolConfig = config.longRunningPoolConfig
+
+ override protected def longRunningQueueFull(): Unit = {
+ log.error("Long running pool queue full, skipping!")
+ }
+
+ // e.g. /brokers/topics/analytics_content/partitions/0/state
+ private[this] val topicsTreeCache = new TreeCache(config.curator,ZkUtils.BrokerTopicsPath)
+
+ private[this] val topicsConfigPathCache = new PathChildrenCache(config.curator,ZkUtils.TopicConfigPath,true)
+
+ private[this] val brokersPathCache = new PathChildrenCache(config.curator,ZkUtils.BrokerIdsPath,true)
+
+ private[this] val adminPathCache = new PathChildrenCache(config.curator,ZkUtils.AdminPath,true)
+
+ private[this] val deleteTopicsPathCache = new PathChildrenCache(config.curator, ZkUtils.DeleteTopicsPath,true)
@volatile
private[this] var topicsTreeCacheLastUpdateMillis : Long = System.currentTimeMillis()
@@ -46,7 +390,7 @@ class KafkaStateActor(curator: CuratorFramework,
TreeCacheEvent.Type.NODE_REMOVED | TreeCacheEvent.Type.NODE_UPDATED =>
topicsTreeCacheLastUpdateMillis = System.currentTimeMillis()
case _ =>
- //do nothing
+ //do nothing
}
}
}
@@ -73,7 +417,7 @@ class KafkaStateActor(curator: CuratorFramework,
endPreferredLeaderElection(event.getData)
endReassignPartition(event.getData)
case _ =>
- //do nothing
+ //do nothing
}
}
@@ -109,9 +453,29 @@ class KafkaStateActor(curator: CuratorFramework,
}
}
}
+
+ private[this] val offsetCache: OffsetCache = {
+ if(config.clusterContext.config.activeOffsetCacheEnabled)
+ new OffsetCacheActive(
+ config.curator,
+ config.clusterContext,
+ getPartitionLeaders,
+ getTopicDescription,
+ config.partitionOffsetCacheTimeoutSecs,
+ config.simpleConsumerSocketTimeoutMillis)(longRunningExecutionContext)
+ else
+ new OffsetCachePassive(
+ config.curator,
+ config.clusterContext,
+ getPartitionLeaders,
+ getTopicDescription,
+ config.partitionOffsetCacheTimeoutSecs,
+ config .simpleConsumerSocketTimeoutMillis)(longRunningExecutionContext)
+ }
@scala.throws[Exception](classOf[Exception])
override def preStart() = {
+ log.info(config.toString)
log.info("Started actor %s".format(self.path))
log.info("Starting topics tree cache...")
topicsTreeCache.start()
@@ -128,6 +492,9 @@ class KafkaStateActor(curator: CuratorFramework,
topicsTreeCache.getListenable.addListener(topicsTreeCacheListener)
log.info("Adding admin path cache listener...")
adminPathCache.getListenable.addListener(adminPathCacheListener)
+
+ log.info("Starting offset cache...")
+ offsetCache.start()
}
@scala.throws[Exception](classOf[Exception])
@@ -141,6 +508,9 @@ class KafkaStateActor(curator: CuratorFramework,
@scala.throws[Exception](classOf[Exception])
override def postStop(): Unit = {
log.info("Stopped actor %s".format(self.path))
+
+ log.info("Stopping offset cache...")
+ Try(offsetCache.stop())
log.info("Removing admin path cache listener...")
Try(adminPathCache.getListenable.removeListener(adminPathCacheListener))
@@ -161,21 +531,53 @@ class KafkaStateActor(curator: CuratorFramework,
super.postStop()
}
- def getTopicDescription(topic: String) : Option[TopicDescription] = {
+ def getTopicZookeeperData(topic: String): Option[(Int,String)] = {
val topicPath = "%s/%s".format(ZkUtils.BrokerTopicsPath,topic)
- val descriptionOption : Option[(Int,String)] =
- Option(topicsTreeCache.getCurrentData(topicPath)).map( childData => (childData.getStat.getVersion,asString(childData.getData)))
+ Option(topicsTreeCache.getCurrentData(topicPath)).map( childData => (childData.getStat.getVersion,asString(childData.getData)))
+ }
+ def getTopicDescription(topic: String) : Option[TopicDescription] = {
for {
- description <- descriptionOption
+ description <- getTopicZookeeperData(topic)
partitionsPath = "%s/%s/partitions".format(ZkUtils.BrokerTopicsPath, topic)
partitions: Map[String, ChildData] <- Option(topicsTreeCache.getCurrentChildren(partitionsPath)).map(_.asScala.toMap)
states : Map[String, String] = partitions flatMap { case (part, _) =>
val statePath = s"$partitionsPath/$part/state"
Option(topicsTreeCache.getCurrentData(statePath)).map(cd => (part, asString(cd.getData)))
}
- config = getTopicConfigString(topic)
- } yield TopicDescription(topic, description, Option(states),config)
+ partitionOffsets = offsetCache.getTopicPartitionOffsets(topic)
+ topicConfig = getTopicConfigString(topic)
+ } yield TopicDescription(topic, description, Option(states), partitionOffsets, topicConfig)
+ }
+
+ def getPartitionLeaders(topic: String) : Option[List[(Int, Option[BrokerIdentity])]] = {
+ val partitionsPath = "%s/%s/partitions".format(ZkUtils.BrokerTopicsPath, topic)
+ val partitions: Option[Map[String, ChildData]] = Option(topicsTreeCache.getCurrentChildren(partitionsPath)).map(_.asScala.toMap)
+ val states : Option[Iterable[(String, String)]] =
+ partitions.map[Iterable[(String,String)]]{ partMap: Map[String, ChildData] =>
+ partMap.flatMap { case (part, _) =>
+ val statePath = s"$partitionsPath/$part/state"
+ Option(topicsTreeCache.getCurrentData(statePath)).map(cd => (part, asString(cd.getData)))
+ }
+ }
+ val targetBrokers : IndexedSeq[BrokerIdentity] = getBrokers
+
+ import org.json4s.jackson.JsonMethods.parse
+ import org.json4s.scalaz.JsonScalaz.field
+ states.map(_.map{case (part, state) =>
+ val partition = part.toInt
+ val descJson = parse(state)
+ val leaderID = field[Int]("leader")(descJson).fold({ e =>
+ log.error(s"[topic=$topic] Failed to get partitions from topic json $state"); 0}, identity)
+ val leader = targetBrokers.find(_.id == leaderID)
+ (partition, leader)
+ }.toList)
+ }
+
+ private[this] def getTopicConfigString(topic: String) : Option[(Int,String)] = {
+ val data: mutable.Buffer[ChildData] = topicsConfigPathCache.getCurrentData.asScala
+ val result: Option[ChildData] = data.find(p => p.getPath.endsWith(topic))
+ result.map(cd => (cd.getStat.getVersion,asString(cd.getData)))
}
override def processActorResponse(response: ActorResponse): Unit = {
@@ -183,18 +585,37 @@ class KafkaStateActor(curator: CuratorFramework,
case any: Any => log.warning("ksa : processActorResponse : Received unknown message: {}", any.toString)
}
}
+
+
+ private[this] def getBrokers : IndexedSeq[BrokerIdentity] = {
+ val data: mutable.Buffer[ChildData] = brokersPathCache.getCurrentData.asScala
+ data.map { cd =>
+ BrokerIdentity.from(nodeFromPath(cd.getPath).toInt, asString(cd.getData))
+ }.filter { v =>
+ v match {
+ case scalaz.Failure(nel) =>
+ log.error(s"Failed to parse broker config $nel")
+ false
+ case _ => true
+ }
+ }.collect {
+ case scalaz.Success(bi) => bi
+ }.toIndexedSeq.sortBy(_.id)
+ }
- private[this] def getTopicConfigString(topic: String) : Option[(Int,String)] = {
- val data: mutable.Buffer[ChildData] = topicsConfigPathCache.getCurrentData.asScala
- val result: Option[ChildData] = data.find(p => p.getPath.endsWith(topic))
- result.map(cd => (cd.getStat.getVersion,asString(cd.getData)))
+ private[this] def asyncPipeToSender[T](fn: => T): Unit = {
+ implicit val ec = longRunningExecutionContext
+ val result: Future[T] = Future {
+ fn
+ }
+ result pipeTo sender
}
override def processQueryRequest(request: QueryRequest): Unit = {
request match {
case KSGetTopics =>
val deleteSet: Set[String] = {
- if(clusterContext.clusterFeatures.features(KMDeleteTopicFeature)) {
+ if(config.clusterContext.clusterFeatures.features(KMDeleteTopicFeature)) {
val deleteTopicsData: mutable.Buffer[ChildData] = deleteTopicsPathCache.getCurrentData.asScala
deleteTopicsData.map { cd =>
nodeFromPath(cd.getPath)
@@ -206,9 +627,14 @@ class KafkaStateActor(curator: CuratorFramework,
withTopicsTreeCache { cache =>
cache.getCurrentChildren(ZkUtils.BrokerTopicsPath)
}.fold {
- sender ! TopicList(IndexedSeq.empty, deleteSet, clusterContext)
+ sender ! TopicList(IndexedSeq.empty, deleteSet, config.clusterContext)
} { data: java.util.Map[String, ChildData] =>
- sender ! TopicList(data.asScala.map(kv => kv._1).toIndexedSeq, deleteSet, clusterContext)
+ sender ! TopicList(data.asScala.keySet.toIndexedSeq, deleteSet, config.clusterContext)
+ }
+
+ case KSGetConsumers =>
+ asyncPipeToSender {
+ offsetCache.getConsumerList
}
case KSGetTopicConfig(topic) =>
@@ -218,39 +644,48 @@ class KafkaStateActor(curator: CuratorFramework,
sender ! getTopicDescription(topic)
case KSGetTopicDescriptions(topics) =>
- sender ! TopicDescriptions(topics.toIndexedSeq.map(getTopicDescription).flatten, topicsTreeCacheLastUpdateMillis)
+ sender ! TopicDescriptions(topics.toIndexedSeq.flatMap(getTopicDescription), topicsTreeCacheLastUpdateMillis)
+
+ case KSGetConsumerDescription(consumer) =>
+ asyncPipeToSender {
+ offsetCache.getConsumerDescription(consumer)
+ }
+
+ case KSGetConsumedTopicDescription(consumer, topic) =>
+ asyncPipeToSender {
+ offsetCache.getConsumedTopicDescription(consumer, topic)
+ }
case KSGetAllTopicDescriptions(lastUpdateMillisOption) =>
val lastUpdateMillis = lastUpdateMillisOption.getOrElse(0L)
- if (topicsTreeCacheLastUpdateMillis > lastUpdateMillis) {
+ //since we want to update offsets, let's do so if last update plus offset cache timeout is before current time
+ if (topicsTreeCacheLastUpdateMillis > lastUpdateMillis || ((topicsTreeCacheLastUpdateMillis + (config.partitionOffsetCacheTimeoutSecs * 1000)) < System.currentTimeMillis())) {
//we have option here since there may be no topics at all!
withTopicsTreeCache { cache: TreeCache =>
cache.getCurrentChildren(ZkUtils.BrokerTopicsPath)
}.fold {
sender ! TopicDescriptions(IndexedSeq.empty, topicsTreeCacheLastUpdateMillis)
} { data: java.util.Map[String, ChildData] =>
- sender ! TopicDescriptions(data.asScala.keys.toIndexedSeq.map(getTopicDescription).flatten, topicsTreeCacheLastUpdateMillis)
+ sender ! TopicDescriptions(data.asScala.keys.toIndexedSeq.flatMap(getTopicDescription), topicsTreeCacheLastUpdateMillis)
}
} // else no updates to send
+ case KSGetAllConsumerDescriptions(lastUpdateMillisOption) =>
+ val lastUpdateMillis = lastUpdateMillisOption.getOrElse(0L)
+ if (offsetCache.lastUpdateMillis > lastUpdateMillis) {
+ asyncPipeToSender {
+ ConsumerDescriptions(offsetCache
+ .getConsumerList
+ .list
+ .flatMap(c => offsetCache.getConsumerDescription(c)), offsetCache.lastUpdateMillis)
+ }
+ }
+
case KSGetTopicsLastUpdateMillis =>
sender ! topicsTreeCacheLastUpdateMillis
case KSGetBrokers =>
- val data: mutable.Buffer[ChildData] = brokersPathCache.getCurrentData.asScala
- val result: IndexedSeq[BrokerIdentity] = data.map { cd =>
- BrokerIdentity.from(nodeFromPath(cd.getPath).toInt, asString(cd.getData))
- }.filter { v =>
- v match {
- case scalaz.Failure(nel) =>
- log.error(s"Failed to parse broker config $nel")
- false
- case _ => true
- }
- }.collect {
- case scalaz.Success(bi) => bi
- }.toIndexedSeq.sortBy(_.id)
- sender ! BrokerList(result, clusterContext)
+ sender ! BrokerList(getBrokers, config.clusterContext)
case KSGetPreferredLeaderElection =>
sender ! preferredLeaderElection
@@ -269,7 +704,7 @@ class KafkaStateActor(curator: CuratorFramework,
val s: Set[TopicAndPartition] = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(json)
preferredLeaderElection.fold {
//nothing there, add as new
- preferredLeaderElection = Some(PreferredReplicaElection(getDateTime(millis), s, None, clusterContext))
+ preferredLeaderElection = Some(PreferredReplicaElection(getDateTime(millis), s, None, config.clusterContext))
} {
existing =>
existing.endTime.fold {
@@ -277,7 +712,7 @@ class KafkaStateActor(curator: CuratorFramework,
preferredLeaderElection = Some(existing.copy(topicAndPartition = existing.topicAndPartition ++ s))
} { _ =>
//new op started
- preferredLeaderElection = Some(PreferredReplicaElection(getDateTime(millis), s, None, clusterContext))
+ preferredLeaderElection = Some(PreferredReplicaElection(getDateTime(millis), s, None, config.clusterContext))
}
}
}
@@ -286,7 +721,7 @@ class KafkaStateActor(curator: CuratorFramework,
val m : Map[TopicAndPartition, Seq[Int]] = ReassignPartitionCommand.parsePartitionReassignmentZkData(json)
reassignPartitions.fold {
//nothing there, add as new
- reassignPartitions = Some(ReassignPartitions(getDateTime(millis),m, None, clusterContext))
+ reassignPartitions = Some(ReassignPartitions(getDateTime(millis),m, None, config.clusterContext))
} {
existing =>
existing.endTime.fold {
@@ -294,7 +729,7 @@ class KafkaStateActor(curator: CuratorFramework,
reassignPartitions = Some(existing.copy(partitionsToBeReassigned = existing.partitionsToBeReassigned ++ m))
} { _ =>
//new op started
- reassignPartitions = Some(ReassignPartitions(getDateTime(millis),m, None, clusterContext))
+ reassignPartitions = Some(ReassignPartitions(getDateTime(millis),m, None, config.clusterContext))
}
}
}
@@ -321,7 +756,7 @@ class KafkaStateActor(curator: CuratorFramework,
case Failure(t) =>
log.error("Failed!",t)
case Success(_) =>
- //do nothing
+ //do nothing
}
}
diff --git a/app/kafka/manager/LogkafkaCommandActor.scala b/app/kafka/manager/LogkafkaCommandActor.scala
index 6dcf367e5..8751cf973 100644
--- a/app/kafka/manager/LogkafkaCommandActor.scala
+++ b/app/kafka/manager/LogkafkaCommandActor.scala
@@ -66,18 +66,12 @@ class LogkafkaCommandActor(logkafkaCommandActorConfig: LogkafkaCommandActorConfi
implicit val ec = longRunningExecutionContext
request match {
case LKCDeleteLogkafka(hostname, log_path, logkafkaConfig) =>
- if(logkafkaCommandActorConfig.clusterContext.clusterFeatures.features(KMDeleteTopicFeature)) {
- longRunning {
- Future {
- LKCCommandResult(Try {
- logkafkaAdminUtils.deleteLogkafka(logkafkaCommandActorConfig.curator, hostname, log_path, logkafkaConfig)
- })
- }
+ longRunning {
+ Future {
+ LKCCommandResult(Try {
+ logkafkaAdminUtils.deleteLogkafka(logkafkaCommandActorConfig.curator, hostname, log_path, logkafkaConfig)
+ })
}
- } else {
- val result : LKCCommandResult = LKCCommandResult(Failure(new UnsupportedOperationException(
- s"Delete logkafka not supported for kafka version ${logkafkaCommandActorConfig.clusterContext.config.version}")))
- sender ! result
}
case LKCCreateLogkafka(hostname, log_path, config, logkafkaConfig) =>
longRunning {
@@ -87,11 +81,11 @@ class LogkafkaCommandActor(logkafkaCommandActorConfig: LogkafkaCommandActorConfi
})
}
}
- case LKCUpdateLogkafkaConfig(hostname, log_path, config, logkafkaConfig) =>
+ case LKCUpdateLogkafkaConfig(hostname, log_path, config, logkafkaConfig, checkConfig) =>
longRunning {
Future {
LKCCommandResult(Try {
- logkafkaAdminUtils.changeLogkafkaConfig(logkafkaCommandActorConfig.curator, hostname, log_path, config, logkafkaConfig)
+ logkafkaAdminUtils.changeLogkafkaConfig(logkafkaCommandActorConfig.curator, hostname, log_path, config, logkafkaConfig, checkConfig)
})
}
}
diff --git a/app/kafka/manager/utils/Logkafka.scala b/app/kafka/manager/utils/Logkafka.scala
index 63e43c4e0..670c5bf63 100644
--- a/app/kafka/manager/utils/Logkafka.scala
+++ b/app/kafka/manager/utils/Logkafka.scala
@@ -46,6 +46,7 @@ object Logkafka {
case None =>
checkCondition(false, IllegalCharacterInName(hostname))
}
+ checkCondition(!hostname.matches("^localhost$"), HostnameIsLocalhost)
checkCondition(hostname.matches(validHostnameRegex), InvalidHostname)
}
@@ -55,10 +56,19 @@ object Logkafka {
checkCondition(log_path.length <= maxPathLength, InvalidLogPathLength)
illRgxPath.findFirstIn(log_path) match {
case Some(t) =>
- checkCondition(false, IllegalCharacterInName(log_path))
+ checkCondition(false, IllegalCharacterInPath(log_path))
case None =>
- checkCondition(true, IllegalCharacterInName(log_path))
+ checkCondition(true, IllegalCharacterInPath(log_path))
}
+
+ val f = new java.io.File(log_path);
+ val valid = try {
+ f.getCanonicalPath()
+ true
+ } catch {
+ case e: Exception => false
+ }
+ checkCondition(valid, InvalidLogPath)
}
def parseJsonStr(hostname: String, jsonStr: String): Map[String, Map[String, String]] = {
@@ -86,11 +96,13 @@ object Logkafka {
object LogkafkaErrors {
class HostnameEmpty private[LogkafkaErrors] extends UtilError("hostname is illegal, can't be empty")
+ class HostnameIsLocalhost private[LogkafkaErrors] extends UtilError("hostname is illegal, can't be localhost")
class LogPathEmpty private[LogkafkaErrors] extends UtilError("log path is illegal, can't be empty")
class LogPathNotAbsolute private[LogkafkaErrors] extends UtilError("log path is illegal, must be absolute")
class InvalidHostname private[LogkafkaErrors] extends UtilError(s"hostname is illegal, does not match regex ${Logkafka.validHostnameRegex}")
class InvalidHostnameLength private[LogkafkaErrors] extends UtilError(
"hostname is illegal, can't be longer than " + Logkafka.maxNameLength + " characters")
+ class InvalidLogPath private[LogkafkaErrors] extends UtilError(s"log path is illegal")
class InvalidLogPathLength private[LogkafkaErrors] extends UtilError(
"log path is illegal, can't be longer than " + Logkafka.maxPathLength + " characters")
class IllegalCharacterInName private[LogkafkaErrors] (hostname: String) extends UtilError(
@@ -100,10 +112,12 @@ object LogkafkaErrors {
class HostnameNotExists private[LogkafkaErrors] (hostname: String) extends UtilError(s"Hostname not exists : $hostname")
val HostnameEmpty = new HostnameEmpty
+ val HostnameIsLocalhost = new HostnameIsLocalhost
val LogPathEmpty = new LogPathEmpty
val LogPathNotAbsolute = new LogPathNotAbsolute
val InvalidHostname = new InvalidHostname
val InvalidHostnameLength = new InvalidHostnameLength
+ val InvalidLogPath = new InvalidLogPath
val InvalidLogPathLength = new InvalidLogPathLength
def IllegalCharacterInName(hostname: String) = new IllegalCharacterInName(hostname)
def IllegalCharacterInPath(log_path: String) = new IllegalCharacterInPath(log_path)
diff --git a/app/kafka/manager/utils/LogkafkaAdminUtils.scala b/app/kafka/manager/utils/LogkafkaAdminUtils.scala
index 8dfaa25fb..281a0ac3a 100644
--- a/app/kafka/manager/utils/LogkafkaAdminUtils.scala
+++ b/app/kafka/manager/utils/LogkafkaAdminUtils.scala
@@ -59,7 +59,7 @@ class LogkafkaAdminUtils(version: KafkaVersion) {
hostname: String,
log_path: String,
config: Properties = new Properties,
- logkafkaConfigOption: Option[kafka.manager.ActorModel.LogkafkaConfig]
+ logkafkaConfigOption: Option[kafka.manager.ActorModel.LogkafkaConfig] = None
): Unit = {
createOrUpdateLogkafkaConfigPathInZK(curator, hostname, log_path, config, logkafkaConfigOption)
}
@@ -70,11 +70,16 @@ class LogkafkaAdminUtils(version: KafkaVersion) {
config: Properties = new Properties,
logkafkaConfigOption: Option[kafka.manager.ActorModel.LogkafkaConfig],
update: Boolean = false,
- readVersion: Int = -1
+ readVersion: Int = -1,
+ checkConfig: Boolean = true
) {
// validate arguments
Logkafka.validateHostname(hostname)
- LogkafkaNewConfigs.validate(version,config)
+ Logkafka.validatePath(log_path)
+
+ if (checkConfig) {
+ LogkafkaNewConfigs.validate(version, config)
+ }
val configMap: mutable.Map[String, String] = {
import scala.collection.JavaConverters._
@@ -112,9 +117,10 @@ class LogkafkaAdminUtils(version: KafkaVersion) {
hostname: String,
log_path: String,
config: Properties = new Properties,
- logkafkaConfigOption: Option[kafka.manager.ActorModel.LogkafkaConfig]
+ logkafkaConfigOption: Option[kafka.manager.ActorModel.LogkafkaConfig],
+ checkConfig: Boolean = true
): Unit = {
- createOrUpdateLogkafkaConfigPathInZK(curator, hostname, log_path, config, logkafkaConfigOption, true)
+ createOrUpdateLogkafkaConfigPathInZK(curator, hostname, log_path, config, logkafkaConfigOption, true, -1, checkConfig)
}
/**
diff --git a/app/kafka/manager/utils/TopicAndPartition.scala b/app/kafka/manager/utils/TopicAndPartition.scala
deleted file mode 100644
index 2707112fa..000000000
--- a/app/kafka/manager/utils/TopicAndPartition.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.manager.utils
-
-/**
- * Borrowed from kafka 0.8.1.1
- * https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blob;f=core/src/main/scala/kafka/common/TopicAndPartition.scala
- */
-case class TopicAndPartition(topic: String, partition: Int)
diff --git a/app/kafka/manager/utils/ZkUtils.scala b/app/kafka/manager/utils/ZkUtils.scala
index 2c7173ebd..675d179f4 100644
--- a/app/kafka/manager/utils/ZkUtils.scala
+++ b/app/kafka/manager/utils/ZkUtils.scala
@@ -19,6 +19,7 @@ package kafka.manager.utils
import java.nio.charset.StandardCharsets
+import kafka.common.TopicAndPartition
import org.apache.curator.framework.CuratorFramework
import org.apache.zookeeper.CreateMode
import org.apache.zookeeper.KeeperException.{NodeExistsException, NoNodeException}
@@ -104,6 +105,19 @@ object ZkUtils {
val dataStr: String = curator.getData.storingStatIn(stat).forPath(path)
(dataStr, stat)
}
+
+ def readDataMaybeNull(curator: CuratorFramework, path: String): (Option[String], Stat) = {
+ val stat: Stat = new Stat()
+ try {
+ val dataStr: String = curator.getData.storingStatIn(stat).forPath(path)
+ (Option(dataStr), stat)
+ } catch {
+ case e: NoNodeException => {
+ (None, stat)
+ }
+ case e2: Throwable => throw e2
+ }
+ }
def getPartitionReassignmentZkData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = {
diff --git a/app/kafka/manager/utils/logkafka81/LogConfig.scala b/app/kafka/manager/utils/logkafka81/LogConfig.scala
index d7018f24b..bdefcd522 100644
--- a/app/kafka/manager/utils/logkafka81/LogConfig.scala
+++ b/app/kafka/manager/utils/logkafka81/LogConfig.scala
@@ -18,6 +18,7 @@
package kafka.manager.utils.logkafka81
import java.util.Properties
+import scala.util.matching.Regex
import kafka.manager.utils.LogkafkaNewConfigs
object Defaults {
@@ -30,6 +31,7 @@ object Defaults {
val CompressionCodec= "none"
val RequiredAcks = 1
val MessageTimeoutMs = 0
+ val RegexFilterPattern = ""
}
/**
@@ -48,6 +50,7 @@ object Defaults {
* @param messageTimeoutMs Local message timeout. This value is only enforced locally
and limits the time a produced message waits for successful delivery.
A time of 0 is infinite.
+ * @param regexFilterPattern The messages matching this pattern will be dropped.
*
*/
case class LogConfig(val valid: Boolean = Defaults.Valid,
@@ -58,7 +61,8 @@ case class LogConfig(val valid: Boolean = Defaults.Valid,
val partition: Int = Defaults.Partition,
val compressionCodec: String = Defaults.CompressionCodec,
val requiredAcks: Int = Defaults.RequiredAcks,
- val messageTimeoutMs: Long = Defaults.MessageTimeoutMs) {
+ val messageTimeoutMs: Long = Defaults.MessageTimeoutMs,
+ val regexFilterPattern: String = Defaults.RegexFilterPattern) {
def toProps: Properties = {
val props = new Properties()
@@ -72,6 +76,7 @@ case class LogConfig(val valid: Boolean = Defaults.Valid,
props.put(CompressionCodecProp, compressionCodec.toString)
props.put(RequiredAcksProp, requiredAcks.toString)
props.put(MessageTimeoutMsProp, messageTimeoutMs.toString)
+ props.put(RegexFilterPatternProp, regexFilterPattern.toString)
props
}
@@ -83,6 +88,11 @@ case class LogConfig(val valid: Boolean = Defaults.Valid,
}
object LogConfig extends LogkafkaNewConfigs {
+ import kafka.manager.utils.logkafka81.LogkafkaConfigErrors._
+ import kafka.manager.utils._
+
+ val maxRegexFilterPatternLength = 255
+
val ValidProp = "valid"
val FollowLastProp = "follow_last"
val BatchSizeProp = "batchsize"
@@ -92,6 +102,7 @@ object LogConfig extends LogkafkaNewConfigs {
val CompressionCodecProp = "compression_codec"
val RequiredAcksProp = "required_acks"
val MessageTimeoutMsProp = "message_timeout_ms"
+ val RegexFilterPatternProp = "regex_filter_pattern"
val ConfigMaps = Map(ValidProp -> Defaults.Valid.toString,
FollowLastProp -> Defaults.FollowLast.toString,
@@ -101,7 +112,8 @@ object LogConfig extends LogkafkaNewConfigs {
PartitionProp -> Defaults.Partition.toString,
CompressionCodecProp -> Defaults.CompressionCodec.toString,
RequiredAcksProp -> Defaults.RequiredAcks.toString,
- MessageTimeoutMsProp -> Defaults.MessageTimeoutMs.toString)
+ MessageTimeoutMsProp -> Defaults.MessageTimeoutMs.toString,
+ RegexFilterPatternProp -> Defaults.RegexFilterPattern.toString)
def configMaps = ConfigMaps
val ConfigNames = ConfigMaps.keySet
def configNames = ConfigNames
@@ -118,7 +130,8 @@ object LogConfig extends LogkafkaNewConfigs {
partition = props.getProperty(PartitionProp, Defaults.Partition.toString).toInt,
compressionCodec = props.getProperty(CompressionCodecProp, Defaults.CompressionCodec.toString).toString,
requiredAcks= props.getProperty(RequiredAcksProp, Defaults.RequiredAcks.toString).toInt,
- messageTimeoutMs = props.getProperty(MessageTimeoutMsProp, Defaults.MessageTimeoutMs.toString).toLong)
+ messageTimeoutMs = props.getProperty(MessageTimeoutMsProp, Defaults.MessageTimeoutMs.toString).toLong,
+ regexFilterPattern = props.getProperty(RegexFilterPatternProp, Defaults.RegexFilterPattern.toString).toString)
}
/**
@@ -145,6 +158,7 @@ object LogConfig extends LogkafkaNewConfigs {
def validate(props: Properties) {
validateNames(props)
validateTopic(props)
+ validateRegexFilterPattern(props)
LogConfig.fromProps(LogConfig().toProps, props) // check that we can parse the values
}
@@ -156,4 +170,30 @@ object LogConfig extends LogkafkaNewConfigs {
require(topic != null , "Topic is null")
}
+ /**
+ * Check that is RegexFilterPattern reasonable
+ */
+ private def validateRegexFilterPattern(props: Properties) {
+ val regexFilterPattern = props.getProperty(RegexFilterPatternProp)
+ if (regexFilterPattern == null) return
+ checkCondition(regexFilterPattern.length <= maxRegexFilterPatternLength, LogkafkaConfigErrors.InvalidRegexFilterPatternLength)
+ val valid = try {
+ s"""$regexFilterPattern""".r
+ true
+ } catch {
+ case e: Exception => false
+ }
+ checkCondition(valid, LogkafkaConfigErrors. InvalidRegexFilterPattern)
+ }
+}
+
+object LogkafkaConfigErrors {
+ import kafka.manager.utils.UtilError
+ class InvalidRegexFilterPattern private[LogkafkaConfigErrors] extends UtilError(
+ "regex filter pattern is illegal, does not conform to pcre2")
+ class InvalidRegexFilterPatternLength private[LogkafkaConfigErrors] extends UtilError(
+ "regex filter pattern is illegal, can't be longer than " + LogConfig.maxRegexFilterPatternLength + " characters")
+
+ val InvalidRegexFilterPattern = new InvalidRegexFilterPattern
+ val InvalidRegexFilterPatternLength = new InvalidRegexFilterPatternLength
}
diff --git a/app/kafka/manager/utils/logkafka82/LogConfig.scala b/app/kafka/manager/utils/logkafka82/LogConfig.scala
index 4c0baae77..87f2a07d0 100644
--- a/app/kafka/manager/utils/logkafka82/LogConfig.scala
+++ b/app/kafka/manager/utils/logkafka82/LogConfig.scala
@@ -18,6 +18,7 @@
package kafka.manager.utils.logkafka82
import java.util.Properties
+import scala.util.matching.Regex
import kafka.manager.utils.LogkafkaNewConfigs
object Defaults {
@@ -30,6 +31,7 @@ object Defaults {
val CompressionCodec= "none"
val RequiredAcks = 1
val MessageTimeoutMs = 0
+ val RegexFilterPattern = ""
}
/**
@@ -48,6 +50,7 @@ object Defaults {
* @param messageTimeoutMs Local message timeout. This value is only enforced locally
and limits the time a produced message waits for successful delivery.
A time of 0 is infinite.
+ * @param regexFilterPattern The messages matching this pattern will be dropped.
*
*/
case class LogConfig(val valid: Boolean = Defaults.Valid,
@@ -58,7 +61,8 @@ case class LogConfig(val valid: Boolean = Defaults.Valid,
val partition: Int = Defaults.Partition,
val compressionCodec: String = Defaults.CompressionCodec,
val requiredAcks: Int = Defaults.RequiredAcks,
- val messageTimeoutMs: Long = Defaults.MessageTimeoutMs) {
+ val messageTimeoutMs: Long = Defaults.MessageTimeoutMs,
+ val regexFilterPattern: String = Defaults.RegexFilterPattern) {
def toProps: Properties = {
val props = new Properties()
@@ -72,6 +76,7 @@ case class LogConfig(val valid: Boolean = Defaults.Valid,
props.put(CompressionCodecProp, compressionCodec.toString)
props.put(RequiredAcksProp, requiredAcks.toString)
props.put(MessageTimeoutMsProp, messageTimeoutMs.toString)
+ props.put(RegexFilterPatternProp, regexFilterPattern.toString)
props
}
@@ -83,6 +88,11 @@ case class LogConfig(val valid: Boolean = Defaults.Valid,
}
object LogConfig extends LogkafkaNewConfigs {
+ import kafka.manager.utils.logkafka82.LogkafkaConfigErrors._
+ import kafka.manager.utils._
+
+ val maxRegexFilterPatternLength = 255
+
val ValidProp = "valid"
val FollowLastProp = "follow_last"
val BatchSizeProp = "batchsize"
@@ -92,6 +102,7 @@ object LogConfig extends LogkafkaNewConfigs {
val CompressionCodecProp = "compression_codec"
val RequiredAcksProp = "required_acks"
val MessageTimeoutMsProp = "message_timeout_ms"
+ val RegexFilterPatternProp = "regex_filter_pattern"
val ConfigMaps = Map(ValidProp -> Defaults.Valid.toString,
FollowLastProp -> Defaults.FollowLast.toString,
@@ -101,7 +112,8 @@ object LogConfig extends LogkafkaNewConfigs {
PartitionProp -> Defaults.Partition.toString,
CompressionCodecProp -> Defaults.CompressionCodec.toString,
RequiredAcksProp -> Defaults.RequiredAcks.toString,
- MessageTimeoutMsProp -> Defaults.MessageTimeoutMs.toString)
+ MessageTimeoutMsProp -> Defaults.MessageTimeoutMs.toString,
+ RegexFilterPatternProp -> Defaults.RegexFilterPattern.toString)
def configMaps = ConfigMaps
val ConfigNames = ConfigMaps.keySet
def configNames = ConfigNames
@@ -118,7 +130,8 @@ object LogConfig extends LogkafkaNewConfigs {
partition = props.getProperty(PartitionProp, Defaults.Partition.toString).toInt,
compressionCodec = props.getProperty(CompressionCodecProp, Defaults.CompressionCodec.toString).toString,
requiredAcks= props.getProperty(RequiredAcksProp, Defaults.RequiredAcks.toString).toInt,
- messageTimeoutMs = props.getProperty(MessageTimeoutMsProp, Defaults.MessageTimeoutMs.toString).toLong)
+ messageTimeoutMs = props.getProperty(MessageTimeoutMsProp, Defaults.MessageTimeoutMs.toString).toLong,
+ regexFilterPattern = props.getProperty(RegexFilterPatternProp, Defaults.RegexFilterPattern.toString).toString)
}
/**
@@ -145,6 +158,7 @@ object LogConfig extends LogkafkaNewConfigs {
def validate(props: Properties) {
validateNames(props)
validateTopic(props)
+ validateRegexFilterPattern(props)
LogConfig.fromProps(LogConfig().toProps, props) // check that we can parse the values
}
@@ -156,4 +170,30 @@ object LogConfig extends LogkafkaNewConfigs {
require(topic != null , "Topic is null")
}
+ /**
+ * Check that is RegexFilterPattern reasonable
+ */
+ private def validateRegexFilterPattern(props: Properties) {
+ val regexFilterPattern = props.getProperty(RegexFilterPatternProp)
+ if (regexFilterPattern == null) return
+ checkCondition(regexFilterPattern.length <= maxRegexFilterPatternLength, LogkafkaConfigErrors.InvalidRegexFilterPatternLength)
+ val valid = try {
+ s"""$regexFilterPattern""".r
+ true
+ } catch {
+ case e: Exception => false
+ }
+ checkCondition(valid, LogkafkaConfigErrors. InvalidRegexFilterPattern)
+ }
+}
+
+object LogkafkaConfigErrors {
+ import kafka.manager.utils.UtilError
+ class InvalidRegexFilterPattern private[LogkafkaConfigErrors] extends UtilError(
+ "regex filter pattern is illegal, does not conform to pcre2")
+ class InvalidRegexFilterPatternLength private[LogkafkaConfigErrors] extends UtilError(
+ "regex filter pattern is illegal, can't be longer than " + LogConfig.maxRegexFilterPatternLength + " characters")
+
+ val InvalidRegexFilterPattern = new InvalidRegexFilterPattern
+ val InvalidRegexFilterPatternLength = new InvalidRegexFilterPatternLength
}
diff --git a/app/kafka/manager/utils/zero81/PreferredReplicaLeaderElectionCommand.scala b/app/kafka/manager/utils/zero81/PreferredReplicaLeaderElectionCommand.scala
index f20296780..dbe4a7a85 100644
--- a/app/kafka/manager/utils/zero81/PreferredReplicaLeaderElectionCommand.scala
+++ b/app/kafka/manager/utils/zero81/PreferredReplicaLeaderElectionCommand.scala
@@ -17,6 +17,7 @@
package kafka.manager.utils.zero81
+import kafka.common.TopicAndPartition
import kafka.manager.utils._
import org.apache.curator.framework.CuratorFramework
import org.apache.zookeeper.KeeperException.NodeExistsException
diff --git a/app/kafka/manager/utils/zero81/ReassignPartitionCommand.scala b/app/kafka/manager/utils/zero81/ReassignPartitionCommand.scala
index 2c9904f54..33af904cb 100644
--- a/app/kafka/manager/utils/zero81/ReassignPartitionCommand.scala
+++ b/app/kafka/manager/utils/zero81/ReassignPartitionCommand.scala
@@ -17,6 +17,7 @@
package kafka.manager.utils.zero81
+import kafka.common.TopicAndPartition
import kafka.manager.utils._
import kafka.manager.ActorModel.{TopicPartitionIdentity, TopicIdentity}
import org.apache.curator.framework.CuratorFramework
diff --git a/app/models/form/ClusterOperation.scala b/app/models/form/ClusterOperation.scala
index 2ac6e35d2..4d1b9c89a 100644
--- a/app/models/form/ClusterOperation.scala
+++ b/app/models/form/ClusterOperation.scala
@@ -30,12 +30,22 @@ object Operation {
}
object ClusterOperation {
- def apply(operation: String, name: String, version: String, zkHosts: String, zkMaxRetry: Int, jmxEnabled: Boolean, logkafkaEnabled: Boolean): ClusterOperation = {
- ClusterOperation(operation,ClusterConfig(name, version, zkHosts, zkMaxRetry, jmxEnabled, logkafkaEnabled))
+ def apply(operation: String,
+ name: String,
+ version: String,
+ zkHosts: String,
+ zkMaxRetry: Int,
+ jmxEnabled: Boolean,
+ filterConsumers: Boolean,
+ logkafkaEnabled: Boolean,
+ activeOffsetCacheEnabled: Boolean): ClusterOperation = {
+ ClusterOperation(operation,ClusterConfig(name, version, zkHosts, zkMaxRetry, jmxEnabled, filterConsumers, logkafkaEnabled, activeOffsetCacheEnabled))
}
- def customUnapply(co: ClusterOperation) : Option[(String, String, String, String, Int, Boolean, Boolean)] = {
- Option((co.op.toString,co.clusterConfig.name, co.clusterConfig.version.toString,co.clusterConfig.curatorConfig.zkConnect,co.clusterConfig.curatorConfig.zkMaxRetry,co.clusterConfig.jmxEnabled,co.clusterConfig.logkafkaEnabled))
+ def customUnapply(co: ClusterOperation) : Option[(String, String, String, String, Int, Boolean, Boolean, Boolean, Boolean)] = {
+ Option((co.op.toString, co.clusterConfig.name, co.clusterConfig.version.toString,
+ co.clusterConfig.curatorConfig.zkConnect, co.clusterConfig.curatorConfig.zkMaxRetry,
+ co.clusterConfig.jmxEnabled, co.clusterConfig.filterConsumers, co.clusterConfig.logkafkaEnabled, co.clusterConfig.activeOffsetCacheEnabled))
}
}
diff --git a/app/models/navigation/BreadCrumbs.scala b/app/models/navigation/BreadCrumbs.scala
index 385a2940c..f62d4a7eb 100644
--- a/app/models/navigation/BreadCrumbs.scala
+++ b/app/models/navigation/BreadCrumbs.scala
@@ -49,6 +49,7 @@ object BreadCrumbs {
BCDynamicNamedLink(identity,"Summary".clusterRoute),
"Brokers".clusterRouteBreadCrumb),
"Topics" -> IndexedSeq("Clusters".baseRouteBreadCrumb,BCDynamicNamedLink(identity,"Summary".clusterRoute)),
+ "Consumers" -> IndexedSeq("Clusters".baseRouteBreadCrumb,BCDynamicNamedLink(identity,"Summary".clusterRoute)),
"Create Topic" -> IndexedSeq(
"Clusters".baseRouteBreadCrumb,
BCDynamicNamedLink(identity,"Summary".clusterRoute),
@@ -57,6 +58,14 @@ object BreadCrumbs {
"Clusters".baseRouteBreadCrumb,
BCDynamicNamedLink(identity,"Summary".clusterRoute),
"Topics".clusterRouteBreadCrumb),
+ "Consumer View" -> IndexedSeq(
+ "Clusters".baseRouteBreadCrumb,
+ BCDynamicNamedLink(identity,"Summary".clusterRoute),
+ "Consumers".clusterRouteBreadCrumb),
+ "Consumed Topic View" -> IndexedSeq(
+ "Clusters".baseRouteBreadCrumb,
+ BCDynamicNamedLink(identity,"Summary".clusterRoute),
+ "Consumers".clusterRouteBreadCrumb),
"Logkafkas" -> IndexedSeq("Clusters".baseRouteBreadCrumb,BCDynamicNamedLink(identity,"Summary".clusterRoute)),
"Create Logkafka" -> IndexedSeq(
"Clusters".baseRouteBreadCrumb,
@@ -105,6 +114,14 @@ object BreadCrumbs {
)
)
+ val consumerBreadCrumbs: Map[String, IndexedSeq[BreadCrumb]] = Map(
+ "Consumer View" -> IndexedSeq(
+ "Clusters".baseRouteBreadCrumb,
+ BCDynamicNamedLink(identity,"Summary".clusterRoute),
+ "Consumers".clusterRouteBreadCrumb,
+ BCDynamicMultiNamedLink(identity,"Consumer View".consumerRoute)
+ )
+ )
val logkafkaBreadCrumbs: Map[String, IndexedSeq[BreadCrumb]] = Map(
"Logkafka View" -> IndexedSeq(
"Clusters".baseRouteBreadCrumb,
@@ -177,6 +194,16 @@ object BreadCrumbs {
}
}
+ private[this] def renderWithClusterAndConsumer(s: String, clusterName: String, consumer: String, topic: String = "") : IndexedSeq[BreadCrumbRendered] = {
+ consumerBreadCrumbs.getOrElse(s,IndexedSeq.empty[BreadCrumb]) map {
+ case BCStaticLink(n,c) => BCLink(n,c.toString())
+ case BCDynamicNamedLink(cn, cl) => BCLink(cn(clusterName),cl(clusterName).toString())
+ case BCDynamicMultiNamedLink(cn, cl) => BCLink(cn(consumer),cl(clusterName,consumer).toString())
+ case BCDynamicLink(cn, cl) => BCLink(cn,cl(clusterName).toString())
+ case BCDynamicText(cn) => BCText(cn(clusterName))
+ }
+ }
+
def withNamedViewAndClusterAndTopic(s: String, clusterName: String, topic: String, name: String) : IndexedSeq[BreadCrumbRendered] = {
renderWithClusterAndTopic(s, clusterName,topic) :+ BCActive(name)
}
@@ -195,4 +222,7 @@ object BreadCrumbs {
def withNamedViewAndClusterAndLogkafka(s: String, clusterName: String, hostname: String, log_path: String, name: String) : IndexedSeq[BreadCrumbRendered] = {
renderWithClusterAndLogkafka(s, clusterName, hostname, log_path) :+ BCActive(name)
}
+ def withNamedViewAndClusterAndConsumer(s: String, clusterName: String, consumer: String, name: String) : IndexedSeq[BreadCrumbRendered] = {
+ renderWithClusterAndConsumer(s, clusterName, consumer) :+ BCActive(name)
+ }
}
diff --git a/app/models/navigation/Menus.scala b/app/models/navigation/Menus.scala
index c7b0a5b0b..69ceee85b 100644
--- a/app/models/navigation/Menus.scala
+++ b/app/models/navigation/Menus.scala
@@ -51,6 +51,10 @@ object Menus {
private[this] def reassignPartitionsMenu(cluster: String, applicationFeatures: ApplicationFeatures) : Option[Menu] = {
Option("Reassign Partitions".clusterMenu(cluster))
}
+
+ private[this] def consumersMenu(cluster: String, applicationFeatures: ApplicationFeatures) : Option[Menu] = {
+ Option("Consumers".clusterMenu(cluster))
+ }
private[this] def logKafkaMenu(cluster: String,
applicationFeatures: ApplicationFeatures,
@@ -72,6 +76,7 @@ object Menus {
topicMenu(cluster, applicationFeatures),
preferredReplicaElectionMenu(cluster, applicationFeatures),
reassignPartitionsMenu(cluster, applicationFeatures),
+ consumersMenu(cluster, applicationFeatures),
logKafkaMenu(cluster, applicationFeatures, clusterFeatures)
).flatten
}
diff --git a/app/models/navigation/QuickRoutes.scala b/app/models/navigation/QuickRoutes.scala
index ca4e7bc54..2358184ea 100644
--- a/app/models/navigation/QuickRoutes.scala
+++ b/app/models/navigation/QuickRoutes.scala
@@ -25,6 +25,7 @@ object QuickRoutes {
"Summary" -> controllers.routes.Cluster.cluster,
"Brokers" -> controllers.routes.Cluster.brokers,
"Topics" -> controllers.routes.Topic.topics,
+ "Consumers" -> controllers.routes.Consumer.consumers,
"List" -> controllers.routes.Topic.topics,
"Create" -> controllers.routes.Topic.createTopic,
"Preferred Replica Election" -> controllers.routes.PreferredReplicaElection.preferredReplicaElection,
@@ -47,6 +48,9 @@ object QuickRoutes {
"Add Partitions" -> controllers.routes.Topic.addPartitions,
"Update Config" -> controllers.routes.Topic.addPartitions
)
+ val consumerRoutes : Map[String, (String, String) => Call] = Map(
+ "Consumer View" -> controllers.routes.Consumer.consumer
+ )
val logkafkaRoutes : Map[String, (String, String, String) => Call] = Map(
"Logkafka View" -> controllers.routes.Logkafka.logkafka,
"Update Config" -> controllers.routes.Logkafka.updateConfig
@@ -106,6 +110,15 @@ object QuickRoutes {
}
}
+ implicit class ConsumerRoute(s: String) {
+ def consumerRouteMenuItem(cluster: String, consumer: String): (String, Call) = {
+ s -> consumerRoutes(s)(cluster,consumer)
+ }
+ def consumerRoute(cluster: String, consumer: String): Call = {
+ consumerRoutes(s)(cluster,consumer)
+ }
+ }
+
implicit class LogkafkaRoute(s: String) {
def logkafkaRouteMenuItem(c: String, h: String, l:String): (String, Call) = {
s -> logkafkaRoutes(s)(c,h,l)
diff --git a/app/views/cluster/addCluster.scala.html b/app/views/cluster/addCluster.scala.html
index e243c7564..8e1bed190 100644
--- a/app/views/cluster/addCluster.scala.html
+++ b/app/views/cluster/addCluster.scala.html
@@ -15,10 +15,12 @@
@b3.form(routes.Cluster.handleAddCluster) {
@b3.text(form("name"), '_label -> "Cluster Name", 'placeholder -> "", 'autofocus -> true )
- @b3.text(form("zkHosts"), '_label -> "Cluster Zookeeper Hosts", 'placeholder -> "zk1:2181,zk2:2181,zk3:2181")
+ @b3.text(form("zkHosts"), '_label -> "Cluster Zookeeper Hosts", 'placeholder -> "zk1:2181,zk2:2181,zk3:2181/NAMESPACE")
@b3.select( form("kafkaVersion"), options = kafka.manager.KafkaVersion.formSelectList, '_label -> "Kafka Version" )
- @b3.checkbox(form("jmxEnabled"), '_text -> "Enable JMX Polling")
+ @b3.checkbox(form("jmxEnabled"), '_text -> "Enable JMX Polling (Set JMX_PORT env variable before starting kafka server)")
@b3.checkbox(form("logkafkaEnabled"), '_text -> "Enable Logkafka")
+ @b3.checkbox(form("filterConsumers"), '_text -> "Filter out inactive consumers")
+ @b3.checkbox(form("activeOffsetCacheEnabled"), '_text -> "Enable Active OffsetCache (Not recommended for large # of consumers)")
@b3.submit('class -> "submit-button btn btn-primary"){ Save }
Cancel
diff --git a/app/views/cluster/updateCluster.scala.html b/app/views/cluster/updateCluster.scala.html
index ae102a6aa..c089f4429 100644
--- a/app/views/cluster/updateCluster.scala.html
+++ b/app/views/cluster/updateCluster.scala.html
@@ -17,10 +17,12 @@
- @b3.text(form("zkHosts"), '_label -> "Cluster Zookeeper Hosts", 'placeholder -> "zk1:2181,zk2:2181,zk3:2181", 'autoFocus -> true)
+ @b3.text(form("zkHosts"), '_label -> "Cluster Zookeeper Hosts", 'placeholder -> "zk1:2181,zk2:2181,zk3:2181/NAMESPACE", 'autoFocus -> true)
@b3.select( form("kafkaVersion"), options = kafka.manager.KafkaVersion.formSelectList, '_label -> "Kafka Version" )
- @b3.checkbox(form("jmxEnabled"), '_text -> "Enable JMX Polling")
+ @b3.checkbox(form("jmxEnabled"), '_text -> "Enable JMX Polling (Set JMX_PORT env variable before starting kafka server)")
+ @b3.checkbox(form("filterConsumers"), '_text -> "Filter out inactive consumers")
@b3.checkbox(form("logkafkaEnabled"), '_text -> "Enable Logkafka")
+ @b3.checkbox(form("activeOffsetCacheEnabled"), '_text -> "Enable Active OffsetCache (Not recommended for large # of consumers)")
@b3.submit('class -> "submit-button btn btn-primary btn"){ Save }
Cancel
diff --git a/app/views/consumer/consumedTopicView.scala.html b/app/views/consumer/consumedTopicView.scala.html
new file mode 100644
index 000000000..372df8b1c
--- /dev/null
+++ b/app/views/consumer/consumedTopicView.scala.html
@@ -0,0 +1,31 @@
+@*
+* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
+* See accompanying LICENSE file.
+*@
+@import scalaz.{\/}
+@(cluster:String,
+ consumer: String,
+ topic: String,
+ errorOrConsumedTopicState: kafka.manager.ApiError \/ kafka.manager.ActorModel.ConsumedTopicState
+)(implicit af: features.ApplicationFeatures)
+
+@theMenu = {
+ @views.html.navigation.clusterMenu(cluster,"Topic Consumption","",models.navigation.Menus.clusterMenus(cluster)(
+ af,
+ errorOrConsumedTopicState.toOption.map(_.clusterContext.clusterFeatures).getOrElse(kafka.manager.features.ClusterFeatures.default)))
+}
+
+@main(
+ "Consumed Topic View",
+ menu = theMenu,
+ breadcrumbs=views.html.navigation.breadCrumbs(models.navigation.BreadCrumbs.withNamedViewAndClusterAndConsumer("Consumer View",cluster,consumer,topic))) {
+
+
+
+
@consumer / @topic
+
+ @errorOrConsumedTopicState.fold(views.html.errors.onApiError(_),views.html.consumer.consumedTopicViewContent(cluster,consumer,topic,_))
+
+
+}
+
diff --git a/app/views/consumer/consumedTopicViewContent.scala.html b/app/views/consumer/consumedTopicViewContent.scala.html
new file mode 100644
index 000000000..0d3cb61d7
--- /dev/null
+++ b/app/views/consumer/consumedTopicViewContent.scala.html
@@ -0,0 +1,64 @@
+@*
+* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
+* See accompanying LICENSE file.
+*@
+@import b3.vertical.fieldConstructor
+@(cluster:String, consumer: String, topic: String, state: kafka.manager.ActorModel.ConsumedTopicState)
+
+
+@getTopicCoverage(percentage: Int) = {
+ @percentage match {
+ case i if i <= 99 => {danger}
+ case i => {}
+ }
+}
+
+@ifPartitionNotOwned(owner: Option[String]) = {
+ @owner match {
+ case None => {warning}
+ case Some(a) => {}
+ }
+}
+
+
+
+
+
Topic Summary
+
+
+
+ Total Lag
+ @state.totalLag.getOrElse(" ")
+
+
+ % of Partitions assigned to a consumer instance
+ @state.percentageCovered
+
+
+
+
+
+
+
+
+
+
+
+
+ Partition LogSize Consumer Offset Lag Consumer Instance Owner
+
+
+ @for(tp:Int <- 0 until state.numPartitions) {
+
+ @tp
+ @state.topicOffsets(tp).getOrElse(" ")
+ @state.partitionOffsets.get(tp)
+ @state.partitionLag(tp).getOrElse(" ")
+ @state.partitionOwners.get(tp).getOrElse("None")
+
+ }
+
+
+
+
+
\ No newline at end of file
diff --git a/app/views/consumer/consumerList.scala.html b/app/views/consumer/consumerList.scala.html
new file mode 100644
index 000000000..f3e4cb06b
--- /dev/null
+++ b/app/views/consumer/consumerList.scala.html
@@ -0,0 +1,36 @@
+@*
+* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
+* See accompanying LICENSE file.
+*@
+@import scalaz.{\/}
+@(cluster:String, errorOrConsumers: kafka.manager.ApiError \/ kafka.manager.ConsumerListExtended
+)(implicit af: features.ApplicationFeatures)
+
+@theMenu = {
+ @views.html.navigation.clusterMenu(cluster,"Consumer","List",models.navigation.Menus.clusterMenus(cluster)(
+ af,
+ errorOrConsumers.toOption.map(_.clusterContext.clusterFeatures).getOrElse(kafka.manager.features.ClusterFeatures.default)))
+}
+
+@consumerScripts = {
+
+}
+
+@main(
+ "Consumer List",
+ menu = theMenu,
+ breadcrumbs=views.html.navigation.breadCrumbs(models.navigation.BreadCrumbs.withViewAndCluster("Consumers",cluster)),
+ scripts=consumerScripts) {
+
+
+
Consumers
+ @errorOrConsumers.fold(
+ views.html.errors.onApiError(_),
+ cl => views.html.consumer.consumerListContent(cluster,cl.list))
+
+
+}
diff --git a/app/views/consumer/consumerListContent.scala.html b/app/views/consumer/consumerListContent.scala.html
new file mode 100644
index 000000000..ed20f0055
--- /dev/null
+++ b/app/views/consumer/consumerListContent.scala.html
@@ -0,0 +1,41 @@
+@*
+* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
+* See accompanying LICENSE file.
+*@
+@(cluster: String, consumers: IndexedSeq[(String, Option[kafka.manager.ActorModel.ConsumerIdentity])])
+
+@getConsumedTopicSummary(state: kafka.manager.ActorModel.ConsumedTopicState) = {
+ @state.percentageCovered match {
+ case i if i <= 99 => {
+
+ (@state.percentageCovered% coverage, @state.totalLag.getOrElse("Lag unavailable") lag)
+
+ }
+ case i => {(@state.percentageCovered% coverage, @state.totalLag.getOrElse("Lag unavailable") lag)}
+ }
+}
+
+
+
+ Consumer
+ Topics it consumes from
+
+
+
+ @for( (consumer, consumerIdentityOpt) <- consumers) {
+
+ @consumer
+
+ @consumerIdentityOpt.fold{
+ No details available for this consumer at this time
+ }{ a:kafka.manager.ActorModel.ConsumerIdentity =>
+ @for((topic: String, state: kafka.manager.ActorModel.ConsumedTopicState) <- a.topicMap) {
+ @topic :
+ @getConsumedTopicSummary(state)
+ }
+ }
+
+
+ }
+
+
diff --git a/app/views/consumer/consumerView.scala.html b/app/views/consumer/consumerView.scala.html
new file mode 100644
index 000000000..4e146ab63
--- /dev/null
+++ b/app/views/consumer/consumerView.scala.html
@@ -0,0 +1,29 @@
+@*
+* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
+* See accompanying LICENSE file.
+*@
+@import scalaz.{\/}
+@(cluster:String,
+ consumer: String,
+ errorOrConsumerIdentity: kafka.manager.ApiError \/ kafka.manager.ActorModel.ConsumerIdentity
+)(implicit af: features.ApplicationFeatures)
+
+@theMenu = {
+ @views.html.navigation.clusterMenu(cluster,"Consumer","",models.navigation.Menus.clusterMenus(cluster)(
+ af,
+ errorOrConsumerIdentity.toOption.map(_.clusterContext.clusterFeatures).getOrElse(kafka.manager.features.ClusterFeatures.default)))
+}
+
+@main(
+ "Consumer View",
+ menu = theMenu,
+ breadcrumbs=views.html.navigation.breadCrumbs(models.navigation.BreadCrumbs.withNamedViewAndCluster("Consumer View",cluster,consumer))) {
+
+
+
+
@consumer
+
+ @errorOrConsumerIdentity.fold(views.html.errors.onApiError(_),views.html.consumer.consumerViewContent(cluster,consumer,_))
+
+
+}
diff --git a/app/views/consumer/consumerViewContent.scala.html b/app/views/consumer/consumerViewContent.scala.html
new file mode 100644
index 000000000..3d1446ff4
--- /dev/null
+++ b/app/views/consumer/consumerViewContent.scala.html
@@ -0,0 +1,37 @@
+@*
+* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
+* See accompanying LICENSE file.
+*@
+@import b3.vertical.fieldConstructor
+@(cluster:String, consumer: String, consumerIdentity: kafka.manager.ActorModel.ConsumerIdentity)
+
+@getTopicCoverage(percentage: Int) = {
+ @percentage match {
+ case i if i <= 99 => {danger}
+ case i => {}
+ }
+}
+
+
+
+
+
Consumed Topic Information
+
+
+ Topic Partitions Covered % Total Lag
+
+
+ @for((topic: String, state: kafka.manager.ActorModel.ConsumedTopicState) <- consumerIdentity.topicMap) {
+
+ @topic
+
+ @state.percentageCovered
+
+ @state.totalLag.getOrElse("not available")
+
+ }
+
+
+
+
+
diff --git a/app/views/logkafka/logkafkaListContent.scala.html b/app/views/logkafka/logkafkaListContent.scala.html
index 3ae690ad0..274f11b71 100644
--- a/app/views/logkafka/logkafkaListContent.scala.html
+++ b/app/views/logkafka/logkafkaListContent.scala.html
@@ -4,27 +4,31 @@
*@
@(cluster: String, logkafkas: IndexedSeq[((String, Option[kafka.manager.ActorModel.LogkafkaIdentity]),Boolean)])
+@import b3.vertical.fieldConstructor
+
@getFilesizeStatus(filesize: Int) = {
-@filesize match {
- case i if i < 0 => {warning}
- case i => {}
-}
+ @filesize match {
+ case i if i < 0 => {warning}
+ case i => {}
+ }
}
-@getLogkafkaStatus(active: Boolean) = {
- @if(active) {} else {danger}
+@getLogkafkaStatus(flag: Boolean, s1: String, s2: String) = {
+ @if(flag) {@s1} else {@s2}
}
- Hostname # Log Path # Real Path # File Pos # File Size # Topic
+ Hostname Log Path # Real Path # File Pos # File Size # Topic Operations
@for( ((hostname, logkafkaIdentity), deleted) <- logkafkas) {
@logkafkaIdentity.map{ li =>
@for( (log_path, im) <- li.identityMap) {
- @hostname
+
+ @hostname
+
@im._1.map { c =>
@log_path
@@ -33,8 +37,12 @@
@im._2.map { c =>
@c.get("realpath").map { d =>
@d
- }.getOrElse{ no corresponding file to collect }
- }.getOrElse{ logkafka in @hostname is inactive }
+ }.getOrElse{ no corresponding file }
+ }.getOrElse{
+
+ @getLogkafkaStatus(li.active,"scanning for new file", "logkafka is inactive")
+
+ }
@im._2.map { c =>
@c.get("filepos").map { d =>
@@ -54,6 +62,31 @@
}.getOrElse{ }
}.getOrElse{ }
+
+
+ @im._1.map { c =>
+ @c.get("valid").map { enabled =>
+ @if(enabled.toBoolean) {
+
Modify
+ @b3.form(routes.Logkafka.handleDisableConfig(cluster, hostname, log_path)) {
+
+ @b3.submit('class -> "btn btn-warning ops-button"){ Disable }
+ }
+ } else {
+ @b3.form(routes.Logkafka.handleEnableConfig(cluster, hostname, log_path)) {
+
+ @b3.submit('class -> "btn btn-success ops-button"){ Enable }
+ }
+ @b3.form(routes.Logkafka.handleDeleteLogkafka(cluster, hostname, log_path)) {
+
+ @b3.submit('class -> "btn btn-danger ops-button"){ Delete }
+ }
+ }
+ }.getOrElse{
}
+ }.getOrElse{
}
+
+
+
}
}.getOrElse{}
diff --git a/app/views/main.scala.html b/app/views/main.scala.html
index b3926e0b4..c1a2ae4d9 100644
--- a/app/views/main.scala.html
+++ b/app/views/main.scala.html
@@ -52,6 +52,11 @@
$('.' + selectClass).css("display", display);
}
}
+
+ function disableSubmission() {
+ $('[type=submit]').prop('disabled', true);
+ $('[type=submit]').removeAttr('type');
+ }
@scripts