Skip to content

Commit

Permalink
Merge pull request #103 from mesos/role_specific_resources
Browse files Browse the repository at this point in the history
role_specific_resources
  • Loading branch information
joestein committed Aug 19, 2015
2 parents 8918971 + df9bd94 commit 64b5a1d
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 133 deletions.
130 changes: 105 additions & 25 deletions src/scala/ly/stealth/mesos/kafka/Broker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package ly.stealth.mesos.kafka

import java.util
import scala.collection.JavaConversions._
import scala.util.parsing.json.JSONObject
import scala.collection
import org.apache.mesos.Protos.{Resource, Offer}
import java.util.{TimeZone, Collections, Date, UUID}
import org.apache.mesos.Protos.{Value, Resource, Offer}
import java.util._
import ly.stealth.mesos.kafka.Broker.{Stickiness, Failover}
import ly.stealth.mesos.kafka.Util.{BindAddress, Period, Range, Str}
import java.text.SimpleDateFormat
import scala.List
import scala.collection.Map
import scala.util.parsing.json.JSONObject

class Broker(_id: String = "0") {
var id: String = _id
Expand Down Expand Up @@ -64,19 +66,10 @@ class Broker(_id: String = "0") {

def matches(offer: Offer, otherAttributes: Broker.OtherAttributes = Broker.NoAttributes): String = {
// check resources
val offerResources = new util.HashMap[String, Resource]()
for (resource <- offer.getResourcesList) offerResources.put(resource.getName, resource)

val port = getSuitablePort(offer)
if (port == -1) return "no suitable port"

val cpusResource = offerResources.get("cpus")
if (cpusResource == null) return "no cpus"
if (cpusResource.getScalar.getValue < cpus) return s"cpus ${cpusResource.getScalar.getValue} < $cpus"

val memResource = offerResources.get("mem")
if (memResource == null) return "no mem"
if (memResource.getScalar.getValue < mem) return s"mem ${memResource.getScalar.getValue.toLong} < $mem"
val reservation: Broker.Reservation = getReservation(offer)
if (reservation.cpus < cpus) return s"cpus < $cpus"
if (reservation.mem < mem) return s"mem < $mem"
if (reservation.port == -1) return "no suitable port"

// check attributes
val offerAttributes = new util.HashMap[String, String]()
Expand All @@ -93,21 +86,65 @@ class Broker(_id: String = "0") {
null
}

def getSuitablePort(offer: Offer): Int = {
val portsResource = offer.getResourcesList.find(_.getName == "ports").getOrElse(null)
if (portsResource == null) return -1
def getReservation(offer: Offer): Broker.Reservation = {
var sharedCpus: Double = 0
var roleCpus: Double = 0
var reservedSharedCpus: Double = 0
var reservedRoleCpus: Double = 0

var sharedMem: Long = 0
var roleMem: Long = 0
var reservedSharedMem: Long = 0
var reservedRoleMem: Long = 0

val sharedPorts: util.List[Range] = new util.ArrayList[Range]()
val rolePorts: util.List[Range] = new util.ArrayList[Range]()
var reservedSharedPort: Long = -1
var reservedRolePort: Long = -1

var role: String = null

for (resource <- offer.getResourcesList) {
if (resource.getRole == "*") {
if (resource.getName == "cpus") sharedCpus = resource.getScalar.getValue
if (resource.getName == "mem") sharedMem = resource.getScalar.getValue.toLong
if (resource.getName == "ports") sharedPorts.addAll(resource.getRanges.getRangeList.map(r => new Range(r.getBegin.toInt, r.getEnd.toInt)))
} else {
if (role != null && role != resource.getRole)
throw new IllegalArgumentException(s"Offer contains 2 non-default roles: $role, ${resource.getRole}")

role = resource.getRole
if (resource.getName == "cpus") roleCpus = resource.getScalar.getValue
if (resource.getName == "mem") roleMem = resource.getScalar.getValue.toLong
if (resource.getName == "ports") rolePorts.addAll(resource.getRanges.getRangeList.map(r => new Range(r.getBegin.toInt, r.getEnd.toInt)))
}
}

reservedRoleCpus = Math.min(cpus, roleCpus)
reservedSharedCpus = Math.min(cpus - reservedRoleCpus, sharedCpus)

reservedRoleMem = Math.min(mem, roleMem)
reservedSharedMem = Math.min(mem - reservedRoleMem, sharedMem)

val ports = portsResource.getRanges.getRangeList.map(r => new Range(r.getBegin.toInt, r.getEnd.toInt))
reservedRolePort = getSuitablePort(rolePorts)
if (reservedRolePort == -1)
reservedSharedPort = getSuitablePort(sharedPorts)

new Broker.Reservation(role, reservedSharedCpus, reservedRoleCpus, reservedSharedMem, reservedRoleMem, reservedSharedPort, reservedRolePort)
}

private[kafka] def getSuitablePort(ports: util.List[Range]): Int = {
if (ports.isEmpty) return -1

val ports_ = ports.sortBy(r => r.start)
if (port == null)
return ports.get(0).start
return ports_.get(0).start

for (range <- ports) {
for (range <- ports_) {
val overlap = range.overlap(port)
if (overlap != null)
return overlap.start
}
if (overlap != null)
return overlap.start
}

-1
}
Expand Down Expand Up @@ -408,6 +445,49 @@ object Broker {

override def toString: String = hostname + ":" + port
}

class Reservation(
_role: String = null,
_sharedCpus: Double = 0.0, _roleCpus: Double = 0.0,
_sharedMem: Long = 0, _roleMem: Long = 0,
_sharedPort: Long = -1, _rolePort: Long = -1
) {
val role: String = _role

val sharedCpus: Double = _sharedCpus
val roleCpus: Double = _roleCpus
def cpus: Double = sharedCpus + roleCpus

val sharedMem: Long = _sharedMem
val roleMem: Long = _roleMem
def mem: Long = sharedMem + roleMem

val sharedPort: Long = _sharedPort
val rolePort: Long = _rolePort
def port: Long = if (rolePort != -1) rolePort else sharedPort

def toResources: util.List[Resource] = {
def cpus(value: Double, role: String): Resource = Resource.newBuilder.setName("cpus").setType(Value.Type.SCALAR).setScalar(Value.Scalar.newBuilder.setValue(value)).setRole(role).build()
def mem(value: Long, role: String): Resource = Resource.newBuilder.setName("mem").setType(Value.Type.SCALAR).setScalar(Value.Scalar.newBuilder.setValue(value)).setRole(role).build()
def port(value: Long, role: String): Resource =
Resource.newBuilder.setName("ports").setType(Value.Type.RANGES).setRanges(
Value.Ranges.newBuilder.addRange(Value.Range.newBuilder().setBegin(value).setEnd(value))
).setRole(role).build()

val resources: util.List[Resource] = new util.ArrayList[Resource]()

if (sharedCpus > 0) resources.add(cpus(sharedCpus, "*"))
if (roleCpus > 0) resources.add(cpus(roleCpus, role))

if (sharedMem > 0) resources.add(mem(sharedMem, "*"))
if (roleMem > 0) resources.add(mem(roleMem, role))

if (sharedPort != -1) resources.add(port(sharedPort, "*"))
if (rolePort != -1) resources.add(port(rolePort, role))

resources
}
}

object State {
val STOPPED = "stopped"
Expand Down
18 changes: 5 additions & 13 deletions src/scala/ly/stealth/mesos/kafka/Scheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ object Scheduler extends org.apache.mesos.Scheduler {
.build()
}

private[kafka] def newTask(broker: Broker, offer: Offer, port: Int): TaskInfo = {
private[kafka] def newTask(broker: Broker, offer: Offer, reservation: Broker.Reservation): TaskInfo = {
def taskData: ByteString = {
val defaults: Map[String, String] = Map(
"broker.id" -> broker.id,
"port" -> ("" + port),
"port" -> ("" + reservation.port),
"log.dirs" -> "kafka-logs",
"log.retention.bytes" -> ("" + 10l * 1024 * 1024 * 1024),

Expand All @@ -83,13 +83,7 @@ object Scheduler extends org.apache.mesos.Scheduler {
.setData(taskData)
.setExecutor(newExecutor(broker))

taskBuilder
.addResources(Resource.newBuilder.setName("cpus").setType(Value.Type.SCALAR).setScalar(Value.Scalar.newBuilder.setValue(broker.cpus)))
.addResources(Resource.newBuilder.setName("mem").setType(Value.Type.SCALAR).setScalar(Value.Scalar.newBuilder.setValue(broker.mem)))
.addResources(Resource.newBuilder.setName("ports").setType(Value.Type.RANGES).setRanges(
Value.Ranges.newBuilder.addRange(Value.Range.newBuilder().setBegin(port).setEnd(port)))
)

taskBuilder.addAllResources(reservation.toResources)
taskBuilder.build
}

Expand Down Expand Up @@ -245,10 +239,8 @@ object Scheduler extends org.apache.mesos.Scheduler {
private def isReconciling: Boolean = cluster.getBrokers.exists(b => b.task != null && b.task.reconciling)

private[kafka] def launchTask(broker: Broker, offer: Offer): Unit = {
val port = broker.getSuitablePort(offer)
if (port == -1) throw new IllegalStateException("no suitable port")

val task_ = newTask(broker, offer, port)
val reservation = broker.getReservation(offer)
val task_ = newTask(broker, offer, reservation)
val id = task_.getTaskId.getValue

val attributes = new util.LinkedHashMap[String, String]()
Expand Down
5 changes: 4 additions & 1 deletion src/scala/ly/stealth/mesos/kafka/Util.scala
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,10 @@ object Util {
val order: util.List[String] = "cpus mem disk ports".split(" ").toList
for (resource <- resources.sortBy(r => order.indexOf(r.getName))) {
if (!s.isEmpty) s += " "
s += resource.getName + ":"

s += resource.getName
if (resource.getRole != "*") s += "(" + resource.getRole + ")"
s += ":"

if (resource.hasScalar)
s += "%.2f".format(resource.getScalar.getValue)
Expand Down
127 changes: 95 additions & 32 deletions src/test/ly/stealth/mesos/kafka/BrokerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ package ly.stealth.mesos.kafka
import org.junit.{Before, Test}
import org.junit.Assert._
import ly.stealth.mesos.kafka.Util.{BindAddress, Period, parseMap}
import java.util.Date
import java.util.{Collections, Date}
import scala.collection.JavaConversions._
import ly.stealth.mesos.kafka.Broker.{Endpoint, Stickiness, State, Task, Failover}
import java.util

class BrokerTest extends MesosTestCase {
var broker: Broker = null
Expand Down Expand Up @@ -57,44 +58,19 @@ class BrokerTest extends MesosTestCase {
def matches {
// cpus
broker.cpus = 0.5
assertNull(broker.matches(offer(cpus = 0.5)))
assertEquals("cpus 0.49 < 0.5", broker.matches(offer(cpus = 0.49)))
assertNull(broker.matches(offer(resources = "cpus:0.2,cpus(role):0.3,ports:1000")))
assertEquals("cpus < 0.5", broker.matches(offer(resources = "cpus:0.2,cpus(role):0.2")))
broker.cpus = 0

// mem
broker.mem = 100
assertNull(broker.matches(offer(mem = 100)))
assertEquals("mem 99 < 100", broker.matches(offer(mem = 99)))
assertNull(broker.matches(offer(resources = "mem:70,mem(role):30,ports:1000")))
assertEquals("mem < 100", broker.matches(offer(resources = "mem:70,mem(role):29")))
broker.mem = 0

// port
assertNull(broker.matches(offer(ports = "100")))
assertEquals("no suitable port", broker.matches(offer(ports = "")))
}

@Test
def getSuitablePort {
// no port restrictions
assertEquals(-1, broker.getSuitablePort(offer(ports = "")))
assertEquals(100, broker.getSuitablePort(offer(ports = "100..100")))
assertEquals(100, broker.getSuitablePort(offer(ports = "100..200")))

// single port restriction
broker.port = new Util.Range(92)
assertEquals(-1, broker.getSuitablePort(offer(ports = "0..91")))
assertEquals(-1, broker.getSuitablePort(offer(ports = "93..100")))
assertEquals(92, broker.getSuitablePort(offer(ports = "90..100")))

// port range restriction
broker.port = new Util.Range("92..100")
assertEquals(-1, broker.getSuitablePort(offer(ports = "0..91")))
assertEquals(-1, broker.getSuitablePort(offer(ports = "101..200")))
assertEquals(92, broker.getSuitablePort(offer(ports = "0..100")))
assertEquals(92, broker.getSuitablePort(offer(ports = "0..92")))

assertEquals(100, broker.getSuitablePort(offer(ports = "100..200")))
assertEquals(95, broker.getSuitablePort(offer(ports = "0..90,95..96,101..200")))
assertEquals(96, broker.getSuitablePort(offer(ports = "0..90,96,101..200")))
assertNull(broker.matches(offer(resources = "ports:1000")))
assertEquals("no suitable port", broker.matches(offer(resources = "")))
}

@Test
Expand Down Expand Up @@ -141,6 +117,77 @@ class BrokerTest extends MesosTestCase {
assertEquals("rack doesn't match groupBy", broker.matches(offer(attributes = "rack=2"), _ => Array("1")))
}

@Test
def getReservations {
broker.cpus = 2
broker.mem = 100

// shared resources
var reservation = broker.getReservation(offer(resources = "cpus:3, mem:200, ports:1000..2000"))
assertEquals(resources("cpus:2, mem:100, ports:1000"), reservation.toResources)

// role resources
reservation = broker.getReservation(offer(resources = "cpus(role):3, mem(role):200, ports(role):1000..2000"))
assertEquals(resources("cpus(role):2, mem(role):100, ports(role):1000"), reservation.toResources)

// mixed resources
reservation = broker.getReservation(offer(resources = "cpus:2, cpus(role):1, mem:100, mem(role):99, ports:1000..2000, ports(role):3000"))
assertEquals(resources("cpus:1, cpus(role):1, mem:1, mem(role):99, ports(role):3000"), reservation.toResources)

// not enough resources
reservation = broker.getReservation(offer(resources = "cpus:0.5, cpus(role):0.5, mem:1, mem(role):1, ports:1000"))
assertEquals(resources("cpus:0.5, cpus(role):0.5, mem:1, mem(role):1, ports:1000"), reservation.toResources)

// no port
reservation = broker.getReservation(offer(resources = ""))
assertEquals(-1, reservation.port)

// two non-default roles
try {
broker.getReservation(offer(resources = "cpus(r1):0.5,mem(r2):100"))
fail()
} catch {
case e: IllegalArgumentException =>
val m: String = e.getMessage
assertTrue(m, m.contains("r1") && m.contains("r2"))
}
}


@Test
def getSuitablePort {
def ranges(s: String): util.List[Util.Range] = {
if (s.isEmpty) return Collections.emptyList()
s.split(",").toList.map(s => new Util.Range(s.trim))
}

// no port restrictions
assertEquals(-1, broker.getSuitablePort(ranges("")))
assertEquals(100, broker.getSuitablePort(ranges("100..100")))
assertEquals(100, broker.getSuitablePort(ranges("100..200")))

// order
assertEquals(10, broker.getSuitablePort(ranges("30,10,20,40")))
assertEquals(50, broker.getSuitablePort(ranges("100..200, 50..60")))

// single port restriction
broker.port = new Util.Range(92)
assertEquals(-1, broker.getSuitablePort(ranges("0..91")))
assertEquals(-1, broker.getSuitablePort(ranges("93..100")))
assertEquals(92, broker.getSuitablePort(ranges("90..100")))

// port range restriction
broker.port = new Util.Range("92..100")
assertEquals(-1, broker.getSuitablePort(ranges("0..91")))
assertEquals(-1, broker.getSuitablePort(ranges("101..200")))
assertEquals(92, broker.getSuitablePort(ranges("0..100")))
assertEquals(92, broker.getSuitablePort(ranges("0..92")))

assertEquals(100, broker.getSuitablePort(ranges("100..200")))
assertEquals(95, broker.getSuitablePort(ranges("0..90,95..96,101..200")))
assertEquals(96, broker.getSuitablePort(ranges("0..90,96,101..200")))
}

@Test
def shouldStart {
val host = "host"
Expand Down Expand Up @@ -266,6 +313,22 @@ class BrokerTest extends MesosTestCase {
assertEquals("0", Broker.idFromTaskId(Broker.nextTaskId(new Broker("0"))))
assertEquals("100", Broker.idFromTaskId(Broker.nextTaskId(new Broker("100"))))
}

// Reservation
@Test
def Reservation_toResources {
// shared
var reservation = new Broker.Reservation(null, _sharedCpus = 0.5, _sharedMem = 100, _sharedPort = 1000)
assertEquals(resources("cpus:0.5, mem:100, ports:1000"), reservation.toResources)

// role
reservation = new Broker.Reservation("role", _roleCpus = 0.5, _roleMem = 100, _rolePort = 1000)
assertEquals(resources("cpus(role):0.5, mem(role):100, ports(role):1000"), reservation.toResources)

// shared + role
reservation = new Broker.Reservation("role", _sharedCpus = 0.3, _roleCpus = 0.7, _sharedMem = 50, _roleMem = 100, _sharedPort = 1000, _rolePort = 2000)
assertEquals(resources("cpus:0.3, cpus(role):0.7, mem:50, mem(role):100, ports:1000, ports(role):2000"), reservation.toResources)
}

// Stickiness
@Test
Expand Down
Loading

0 comments on commit 64b5a1d

Please sign in to comment.