Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ object AddPartitionsCommand extends Logging {
"broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...")
.ofType(classOf[String])
.defaultsTo("")
val rackReplicationOpt = parser.accepts("max-rack-replication", "maximum replicas assigned to a single rack")
.withRequiredArg
.describedAs("max # of replicas per rack")
.ofType(classOf[java.lang.Integer])
.defaultsTo(-1)

val options = parser.parse(args : _*)

Expand All @@ -62,10 +67,11 @@ object AddPartitionsCommand extends Logging {
val zkConnect = options.valueOf(zkConnectOpt)
val nPartitions = options.valueOf(nPartitionsOpt).intValue
val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt)
val rackReplication = options.valueOf(rackReplicationOpt).intValue
var zkClient: ZkClient = null
try {
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr, rackReplication)
println("adding partitions succeeded!")
} catch {
case e: Throwable =>
Expand All @@ -77,7 +83,7 @@ object AddPartitionsCommand extends Logging {
}
}

def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "") {
def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "", rackReplication: Int = -1) {
val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
if (existingPartitionsReplicaList.size == 0)
throw new AdministrationException("The topic %s does not exist".format(topic))
Expand All @@ -87,7 +93,7 @@ object AddPartitionsCommand extends Logging {
// create the new partition replication list
val brokerList = ZkUtils.getSortedBrokerList(zkClient)
val newPartitionReplicaList = if (replicaAssignmentStr == "")
AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size)
AdminUtils.assignReplicasToBrokers(zkClient, brokerList, numPartitions, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size, rackReplication)
else
getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size)

Expand Down
59 changes: 48 additions & 11 deletions core/src/main/scala/kafka/admin/AdminUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ object AdminUtils extends Logging {
* p3 p4 p0 p1 p2 (3nd replica)
* p7 p8 p9 p5 p6 (3nd replica)
*/
def assignReplicasToBrokers(brokerList: Seq[Int], nPartitions: Int, replicationFactor: Int,
fixedStartIndex: Int = -1, startPartitionId: Int = -1)
def assignReplicasToBrokers(zkClient: ZkClient, brokerList: Seq[Int], nPartitions: Int, replicationFactor: Int,
fixedStartIndex: Int = -1, startPartitionId: Int = -1, maxReplicaPerRack: Int = -1)
: Map[Int, Seq[Int]] = {
if (nPartitions <= 0)
throw new AdministrationException("number of partitions must be larger than 0")
Expand All @@ -62,15 +62,52 @@ object AdminUtils extends Logging {
var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0

var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
for (i <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0))
nextReplicaShift += 1
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size
var replicaList = List(brokerList(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size))
ret.put(currentPartitionId, replicaList.reverse)
currentPartitionId = currentPartitionId + 1
if (maxReplicaPerRack <= 0) {
for (i <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0))
nextReplicaShift += 1
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size
var replicaList = List(brokerList(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size))
ret.put(currentPartitionId, replicaList.reverse)
currentPartitionId = currentPartitionId + 1
}
} else {
val brokerToRackMap: Map[Int, Int] = brokerList.map(brokerId => (brokerId -> (ZkUtils.getBrokerInfo(zkClient, brokerId) match {
case Some(broker) => broker.rack
case None => throw new AdministrationException("broker " + brokerId + " must have rack id")
}) )).toMap
for (i <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0))
nextReplicaShift += 1
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size
var replicaList = List(brokerList(firstReplicaIndex))
var rackReplicaCount: mutable.Map[Int, Int] = mutable.Map(brokerToRackMap(brokerList(firstReplicaIndex)) -> 1)
var k = 0
for (j <- 0 until replicationFactor - 1) {
var done = false;
while (!done && k < brokerList.size) {
val broker = brokerList(getWrappedIndex(firstReplicaIndex, nextReplicaShift, k, brokerList.size))
val rack = brokerToRackMap(broker)
if (!(rackReplicaCount contains rack)) {
replicaList ::= broker
rackReplicaCount += (rack -> 1)
done = true;
} else if (rackReplicaCount(rack) < maxReplicaPerRack) {
rackReplicaCount(rack) = rackReplicaCount(rack) + 1
replicaList ::= broker
done = true;
}
k = k + 1
}
if (!done) {
throw new AdministrationException("not enough brokers available in unique racks to meet maxReplicaPerRack limit of " + maxReplicaPerRack)
}
}
ret.put(currentPartitionId, replicaList.reverse)
currentPartitionId = currentPartitionId + 1
}
}
ret.toMap
}
Expand Down
12 changes: 9 additions & 3 deletions core/src/main/scala/kafka/admin/CreateTopicCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ object CreateTopicCommand extends Logging {
"broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...")
.ofType(classOf[String])
.defaultsTo("")
val rackReplicationOpt = parser.accepts("max-rack-replication", "maximum replicas assigned to a single rack")
.withRequiredArg
.describedAs("max # of replicas per rack")
.ofType(classOf[java.lang.Integer])
.defaultsTo(-1)

val options = parser.parse(args : _*)

Expand All @@ -68,10 +73,11 @@ object CreateTopicCommand extends Logging {
val nPartitions = options.valueOf(nPartitionsOpt).intValue
val replicationFactor = options.valueOf(replicationFactorOpt).intValue
val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt)
val rackReplication = options.valueOf(rackReplicationOpt).intValue
var zkClient: ZkClient = null
try {
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr)
createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr, rackReplication)
println("creation succeeded!")
} catch {
case e: Throwable =>
Expand All @@ -83,13 +89,13 @@ object CreateTopicCommand extends Logging {
}
}

def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "") {
def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "", rackReplication: Int = -1) {
Topic.validate(topic)

val brokerList = ZkUtils.getSortedBrokerList(zkClient)

val partitionReplicaAssignment = if (replicaAssignmentStr == "")
AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor)
AdminUtils.assignReplicasToBrokers(zkClient, brokerList, numPartitions, replicationFactor, -1, -1, rackReplication)
else
getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
debug("Replica assignment list for %s is %s".format(topic, partitionReplicaAssignment))
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ object ReassignPartitionsCommand extends Logging {
.describedAs("partition reassignment json file path")
.ofType(classOf[String])

val rackReplicationOpt = parser.accepts("max-rack-replication", "maximum replicas assigned to a single rack")
.withRequiredArg
.describedAs("max # of replicas per rack")
.ofType(classOf[java.lang.Integer])
.defaultsTo(-1)

val options = parser.parse(args : _*)

for(arg <- List(zkConnectOpt)) {
Expand Down Expand Up @@ -110,11 +116,12 @@ object ReassignPartitionsCommand extends Logging {
val brokerListToReassign = brokerList.split(',') map (_.toInt)
val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString)
val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign)
val rackReplication = options.valueOf(rackReplicationOpt).intValue

val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic)
groupedByTopic.foreach { topicInfo =>
val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size,
topicInfo._2.head._2.size)
val assignedReplicas = AdminUtils.assignReplicasToBrokers(zkClient, brokerListToReassign, topicInfo._2.size,
topicInfo._2.head._2.size, -1, -1, rackReplication)
partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2))
}

Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/client/ClientUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ object ClientUtils extends Logging{
}

/**
* Parse a list of broker urls in the form host1:port1, host2:port2, ...
* Parse a list of broker urls in the form host1:port1:rack1, host2:port2:rack2, ...
*/
def parseBrokerList(brokerListStr: String): Seq[Broker] = {
val brokersStr = Utils.parseCsvList(brokerListStr)
Expand All @@ -100,7 +100,8 @@ object ClientUtils extends Logging{
val brokerInfos = brokerStr.split(":")
val hostName = brokerInfos(0)
val port = brokerInfos(1).toInt
new Broker(brokerId, hostName, port)
val rack = brokerInfos(2).toInt
new Broker(brokerId, hostName, port, rack)
})
}

Expand Down
19 changes: 11 additions & 8 deletions core/src/main/scala/kafka/cluster/Broker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ private[kafka] object Broker {
val brokerInfo = m.asInstanceOf[Map[String, Any]]
val host = brokerInfo.get("host").get.asInstanceOf[String]
val port = brokerInfo.get("port").get.asInstanceOf[Int]
new Broker(id, host, port)
val rack = brokerInfo.get("rack").get.asInstanceOf[Int]
new Broker(id, host, port, rack)
case None =>
throw new BrokerNotAvailableException("Broker id %d does not exist".format(id))
}
Expand All @@ -50,32 +51,34 @@ private[kafka] object Broker {
val id = buffer.getInt
val host = readShortString(buffer)
val port = buffer.getInt
new Broker(id, host, port)
val rack = buffer.getInt
new Broker(id, host, port, rack)
}
}

private[kafka] case class Broker(val id: Int, val host: String, val port: Int) {
private[kafka] case class Broker(val id: Int, val host: String, val port: Int, val rack: Int) {

override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" + port)
override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" + port + ",rack:" + rack)

def getConnectionString(): String = host + ":" + port
def getConnectionString(): String = host + ":" + port + ":" + rack

def writeTo(buffer: ByteBuffer) {
buffer.putInt(id)
writeShortString(buffer, host)
buffer.putInt(port)
buffer.putInt(rack)
}

def sizeInBytes: Int = shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/
def sizeInBytes: Int = shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/ + 4 /* rack id*/

override def equals(obj: Any): Boolean = {
obj match {
case null => false
case n: Broker => id == n.id && host == n.host && port == n.port
case n: Broker => id == n.id && host == n.host && port == n.port && rack == n.rack
case _ => false
}
}

override def hashCode(): Int = hashcode(id, host, port)
override def hashCode(): Int = hashcode(id, host, port, rack)

}
3 changes: 3 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* the broker id for this server */
val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue))

/* the rack id for this server */
val rackId: Int = props.getIntInRange("broker.rack", (0, Int.MaxValue))

/* the maximum size of message that the server can receive */
val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000 + MessageSet.LogOverhead, (0, Int.MaxValue))

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaZooKeeper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class KafkaZooKeeper(config: KafkaConfig) extends Logging {
else
config.hostName
val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port, config.zkSessionTimeoutMs, jmxPort)
ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port, config.rackId, config.zkSessionTimeoutMs, jmxPort)
}

/**
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/utils/ZkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,14 @@ object ZkUtils extends Logging {
replicas.contains(brokerId.toString)
}

def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, timeout: Int, jmxPort: Int) {
def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, rack: Int, timeout: Int, jmxPort: Int) {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
val timestamp = "\"" + SystemTime.milliseconds.toString + "\""
val brokerInfo =
Utils.mergeJsonFields(Utils.mapToJsonFields(Map("host" -> host), valueInQuotes = true) ++
Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString, "timestamp" -> timestamp),
Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString, "rack" -> rack.toString, "timestamp" -> timestamp),
valueInQuotes = false))
val expectedBroker = new Broker(id, host, port)
val expectedBroker = new Broker(id, host, port, rack)

try {
createEphemeralPathExpectConflictHandleZKBug(zkClient, brokerIdPath, brokerInfo, expectedBroker,
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/other/kafka/TestLogPerformance.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object TestLogPerformance {
val messageSize = args(1).toInt
val batchSize = args(2).toInt
val compressionCodec = CompressionCodec.getCompressionCodec(args(3).toInt)
val props = TestUtils.createBrokerConfig(0, -1)
val props = TestUtils.createBrokerConfig(0, -1, 1)
val config = new KafkaConfig(props)
val dir = TestUtils.tempDir()
val log = new Log(dir, 50*1024*1024, config.messageMaxBytes, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, time = SystemTime)
Expand Down
15 changes: 10 additions & 5 deletions core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,15 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
val port3 = TestUtils.choosePort()
val port4 = TestUtils.choosePort()

val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3)
val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4)
val rack1 = 1
val rack2 = 2
val rack3 = 3
val rack4 = 4

val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, rack1)
val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, rack2)
val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3, rack3)
val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4, rack4)

var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
var brokers: Seq[Broker] = Seq.empty[Broker]
Expand All @@ -61,7 +66,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
val server4 = TestUtils.createServer(new KafkaConfig(configProps4))

servers ++= List(server1, server2, server3, server4)
brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port))
brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port, s.config.rackId))

// create topics with 1 partition, 2 replicas, one on each broker
CreateTopicCommand.createTopic(zkClient, topic1, 1, 2, "0:1")
Expand Down
9 changes: 4 additions & 5 deletions core/src/test/scala/unit/kafka/admin/AdminTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import kafka.server.KafkaConfig
import kafka.utils.{Logging, ZkUtils, TestUtils}
import kafka.common.{TopicExistsException, ErrorMapping, TopicAndPartition}


class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {

@Test
Expand All @@ -33,7 +32,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {

// test 0 replication factor
try {
AdminUtils.assignReplicasToBrokers(brokerList, 10, 0)
AdminUtils.assignReplicasToBrokers(zkClient, brokerList, 10, 0)
fail("shouldn't allow replication factor 0")
}
catch {
Expand All @@ -43,7 +42,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {

// test wrong replication factor
try {
AdminUtils.assignReplicasToBrokers(brokerList, 10, 6)
AdminUtils.assignReplicasToBrokers(zkClient, brokerList, 10, 6)
fail("shouldn't allow replication factor larger than # of brokers")
}
catch {
Expand All @@ -66,7 +65,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
9 -> List(4, 1, 2)
)

val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0)
val actualAssignment = AdminUtils.assignReplicasToBrokers(zkClient, brokerList, 10, 3, 0)
val e = (expectedAssignment.toList == actualAssignment.toList)
assertTrue(expectedAssignment.toList == actualAssignment.toList)
}
Expand Down Expand Up @@ -155,7 +154,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
11 -> 1
)
val topic = "test"
TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
TestUtils.createBrokersInZk(zkClient, List((0, 0), (1, 1), (2, 2), (3, 3), (4, 4)))
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
// create leaders for all partitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ object SerializationTestUtils{
TopicAndPartition(topic2, 3) -> PartitionFetchInfo(4000, 100)
)

private val brokers = List(new Broker(0, "localhost", 1011), new Broker(1, "localhost", 1012), new Broker(2, "localhost", 1013))
private val brokers = List(new Broker(0, "localhost", 1011, 1), new Broker(1, "localhost", 1012, 2), new Broker(2, "localhost", 1013, 3))
private val partitionMetaData0 = new PartitionMetadata(0, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 0)
private val partitionMetaData1 = new PartitionMetadata(1, Some(brokers.head), replicas = brokers, isr = brokers.tail, errorCode = 1)
private val partitionMetaData2 = new PartitionMetadata(2, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 2)
Expand Down
Loading