Skip to content

Commit

Permalink
refactoring and first test
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Aug 23, 2018
1 parent a6f9039 commit 8efac4b
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 70 deletions.
49 changes: 49 additions & 0 deletions core/src/main/scala/com/cloudera/spark/MemoryGetter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// (c) Copyright 2018 Cloudera, Inc. All rights reserved.
package com.cloudera.spark

import java.lang.management.{BufferPoolMXBean, MemoryMXBean, MemoryPoolMXBean}

trait MemoryGetter {
def namesAndReporting: Seq[(String, PeakReporting)]
def values(dest: Array[Long], offset: Int): Unit
}

class MemoryMxBeanGetter(bean: MemoryMXBean) extends MemoryGetter {
val namesAndReporting: Seq[(String, PeakReporting)] = for {
source <- Seq("heap", "offheap")
usage <- Seq(("used", IncrementBytes), ("committed", Always))
} yield {
(source + ":" + usage._1, usage._2)
}
def values(dest: Array[Long], offset:Int): Unit = {
val heap = bean.getHeapMemoryUsage()
dest(offset) = heap.getUsed()
dest(offset + 1) = heap.getCommitted()
val offheap = bean.getNonHeapMemoryUsage()
dest(offset + 2) = offheap.getUsed()
dest(offset + 3) = offheap.getCommitted()
}
}

class PoolGetter(bean: MemoryPoolMXBean) extends MemoryGetter {
val namesAndReporting: Seq[(String, PeakReporting)] =
Seq(("used", IncrementBytes), ("committed", Always)).map { case (n, r) =>
(bean.getName() + n, r)
}
def values(dest: Array[Long], offset: Int): Unit = {
// there are actually a bunch more things here I *could* get ...
val usage = bean.getUsage()
dest(offset) = usage.getUsed()
dest(offset + 1) = usage.getCommitted()
}
}

class BufferPoolGetter(bean: BufferPoolMXBean) extends MemoryGetter {
val namesAndReporting = Seq(("capacity", IncrementBytes), ("used", IncrementBytes)).map{ case (n, r) =>
(bean.getName() + ":" + n, r)
}
def values(dest: Array[Long], offset: Int): Unit = {
dest(offset) = bean.getTotalCapacity()
dest(offset + 1) = bean.getMemoryUsed()
}
}
69 changes: 0 additions & 69 deletions core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -327,75 +327,6 @@ object MemoryMonitor {
}
}


sealed trait PeakReporting {
def report(orig: Long, update: Long): Boolean
}
case object Never extends PeakReporting {
override def report(orig: Long, update: Long): Boolean = false
}
case object Always extends PeakReporting {
override def report(orig: Long, update: Long): Boolean = update > orig
}
case object IncrementBytes extends PeakReporting {
override def report(orig: Long, update: Long): Boolean = {
val delta = update - orig
delta > 1e7.toInt && (update.toDouble / orig) > 1.05
}
}

case object IncrementCounts extends PeakReporting {
override def report(orig: Long, update: Long): Boolean = {
val delta = update - orig
delta > 100 && (update.toDouble / orig) > 1.05
}
}

trait MemoryGetter {
def namesAndReporting: Seq[(String, PeakReporting)]
def values(dest: Array[Long], offset: Int): Unit
}

class MemoryMxBeanGetter(bean: MemoryMXBean) extends MemoryGetter {
val namesAndReporting: Seq[(String, PeakReporting)] = for {
source <- Seq("heap", "offheap")
usage <- Seq(("used", IncrementBytes), ("committed", Always))
} yield {
(source + ":" + usage._1, usage._2)
}
def values(dest: Array[Long], offset:Int): Unit = {
val heap = bean.getHeapMemoryUsage()
dest(offset) = heap.getUsed()
dest(offset + 1) = heap.getCommitted()
val offheap = bean.getNonHeapMemoryUsage()
dest(offset + 2) = offheap.getUsed()
dest(offset + 3) = offheap.getCommitted()
}
}

class PoolGetter(bean: MemoryPoolMXBean) extends MemoryGetter {
val namesAndReporting: Seq[(String, PeakReporting)] =
Seq(("used", IncrementBytes), ("committed", Always)).map { case (n, r) =>
(bean.getName() + n, r)
}
def values(dest: Array[Long], offset: Int): Unit = {
// there are actually a bunch more things here I *could* get ...
val usage = bean.getUsage()
dest(offset) = usage.getUsed()
dest(offset + 1) = usage.getCommitted()
}
}

class BufferPoolGetter(bean: BufferPoolMXBean) extends MemoryGetter {
val namesAndReporting = Seq(("capacity", IncrementBytes), ("used", IncrementBytes)).map{ case (n, r) =>
(bean.getName() + ":" + n, r)
}
def values(dest: Array[Long], offset: Int): Unit = {
dest(offset) = bean.getTotalCapacity()
dest(offset + 1) = bean.getMemoryUsed()
}
}

class MemoryMonitorExecutorExtension extends ExecutorPlugin {
// the "extension class" api just lets you invoke a constructor. We really just want to
// call this static method, so that's good enough.
Expand Down
25 changes: 25 additions & 0 deletions core/src/main/scala/com/cloudera/spark/PeakReporting.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// (c) Copyright 2018 Cloudera, Inc. All rights reserved.
package com.cloudera.spark

sealed trait PeakReporting {
def report(orig: Long, update: Long): Boolean
}
case object Never extends PeakReporting {
override def report(orig: Long, update: Long): Boolean = false
}
case object Always extends PeakReporting {
override def report(orig: Long, update: Long): Boolean = update > orig
}
case object IncrementBytes extends PeakReporting {
override def report(orig: Long, update: Long): Boolean = {
val delta = update - orig
delta > 1e7.toInt && (update.toDouble / orig) > 1.05
}
}

case object IncrementCounts extends PeakReporting {
override def report(orig: Long, update: Long): Boolean = {
val delta = update - orig
delta > 100 && (update.toDouble / orig) > 1.05
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class SparkNettyMemoryHandle(

object SparkNettyMemoryHandle {

def get(displayError: Boolean = false): Option[SparkNettyMemoryHandle] = try {
def get(displayError: Boolean = false): Option[SparkNettyMemoryHandle] = {
Option(SparkEnv.get).map { env =>
new SparkNettyMemoryHandle(
getRpcClientPooledAllocator(env).metric,
Expand Down
23 changes: 23 additions & 0 deletions core/src/test/scala/com/cloudera/spark/PeakReportingSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// (c) Copyright 2018 Cloudera, Inc. All rights reserved.
package com.cloudera.spark

import org.scalatest.FunSuite

class PeakReportingSuite extends FunSuite {

test("increment bytes") {
// delta over 1e7, and 5% increase
assert(IncrementBytes.report(1e9.toLong, 1.051e9.toLong))
// delta over 1e7, but less than 5% increase
assert(!IncrementBytes.report(1e9.toLong, 1.049e9.toLong))

//5% increase, but below overall threshold
assert(!IncrementBytes.report(1e7.toLong, 1.05e7.toLong))
assert(!IncrementBytes.report(1e7.toLong, 1.9e7.toLong))
assert(!IncrementBytes.report(1e6.toLong, 1e7.toLong))

// increase from small starting point OK
assert(IncrementBytes.report(0, 1.001e7.toLong))

}
}

0 comments on commit 8efac4b

Please sign in to comment.