Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
/** Specifies sort order for each partition requirements on the input data for this operator. */
def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)

def logicalPlan: Option[LogicalPlan] = {
getTagValue(SparkPlan.LOGICAL_PLAN_TAG)
}

/**
* Returns the result of this query as an RDD[InternalRow] by delegating to `doExecute` after
* preparations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[execution] object SparkPlanInfo {
case _ => plan.children ++ plan.subqueries
}
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType)
new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType, metric.stats)
}

// dump the file scan metadata (e.g file path) to event log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,13 @@ case class BroadcastHashJoinExec(
right: SparkPlan)
extends BinaryExecNode with HashJoin with CodegenSupport {

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
override lazy val metrics = {
Map("numOutputRows" ->
SQLMetrics.createMetric(
sparkContext,
"number of output rows",
logicalPlan.map(_.stats.rowCount.map(_.toLong).getOrElse(-1L)).getOrElse(-1L)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, for file sources, usually there is only sizeInBytes stats in logical plan level. So the estimated numOutputRows for logical plan should be empty for file sources.
What is the scenario of this PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for file source table, there will be row count stats if CBO is enabled.

}

override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(buildKeys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ import org.apache.spark.annotation.DeveloperApi
class SQLMetricInfo(
val name: String,
val accumulatorId: Long,
val metricType: String)
val metricType: String,
val stats: Long = -1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not all metric has its corresponding statistics (e.g. peakMemory), and not all statistics are long type. We should think of a better place to carry the statistics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we can put a val stats: Option[Statistics] = None

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Thanks for your comments.

The idea is that each SQL metric can have a statistic value (-1 means not available/initialized). I set the statistic type to Long is because SQL Metric's value is always Long type as well. class SQLMetric(val metricType: String, initValue: Long = 0L)

Put Option[Statistics] in SQLMetricInfo doesn't sound quite right though. It means that all SQL metrics have an attribute including rowCount, size & column stats.

Let me know your feedback, thanks in advance.

Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils}
* the executor side are automatically propagated and shown in the SQL UI through metrics. Updates
* on the driver side must be explicitly posted using [[SQLMetrics.postDriverMetricUpdates()]].
*/
class SQLMetric(val metricType: String, initValue: Long = 0L) extends AccumulatorV2[Long, Long] {
class SQLMetric(val metricType: String, initValue: Long = 0L, val stats: Long = -1L) extends
AccumulatorV2[Long, Long] {
// This is a workaround for SPARK-11013.
// We may use -1 as initial value of the accumulator, if the accumulator is valid, we will
// update it at the end of task and the value will be at least 0. Then we can filter out the -1
Expand All @@ -42,7 +43,7 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato
private var _zeroValue = initValue

override def copy(): SQLMetric = {
val newAcc = new SQLMetric(metricType, _value)
val newAcc = new SQLMetric(metricType, _value, stats)
newAcc._zeroValue = initValue
newAcc
}
Expand Down Expand Up @@ -96,8 +97,8 @@ object SQLMetrics {
metric.set((v * baseForAvgMetric).toLong)
}

def createMetric(sc: SparkContext, name: String): SQLMetric = {
val acc = new SQLMetric(SUM_METRIC)
def createMetric(sc: SparkContext, name: String, stats: Long = -1): SQLMetric = {
val acc = new SQLMetric(SUM_METRIC, stats = stats)
acc.register(sc, name = Some(name), countFailedValues = false)
acc
}
Expand Down Expand Up @@ -193,6 +194,14 @@ object SQLMetrics {
}
}

def stringStats(value: Long): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should handle stats in stringValue, different metrics may need to look at different stats and display different things.

Copy link
Contributor

@cloud-fan cloud-fan May 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one example: we can also display the difference between real row count and estimated row count, e.g. 10X, 0.01X, etc. Something like row count: 4, est: 40 (10X)

if (value < 0) {
""
} else {
s" est: ${stringValue(SUM_METRIC, Seq(value))}"
}
}

/**
* Updates metrics based on the driver side value. This is useful for certain metrics that
* are only updated on the driver, e.g. subquery execution time, or number of files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,17 +180,20 @@ class SQLAppStatusListener(
}

private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = {
val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap
val metricMap = exec.metrics.map { m => (m.accumulatorId, m) }.toMap
val metrics = exec.stages.toSeq
.flatMap { stageId => Option(stageMetrics.get(stageId)) }
.flatMap(_.taskMetrics.values().asScala)
.flatMap { metrics => metrics.ids.zip(metrics.values) }

val aggregatedMetrics = (metrics ++ exec.driverAccumUpdates.toSeq)
.filter { case (id, _) => metricTypes.contains(id) }
.filter { case (id, _) => metricMap.contains(id) }
.groupBy(_._1)
.map { case (id, values) =>
id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2))
val metric = metricMap(id)
val value = SQLMetrics.stringValue(metric.metricType, values.map(_._2))
val stats = SQLMetrics.stringStats(metric.stats)
id -> (value + stats)
}

// Check the execution again for whether the aggregated metrics data has been calculated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,5 @@ class SparkPlanGraphNodeWrapper(
case class SQLPlanMetric(
name: String,
accumulatorId: Long,
metricType: String)
metricType: String,
stats : Long = -1)
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ object SparkPlanGraph {
planInfo.nodeName match {
case "WholeStageCodegen" =>
val metrics = planInfo.metrics.map { metric =>
SQLPlanMetric(metric.name, metric.accumulatorId, metric.metricType)
SQLPlanMetric(metric.name, metric.accumulatorId, metric.metricType, metric.stats)
}

val cluster = new SparkPlanGraphCluster(
Expand Down Expand Up @@ -114,7 +114,7 @@ object SparkPlanGraph {
edges += SparkPlanGraphEdge(node.id, parent.id)
case name =>
val metrics = planInfo.metrics.map { metric =>
SQLPlanMetric(metric.name, metric.accumulatorId, metric.metricType)
SQLPlanMetric(metric.name, metric.accumulatorId, metric.metricType, metric.stats)
}
val node = new SparkPlanGraphNode(
nodeIdGenerator.getAndIncrement(), planInfo.nodeName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1430,4 +1430,25 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
assert(catalogStats.rowCount.isEmpty)
}
}

test("statistics for broadcastHashJoin numOutputRows statistic") {
withTempView("t1", "t2") {
withSQLConf(SQLConf.CBO_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
sql("CREATE TABLE t1 (key INT, a2 STRING, a3 DOUBLE)")
sql("INSERT INTO TABLE t1 SELECT 1, 'a', 10.0")
sql("INSERT INTO TABLE t1 SELECT 1, 'b', null")
sql("ANALYZE TABLE t1 COMPUTE STATISTICS FOR ALL COLUMNS")

sql("CREATE TABLE t2 (key INT, b2 STRING, b3 DOUBLE)")
sql("INSERT INTO TABLE t2 SELECT 1, 'a', 10.0")
sql("ANALYZE TABLE t2 COMPUTE STATISTICS FOR ALL COLUMNS")

val df = sql("SELECT * FROM t1 JOIN t2 ON t1.key = t2.key")
assert(df.queryExecution.sparkPlan.isInstanceOf[BroadcastHashJoinExec])
assert(df.queryExecution.sparkPlan.metrics("numOutputRows").stats == 2)
}
}
}
}