From 8efac4befd50eca182dbbe596497df3050d93053 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 22 Aug 2018 21:48:25 -0500 Subject: [PATCH] refactoring and first test --- .../com/cloudera/spark/MemoryGetter.scala | 49 +++++++++++++ .../com/cloudera/spark/MemoryMonitor.scala | 69 ------------------- .../com/cloudera/spark/PeakReporting.scala | 25 +++++++ .../spark/SparkNettyMemoryHandle.scala | 2 +- .../cloudera/spark/PeakReportingSuite.scala | 23 +++++++ 5 files changed, 98 insertions(+), 70 deletions(-) create mode 100644 core/src/main/scala/com/cloudera/spark/MemoryGetter.scala create mode 100644 core/src/main/scala/com/cloudera/spark/PeakReporting.scala create mode 100644 core/src/test/scala/com/cloudera/spark/PeakReportingSuite.scala diff --git a/core/src/main/scala/com/cloudera/spark/MemoryGetter.scala b/core/src/main/scala/com/cloudera/spark/MemoryGetter.scala new file mode 100644 index 0000000..62e78af --- /dev/null +++ b/core/src/main/scala/com/cloudera/spark/MemoryGetter.scala @@ -0,0 +1,49 @@ +// (c) Copyright 2018 Cloudera, Inc. All rights reserved. +package com.cloudera.spark + +import java.lang.management.{BufferPoolMXBean, MemoryMXBean, MemoryPoolMXBean} + +trait MemoryGetter { + def namesAndReporting: Seq[(String, PeakReporting)] + def values(dest: Array[Long], offset: Int): Unit +} + +class MemoryMxBeanGetter(bean: MemoryMXBean) extends MemoryGetter { + val namesAndReporting: Seq[(String, PeakReporting)] = for { + source <- Seq("heap", "offheap") + usage <- Seq(("used", IncrementBytes), ("committed", Always)) + } yield { + (source + ":" + usage._1, usage._2) + } + def values(dest: Array[Long], offset:Int): Unit = { + val heap = bean.getHeapMemoryUsage() + dest(offset) = heap.getUsed() + dest(offset + 1) = heap.getCommitted() + val offheap = bean.getNonHeapMemoryUsage() + dest(offset + 2) = offheap.getUsed() + dest(offset + 3) = offheap.getCommitted() + } +} + +class PoolGetter(bean: MemoryPoolMXBean) extends MemoryGetter { + val namesAndReporting: Seq[(String, PeakReporting)] = + Seq(("used", IncrementBytes), ("committed", Always)).map { case (n, r) => + (bean.getName() + n, r) + } + def values(dest: Array[Long], offset: Int): Unit = { + // there are actually a bunch more things here I *could* get ... + val usage = bean.getUsage() + dest(offset) = usage.getUsed() + dest(offset + 1) = usage.getCommitted() + } +} + +class BufferPoolGetter(bean: BufferPoolMXBean) extends MemoryGetter { + val namesAndReporting = Seq(("capacity", IncrementBytes), ("used", IncrementBytes)).map{ case (n, r) => + (bean.getName() + ":" + n, r) + } + def values(dest: Array[Long], offset: Int): Unit = { + dest(offset) = bean.getTotalCapacity() + dest(offset + 1) = bean.getMemoryUsed() + } +} diff --git a/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala b/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala index 5e5c21f..9a85c11 100644 --- a/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala +++ b/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala @@ -327,75 +327,6 @@ object MemoryMonitor { } } - -sealed trait PeakReporting { - def report(orig: Long, update: Long): Boolean -} -case object Never extends PeakReporting { - override def report(orig: Long, update: Long): Boolean = false -} -case object Always extends PeakReporting { - override def report(orig: Long, update: Long): Boolean = update > orig -} -case object IncrementBytes extends PeakReporting { - override def report(orig: Long, update: Long): Boolean = { - val delta = update - orig - delta > 1e7.toInt && (update.toDouble / orig) > 1.05 - } -} - -case object IncrementCounts extends PeakReporting { - override def report(orig: Long, update: Long): Boolean = { - val delta = update - orig - delta > 100 && (update.toDouble / orig) > 1.05 - } -} - -trait MemoryGetter { - def namesAndReporting: Seq[(String, PeakReporting)] - def values(dest: Array[Long], offset: Int): Unit -} - -class MemoryMxBeanGetter(bean: MemoryMXBean) extends MemoryGetter { - val namesAndReporting: Seq[(String, PeakReporting)] = for { - source <- Seq("heap", "offheap") - usage <- Seq(("used", IncrementBytes), ("committed", Always)) - } yield { - (source + ":" + usage._1, usage._2) - } - def values(dest: Array[Long], offset:Int): Unit = { - val heap = bean.getHeapMemoryUsage() - dest(offset) = heap.getUsed() - dest(offset + 1) = heap.getCommitted() - val offheap = bean.getNonHeapMemoryUsage() - dest(offset + 2) = offheap.getUsed() - dest(offset + 3) = offheap.getCommitted() - } -} - -class PoolGetter(bean: MemoryPoolMXBean) extends MemoryGetter { - val namesAndReporting: Seq[(String, PeakReporting)] = - Seq(("used", IncrementBytes), ("committed", Always)).map { case (n, r) => - (bean.getName() + n, r) - } - def values(dest: Array[Long], offset: Int): Unit = { - // there are actually a bunch more things here I *could* get ... - val usage = bean.getUsage() - dest(offset) = usage.getUsed() - dest(offset + 1) = usage.getCommitted() - } -} - -class BufferPoolGetter(bean: BufferPoolMXBean) extends MemoryGetter { - val namesAndReporting = Seq(("capacity", IncrementBytes), ("used", IncrementBytes)).map{ case (n, r) => - (bean.getName() + ":" + n, r) - } - def values(dest: Array[Long], offset: Int): Unit = { - dest(offset) = bean.getTotalCapacity() - dest(offset + 1) = bean.getMemoryUsed() - } -} - class MemoryMonitorExecutorExtension extends ExecutorPlugin { // the "extension class" api just lets you invoke a constructor. We really just want to // call this static method, so that's good enough. diff --git a/core/src/main/scala/com/cloudera/spark/PeakReporting.scala b/core/src/main/scala/com/cloudera/spark/PeakReporting.scala new file mode 100644 index 0000000..394c42f --- /dev/null +++ b/core/src/main/scala/com/cloudera/spark/PeakReporting.scala @@ -0,0 +1,25 @@ +// (c) Copyright 2018 Cloudera, Inc. All rights reserved. +package com.cloudera.spark + +sealed trait PeakReporting { + def report(orig: Long, update: Long): Boolean +} +case object Never extends PeakReporting { + override def report(orig: Long, update: Long): Boolean = false +} +case object Always extends PeakReporting { + override def report(orig: Long, update: Long): Boolean = update > orig +} +case object IncrementBytes extends PeakReporting { + override def report(orig: Long, update: Long): Boolean = { + val delta = update - orig + delta > 1e7.toInt && (update.toDouble / orig) > 1.05 + } +} + +case object IncrementCounts extends PeakReporting { + override def report(orig: Long, update: Long): Boolean = { + val delta = update - orig + delta > 100 && (update.toDouble / orig) > 1.05 + } +} diff --git a/core/src/main/scala/com/cloudera/spark/SparkNettyMemoryHandle.scala b/core/src/main/scala/com/cloudera/spark/SparkNettyMemoryHandle.scala index ca90dc6..bb84c76 100644 --- a/core/src/main/scala/com/cloudera/spark/SparkNettyMemoryHandle.scala +++ b/core/src/main/scala/com/cloudera/spark/SparkNettyMemoryHandle.scala @@ -81,7 +81,7 @@ class SparkNettyMemoryHandle( object SparkNettyMemoryHandle { - def get(displayError: Boolean = false): Option[SparkNettyMemoryHandle] = try { + def get(displayError: Boolean = false): Option[SparkNettyMemoryHandle] = { Option(SparkEnv.get).map { env => new SparkNettyMemoryHandle( getRpcClientPooledAllocator(env).metric, diff --git a/core/src/test/scala/com/cloudera/spark/PeakReportingSuite.scala b/core/src/test/scala/com/cloudera/spark/PeakReportingSuite.scala new file mode 100644 index 0000000..053da98 --- /dev/null +++ b/core/src/test/scala/com/cloudera/spark/PeakReportingSuite.scala @@ -0,0 +1,23 @@ +// (c) Copyright 2018 Cloudera, Inc. All rights reserved. +package com.cloudera.spark + +import org.scalatest.FunSuite + +class PeakReportingSuite extends FunSuite { + + test("increment bytes") { + // delta over 1e7, and 5% increase + assert(IncrementBytes.report(1e9.toLong, 1.051e9.toLong)) + // delta over 1e7, but less than 5% increase + assert(!IncrementBytes.report(1e9.toLong, 1.049e9.toLong)) + + //5% increase, but below overall threshold + assert(!IncrementBytes.report(1e7.toLong, 1.05e7.toLong)) + assert(!IncrementBytes.report(1e7.toLong, 1.9e7.toLong)) + assert(!IncrementBytes.report(1e6.toLong, 1e7.toLong)) + + // increase from small starting point OK + assert(IncrementBytes.report(0, 1.001e7.toLong)) + + } +}