Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge with upstream and fixing build #8

Merged
merged 45 commits into from
Nov 18, 2015
Merged
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
883f54f
fix warning level of broker skew percentage
jisookim0513 Aug 7, 2015
a243276
fix warning level of under-replicated percentage
jisookim0513 Aug 7, 2015
eebd59e
Add consumer-level information, along with the latest-produced offset…
cvcal Aug 19, 2015
e9008d2
Refactoring
patelh Sep 4, 2015
6f843c5
Add passive/active offset cache and test
patelh Sep 20, 2015
d8858d6
Increment version
patelh Sep 20, 2015
52bf7c2
Fix cluster config
patelh Sep 20, 2015
6a99447
Increment version
patelh Sep 20, 2015
09d4215
Fix update of cluster config
patelh Sep 20, 2015
c8524cf
Increment version
patelh Sep 20, 2015
e6bf39c
Another bug fix for cluster config
patelh Sep 20, 2015
d8f9b12
Fix consumer list in active mode
patelh Sep 20, 2015
a98cc74
Merge pull request #131 from cvcal/cvcal
patelh Sep 23, 2015
1a9cb19
Merge pull request #110 from metamx/fix-warning-display
patelh Sep 23, 2015
b30eeb8
Move check before getting topic descriptions
patelh Aug 17, 2015
8e06123
Update to check before and after
patelh Aug 23, 2015
0daafd0
Merge pull request #116 from patelh/check-before
patelh Sep 23, 2015
9d09807
Increment version
patelh Sep 23, 2015
baec2a6
Parallel offset requests in bulk
patelh Sep 24, 2015
2f7710f
Cleanup, add configs
patelh Sep 24, 2015
0f5b85e
Update more often
patelh Sep 24, 2015
bf56d01
Add producer message rate to topic list
patelh Sep 24, 2015
ea146de
Merge pull request #132 from patelh/par-prod-offset
patelh Sep 24, 2015
7e13c00
Broker Down bugfix
Aug 26, 2015
ee720e3
Fix owners list
patelh Sep 25, 2015
4492659
Bump version
patelh Sep 25, 2015
b0fb7f2
Merge pull request #133 from patelh/broker-list-interface-bugfix
patelh Sep 25, 2015
bed7558
adding colors to manual assignments
yazgoo Sep 30, 2015
4b42c55
logkafka: check if hostname is localhost
zheolong Oct 13, 2015
8ce8207
logkafka: update log collecting state
zheolong Oct 21, 2015
61b26d2
logkafka: little fix about config deleting
zheolong Oct 23, 2015
b6bc8b9
Add default logging for akka
bjoernhaeuser Oct 24, 2015
4ab38d7
logkafka: add config operations column
zheolong Oct 25, 2015
e76c2ee
Merge pull request #144 from bjoernhaeuser/logging
patelh Oct 27, 2015
629868d
Merge pull request #140 from zheolong/master
patelh Oct 27, 2015
b6db5c0
Merge pull request #136 from yazgoo/master
patelh Oct 27, 2015
56c05a4
Add hints to addCluster and updateCluster
zheolong Nov 6, 2015
3728c11
Merge pull request #151 from zheolong/master
patelh Nov 6, 2015
1f4a03a
logkafka: add config item regex_filter_pattern
zheolong Nov 12, 2015
8a858a4
logkafka: check if regex_filter_pattern is legal
zheolong Nov 13, 2015
ace56fe
logkafka: add logkafka creation test
zheolong Nov 14, 2015
b4ba8d7
logkafka: change duplicated case name
zheolong Nov 14, 2015
a84296e
Merge pull request #153 from zheolong/master
patelh Nov 14, 2015
1b46d3e
Merge remote-tracking branch 'upstream/master'
sjoerdmulder Nov 18, 2015
39e2d0d
Fixing build by excluding oauth (that contains logging)
sjoerdmulder Nov 18, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Update to check before and after
  • Loading branch information
patelh committed Sep 23, 2015

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 8e06123f73f98f232afc694e4a758c567e3be927
39 changes: 23 additions & 16 deletions app/kafka/manager/ClusterManagerActor.scala
Original file line number Diff line number Diff line change
@@ -401,17 +401,9 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)

case CMGeneratePartitionAssignments(topics, brokers) =>
implicit val ec = longRunningExecutionContext
val eventualReassignPartitions = withKafkaStateActor(KSGetReassignPartition)(identity[Option[ReassignPartitions]])
val topicCheckFuture = for {
rp <- eventualReassignPartitions
} yield {
// check if any topic undergoing reassignment got selected for reassignment
val topicsUndergoingReassignment = getTopicsUnderReassignment(rp, topics)
require(topicsUndergoingReassignment.isEmpty, "Topic(s) already undergoing reassignment(s): [%s]"
.format(topicsUndergoingReassignment.mkString(", ")))
}

val generated: Future[IndexedSeq[(String, Map[Int, Seq[Int]])]] = topicCheckFuture.flatMap { _ =>
val topicCheckFutureBefore = checkTopicsUnderAssignment(topics)

val generated: Future[IndexedSeq[(String, Map[Int, Seq[Int]])]] = topicCheckFutureBefore.flatMap { _ =>
val eventualBrokerList = withKafkaStateActor(KSGetBrokers)(identity[BrokerList])
val eventualDescriptions = withKafkaStateActor(KSGetTopicDescriptions(topics))(identity[TopicDescriptions])
for {
@@ -429,8 +421,11 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
ti.replicationFactor)))
}
}

val result: Future[IndexedSeq[Try[Unit]]] = generated.map { list =>

val result: Future[IndexedSeq[Try[Unit]]] = for {
list <- generated
_ <- checkTopicsUnderAssignment(topics) //check again
} yield {
modify {
list.map { case (topic, assignments: Map[Int, Seq[Int]]) =>
updateAssignmentInZk(topic, assignments)
@@ -579,22 +574,34 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
}
}

def getNonExistentBrokers(availableBrokers: BrokerList, selectedBrokers: Seq[Int]): Seq[Int] = {
private[this] def getNonExistentBrokers(availableBrokers: BrokerList, selectedBrokers: Seq[Int]): Seq[Int] = {
val availableBrokerIds: Set[Int] = availableBrokers.list.map(_.id.toInt).toSet
selectedBrokers filter { b: Int => !availableBrokerIds.contains(b) }
}

def getNonExistentBrokers(availableBrokers: BrokerList, assignments: Map[Int, Seq[Int]]): Seq[Int] = {
private[this] def getNonExistentBrokers(availableBrokers: BrokerList, assignments: Map[Int, Seq[Int]]): Seq[Int] = {
val brokersAssigned = assignments.flatMap({ case (pt, bl) => bl }).toSet.toSeq
getNonExistentBrokers(availableBrokers, brokersAssigned)
}

def getTopicsUnderReassignment(reassignPartitions: Option[ReassignPartitions], topicsToBeReassigned: Set[String]): Set[String] = {
private[this] def getTopicsUnderReassignment(reassignPartitions: Option[ReassignPartitions], topicsToBeReassigned: Set[String]): Set[String] = {
val topicsUnderReassignment = reassignPartitions.map { asgn =>
asgn.endTime.map(_ => Set[String]()).getOrElse{
asgn.partitionsToBeReassigned.map { case (t,s) => t.topic}.toSet
}
}.getOrElse(Set[String]())
topicsToBeReassigned.intersect(topicsUnderReassignment)
}

private[this] def checkTopicsUnderAssignment(topicsToBeReassigned: Set[String])(implicit ec: ExecutionContext) : Future[Unit] = {
val eventualReassignPartitions = withKafkaStateActor(KSGetReassignPartition)(identity[Option[ReassignPartitions]])
for {
rp <- eventualReassignPartitions
} yield {
// check if any topic undergoing reassignment got selected for reassignment
val topicsUndergoingReassignment = getTopicsUnderReassignment(rp, topicsToBeReassigned)
require(topicsUndergoingReassignment.isEmpty, "Topic(s) already undergoing reassignment(s): [%s]"
.format(topicsUndergoingReassignment.mkString(", ")))
}
}
}