From 56a8e6ee1fcf8c54dc5c941eb17e3aaa866fba1a Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Tue, 24 Jun 2014 16:32:27 -0700 Subject: [PATCH 01/27] Prototype impl of estimations for Catalyst logical plans. - Also add simple size-getters for ParquetRelation and MetastoreRelation. - Add a rule to auto-convert equi-joins to BroadcastHashJoin, if a table has smaller size, based on the above getter (for MetastoreRelation). --- .../catalyst/plans/logical/LogicalPlan.scala | 12 ++++++ .../spark/sql/execution/SparkStrategies.scala | 8 ++-- .../spark/sql/parquet/ParquetRelation.scala | 13 ++++++ .../org/apache/spark/sql/JoinSuite.scala | 9 ++++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 42 +++++++++++++------ .../sql/hive/execution/HiveQuerySuite.scala | 27 +++++++++++- 6 files changed, 95 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index edc37e3877c0..0e52ec9f3c81 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -26,6 +26,18 @@ import org.apache.spark.sql.catalyst.trees abstract class LogicalPlan extends QueryPlan[LogicalPlan] { self: Product => + protected class Estimates { + lazy val childrenEstimations = children.map(_.estimates) + lazy val cardinality: Long = childrenEstimations.map(_.cardinality).sum + lazy val numTuples: Long = childrenEstimations.map(_.size).sum + lazy val size: Long = childrenEstimations.map(_.numTuples).sum + } + + /** + * Estimates of various statistics. + */ + lazy val estimates: Estimates = new Estimates + /** * Returns the set of attributes that are referenced by this node * during evaluation. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index c078e71fe029..809ccf5b5401 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -71,8 +71,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { condition, left, right @ PhysicalOperation(_, _, b: BaseRelation)) - if broadcastTables.contains(b.tableName) => - broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) + if broadcastTables.contains(b.tableName) + || (right.estimates.size <= sqlContext.autoConvertJoinSize) => + broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) case ExtractEquiJoinKeys( Inner, @@ -81,7 +82,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { condition, left @ PhysicalOperation(_, _, b: BaseRelation), right) - if broadcastTables.contains(b.tableName) => + if broadcastTables.contains(b.tableName) + || (left.estimates.size <= sqlContext.autoConvertJoinSize) => broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 9c4771d1a984..91610ea95e74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -22,11 +22,14 @@ import java.io.IOException import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.permission.FsAction +import org.apache.hadoop.mapreduce.Job import parquet.hadoop.ParquetOutputFormat import parquet.hadoop.metadata.CompressionCodecName +import parquet.hadoop.util.ContextUtil import parquet.schema.MessageType +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode} @@ -43,12 +46,22 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode} * * @param path The path to the Parquet file. */ +// TODO: make me a BaseRelation? For HashJoin strategy. private[sql] case class ParquetRelation( path: String, @transient conf: Option[Configuration] = None) extends LeafNode with MultiInstanceRelation { self: Product => + @transient override lazy val estimates = new Estimates { + // TODO: investigate getting encoded column statistics in the parquet file? + override lazy val size: Long = { + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(conf.getOrElse(ContextUtil.getConfiguration(new Job()))) + fs.getContentSummary(hdfsPath).getLength // TODO: in bytes or system-dependent? + } + } + /** Schema derived from ParquetFile */ def parquetSchema: MessageType = ParquetTypesConverter diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index e17ecc87fd52..e7e3fa9e7a61 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import org.apache.spark.sql.TestData._ import org.apache.spark.sql.catalyst.plans.{LeftOuter, RightOuter, FullOuter, Inner} import org.apache.spark.sql.execution._ +import org.apache.spark.sql.parquet.ParquetRelation import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.test.TestSQLContext._ @@ -28,6 +29,14 @@ class JoinSuite extends QueryTest { // Ensures tables are loaded. TestData + test("parquet") { + val data = parquetFile("../../points.parquet") // local file! + val sizes = data.logicalPlan.collect { case j: ParquetRelation => + j.newInstance.estimates.size // also works without .newInstance + }.toSeq + assert(sizes.size === 1 && sizes(0) > 0) + } + test("equi-join is hash-join") { val x = testData2.as('x) val y = testData2.as('y) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 156b090712df..7b6a48cf269d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -19,11 +19,12 @@ package org.apache.spark.sql.hive import scala.util.parsing.combinator.RegexParsers +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.api.{FieldSchema, StorageDescriptor, SerDeInfo} import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition} import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table} import org.apache.hadoop.hive.ql.plan.TableDesc -import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde2.Deserializer import org.apache.spark.annotation.DeveloperApi @@ -64,9 +65,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with // Since HiveQL is case insensitive for table names we make them all lowercase. MetastoreRelation( - databaseName, - tblName, - alias)(table.getTTable, partitions.map(part => part.getTPartition)) + databaseName, tblName, alias)( + table.getTTable, partitions.map(part => part.getTPartition))( + hive.hiveconf, table.getPath) } def createTable( @@ -251,7 +252,11 @@ object HiveMetastoreTypes extends RegexParsers { private[hive] case class MetastoreRelation (databaseName: String, tableName: String, alias: Option[String]) (val table: TTable, val partitions: Seq[TPartition]) + (@transient hiveConf: HiveConf, @transient path: Path) extends BaseRelation { + + self: Product => + // TODO: Can we use org.apache.hadoop.hive.ql.metadata.Table as the type of table and // use org.apache.hadoop.hive.ql.metadata.Partition as the type of elements of partitions. // Right now, using org.apache.hadoop.hive.ql.metadata.Table and @@ -264,6 +269,19 @@ private[hive] case class MetastoreRelation new Partition(hiveQlTable, p) } + // TODO: are there any stats in hiveQlTable.getSkewedInfo that we can use? + @transient override lazy val estimates = new Estimates { + // Size getters adapted from + // https://github.com/apache/hive/blob/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SizeBasedBigTableSelectorForAutoSMJ.java + override lazy val size: Long = + maybeGetSize(hiveConf, hiveQlTable.getProperty("totalSize"), path) + + private[this] def maybeGetSize(conf: HiveConf, size: String, path: Path): Long = { + val res = try { Some(size.toLong) } catch { case _: Exception => None } + res.getOrElse { path.getFileSystem(conf).getContentSummary(path).getLength } + } + } + val tableDesc = new TableDesc( Class.forName(hiveQlTable.getSerializationLib).asInstanceOf[Class[Deserializer]], hiveQlTable.getInputFormatClass, @@ -275,14 +293,14 @@ private[hive] case class MetastoreRelation hiveQlTable.getMetadata ) - implicit class SchemaAttribute(f: FieldSchema) { - def toAttribute = AttributeReference( - f.getName, - HiveMetastoreTypes.toDataType(f.getType), - // Since data can be dumped in randomly with no validation, everything is nullable. - nullable = true - )(qualifiers = tableName +: alias.toSeq) - } + implicit class SchemaAttribute(f: FieldSchema) { + def toAttribute = AttributeReference( + f.getName, + HiveMetastoreTypes.toDataType(f.getType), + // Since data can be dumped in randomly with no validation, everything is nullable. + nullable = true + )(qualifiers = tableName +: alias.toSeq) + } // Must be a stable value since new attributes are born here. val partitionKeys = hiveQlTable.getPartitionKeys.map(_.toAttribute) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index a022a1e2dc70..eba4f6fdda3f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -17,11 +17,14 @@ package org.apache.spark.sql.hive.execution +import org.apache.spark.sql.execution.{BuildRight, BroadcastHashJoin} + import scala.util.Try +import org.apache.spark.sql.{SchemaRDD, Row} +import org.apache.spark.sql.hive.MetastoreRelation import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ -import org.apache.spark.sql.{SchemaRDD, Row} case class TestData(a: Int, b: String) @@ -48,6 +51,28 @@ class HiveQuerySuite extends HiveComparisonTest { "Incorrect number of rows in created table") } + // TODO: put me in a separate EstimateSuite? + test("BHJ by size") { + hql("""SET spark.sql.join.broadcastTables=""") // reset broadcast tables + // TODO: use two different tables? + // assume src has small size + val rdd = hql("""SELECT * FROM src a JOIN src b ON a.key = b.key""") + val physical = rdd.queryExecution.sparkPlan + val bhj = physical.collect { case j: BroadcastHashJoin => j } + println(s"${rdd.queryExecution}") + assert(bhj.size === 1) + } + + // TODO: put me in a separate EstimateSuite? + test("estimates the size of a MetastoreRelation") { + val rdd = hql("""SELECT * FROM src""") + println(s"${rdd.queryExecution}") + val sizes = rdd.queryExecution.analyzed.collect { case mr: MetastoreRelation => + mr.estimates.size + }.toSeq + assert(sizes.size === 1 && sizes(0) > 0) + } + createQueryTest("between", "SELECT * FROM src WHERE key Between 1 and 2") From 5bf5586597374ea2063883362c9b09a050620122 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Thu, 26 Jun 2014 16:15:35 -0700 Subject: [PATCH 02/27] Typo. --- .../apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 0e52ec9f3c81..f302b0f20cb4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -29,8 +29,8 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { protected class Estimates { lazy val childrenEstimations = children.map(_.estimates) lazy val cardinality: Long = childrenEstimations.map(_.cardinality).sum - lazy val numTuples: Long = childrenEstimations.map(_.size).sum - lazy val size: Long = childrenEstimations.map(_.numTuples).sum + lazy val numTuples: Long = childrenEstimations.map(_.numTuples).sum + lazy val size: Long = childrenEstimations.map(_.size).sum } /** From 84301a420ba8e12dbb63a48a37574fdf0f5b7bcb Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Fri, 27 Jun 2014 14:20:44 -0700 Subject: [PATCH 03/27] Refactors. - Remove BaseRelation from Catalyst and clean up related code (e.g. unmake SparkLogicalPlan a BaseRelation). - Remove broadcastTables from SQLConf and clean up related code. - Add EstimatesSuite. - Address some review comments. --- .../sql/catalyst/analysis/unresolved.scala | 4 +- .../catalyst/plans/logical/BaseRelation.scala | 24 ---- .../catalyst/plans/logical/LogicalPlan.scala | 3 +- .../scala/org/apache/spark/sql/SQLConf.scala | 4 - .../org/apache/spark/sql/SQLContext.scala | 6 +- .../spark/sql/execution/SparkPlan.scala | 9 +- .../spark/sql/execution/SparkStrategies.scala | 18 ++- .../spark/sql/parquet/ParquetRelation.scala | 6 +- .../org/apache/spark/sql/JoinSuite.scala | 11 -- .../spark/sql/hive/HiveMetastoreCatalog.scala | 7 +- .../spark/sql/hive/EstimatesSuite.scala | 113 ++++++++++++++++++ .../hive/execution/HiveComparisonTest.scala | 2 +- .../sql/hive/execution/HiveQuerySuite.scala | 22 ---- .../spark/sql/parquet/HiveParquetSuite.scala | 2 +- 14 files changed, 136 insertions(+), 95 deletions(-) delete mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/EstimatesSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 7abeb032964e..a0e25775da6d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.{errors, trees} import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.BaseRelation +import org.apache.spark.sql.catalyst.plans.logical.LeafNode import org.apache.spark.sql.catalyst.trees.TreeNode /** @@ -36,7 +36,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str case class UnresolvedRelation( databaseName: Option[String], tableName: String, - alias: Option[String] = None) extends BaseRelation { + alias: Option[String] = None) extends LeafNode { override def output = Nil override lazy val resolved = false } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala deleted file mode 100644 index 582334aa4259..000000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.plans.logical - -abstract class BaseRelation extends LeafNode { - self: Product => - - def tableName: String -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index f302b0f20cb4..738231cc13eb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -29,8 +29,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { protected class Estimates { lazy val childrenEstimations = children.map(_.estimates) lazy val cardinality: Long = childrenEstimations.map(_.cardinality).sum - lazy val numTuples: Long = childrenEstimations.map(_.numTuples).sum - lazy val size: Long = childrenEstimations.map(_.size).sum + lazy val sizeInBytes: Long = childrenEstimations.map(_.sizeInBytes).sum } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 41920c00b5a2..df7619572307 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -46,9 +46,6 @@ trait SQLConf { */ private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt - /** A comma-separated list of table names marked to be broadcasted during joins. */ - private[spark] def joinBroadcastTables: String = get(JOIN_BROADCAST_TABLES, "") - /** ********************** SQLConf functionality methods ************ */ @transient @@ -94,7 +91,6 @@ trait SQLConf { object SQLConf { val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size" val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions" - val JOIN_BROADCAST_TABLES = "spark.sql.join.broadcastTables" object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 4abd89955bd2..568a64951def 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -170,11 +170,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * @group userf */ def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = { - val name = tableName - val newPlan = rdd.logicalPlan transform { - case s @ SparkLogicalPlan(ExistingRdd(_, _), _) => s.copy(tableName = name) - } - catalog.registerTable(None, tableName, newPlan) + catalog.registerTable(None, tableName, rdd.logicalPlan) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 27dc091b8581..b07f0df7de4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.BaseRelation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical._ /** @@ -66,8 +66,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging { * linking. */ @DeveloperApi -case class SparkLogicalPlan(alreadyPlanned: SparkPlan, tableName: String = "SparkLogicalPlan") - extends BaseRelation with MultiInstanceRelation { +case class SparkLogicalPlan(alreadyPlanned: SparkPlan) + extends LogicalPlan with MultiInstanceRelation { def output = alreadyPlanned.output override def references = Set.empty @@ -78,8 +78,7 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan, tableName: String = "Spar alreadyPlanned match { case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd) case _ => sys.error("Multiple instance of the same relation detected.") - }, tableName) - .asInstanceOf[this.type] + }).asInstanceOf[this.type] } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 809ccf5b5401..58cc3f92cf85 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.{SQLContext, execution} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan} import org.apache.spark.sql.parquet._ @@ -61,8 +61,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { condition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) :: Nil } - def broadcastTables: Seq[String] = sqlContext.joinBroadcastTables.split(",").toBuffer - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case ExtractEquiJoinKeys( Inner, @@ -70,20 +68,18 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { rightKeys, condition, left, - right @ PhysicalOperation(_, _, b: BaseRelation)) - if broadcastTables.contains(b.tableName) - || (right.estimates.size <= sqlContext.autoConvertJoinSize) => - broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) + right) + if right.estimates.sizeInBytes <= sqlContext.autoConvertJoinSize => + broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) case ExtractEquiJoinKeys( Inner, leftKeys, rightKeys, condition, - left @ PhysicalOperation(_, _, b: BaseRelation), + left, right) - if broadcastTables.contains(b.tableName) - || (left.estimates.size <= sqlContext.autoConvertJoinSize) => + if left.estimates.sizeInBytes <= sqlContext.autoConvertJoinSize => broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) => @@ -285,7 +281,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.ExistingRdd(Nil, singleRowRdd) :: Nil case logical.Repartition(expressions, child) => execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil - case SparkLogicalPlan(existingPlan, _) => existingPlan :: Nil + case SparkLogicalPlan(existingPlan) => existingPlan :: Nil case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 91610ea95e74..6e24bc951d61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -46,16 +46,16 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode} * * @param path The path to the Parquet file. */ -// TODO: make me a BaseRelation? For HashJoin strategy. private[sql] case class ParquetRelation( path: String, - @transient conf: Option[Configuration] = None) extends LeafNode with MultiInstanceRelation { + @transient conf: Option[Configuration] = None) + extends LeafNode with MultiInstanceRelation { self: Product => @transient override lazy val estimates = new Estimates { // TODO: investigate getting encoded column statistics in the parquet file? - override lazy val size: Long = { + override lazy val sizeInBytes: Long = { val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(conf.getOrElse(ContextUtil.getConfiguration(new Job()))) fs.getContentSummary(hdfsPath).getLength // TODO: in bytes or system-dependent? diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index e7e3fa9e7a61..025c396ef062 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -19,9 +19,6 @@ package org.apache.spark.sql import org.apache.spark.sql.TestData._ import org.apache.spark.sql.catalyst.plans.{LeftOuter, RightOuter, FullOuter, Inner} -import org.apache.spark.sql.execution._ -import org.apache.spark.sql.parquet.ParquetRelation -import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.test.TestSQLContext._ class JoinSuite extends QueryTest { @@ -29,14 +26,6 @@ class JoinSuite extends QueryTest { // Ensures tables are loaded. TestData - test("parquet") { - val data = parquetFile("../../points.parquet") // local file! - val sizes = data.logicalPlan.collect { case j: ParquetRelation => - j.newInstance.estimates.size // also works without .newInstance - }.toSeq - assert(sizes.size === 1 && sizes(0) > 0) - } - test("equi-join is hash-join") { val x = testData2.as('x) val y = testData2.as('y) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 7b6a48cf269d..88e65b8838bf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -253,7 +253,7 @@ private[hive] case class MetastoreRelation (databaseName: String, tableName: String, alias: Option[String]) (val table: TTable, val partitions: Seq[TPartition]) (@transient hiveConf: HiveConf, @transient path: Path) - extends BaseRelation { + extends LeafNode { self: Product => @@ -271,9 +271,8 @@ private[hive] case class MetastoreRelation // TODO: are there any stats in hiveQlTable.getSkewedInfo that we can use? @transient override lazy val estimates = new Estimates { - // Size getters adapted from - // https://github.com/apache/hive/blob/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SizeBasedBigTableSelectorForAutoSMJ.java - override lazy val size: Long = + // Size getters adapted from SizeBasedBigTableSelectorForAutoSMJ.java in Hive (version 0.13). + override lazy val sizeInBytes: Long = maybeGetSize(hiveConf, hiveQlTable.getProperty("totalSize"), path) private[this] def maybeGetSize(conf: HiveConf, size: String, path: Path): Long = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/EstimatesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/EstimatesSuite.scala new file mode 100644 index 000000000000..f67646e81dc7 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/EstimatesSuite.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import scala.reflect.ClassTag + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.execution.BroadcastHashJoin +import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.parquet.{ParquetRelation, ParquetTestData} +import org.apache.spark.util.Utils + +class EstimatesSuite extends QueryTest { + + test("estimates the size of a test ParquetRelation") { + ParquetTestData.writeFile() + val testRDD = parquetFile(ParquetTestData.testDir.toString) + + val sizes = testRDD.logicalPlan.collect { case j: ParquetRelation => + (j.estimates.sizeInBytes, j.newInstance.estimates.sizeInBytes) + } + assert(sizes.size === 1) + assert(sizes(0)._1 == sizes(0)._2, "after .newInstance, estimates are different from before") + assert(sizes(0)._1 > 0) + + Utils.deleteRecursively(ParquetTestData.testDir) + } + + test("estimates the size of a test MetastoreRelation") { + val rdd = hql("""SELECT * FROM src""") + val sizes = rdd.queryExecution.analyzed.collect { case mr: MetastoreRelation => + mr.estimates.sizeInBytes + } + assert(sizes.size === 1 && sizes(0) > 0) + } + + test("auto converts to broadcast hash join, by size estimate of a relation") { + def mkTest( + before: () => Unit, + after: () => Unit, + query: String, + expectedAnswer: Seq[Any], + ct: ClassTag[_]) = { + before() + + var rdd = hql(query) + + // Assert src has a size smaller than the threshold. + val sizes = rdd.queryExecution.analyzed.collect { + case r if ct.runtimeClass.isAssignableFrom(r.getClass) => + r.estimates.sizeInBytes + } + assert(sizes.size === 2 && sizes(0) <= autoConvertJoinSize, + s"query should contain two relations, each of which has size smaller than autoConvertSize") + + // Using `sparkPlan` because for relevant patterns in HashJoin to be + // matched, other strategies need to be applied. + var bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j } + assert(bhj.size === 1, + s"actual query plans do not contain broadcast join: ${rdd.queryExecution}") + + checkAnswer(rdd, expectedAnswer) + + // TODO(zongheng): synchronize on TestHive.settings, or use Sequential/Stepwise. + val tmp = autoConvertJoinSize + hql("""SET spark.sql.auto.convert.join.size=0""") + rdd = hql(query) + bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j } + assert(bhj.isEmpty) + + hql(s"""SET spark.sql.auto.convert.join.size=$tmp""") + + after() + } + + /** Tests for ParquetRelation */ + val parquetQuery = + """SELECT a.mystring, b.myint + |FROM psrc a + |JOIN psrc b + |ON a.mylong = 0 AND a.mylong = b.mylong""".stripMargin + val parquetAnswer = Seq(("abc", 5)) + def parquetBefore(): Unit = { + ParquetTestData.writeFile() + val testRDD = parquetFile(ParquetTestData.testDir.toString) + testRDD.registerAsTable("psrc") + } + mkTest( + parquetBefore, reset, parquetQuery, parquetAnswer, implicitly[ClassTag[ParquetRelation]]) + + /** Tests for MetastoreRelation */ + val metastoreQuery = """SELECT * FROM src a JOIN src b ON a.key = 238 AND a.key = b.key""" + val metastoreAnswer = Seq.fill(4)((238, "val_238", 238, "val_238")) + mkTest( + () => (), () => (), metastoreQuery, metastoreAnswer, implicitly[ClassTag[MetastoreRelation]]) + } + +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index b4dbf2b11579..6c8fe4b196de 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -132,7 +132,7 @@ abstract class HiveComparisonTest answer: Seq[String]): Seq[String] = { def isSorted(plan: LogicalPlan): Boolean = plan match { - case _: Join | _: Aggregate | _: BaseRelation | _: Generate | _: Sample | _: Distinct => false + case _: Join | _: Aggregate | _: Generate | _: Sample | _: Distinct => false case PhysicalOperation(_, _, Sort(_, _)) => true case _ => plan.children.iterator.exists(isSorted) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index eba4f6fdda3f..b493972ef67f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -51,28 +51,6 @@ class HiveQuerySuite extends HiveComparisonTest { "Incorrect number of rows in created table") } - // TODO: put me in a separate EstimateSuite? - test("BHJ by size") { - hql("""SET spark.sql.join.broadcastTables=""") // reset broadcast tables - // TODO: use two different tables? - // assume src has small size - val rdd = hql("""SELECT * FROM src a JOIN src b ON a.key = b.key""") - val physical = rdd.queryExecution.sparkPlan - val bhj = physical.collect { case j: BroadcastHashJoin => j } - println(s"${rdd.queryExecution}") - assert(bhj.size === 1) - } - - // TODO: put me in a separate EstimateSuite? - test("estimates the size of a MetastoreRelation") { - val rdd = hql("""SELECT * FROM src""") - println(s"${rdd.queryExecution}") - val sizes = rdd.queryExecution.analyzed.collect { case mr: MetastoreRelation => - mr.estimates.size - }.toSeq - assert(sizes.size === 1 && sizes(0) > 0) - } - createQueryTest("between", "SELECT * FROM src WHERE key Between 1 and 2") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala index 91ad59d7f82c..3bfe49a760be 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala @@ -35,7 +35,7 @@ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAft override def beforeAll() { // write test data - ParquetTestData.writeFile + ParquetTestData.writeFile() testRDD = parquetFile(ParquetTestData.testDir.toString) testRDD.registerAsTable("testsource") } From dcff9bd0c0acf04a190f8dbde7205e36dc24577f Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Thu, 3 Jul 2014 14:26:41 -0700 Subject: [PATCH 04/27] Cleanups. --- .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 1 + .../org/apache/spark/sql/hive/execution/HiveQuerySuite.scala | 5 +---- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 88e65b8838bf..246f804bb5d3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -271,6 +271,7 @@ private[hive] case class MetastoreRelation // TODO: are there any stats in hiveQlTable.getSkewedInfo that we can use? @transient override lazy val estimates = new Estimates { + // TODO: check if this estimate is valid for tables after partition pruning. // Size getters adapted from SizeBasedBigTableSelectorForAutoSMJ.java in Hive (version 0.13). override lazy val sizeInBytes: Long = maybeGetSize(hiveConf, hiveQlTable.getProperty("totalSize"), path) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index b493972ef67f..50f85289fdad 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -17,14 +17,11 @@ package org.apache.spark.sql.hive.execution -import org.apache.spark.sql.execution.{BuildRight, BroadcastHashJoin} - import scala.util.Try -import org.apache.spark.sql.{SchemaRDD, Row} -import org.apache.spark.sql.hive.MetastoreRelation import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.{Row, SchemaRDD} case class TestData(a: Int, b: String) From de3ae13ecde5de91c3b4c81c6d86cbba12b4192c Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Thu, 3 Jul 2014 15:55:23 -0700 Subject: [PATCH 05/27] Add parquetAfter() properly in test. --- .../apache/spark/sql/hive/EstimatesSuite.scala | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/EstimatesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/EstimatesSuite.scala index f67646e81dc7..7e86aba6f3ce 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/EstimatesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/EstimatesSuite.scala @@ -100,14 +100,28 @@ class EstimatesSuite extends QueryTest { val testRDD = parquetFile(ParquetTestData.testDir.toString) testRDD.registerAsTable("psrc") } + def parquetAfter() = { + Utils.deleteRecursively(ParquetTestData.testDir) + reset() + } mkTest( - parquetBefore, reset, parquetQuery, parquetAnswer, implicitly[ClassTag[ParquetRelation]]) + parquetBefore, + parquetAfter, + parquetQuery, + parquetAnswer, + implicitly[ClassTag[ParquetRelation]] + ) /** Tests for MetastoreRelation */ val metastoreQuery = """SELECT * FROM src a JOIN src b ON a.key = 238 AND a.key = b.key""" val metastoreAnswer = Seq.fill(4)((238, "val_238", 238, "val_238")) mkTest( - () => (), () => (), metastoreQuery, metastoreAnswer, implicitly[ClassTag[MetastoreRelation]]) + () => (), + () => (), + metastoreQuery, + metastoreAnswer, + implicitly[ClassTag[MetastoreRelation]] + ) } } From 7a60ab7cf621c325c5a28b59e5e1d162a26fcb24 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Tue, 8 Jul 2014 20:53:19 -0700 Subject: [PATCH 06/27] s/Estimates/Statistics, s/cardinality/numTuples. --- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 10 +++++----- .../apache/spark/sql/execution/SparkStrategies.scala | 8 ++++---- .../org/apache/spark/sql/parquet/ParquetRelation.scala | 2 +- .../apache/spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../{EstimatesSuite.scala => StatisticsSuite.scala} | 8 ++++---- 5 files changed, 15 insertions(+), 15 deletions(-) rename sql/hive/src/test/scala/org/apache/spark/sql/hive/{EstimatesSuite.scala => StatisticsSuite.scala} (95%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 738231cc13eb..3ac9b8d8e42e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -26,16 +26,16 @@ import org.apache.spark.sql.catalyst.trees abstract class LogicalPlan extends QueryPlan[LogicalPlan] { self: Product => - protected class Estimates { - lazy val childrenEstimations = children.map(_.estimates) - lazy val cardinality: Long = childrenEstimations.map(_.cardinality).sum - lazy val sizeInBytes: Long = childrenEstimations.map(_.sizeInBytes).sum + protected class Statistics { + lazy val childrenStats = children.map(_.statistics) + lazy val numTuples: Long = childrenStats.map(_.numTuples).sum + lazy val sizeInBytes: Long = childrenStats.map(_.sizeInBytes).sum } /** * Estimates of various statistics. */ - lazy val estimates: Estimates = new Estimates + lazy val statistics: Statistics = new Statistics /** * Returns the set of attributes that are referenced by this node diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 58cc3f92cf85..bafc5aaff3f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -69,7 +69,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { condition, left, right) - if right.estimates.sizeInBytes <= sqlContext.autoConvertJoinSize => + if right.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize => broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) case ExtractEquiJoinKeys( @@ -79,7 +79,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { condition, left, right) - if left.estimates.sizeInBytes <= sqlContext.autoConvertJoinSize => + if left.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize => broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) => @@ -271,8 +271,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Limit(limit, planLater(child))(sqlContext) :: Nil case Unions(unionChildren) => execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil - case logical.Except(left,right) => - execution.Except(planLater(left),planLater(right)) :: Nil + case logical.Except(left,right) => + execution.Except(planLater(left),planLater(right)) :: Nil case logical.Intersect(left, right) => execution.Intersect(planLater(left), planLater(right)) :: Nil case logical.Generate(generator, join, outer, _, child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 6e24bc951d61..f70aff4fef69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -53,7 +53,7 @@ private[sql] case class ParquetRelation( self: Product => - @transient override lazy val estimates = new Estimates { + @transient override lazy val statistics = new Statistics { // TODO: investigate getting encoded column statistics in the parquet file? override lazy val sizeInBytes: Long = { val hdfsPath = new Path(path) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 246f804bb5d3..a504405701dd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -270,7 +270,7 @@ private[hive] case class MetastoreRelation } // TODO: are there any stats in hiveQlTable.getSkewedInfo that we can use? - @transient override lazy val estimates = new Estimates { + @transient override lazy val statistics = new Statistics { // TODO: check if this estimate is valid for tables after partition pruning. // Size getters adapted from SizeBasedBigTableSelectorForAutoSMJ.java in Hive (version 0.13). override lazy val sizeInBytes: Long = diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/EstimatesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala similarity index 95% rename from sql/hive/src/test/scala/org/apache/spark/sql/hive/EstimatesSuite.scala rename to sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 7e86aba6f3ce..c004eb7a588f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/EstimatesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -25,14 +25,14 @@ import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.parquet.{ParquetRelation, ParquetTestData} import org.apache.spark.util.Utils -class EstimatesSuite extends QueryTest { +class StatisticsSuite extends QueryTest { test("estimates the size of a test ParquetRelation") { ParquetTestData.writeFile() val testRDD = parquetFile(ParquetTestData.testDir.toString) val sizes = testRDD.logicalPlan.collect { case j: ParquetRelation => - (j.estimates.sizeInBytes, j.newInstance.estimates.sizeInBytes) + (j.statistics.sizeInBytes, j.newInstance.statistics.sizeInBytes) } assert(sizes.size === 1) assert(sizes(0)._1 == sizes(0)._2, "after .newInstance, estimates are different from before") @@ -44,7 +44,7 @@ class EstimatesSuite extends QueryTest { test("estimates the size of a test MetastoreRelation") { val rdd = hql("""SELECT * FROM src""") val sizes = rdd.queryExecution.analyzed.collect { case mr: MetastoreRelation => - mr.estimates.sizeInBytes + mr.statistics.sizeInBytes } assert(sizes.size === 1 && sizes(0) > 0) } @@ -63,7 +63,7 @@ class EstimatesSuite extends QueryTest { // Assert src has a size smaller than the threshold. val sizes = rdd.queryExecution.analyzed.collect { case r if ct.runtimeClass.isAssignableFrom(r.getClass) => - r.estimates.sizeInBytes + r.statistics.sizeInBytes } assert(sizes.size === 2 && sizes(0) <= autoConvertJoinSize, s"query should contain two relations, each of which has size smaller than autoConvertSize") From 73412be75a89a6fe54f9233663697466f585dc25 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Tue, 29 Jul 2014 11:48:22 -0700 Subject: [PATCH 07/27] Move SQLConf to Catalyst & add default val for sizeInBytes. Conflicts: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SQLConf.scala --- .../sql/catalyst/planning}/SQLConf.scala | 63 ++++++++++++------- .../catalyst/plans/logical/LogicalPlan.scala | 11 +++- .../org/apache/spark/sql/SQLContext.scala | 8 ++- .../spark/sql/execution/SparkStrategies.scala | 1 + 4 files changed, 56 insertions(+), 27 deletions(-) rename sql/{core/src/main/scala/org/apache/spark/sql => catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning}/SQLConf.scala (53%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SQLConf.scala similarity index 53% rename from sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SQLConf.scala index df7619572307..c78c2439c253 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SQLConf.scala @@ -15,23 +15,34 @@ * limitations under the License. */ -package org.apache.spark.sql +package org.apache.spark.sql.catalyst.planning import java.util.Properties import scala.collection.JavaConverters._ +private object SQLConf { + @transient protected[spark] val confSettings = java.util.Collections.synchronizedMap( + new java.util.HashMap[String, String]()) +} + /** - * SQLConf holds mutable config parameters and hints. These can be set and - * queried either by passing SET commands into Spark SQL's DSL - * functions (sql(), hql(), etc.), or by programmatically using setters and - * getters of this class. + * A trait that enables the setting and getting of mutable config parameters/hints. The central + * location for storing them is uniquely located in the same-name private companion object. + * Therefore, all classes that mix in this trait share all the hints. + * + * In the presence of a SQLContext, these can be set and queried either by passing SET commands + * into Spark SQL's DSL functions (sql(), hql(), etc.). Otherwise, users of this trait can + * modify the hints by programmatically calling the setters and getters of this trait. * * SQLConf is thread-safe (internally synchronized so safe to be used in multiple threads). */ trait SQLConf { import SQLConf._ + import SQLConf._ + protected[spark] val settings = confSettings + /** ************************ Spark SQL Params/Hints ******************* */ // TODO: refactor so that these hints accessors don't pollute the name space of SQLContext? @@ -40,50 +51,58 @@ trait SQLConf { /** * Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to - * a broadcast value during the physical executions of join operations. Setting this to 0 + * a broadcast value during the physical executions of join operations. Setting this to -1 * effectively disables auto conversion. - * Hive setting: hive.auto.convert.join.noconditionaltask.size. + * + * Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is also 10000. */ private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt - /** ********************** SQLConf functionality methods ************ */ + /** + * The default size in bytes to assign to a logical operator's estimation statistics. By default, + * it is set to a larger value than `autoConvertJoinSize`, hence any logical operator without a + * properly implemented estimation of this statistic will not be incorrectly broadcasted in joins. + */ + private[spark] def statsDefaultSizeInBytes: Long = + getOption("spark.sql.catalyst.stats.sizeInBytes").map(_.toLong) + .getOrElse(autoConvertJoinSize + 1) - @transient - private val settings = java.util.Collections.synchronizedMap( - new java.util.HashMap[String, String]()) + /** ********************** SQLConf functionality methods ************ */ def set(props: Properties): Unit = { - props.asScala.foreach { case (k, v) => this.settings.put(k, v) } + confSettings.synchronized { + props.asScala.foreach { case (k, v) => confSettings.put(k, v) } + } } def set(key: String, value: String): Unit = { require(key != null, "key cannot be null") - require(value != null, s"value cannot be null for $key") - settings.put(key, value) + require(value != null, s"value cannot be null for key: $key") + confSettings.put(key, value) } def get(key: String): String = { - Option(settings.get(key)).getOrElse(throw new NoSuchElementException(key)) + Option(confSettings.get(key)).getOrElse(throw new NoSuchElementException(key)) } def get(key: String, defaultValue: String): String = { - Option(settings.get(key)).getOrElse(defaultValue) + Option(confSettings.get(key)).getOrElse(defaultValue) } - def getAll: Array[(String, String)] = settings.synchronized { settings.asScala.toArray } + def getAll: Array[(String, String)] = confSettings.synchronized { confSettings.asScala.toArray } - def getOption(key: String): Option[String] = Option(settings.get(key)) + def getOption(key: String): Option[String] = Option(confSettings.get(key)) - def contains(key: String): Boolean = settings.containsKey(key) + def contains(key: String): Boolean = confSettings.containsKey(key) def toDebugString: String = { - settings.synchronized { - settings.asScala.toArray.sorted.map{ case (k, v) => s"$k=$v" }.mkString("\n") + confSettings.synchronized { + confSettings.asScala.toArray.sorted.map{ case (k, v) => s"$k=$v" }.mkString("\n") } } private[spark] def clear() { - settings.clear() + confSettings.clear() } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 3ac9b8d8e42e..45e8830b9ba7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -19,17 +19,24 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.SQLConf import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.types.StructType import org.apache.spark.sql.catalyst.trees -abstract class LogicalPlan extends QueryPlan[LogicalPlan] { +abstract class LogicalPlan extends QueryPlan[LogicalPlan] with SQLConf { self: Product => + // TODO: make a case class? protected class Statistics { lazy val childrenStats = children.map(_.statistics) + lazy val numTuples: Long = childrenStats.map(_.numTuples).sum - lazy val sizeInBytes: Long = childrenStats.map(_.sizeInBytes).sum + + lazy val sizeInBytes: Long = { + val sum = childrenStats.map(_.sizeInBytes).sum + if (sum == 0) statsDefaultSizeInBytes else sum + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 568a64951def..d3718c55543f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql + import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag @@ -24,14 +25,15 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.ScalaReflection -import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.dsl.ExpressionConversions -import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.planning.SQLConf import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.columnar.InMemoryRelation import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.SparkStrategies diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index bafc5aaff3f0..dffa112fb7ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -82,6 +82,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { if left.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize => broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) + // TODO: use optimization here as well case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) => val hashJoin = execution.ShuffledHashJoin( From 73cde01cff8344874550bc8949bfafafc59ac8d4 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Wed, 9 Jul 2014 12:38:14 -0700 Subject: [PATCH 08/27] Move SQLConf back. Assign default sizeInBytes to SparkLogicalPlan. --- .../catalyst/plans/logical/LogicalPlan.scala | 19 +++++++------------ .../scala/org/apache/spark/sql}/SQLConf.scala | 6 +++--- .../org/apache/spark/sql/SQLContext.scala | 2 -- .../spark/sql/execution/SparkPlan.scala | 17 +++++++++++++++-- 4 files changed, 25 insertions(+), 19 deletions(-) rename sql/{catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning => core/src/main/scala/org/apache/spark/sql}/SQLConf.scala (97%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 45e8830b9ba7..5689816d022a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -19,29 +19,24 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.planning.SQLConf import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.types.StructType import org.apache.spark.sql.catalyst.trees -abstract class LogicalPlan extends QueryPlan[LogicalPlan] with SQLConf { +abstract class LogicalPlan extends QueryPlan[LogicalPlan] { self: Product => // TODO: make a case class? + /** + * Estimates of various statistics. The default estimation logic simply sums up the corresponding + * statistic produced by the children. To override this behavior, override `statistics` and + * assign it a overriden version of `Statistics`. + */ protected class Statistics { lazy val childrenStats = children.map(_.statistics) - lazy val numTuples: Long = childrenStats.map(_.numTuples).sum - - lazy val sizeInBytes: Long = { - val sum = childrenStats.map(_.sizeInBytes).sum - if (sum == 0) statsDefaultSizeInBytes else sum - } + lazy val sizeInBytes: Long = childrenStats.map(_.sizeInBytes).sum } - - /** - * Estimates of various statistics. - */ lazy val statistics: Statistics = new Statistics /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala similarity index 97% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SQLConf.scala rename to sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index c78c2439c253..234cb2a445b4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.catalyst.planning +package org.apache.spark.sql import java.util.Properties @@ -40,8 +40,8 @@ private object SQLConf { trait SQLConf { import SQLConf._ - import SQLConf._ - protected[spark] val settings = confSettings + import org.apache.spark.sql.SQLConf._ + @transient protected[spark] val settings = confSettings /** ************************ Spark SQL Params/Hints ******************* */ // TODO: refactor so that these hints accessors don't pollute the name space of SQLContext? diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index d3718c55543f..1a8357f31f5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql - import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag @@ -30,7 +29,6 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.dsl.ExpressionConversions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.Optimizer -import org.apache.spark.sql.catalyst.planning.SQLConf import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.types._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index b07f0df7de4b..7e6454d9c87f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Logging, Row} +import org.apache.spark.sql.{Logging, Row, SQLConf} import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.GenericRow @@ -67,7 +67,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging { */ @DeveloperApi case class SparkLogicalPlan(alreadyPlanned: SparkPlan) - extends LogicalPlan with MultiInstanceRelation { + extends LogicalPlan with MultiInstanceRelation with SQLConf { def output = alreadyPlanned.output override def references = Set.empty @@ -80,6 +80,19 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan) case _ => sys.error("Multiple instance of the same relation detected.") }).asInstanceOf[this.type] } + + override lazy val statistics = new Statistics { + // If this is wrapping around ExistingRdd and no reasonable estimation logic is implemented, + // return a default value. + override lazy val sizeInBytes: Long = { + val defaultSum = childrenStats.map(_.sizeInBytes).sum + alreadyPlanned match { + case e: ExistingRdd if defaultSum == 0 => statsDefaultSizeInBytes + case _ => defaultSum + } + } + } + } private[sql] trait LeafNode extends SparkPlan with trees.LeafNode[SparkPlan] { From 7d9216a8c7e385669205ce33bba77967d24800b8 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Wed, 9 Jul 2014 13:22:01 -0700 Subject: [PATCH 09/27] Apply estimation to planning ShuffleHashJoin & BroadcastNestedLoopJoin. --- .../spark/sql/execution/SparkStrategies.scala | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index dffa112fb7ee..4da2cc97a8fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -44,6 +44,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } + // TODO: add comments to explain optimization /** * Uses the ExtractEquiJoinKeys pattern to find joins where at least some of the predicates can be * evaluated by matching hash keys. @@ -82,11 +83,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { if left.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize => broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) - // TODO: use optimization here as well case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) => + val buildSide = + if (right.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize) BuildRight + else BuildLeft val hashJoin = execution.ShuffledHashJoin( - leftKeys, rightKeys, BuildRight, planLater(left), planLater(right)) + leftKeys, rightKeys, buildSide, planLater(left), planLater(right)) condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil case _ => Nil @@ -147,11 +150,16 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } + // TODO: add comments to explain optimization object BroadcastNestedLoopJoin extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.Join(left, right, joinType, condition) => + val (streamed, broadcast) = + if (right.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize) + (planLater(left), planLater(right)) + else (planLater(right), planLater(left)) execution.BroadcastNestedLoopJoin( - planLater(left), planLater(right), joinType, condition)(sqlContext) :: Nil + streamed, broadcast, joinType, condition)(sqlContext) :: Nil case _ => Nil } } From e5bcf5bcf9c650e8a33987c738c89a380ec022c9 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Wed, 9 Jul 2014 14:33:33 -0700 Subject: [PATCH 10/27] Fix optimization conditions & update scala docs to explain. --- .../spark/sql/execution/SparkStrategies.scala | 39 +++++++++---------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 4da2cc97a8fc..cf689a80a6e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -44,10 +44,16 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } - // TODO: add comments to explain optimization /** * Uses the ExtractEquiJoinKeys pattern to find joins where at least some of the predicates can be * evaluated by matching hash keys. + * + * This strategy applies a simple optimization based on the estimates of the physical sizes of + * the two join sides. When planning a [[execution.BroadcastHashJoin]], if one side has an + * estimated physical size smaller than the user-settable threshold + * `spark.sql.auto.convert.join.size`, the planner would mark it as the ''build'' relation and + * mark the other relation as the ''stream'' side. If both estimates exceed the threshold, + * they will instead be used to decide the build side in a [[execution.ShuffledHashJoin]]. */ object HashJoin extends Strategy with PredicateHelper { private[this] def broadcastHashJoin( @@ -63,29 +69,17 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case ExtractEquiJoinKeys( - Inner, - leftKeys, - rightKeys, - condition, - left, - right) + case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) if right.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize => broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) - case ExtractEquiJoinKeys( - Inner, - leftKeys, - rightKeys, - condition, - left, - right) + case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) if left.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize => broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) => val buildSide = - if (right.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize) BuildRight + if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) BuildRight else BuildLeft val hashJoin = execution.ShuffledHashJoin( @@ -150,16 +144,19 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } - // TODO: add comments to explain optimization + /** + * This strategy applies a simple optimization based on the estimates of the physical sizes of + * the two join sides: the planner would mark the relation with the smaller estimated physical + * size as the ''build'' (broadcast) relation and mark the other as the ''stream'' relation. + */ object BroadcastNestedLoopJoin extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.Join(left, right, joinType, condition) => val (streamed, broadcast) = - if (right.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize) - (planLater(left), planLater(right)) - else (planLater(right), planLater(left)) + if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) (left, right) + else (right, left) execution.BroadcastNestedLoopJoin( - streamed, broadcast, joinType, condition)(sqlContext) :: Nil + planLater(streamed), planLater(broadcast), joinType, condition)(sqlContext) :: Nil case _ => Nil } } From 3ba8f3e2355f4716a4aa1fc329054c937199634d Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Wed, 9 Jul 2014 14:39:33 -0700 Subject: [PATCH 11/27] Add comment. --- .../main/scala/org/apache/spark/sql/execution/SparkPlan.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 7e6454d9c87f..be573724b761 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -87,6 +87,8 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan) override lazy val sizeInBytes: Long = { val defaultSum = childrenStats.map(_.sizeInBytes).sum alreadyPlanned match { + // TODO: Instead of returning a default value here, find a way to return a meaningful + // estimate for RDDs. See PR 1238 for more discussions. case e: ExistingRdd if defaultSum == 0 => statsDefaultSizeInBytes case _ => defaultSum } From 4ef0d262612781a711520e82cd81d7db9ee87078 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Wed, 9 Jul 2014 15:08:49 -0700 Subject: [PATCH 12/27] Make Statistics a case class. --- .../sql/catalyst/plans/logical/LogicalPlan.scala | 11 +++++------ .../org/apache/spark/sql/execution/SparkPlan.scala | 8 ++++---- .../apache/spark/sql/parquet/ParquetRelation.scala | 6 +++--- .../apache/spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- 4 files changed, 13 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 5689816d022a..36080ec8b719 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -26,18 +26,17 @@ import org.apache.spark.sql.catalyst.trees abstract class LogicalPlan extends QueryPlan[LogicalPlan] { self: Product => - // TODO: make a case class? /** * Estimates of various statistics. The default estimation logic simply sums up the corresponding * statistic produced by the children. To override this behavior, override `statistics` and * assign it a overriden version of `Statistics`. */ - protected class Statistics { - lazy val childrenStats = children.map(_.statistics) - lazy val numTuples: Long = childrenStats.map(_.numTuples).sum - lazy val sizeInBytes: Long = childrenStats.map(_.sizeInBytes).sum - } + case class Statistics( + numTuples: Long = childrenStats.map(_.numTuples).sum, + sizeInBytes: Long = childrenStats.map(_.sizeInBytes).sum + ) lazy val statistics: Statistics = new Statistics + lazy val childrenStats = children.map(_.statistics) /** * Returns the set of attributes that are referenced by this node diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index be573724b761..55b3c99fb399 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -81,19 +81,19 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan) }).asInstanceOf[this.type] } - override lazy val statistics = new Statistics { + @transient override lazy val statistics = Statistics( // If this is wrapping around ExistingRdd and no reasonable estimation logic is implemented, // return a default value. - override lazy val sizeInBytes: Long = { + sizeInBytes = { val defaultSum = childrenStats.map(_.sizeInBytes).sum alreadyPlanned match { // TODO: Instead of returning a default value here, find a way to return a meaningful - // estimate for RDDs. See PR 1238 for more discussions. + // size estimate for RDDs. See PR 1238 for more discussions. case e: ExistingRdd if defaultSum == 0 => statsDefaultSizeInBytes case _ => defaultSum } } - } + ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index f70aff4fef69..8962f3a07b9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -53,14 +53,14 @@ private[sql] case class ParquetRelation( self: Product => - @transient override lazy val statistics = new Statistics { + @transient override lazy val statistics = Statistics( // TODO: investigate getting encoded column statistics in the parquet file? - override lazy val sizeInBytes: Long = { + sizeInBytes = { val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(conf.getOrElse(ContextUtil.getConfiguration(new Job()))) fs.getContentSummary(hdfsPath).getLength // TODO: in bytes or system-dependent? } - } + ) /** Schema derived from ParquetFile */ def parquetSchema: MessageType = diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index a504405701dd..909fb4de4a42 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -273,7 +273,7 @@ private[hive] case class MetastoreRelation @transient override lazy val statistics = new Statistics { // TODO: check if this estimate is valid for tables after partition pruning. // Size getters adapted from SizeBasedBigTableSelectorForAutoSMJ.java in Hive (version 0.13). - override lazy val sizeInBytes: Long = + override val sizeInBytes: Long = maybeGetSize(hiveConf, hiveQlTable.getProperty("totalSize"), path) private[this] def maybeGetSize(conf: HiveConf, size: String, path: Path): Long = { From 0ef9e5b06f776a2029761edf474f162d7a83268e Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Wed, 9 Jul 2014 23:37:00 -0700 Subject: [PATCH 13/27] Use multiplication instead of sum for default estimates. --- .../sql/catalyst/plans/logical/LogicalPlan.scala | 16 ++++++++++++++-- .../apache/spark/sql/execution/SparkPlan.scala | 6 +++--- .../spark/sql/parquet/ParquetRelation.scala | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 36080ec8b719..75b9b626d7a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -26,14 +26,24 @@ import org.apache.spark.sql.catalyst.trees abstract class LogicalPlan extends QueryPlan[LogicalPlan] { self: Product => + // TODO: handle overflow? /** * Estimates of various statistics. The default estimation logic simply sums up the corresponding * statistic produced by the children. To override this behavior, override `statistics` and * assign it a overriden version of `Statistics`. */ case class Statistics( - numTuples: Long = childrenStats.map(_.numTuples).sum, - sizeInBytes: Long = childrenStats.map(_.sizeInBytes).sum + /** + * Number of output tuples. For leaf operators this defaults to 1, otherwise it is set to the + * product of children's `numTuples`. + */ + numTuples: Long = childrenStats.map(_.numTuples).product, + + /** + * Physical size in bytes. For leaf operators this defaults to 1, otherwise it is set to the + * product of children's `sizeInBytes`. + */ + sizeInBytes: Long = childrenStats.map(_.sizeInBytes).product ) lazy val statistics: Statistics = new Statistics lazy val childrenStats = children.map(_.statistics) @@ -104,6 +114,8 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] { self: Product => + override lazy val statistics = Statistics(numTuples = 1L, sizeInBytes = 1L) + // Leaf nodes by definition cannot reference any input attributes. override def references = Set.empty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 55b3c99fb399..5ed5a89b1e2a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -85,12 +85,12 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan) // If this is wrapping around ExistingRdd and no reasonable estimation logic is implemented, // return a default value. sizeInBytes = { - val defaultSum = childrenStats.map(_.sizeInBytes).sum + val naiveVal = childrenStats.map(_.sizeInBytes).product alreadyPlanned match { // TODO: Instead of returning a default value here, find a way to return a meaningful // size estimate for RDDs. See PR 1238 for more discussions. - case e: ExistingRdd if defaultSum == 0 => statsDefaultSizeInBytes - case _ => defaultSum + case e: ExistingRdd if naiveVal == 1L => statsDefaultSizeInBytes + case _ => naiveVal } } ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 8962f3a07b9e..ad3753982715 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -58,7 +58,7 @@ private[sql] case class ParquetRelation( sizeInBytes = { val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(conf.getOrElse(ContextUtil.getConfiguration(new Job()))) - fs.getContentSummary(hdfsPath).getLength // TODO: in bytes or system-dependent? + math.max(fs.getContentSummary(hdfsPath).getLength, 1L) // TODO: in bytes or system-dependent? } ) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 909fb4de4a42..49b1301b55fc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -274,7 +274,7 @@ private[hive] case class MetastoreRelation // TODO: check if this estimate is valid for tables after partition pruning. // Size getters adapted from SizeBasedBigTableSelectorForAutoSMJ.java in Hive (version 0.13). override val sizeInBytes: Long = - maybeGetSize(hiveConf, hiveQlTable.getProperty("totalSize"), path) + math.max(maybeGetSize(hiveConf, hiveQlTable.getProperty("totalSize"), path), 1L) private[this] def maybeGetSize(conf: HiveConf, size: String, path: Path): Long = { val res = try { Some(size.toLong) } catch { case _: Exception => None } From 43d38a6ebb6d46b09e327e493f5a218ac1d1093a Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Wed, 9 Jul 2014 23:39:16 -0700 Subject: [PATCH 14/27] Revert optimization for BroadcastNestedLoopJoin (this fixes tests). --- .../org/apache/spark/sql/execution/SparkStrategies.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index cf689a80a6e6..0b78432ab201 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -152,11 +152,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object BroadcastNestedLoopJoin extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.Join(left, right, joinType, condition) => - val (streamed, broadcast) = - if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) (left, right) - else (right, left) execution.BroadcastNestedLoopJoin( - planLater(streamed), planLater(broadcast), joinType, condition)(sqlContext) :: Nil + planLater(left), planLater(right), joinType, condition)(sqlContext) :: Nil case _ => Nil } } From ca5b8258ff99a68f8ee0a7b3d1d4bca2c1193aa4 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Thu, 10 Jul 2014 09:55:20 -0700 Subject: [PATCH 15/27] Inject SQLContext into SparkLogicalPlan, removing SQLConf mixin from it. --- .../main/scala/org/apache/spark/sql/SQLContext.scala | 8 ++++---- .../main/scala/org/apache/spark/sql/SchemaRDD.scala | 3 ++- .../scala/org/apache/spark/sql/SchemaRDDLike.scala | 2 +- .../apache/spark/sql/api/java/JavaSQLContext.scala | 4 ++-- .../org/apache/spark/sql/execution/SparkPlan.scala | 10 +++++----- .../scala/org/apache/spark/sql/json/JsonRDD.scala | 11 +++++++---- 6 files changed, 21 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 1a8357f31f5d..3f3952bf91d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -86,7 +86,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * @group userf */ implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) = - new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd))) + new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd))(self)) /** * Loads a Parquet file, returning the result as a [[SchemaRDD]]. @@ -127,7 +127,7 @@ class SQLContext(@transient val sparkContext: SparkContext) */ @Experimental def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD = - new SchemaRDD(this, JsonRDD.inferSchema(json, samplingRatio)) + new SchemaRDD(this, JsonRDD.inferSchema(self, json, samplingRatio)) /** * :: Experimental :: @@ -208,7 +208,7 @@ class SQLContext(@transient val sparkContext: SparkContext) case inMem @ InMemoryRelation(_, _, e: ExistingRdd) => inMem.cachedColumnBuffers.unpersist() catalog.unregisterTable(None, tableName) - catalog.registerTable(None, tableName, SparkLogicalPlan(e)) + catalog.registerTable(None, tableName, SparkLogicalPlan(e)(self)) case inMem: InMemoryRelation => inMem.cachedColumnBuffers.unpersist() catalog.unregisterTable(None, tableName) @@ -367,7 +367,7 @@ class SQLContext(@transient val sparkContext: SparkContext) new GenericRow(map.values.toArray.asInstanceOf[Array[Any]]): Row } } - new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema, rowRdd))) + new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema, rowRdd))(self)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 31d27bb4f057..a5b9383e4825 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -430,7 +430,8 @@ class SchemaRDD( * @group schema */ private def applySchema(rdd: RDD[Row]): SchemaRDD = { - new SchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(queryExecution.analyzed.output, rdd))) + new SchemaRDD(sqlContext, + SparkLogicalPlan(ExistingRdd(queryExecution.analyzed.output, rdd))(sqlContext)) } // ======================================================================= diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala index fe8172194320..fd751031b26e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala @@ -56,7 +56,7 @@ private[sql] trait SchemaRDDLike { // happen right away to let these side effects take place eagerly. case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile => queryExecution.toRdd - SparkLogicalPlan(queryExecution.executedPlan) + SparkLogicalPlan(queryExecution.executedPlan)(sqlContext) case _ => baseLogicalPlan } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala index 790d9ef22cf1..806097c917b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -92,7 +92,7 @@ class JavaSQLContext(val sqlContext: SQLContext) { new GenericRow(extractors.map(e => e.invoke(row)).toArray[Any]): ScalaRow } } - new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd))) + new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd))(sqlContext)) } /** @@ -120,7 +120,7 @@ class JavaSQLContext(val sqlContext: SQLContext) { * @group userf */ def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD = - new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(json, 1.0)) + new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(sqlContext, json, 1.0)) /** * Registers the given RDD as a temporary table in the catalog. Temporary tables exist only diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 5ed5a89b1e2a..f398234bb7bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Logging, Row, SQLConf} +import org.apache.spark.sql.{Logging, Row, SQLContext} import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.GenericRow @@ -66,8 +66,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging { * linking. */ @DeveloperApi -case class SparkLogicalPlan(alreadyPlanned: SparkPlan) - extends LogicalPlan with MultiInstanceRelation with SQLConf { +case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQLContext) + extends LogicalPlan with MultiInstanceRelation { def output = alreadyPlanned.output override def references = Set.empty @@ -78,7 +78,7 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan) alreadyPlanned match { case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd) case _ => sys.error("Multiple instance of the same relation detected.") - }).asInstanceOf[this.type] + })(sqlContext).asInstanceOf[this.type] } @transient override lazy val statistics = Statistics( @@ -89,7 +89,7 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan) alreadyPlanned match { // TODO: Instead of returning a default value here, find a way to return a meaningful // size estimate for RDDs. See PR 1238 for more discussions. - case e: ExistingRdd if naiveVal == 1L => statsDefaultSizeInBytes + case e: ExistingRdd if naiveVal == 1L => sqlContext.statsDefaultSizeInBytes case _ => naiveVal } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index b48c70ee73a2..6c2b553bb908 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -28,11 +28,12 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan} -import org.apache.spark.sql.Logging +import org.apache.spark.sql.{SQLContext, Logging} private[sql] object JsonRDD extends Logging { private[sql] def inferSchema( + sqlContext: SQLContext, json: RDD[String], samplingRatio: Double = 1.0): LogicalPlan = { require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0") @@ -40,15 +41,17 @@ private[sql] object JsonRDD extends Logging { val allKeys = parseJson(schemaData).map(allKeysWithValueTypes).reduce(_ ++ _) val baseSchema = createSchema(allKeys) - createLogicalPlan(json, baseSchema) + createLogicalPlan(json, baseSchema, sqlContext) } private def createLogicalPlan( json: RDD[String], - baseSchema: StructType): LogicalPlan = { + baseSchema: StructType, + sqlContext: SQLContext): LogicalPlan = { val schema = nullTypeToStringType(baseSchema) - SparkLogicalPlan(ExistingRdd(asAttributes(schema), parseJson(json).map(asRow(_, schema)))) + SparkLogicalPlan( + ExistingRdd(asAttributes(schema), parseJson(json).map(asRow(_, schema))))(sqlContext) } private def createSchema(allKeys: Set[(String, DataType)]): StructType = { From 2d99eb5479bded7d76cf8c88d28f9a1c85390a50 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Thu, 10 Jul 2014 10:03:20 -0700 Subject: [PATCH 16/27] {Cleanup, use synchronized in, enrich} StatisticsSuite. --- .../spark/sql/hive/StatisticsSuite.scala | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index c004eb7a588f..67c157f228c2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql.hive import scala.reflect.ClassTag import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.execution.BroadcastHashJoin +import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin} +import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.parquet.{ParquetRelation, ParquetTestData} import org.apache.spark.util.Utils @@ -36,7 +37,7 @@ class StatisticsSuite extends QueryTest { } assert(sizes.size === 1) assert(sizes(0)._1 == sizes(0)._2, "after .newInstance, estimates are different from before") - assert(sizes(0)._1 > 0) + assert(sizes(0)._1 > 1, "1 is the default, indicating the absence of a meaningful estimate") Utils.deleteRecursively(ParquetTestData.testDir) } @@ -46,7 +47,8 @@ class StatisticsSuite extends QueryTest { val sizes = rdd.queryExecution.analyzed.collect { case mr: MetastoreRelation => mr.statistics.sizeInBytes } - assert(sizes.size === 1 && sizes(0) > 0) + assert(sizes.size === 1) + assert(sizes(0) > 1, "1 is the default, indicating the absence of a meaningful estimate") } test("auto converts to broadcast hash join, by size estimate of a relation") { @@ -62,8 +64,7 @@ class StatisticsSuite extends QueryTest { // Assert src has a size smaller than the threshold. val sizes = rdd.queryExecution.analyzed.collect { - case r if ct.runtimeClass.isAssignableFrom(r.getClass) => - r.statistics.sizeInBytes + case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes } assert(sizes.size === 2 && sizes(0) <= autoConvertJoinSize, s"query should contain two relations, each of which has size smaller than autoConvertSize") @@ -74,16 +75,22 @@ class StatisticsSuite extends QueryTest { assert(bhj.size === 1, s"actual query plans do not contain broadcast join: ${rdd.queryExecution}") - checkAnswer(rdd, expectedAnswer) + checkAnswer(rdd, expectedAnswer) // check correctness of output + + TestHive.settings.synchronized { + val tmp = autoConvertJoinSize - // TODO(zongheng): synchronize on TestHive.settings, or use Sequential/Stepwise. - val tmp = autoConvertJoinSize - hql("""SET spark.sql.auto.convert.join.size=0""") - rdd = hql(query) - bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j } - assert(bhj.isEmpty) + hql("""SET spark.sql.auto.convert.join.size=-1""") + rdd = hql(query) + bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j } + assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off") - hql(s"""SET spark.sql.auto.convert.join.size=$tmp""") + val shj = rdd.queryExecution.sparkPlan.collect { case j: ShuffledHashJoin => j } + assert(shj.size === 1, + "ShuffledHashJoin should be planned when BroadcastHashJoin is turned off") + + hql(s"""SET spark.sql.auto.convert.join.size=$tmp""") + } after() } From 573e644888df883defbb65fa0440a134dd253213 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Mon, 14 Jul 2014 11:41:02 -0700 Subject: [PATCH 17/27] Remove singleton SQLConf and move back `settings` to the trait. --- .../scala/org/apache/spark/sql/SQLConf.scala | 31 ++++++++----------- 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 234cb2a445b4..c880e9f84208 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -21,11 +21,6 @@ import java.util.Properties import scala.collection.JavaConverters._ -private object SQLConf { - @transient protected[spark] val confSettings = java.util.Collections.synchronizedMap( - new java.util.HashMap[String, String]()) -} - /** * A trait that enables the setting and getting of mutable config parameters/hints. The central * location for storing them is uniquely located in the same-name private companion object. @@ -40,8 +35,8 @@ private object SQLConf { trait SQLConf { import SQLConf._ - import org.apache.spark.sql.SQLConf._ - @transient protected[spark] val settings = confSettings + @transient protected[spark] val settings = java.util.Collections.synchronizedMap( + new java.util.HashMap[String, String]()) /** ************************ Spark SQL Params/Hints ******************* */ // TODO: refactor so that these hints accessors don't pollute the name space of SQLContext? @@ -70,39 +65,39 @@ trait SQLConf { /** ********************** SQLConf functionality methods ************ */ def set(props: Properties): Unit = { - confSettings.synchronized { - props.asScala.foreach { case (k, v) => confSettings.put(k, v) } + settings.synchronized { + props.asScala.foreach { case (k, v) => settings.put(k, v) } } } def set(key: String, value: String): Unit = { require(key != null, "key cannot be null") require(value != null, s"value cannot be null for key: $key") - confSettings.put(key, value) + settings.put(key, value) } def get(key: String): String = { - Option(confSettings.get(key)).getOrElse(throw new NoSuchElementException(key)) + Option(settings.get(key)).getOrElse(throw new NoSuchElementException(key)) } def get(key: String, defaultValue: String): String = { - Option(confSettings.get(key)).getOrElse(defaultValue) + Option(settings.get(key)).getOrElse(defaultValue) } - def getAll: Array[(String, String)] = confSettings.synchronized { confSettings.asScala.toArray } + def getAll: Array[(String, String)] = settings.synchronized { settings.asScala.toArray } - def getOption(key: String): Option[String] = Option(confSettings.get(key)) + def getOption(key: String): Option[String] = Option(settings.get(key)) - def contains(key: String): Boolean = confSettings.containsKey(key) + def contains(key: String): Boolean = settings.containsKey(key) def toDebugString: String = { - confSettings.synchronized { - confSettings.asScala.toArray.sorted.map{ case (k, v) => s"$k=$v" }.mkString("\n") + settings.synchronized { + settings.asScala.toArray.sorted.map{ case (k, v) => s"$k=$v" }.mkString("\n") } } private[spark] def clear() { - confSettings.clear() + settings.clear() } } From 729a8e25a01ead3815ab36c11b81382f4413247e Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Mon, 14 Jul 2014 11:49:29 -0700 Subject: [PATCH 18/27] Update docs to be more explicit. --- .../apache/spark/sql/execution/SparkStrategies.scala | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 0b78432ab201..64d246346dd8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -52,8 +52,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * the two join sides. When planning a [[execution.BroadcastHashJoin]], if one side has an * estimated physical size smaller than the user-settable threshold * `spark.sql.auto.convert.join.size`, the planner would mark it as the ''build'' relation and - * mark the other relation as the ''stream'' side. If both estimates exceed the threshold, - * they will instead be used to decide the build side in a [[execution.ShuffledHashJoin]]. + * mark the other relation as the ''stream'' side. The build table will be ''broadcasted'' to + * all of the executors involved in the join, as a [[org.apache.spark.broadcast.Broadcast]] + * object. If both estimates exceed the threshold, they will instead be used to decide the build + * side in a [[execution.ShuffledHashJoin]]. */ object HashJoin extends Strategy with PredicateHelper { private[this] def broadcastHashJoin( @@ -144,11 +146,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } - /** - * This strategy applies a simple optimization based on the estimates of the physical sizes of - * the two join sides: the planner would mark the relation with the smaller estimated physical - * size as the ''build'' (broadcast) relation and mark the other as the ''stream'' relation. - */ object BroadcastNestedLoopJoin extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.Join(left, right, joinType, condition) => From 549061ce8f1da1f9627dab428ddcbdd41a1def63 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Mon, 14 Jul 2014 14:19:03 -0700 Subject: [PATCH 19/27] Remove numTuples in Statistics for now. --- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 75b9b626d7a2..76a0ec5ac99a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -33,12 +33,6 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { * assign it a overriden version of `Statistics`. */ case class Statistics( - /** - * Number of output tuples. For leaf operators this defaults to 1, otherwise it is set to the - * product of children's `numTuples`. - */ - numTuples: Long = childrenStats.map(_.numTuples).product, - /** * Physical size in bytes. For leaf operators this defaults to 1, otherwise it is set to the * product of children's `sizeInBytes`. From 01b7a3e2689b904ab30e3fa6a6238443d99faeb2 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Mon, 14 Jul 2014 14:21:07 -0700 Subject: [PATCH 20/27] Update scaladoc for a field and move it to @param section. --- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 76a0ec5ac99a..e45717527cf0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -31,12 +31,11 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { * Estimates of various statistics. The default estimation logic simply sums up the corresponding * statistic produced by the children. To override this behavior, override `statistics` and * assign it a overriden version of `Statistics`. + * + * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it + * defaults to the product of children's `sizeInBytes`. */ case class Statistics( - /** - * Physical size in bytes. For leaf operators this defaults to 1, otherwise it is set to the - * product of children's `sizeInBytes`. - */ sizeInBytes: Long = childrenStats.map(_.sizeInBytes).product ) lazy val statistics: Statistics = new Statistics @@ -108,7 +107,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] { self: Product => - override lazy val statistics = Statistics(numTuples = 1L, sizeInBytes = 1L) + override lazy val statistics = Statistics(sizeInBytes = 1L) // Leaf nodes by definition cannot reference any input attributes. override def references = Set.empty From 6e594b845029ec060cbb7ccfde482a1071bf5b1a Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Wed, 16 Jul 2014 11:15:26 -0700 Subject: [PATCH 21/27] Get size info from metastore for MetastoreRelation. Additionally, remove size estimate from ParquetRelation since the Hadoop FileSystem API calls can be expensive (e.g. S3FileSystem has a lot of RPCs). --- .../spark/sql/parquet/ParquetRelation.scala | 11 ----- .../spark/sql/hive/HiveMetastoreCatalog.scala | 20 ++++----- .../spark/sql/hive/StatisticsSuite.scala | 42 +------------------ 3 files changed, 11 insertions(+), 62 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index ad3753982715..8c7dbd5eb4a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -22,11 +22,9 @@ import java.io.IOException import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.permission.FsAction -import org.apache.hadoop.mapreduce.Job import parquet.hadoop.ParquetOutputFormat import parquet.hadoop.metadata.CompressionCodecName -import parquet.hadoop.util.ContextUtil import parquet.schema.MessageType import org.apache.spark.sql.SQLContext @@ -53,15 +51,6 @@ private[sql] case class ParquetRelation( self: Product => - @transient override lazy val statistics = Statistics( - // TODO: investigate getting encoded column statistics in the parquet file? - sizeInBytes = { - val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(conf.getOrElse(ContextUtil.getConfiguration(new Job()))) - math.max(fs.getContentSummary(hdfsPath).getLength, 1L) // TODO: in bytes or system-dependent? - } - ) - /** Schema derived from ParquetFile */ def parquetSchema: MessageType = ParquetTypesConverter diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 49b1301b55fc..04a639209e9c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -269,18 +269,18 @@ private[hive] case class MetastoreRelation new Partition(hiveQlTable, p) } - // TODO: are there any stats in hiveQlTable.getSkewedInfo that we can use? - @transient override lazy val statistics = new Statistics { + @transient override lazy val statistics = Statistics( // TODO: check if this estimate is valid for tables after partition pruning. - // Size getters adapted from SizeBasedBigTableSelectorForAutoSMJ.java in Hive (version 0.13). - override val sizeInBytes: Long = - math.max(maybeGetSize(hiveConf, hiveQlTable.getProperty("totalSize"), path), 1L) - - private[this] def maybeGetSize(conf: HiveConf, size: String, path: Path): Long = { - val res = try { Some(size.toLong) } catch { case _: Exception => None } - res.getOrElse { path.getFileSystem(conf).getContentSummary(path).getLength } + sizeInBytes = { + // NOTE: kind of hacky, but this should be relatively cheap if parameters for the table are + // populated into the metastore. An alternative would be going through Hadoop's FileSystem + // API, which can be expensive if a lot of RPCs are involved. Besides `totalSize`, there are + // also `numFiles`, `numRows`, `rawDataSize` keys we can look at in the future. + val sizeMaybeFromMetastore = + Option(hiveQlTable.getParameters.get("totalSize")).map(_.toLong).getOrElse(-1L) + math.max(sizeMaybeFromMetastore, 1L) } - } + ) val tableDesc = new TableDesc( Class.forName(hiveQlTable.getSerializationLib).asInstanceOf[Class[Deserializer]], diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 67c157f228c2..2be8573620e0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -23,32 +23,16 @@ import org.apache.spark.sql.QueryTest import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin} import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ -import org.apache.spark.sql.parquet.{ParquetRelation, ParquetTestData} -import org.apache.spark.util.Utils class StatisticsSuite extends QueryTest { - test("estimates the size of a test ParquetRelation") { - ParquetTestData.writeFile() - val testRDD = parquetFile(ParquetTestData.testDir.toString) - - val sizes = testRDD.logicalPlan.collect { case j: ParquetRelation => - (j.statistics.sizeInBytes, j.newInstance.statistics.sizeInBytes) - } - assert(sizes.size === 1) - assert(sizes(0)._1 == sizes(0)._2, "after .newInstance, estimates are different from before") - assert(sizes(0)._1 > 1, "1 is the default, indicating the absence of a meaningful estimate") - - Utils.deleteRecursively(ParquetTestData.testDir) - } - test("estimates the size of a test MetastoreRelation") { val rdd = hql("""SELECT * FROM src""") val sizes = rdd.queryExecution.analyzed.collect { case mr: MetastoreRelation => mr.statistics.sizeInBytes } assert(sizes.size === 1) - assert(sizes(0) > 1, "1 is the default, indicating the absence of a meaningful estimate") + assert(sizes(0) == 5812, s"expected exact size 5812 for test table 'src', got ${sizes(0)}") } test("auto converts to broadcast hash join, by size estimate of a relation") { @@ -95,30 +79,6 @@ class StatisticsSuite extends QueryTest { after() } - /** Tests for ParquetRelation */ - val parquetQuery = - """SELECT a.mystring, b.myint - |FROM psrc a - |JOIN psrc b - |ON a.mylong = 0 AND a.mylong = b.mylong""".stripMargin - val parquetAnswer = Seq(("abc", 5)) - def parquetBefore(): Unit = { - ParquetTestData.writeFile() - val testRDD = parquetFile(ParquetTestData.testDir.toString) - testRDD.registerAsTable("psrc") - } - def parquetAfter() = { - Utils.deleteRecursively(ParquetTestData.testDir) - reset() - } - mkTest( - parquetBefore, - parquetAfter, - parquetQuery, - parquetAnswer, - implicitly[ClassTag[ParquetRelation]] - ) - /** Tests for MetastoreRelation */ val metastoreQuery = """SELECT * FROM src a JOIN src b ON a.key = 238 AND a.key = b.key""" val metastoreAnswer = Seq.fill(4)((238, "val_238", 238, "val_238")) From 8bd2816a90d5eb942f126dd6b77aef3e4d260bcf Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Wed, 16 Jul 2014 11:25:27 -0700 Subject: [PATCH 22/27] Add a note on performance of statistics. --- .../apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index e45717527cf0..da7d8e68e212 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -32,6 +32,10 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { * statistic produced by the children. To override this behavior, override `statistics` and * assign it a overriden version of `Statistics`. * + * '''NOTE''': concrete and/or overriden versions of statistics fields should pay attention to the + * performance of the implementations. The reason is that estimations might get triggered in + * performance-critical processes, such as query plan planning. + * * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it * defaults to the product of children's `sizeInBytes`. */ From 16fc60a2bb876acdadfc21099b5a7044ded924a7 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Wed, 16 Jul 2014 11:28:44 -0700 Subject: [PATCH 23/27] Avoid calling statistics on plans if auto join conversion is disabled. --- .../sql/catalyst/plans/logical/LogicalPlan.scala | 6 +++--- .../apache/spark/sql/execution/SparkStrategies.scala | 12 +++++++----- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index da7d8e68e212..9939041b6f8b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -28,9 +28,9 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { // TODO: handle overflow? /** - * Estimates of various statistics. The default estimation logic simply sums up the corresponding - * statistic produced by the children. To override this behavior, override `statistics` and - * assign it a overriden version of `Statistics`. + * Estimates of various statistics. The default estimation logic simply lazily multiplies the + * corresponding statistic produced by the children. To override this behavior, override + * `statistics` and assign it an overriden version of `Statistics`. * * '''NOTE''': concrete and/or overriden versions of statistics fields should pay attention to the * performance of the implementations. The reason is that estimations might get triggered in diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 64d246346dd8..b766dd1465c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -58,7 +58,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * side in a [[execution.ShuffledHashJoin]]. */ object HashJoin extends Strategy with PredicateHelper { - private[this] def broadcastHashJoin( + private[this] def makeBroadcastHashJoin( leftKeys: Seq[Expression], rightKeys: Seq[Expression], left: LogicalPlan, @@ -72,12 +72,14 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) - if right.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize => - broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) + if sqlContext.autoConvertJoinSize > 0 && + right.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize => + makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) - if left.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize => - broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) + if sqlContext.autoConvertJoinSize > 0 && + left.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize => + makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) => val buildSide = From 9951305ea5cb6d0bf611aa9ab6a437428e47caad Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Wed, 16 Jul 2014 13:03:56 -0700 Subject: [PATCH 24/27] Remove childrenStats. --- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 9939041b6f8b..fc06fc1f96d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -40,10 +40,11 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { * defaults to the product of children's `sizeInBytes`. */ case class Statistics( - sizeInBytes: Long = childrenStats.map(_.sizeInBytes).product + sizeInBytes: Long + ) + lazy val statistics: Statistics = Statistics( + sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product ) - lazy val statistics: Statistics = new Statistics - lazy val childrenStats = children.map(_.statistics) /** * Returns the set of attributes that are referenced by this node From 2f2fb89b1420898ea00a5727599380e764b5fef5 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Wed, 16 Jul 2014 13:30:57 -0700 Subject: [PATCH 25/27] Fix statistics for SparkLogicalPlan. --- .../scala/org/apache/spark/sql/execution/SparkPlan.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index f398234bb7bd..e0ea0a3e4dc5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -82,15 +82,12 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQ } @transient override lazy val statistics = Statistics( - // If this is wrapping around ExistingRdd and no reasonable estimation logic is implemented, - // return a default value. sizeInBytes = { - val naiveVal = childrenStats.map(_.sizeInBytes).product alreadyPlanned match { // TODO: Instead of returning a default value here, find a way to return a meaningful // size estimate for RDDs. See PR 1238 for more discussions. - case e: ExistingRdd if naiveVal == 1L => sqlContext.statsDefaultSizeInBytes - case _ => naiveVal + case e: ExistingRdd => sqlContext.statsDefaultSizeInBytes + case _ => 1L // TODO: consider adding statistics to physical plans as well. } } ) From 8663e84b89509afc72f047c6c2f21dd73d44e245 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Tue, 22 Jul 2014 11:51:09 -0700 Subject: [PATCH 26/27] Use BigInt for stat; for logical leaves, by default throw an exception. Also cleanups & scaladoc fixes per review comments. --- .../catalyst/plans/logical/LogicalPlan.scala | 6 ++--- .../scala/org/apache/spark/sql/SQLConf.scala | 10 +++----- .../spark/sql/execution/SparkPlan.scala | 11 +++----- .../spark/sql/execution/SparkStrategies.scala | 19 ++++++++------ .../spark/sql/hive/HiveMetastoreCatalog.scala | 25 ++++++++++--------- .../spark/sql/hive/StatisticsSuite.scala | 3 ++- 6 files changed, 37 insertions(+), 37 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index fc06fc1f96d3..ac85f95b52a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.trees abstract class LogicalPlan extends QueryPlan[LogicalPlan] { self: Product => - // TODO: handle overflow? /** * Estimates of various statistics. The default estimation logic simply lazily multiplies the * corresponding statistic produced by the children. To override this behavior, override @@ -40,7 +39,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { * defaults to the product of children's `sizeInBytes`. */ case class Statistics( - sizeInBytes: Long + sizeInBytes: BigInt ) lazy val statistics: Statistics = Statistics( sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product @@ -112,7 +111,8 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] { self: Product => - override lazy val statistics = Statistics(sizeInBytes = 1L) + override lazy val statistics: Statistics = + throw new UnsupportedOperationException("default leaf nodes don't have meaningful Statistics") // Leaf nodes by definition cannot reference any input attributes. override def references = Set.empty diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index c880e9f84208..825525de7804 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -22,15 +22,13 @@ import java.util.Properties import scala.collection.JavaConverters._ /** - * A trait that enables the setting and getting of mutable config parameters/hints. The central - * location for storing them is uniquely located in the same-name private companion object. - * Therefore, all classes that mix in this trait share all the hints. + * A trait that enables the setting and getting of mutable config parameters/hints. * - * In the presence of a SQLContext, these can be set and queried either by passing SET commands - * into Spark SQL's DSL functions (sql(), hql(), etc.). Otherwise, users of this trait can + * In the presence of a SQLContext, these can be set and queried by passing SET commands + * into Spark SQL's query functions (sql(), hql(), etc.). Otherwise, users of this trait can * modify the hints by programmatically calling the setters and getters of this trait. * - * SQLConf is thread-safe (internally synchronized so safe to be used in multiple threads). + * SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads). */ trait SQLConf { import SQLConf._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index e0ea0a3e4dc5..61dc1b54bb62 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -82,14 +82,9 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQ } @transient override lazy val statistics = Statistics( - sizeInBytes = { - alreadyPlanned match { - // TODO: Instead of returning a default value here, find a way to return a meaningful - // size estimate for RDDs. See PR 1238 for more discussions. - case e: ExistingRdd => sqlContext.statsDefaultSizeInBytes - case _ => 1L // TODO: consider adding statistics to physical plans as well. - } - } + // TODO: Instead of returning a default value here, find a way to return a meaningful size + // estimate for RDDs. See PR 1238 for more discussions. + sizeInBytes = BigInt(sqlContext.statsDefaultSizeInBytes) ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index b766dd1465c4..649ff82383de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution +import scala.util.Try + import org.apache.spark.sql.{SQLContext, execution} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ @@ -72,19 +74,22 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) - if sqlContext.autoConvertJoinSize > 0 && - right.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize => - makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) + if Try(sqlContext.autoConvertJoinSize > 0 && + right.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize).getOrElse(false) => + makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) - if sqlContext.autoConvertJoinSize > 0 && - left.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize => + if Try(sqlContext.autoConvertJoinSize > 0 && + left.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize).getOrElse(false) => makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) => val buildSide = - if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) BuildRight - else BuildLeft + if (Try(right.statistics.sizeInBytes <= left.statistics.sizeInBytes).getOrElse(false)) { + BuildRight + } else { + BuildLeft + } val hashJoin = execution.ShuffledHashJoin( leftKeys, rightKeys, buildSide, planLater(left), planLater(right)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 04a639209e9c..5508c2da34de 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.serde2.Deserializer import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.Logging +import org.apache.spark.sql.{SQLContext, Logging} import org.apache.spark.sql.catalyst.analysis.{EliminateAnalysisOperators, Catalog} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical @@ -66,8 +66,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with // Since HiveQL is case insensitive for table names we make them all lowercase. MetastoreRelation( databaseName, tblName, alias)( - table.getTTable, partitions.map(part => part.getTPartition))( - hive.hiveconf, table.getPath) + table.getTTable, partitions.map(part => part.getTPartition))(hive) } def createTable( @@ -252,7 +251,7 @@ object HiveMetastoreTypes extends RegexParsers { private[hive] case class MetastoreRelation (databaseName: String, tableName: String, alias: Option[String]) (val table: TTable, val partitions: Seq[TPartition]) - (@transient hiveConf: HiveConf, @transient path: Path) + (@transient sqlContext: SQLContext) extends LeafNode { self: Product => @@ -270,15 +269,17 @@ private[hive] case class MetastoreRelation } @transient override lazy val statistics = Statistics( - // TODO: check if this estimate is valid for tables after partition pruning. sizeInBytes = { - // NOTE: kind of hacky, but this should be relatively cheap if parameters for the table are - // populated into the metastore. An alternative would be going through Hadoop's FileSystem - // API, which can be expensive if a lot of RPCs are involved. Besides `totalSize`, there are - // also `numFiles`, `numRows`, `rawDataSize` keys we can look at in the future. - val sizeMaybeFromMetastore = - Option(hiveQlTable.getParameters.get("totalSize")).map(_.toLong).getOrElse(-1L) - math.max(sizeMaybeFromMetastore, 1L) + // TODO: check if this estimate is valid for tables after partition pruning. + // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be + // relatively cheap if parameters for the table are populated into the metastore. An + // alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot + // of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`, + // `rawDataSize` keys that we can look at in the future. + BigInt( + Option(hiveQlTable.getParameters.get("totalSize")) + .map(_.toLong) + .getOrElse(sqlContext.statsDefaultSizeInBytes)) } ) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 2be8573620e0..21eb2d95bed3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -32,7 +32,8 @@ class StatisticsSuite extends QueryTest { mr.statistics.sizeInBytes } assert(sizes.size === 1) - assert(sizes(0) == 5812, s"expected exact size 5812 for test table 'src', got ${sizes(0)}") + assert(sizes(0).equals(BigInt(5812)), + s"expected exact size 5812 for test table 'src', got: ${sizes(0)}") } test("auto converts to broadcast hash join, by size estimate of a relation") { From 329071d196342ee67482b87a5745bf0a135d8296 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Tue, 29 Jul 2014 11:58:16 -0700 Subject: [PATCH 27/27] Address review comments; turn config name from string to field in SQLConf. --- .../scala/org/apache/spark/sql/SQLConf.scala | 27 ++++++++++--------- .../spark/sql/execution/SparkPlan.scala | 2 +- .../spark/sql/execution/SparkStrategies.scala | 18 ++++++------- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../spark/sql/hive/StatisticsSuite.scala | 10 +++---- 5 files changed, 30 insertions(+), 29 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 825525de7804..be8d4e15ec4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -21,6 +21,16 @@ import java.util.Properties import scala.collection.JavaConverters._ +object SQLConf { + val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold" + val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions" + val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes" + + object Deprecated { + val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" + } +} + /** * A trait that enables the setting and getting of mutable config parameters/hints. * @@ -49,16 +59,16 @@ trait SQLConf { * * Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is also 10000. */ - private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt + private[spark] def autoBroadcastJoinThreshold: Int = + get(AUTO_BROADCASTJOIN_THRESHOLD, "10000").toInt /** * The default size in bytes to assign to a logical operator's estimation statistics. By default, * it is set to a larger value than `autoConvertJoinSize`, hence any logical operator without a * properly implemented estimation of this statistic will not be incorrectly broadcasted in joins. */ - private[spark] def statsDefaultSizeInBytes: Long = - getOption("spark.sql.catalyst.stats.sizeInBytes").map(_.toLong) - .getOrElse(autoConvertJoinSize + 1) + private[spark] def defaultSizeInBytes: Long = + getOption(DEFAULT_SIZE_IN_BYTES).map(_.toLong).getOrElse(autoBroadcastJoinThreshold + 1) /** ********************** SQLConf functionality methods ************ */ @@ -99,12 +109,3 @@ trait SQLConf { } } - -object SQLConf { - val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size" - val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions" - - object Deprecated { - val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 61dc1b54bb62..77c874d0315e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -84,7 +84,7 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQ @transient override lazy val statistics = Statistics( // TODO: Instead of returning a default value here, find a way to return a meaningful size // estimate for RDDs. See PR 1238 for more discussions. - sizeInBytes = BigInt(sqlContext.statsDefaultSizeInBytes) + sizeInBytes = BigInt(sqlContext.defaultSizeInBytes) ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 649ff82383de..404d48ae05b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -53,11 +53,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * This strategy applies a simple optimization based on the estimates of the physical sizes of * the two join sides. When planning a [[execution.BroadcastHashJoin]], if one side has an * estimated physical size smaller than the user-settable threshold - * `spark.sql.auto.convert.join.size`, the planner would mark it as the ''build'' relation and - * mark the other relation as the ''stream'' side. The build table will be ''broadcasted'' to - * all of the executors involved in the join, as a [[org.apache.spark.broadcast.Broadcast]] - * object. If both estimates exceed the threshold, they will instead be used to decide the build - * side in a [[execution.ShuffledHashJoin]]. + * [[org.apache.spark.sql.SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]], the planner would mark it as the + * ''build'' relation and mark the other relation as the ''stream'' side. The build table will be + * ''broadcasted'' to all of the executors involved in the join, as a + * [[org.apache.spark.broadcast.Broadcast]] object. If both estimates exceed the threshold, they + * will instead be used to decide the build side in a [[execution.ShuffledHashJoin]]. */ object HashJoin extends Strategy with PredicateHelper { private[this] def makeBroadcastHashJoin( @@ -74,13 +74,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) - if Try(sqlContext.autoConvertJoinSize > 0 && - right.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize).getOrElse(false) => + if Try(sqlContext.autoBroadcastJoinThreshold > 0 && + right.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold).getOrElse(false) => makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) - if Try(sqlContext.autoConvertJoinSize > 0 && - left.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize).getOrElse(false) => + if Try(sqlContext.autoBroadcastJoinThreshold > 0 && + left.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold).getOrElse(false) => makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 5508c2da34de..dff1d6a4b93b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -279,7 +279,7 @@ private[hive] case class MetastoreRelation BigInt( Option(hiveQlTable.getParameters.get("totalSize")) .map(_.toLong) - .getOrElse(sqlContext.statsDefaultSizeInBytes)) + .getOrElse(sqlContext.defaultSizeInBytes)) } ) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 21eb2d95bed3..a61fd9df95c9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive import scala.reflect.ClassTag -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{SQLConf, QueryTest} import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin} import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ @@ -51,7 +51,7 @@ class StatisticsSuite extends QueryTest { val sizes = rdd.queryExecution.analyzed.collect { case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes } - assert(sizes.size === 2 && sizes(0) <= autoConvertJoinSize, + assert(sizes.size === 2 && sizes(0) <= autoBroadcastJoinThreshold, s"query should contain two relations, each of which has size smaller than autoConvertSize") // Using `sparkPlan` because for relevant patterns in HashJoin to be @@ -63,9 +63,9 @@ class StatisticsSuite extends QueryTest { checkAnswer(rdd, expectedAnswer) // check correctness of output TestHive.settings.synchronized { - val tmp = autoConvertJoinSize + val tmp = autoBroadcastJoinThreshold - hql("""SET spark.sql.auto.convert.join.size=-1""") + hql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""") rdd = hql(query) bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j } assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off") @@ -74,7 +74,7 @@ class StatisticsSuite extends QueryTest { assert(shj.size === 1, "ShuffledHashJoin should be planned when BroadcastHashJoin is turned off") - hql(s"""SET spark.sql.auto.convert.join.size=$tmp""") + hql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=$tmp""") } after()