From 4589e6668679b5bc642ea297cde403a18a690ced Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 22 Oct 2015 11:59:39 +0800 Subject: [PATCH 1/6] reset all accumulators in physical operators before execute an action --- .../org/apache/spark/sql/DataFrame.scala | 3 +++ .../sql/execution/metric/SQLMetrics.scala | 7 ++++++- .../sql/util/DataFrameCallbackSuite.scala | 20 +++++++++++++++++++ 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 2f10aa9f3c44..7e9ff78d90b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1963,6 +1963,9 @@ class DataFrame private[sql]( */ private def withCallback[T](name: String, df: DataFrame)(action: DataFrame => T) = { try { + df.queryExecution.executedPlan.foreach { plan => + plan.metrics.valuesIterator.foreach(_.reset) + } val start = System.nanoTime() val result = action(df) val end = System.nanoTime() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 075b7ad88111..c331db089a7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -28,7 +28,12 @@ import org.apache.spark.{Accumulable, AccumulableParam, SparkContext} */ private[sql] abstract class SQLMetric[R <: SQLMetricValue[T], T]( name: String, val param: SQLMetricParam[R, T]) - extends Accumulable[R, T](param.zero, param, Some(name), true) + extends Accumulable[R, T](param.zero, param, Some(name), true) { + + def reset: Unit = { + this.value = param.zero + } +} /** * Create a layer for specialized metric. We cannot add `@specialized` to diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index eb056cd51971..7092e2177f38 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -80,4 +80,24 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { assert(metrics(0)._2.analyzed.isInstanceOf[Project]) assert(metrics(0)._3.getMessage == e.getMessage) } + + test("get metrics by callback") { + val metrics = ArrayBuffer.empty[Long] + val listener = new QueryExecutionListener { + // Only test successful case here, so no need to implement `onFailure` + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} + + override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { + metrics += qe.executedPlan.longMetric("numInputRows").value.value + } + } + sqlContext.listenerManager.register(listener) + + Seq(1 -> "a").toDF("i", "j").groupBy("i").count().collect() + Seq(1 -> "a", 2 -> "a").toDF("i", "j").groupBy("i").count().collect() + + assert(metrics.length == 2) + assert(metrics(0) == 1) + assert(metrics(1) == 2) + } } From 6397cf57e80b6de6f039a6954e264bc214b6a2c3 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 22 Oct 2015 12:50:30 +0800 Subject: [PATCH 2/6] address comments --- sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 2 +- .../org/apache/spark/sql/execution/metric/SQLMetrics.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 7e9ff78d90b3..470896a033c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1964,7 +1964,7 @@ class DataFrame private[sql]( private def withCallback[T](name: String, df: DataFrame)(action: DataFrame => T) = { try { df.queryExecution.executedPlan.foreach { plan => - plan.metrics.valuesIterator.foreach(_.reset) + plan.metrics.valuesIterator.foreach(_.reset()) } val start = System.nanoTime() val result = action(df) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index c331db089a7e..1c253e3942e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -30,7 +30,7 @@ private[sql] abstract class SQLMetric[R <: SQLMetricValue[T], T]( name: String, val param: SQLMetricParam[R, T]) extends Accumulable[R, T](param.zero, param, Some(name), true) { - def reset: Unit = { + def reset(): Unit = { this.value = param.zero } } From 778992e23d309f00869b80fec2a08413bb624e84 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 23 Oct 2015 08:09:57 +0800 Subject: [PATCH 3/6] improve test --- .../apache/spark/sql/util/DataFrameCallbackSuite.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index 7092e2177f38..7d9bc7b37da2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -93,11 +93,14 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { } sqlContext.listenerManager.register(listener) - Seq(1 -> "a").toDF("i", "j").groupBy("i").count().collect() + val df = Seq(1 -> "a").toDF("i", "j").groupBy("i").count() + df.collect() + df.collect() Seq(1 -> "a", 2 -> "a").toDF("i", "j").groupBy("i").count().collect() - assert(metrics.length == 2) + assert(metrics.length == 3) assert(metrics(0) == 1) - assert(metrics(1) == 2) + assert(metrics(1) == 1) + assert(metrics(2) == 2) } } From 4ff891205979a06abbf229f115bbbf99bda3ba1f Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sat, 24 Oct 2015 01:30:42 +0800 Subject: [PATCH 4/6] handle -1 values --- .../sql/execution/metric/SQLMetrics.scala | 14 +++++- .../sql/util/DataFrameCallbackSuite.scala | 47 +++++++++++++++++-- 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 1c253e3942e9..550b828dc66a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -67,7 +67,19 @@ private[sql] trait SQLMetricValue[T] extends Serializable { private[sql] class LongSQLMetricValue(private var _value : Long) extends SQLMetricValue[Long] { def add(incr: Long): LongSQLMetricValue = { - _value += incr + // Some LongSQLMetric will use -1 as initial value, so if the accumulator is never updated, + // we can filter it out later. However, when `add` is called, the accumulator is valid, we + // should turn -1 to 0. + if (_value < 0) { + _value = 0 + } + + // Some LongSQLMetric will use -1 as initial value, when we merge accumulator updates at driver + // side, we should ignore these -1 values. + if (incr > 0) { + _value += incr + } + this } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index 7d9bc7b37da2..bf97c7e65c69 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -17,14 +17,14 @@ package org.apache.spark.sql.util -import org.apache.spark.SparkException +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark._ import org.apache.spark.sql.{functions, QueryTest} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Project} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.test.SharedSQLContext -import scala.collection.mutable.ArrayBuffer - class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { import testImplicits._ import functions._ @@ -81,7 +81,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { assert(metrics(0)._3.getMessage == e.getMessage) } - test("get metrics by callback") { + test("get numRows metrics by callback") { val metrics = ArrayBuffer.empty[Long] val listener = new QueryExecutionListener { // Only test successful case here, so no need to implement `onFailure` @@ -103,4 +103,43 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { assert(metrics(1) == 1) assert(metrics(2) == 2) } + + test("get size metrics by callback") { + val metrics = ArrayBuffer.empty[Long] + val listener = new QueryExecutionListener { + // Only test successful case here, so no need to implement `onFailure` + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} + + override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { + metrics += qe.executedPlan.longMetric("dataSize").value.value + val bottomAgg = qe.executedPlan.children(0).children(0) + metrics += bottomAgg.longMetric("dataSize").value.value + } + } + sqlContext.listenerManager.register(listener) + + val sparkListener = new SaveInfoListener + sqlContext.sparkContext.addSparkListener(sparkListener) + + val df = (1 to 100).map(i => i -> i.toString).toDF("i", "j") + df.groupBy("i").count().collect() + + def getPeakExecutionMemory(stageId: Int): Long = { + val peakMemoryAccumulator = sparkListener.getCompletedStageInfos(stageId).accumulables + .filter(_._2.name == InternalAccumulator.PEAK_EXECUTION_MEMORY) + + assert(peakMemoryAccumulator.size == 1) + peakMemoryAccumulator.head._2.value.toLong + } + + assert(sparkListener.getCompletedStageInfos.length == 2) + val bottomAggDataSize = getPeakExecutionMemory(0) + val topAggDataSize = getPeakExecutionMemory(1) + + // For this simple case, the peakExecutionMemory of a stage should be the data size of the + // aggregate operator, as we only have one memory consuming operator per stage. + assert(metrics.length == 2) + assert(metrics(0) == topAggDataSize) + assert(metrics(1) == bottomAggDataSize) + } } From b088c70f4bc2a1a5edd3388241803413bb23a2fc Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sun, 25 Oct 2015 22:38:35 +0800 Subject: [PATCH 5/6] do not handle -1 values --- .../spark/sql/execution/metric/SQLMetrics.scala | 14 +------------- .../spark/sql/util/DataFrameCallbackSuite.scala | 7 ++++++- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 550b828dc66a..1c253e3942e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -67,19 +67,7 @@ private[sql] trait SQLMetricValue[T] extends Serializable { private[sql] class LongSQLMetricValue(private var _value : Long) extends SQLMetricValue[Long] { def add(incr: Long): LongSQLMetricValue = { - // Some LongSQLMetric will use -1 as initial value, so if the accumulator is never updated, - // we can filter it out later. However, when `add` is called, the accumulator is valid, we - // should turn -1 to 0. - if (_value < 0) { - _value = 0 - } - - // Some LongSQLMetric will use -1 as initial value, when we merge accumulator updates at driver - // side, we should ignore these -1 values. - if (incr > 0) { - _value += incr - } - + _value += incr this } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index bf97c7e65c69..43e2589286e0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -104,7 +104,12 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { assert(metrics(2) == 2) } - test("get size metrics by callback") { + // TODO: Currently some LongSQLMetric use -1 as initial value, so if the accumulator is never + // updated, we can filter it out later. However, when we aggregate(sum) accumulator values at + // driver side for SQL physical operators, these -1 values will make our result smaller. + // A easy fix is to create a new SQLMetric(including new MetricValue, MetricParam, etc.), but we + // can do it later because the impact is just too small (1048576 tasks for 1 MB). + ignore("get size metrics by callback") { val metrics = ArrayBuffer.empty[Long] val listener = new QueryExecutionListener { // Only test successful case here, so no need to implement `onFailure` From 6923c4f9f9ca63a491d80596cbc6ccd71d8f5f25 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 26 Oct 2015 11:18:57 +0800 Subject: [PATCH 6/6] unregister listener at end of test --- .../apache/spark/sql/util/DataFrameCallbackSuite.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index 43e2589286e0..b46b0d2f6040 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -54,6 +54,8 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { assert(metrics(1)._1 == "count") assert(metrics(1)._2.analyzed.isInstanceOf[Aggregate]) assert(metrics(1)._3 > 0) + + sqlContext.listenerManager.unregister(listener) } test("execute callback functions when a DataFrame action failed") { @@ -79,6 +81,8 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { assert(metrics(0)._1 == "collect") assert(metrics(0)._2.analyzed.isInstanceOf[Project]) assert(metrics(0)._3.getMessage == e.getMessage) + + sqlContext.listenerManager.unregister(listener) } test("get numRows metrics by callback") { @@ -102,6 +106,8 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { assert(metrics(0) == 1) assert(metrics(1) == 1) assert(metrics(2) == 2) + + sqlContext.listenerManager.unregister(listener) } // TODO: Currently some LongSQLMetric use -1 as initial value, so if the accumulator is never @@ -146,5 +152,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { assert(metrics.length == 2) assert(metrics(0) == topAggDataSize) assert(metrics(1) == bottomAggDataSize) + + sqlContext.listenerManager.unregister(listener) } }