Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.hive.execution
package org.apache.spark.sql.execution.datasources

import org.scalatest.matchers.should.Matchers._

Expand All @@ -24,18 +24,19 @@ import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType

class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase {
class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase with SharedSparkSession {

override def format: String = "parquet"

Expand All @@ -45,35 +46,27 @@ class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase {

test("PruneFileSourcePartitions should not change the output of LogicalRelation") {
withTable("test") {
withTempDir { dir =>
sql(
s"""
|CREATE EXTERNAL TABLE test(i int)
|PARTITIONED BY (p int)
|STORED AS parquet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so it's not only moving the package, but also changes some tests to not use hive tables but use data source tables instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, since we're moving PruneFileSourcePartitionsSuite out of the hive package, we need to remove the Hive dependency here too.

As commented in the other thread, to me it's OK to switch to use data source table here. I also digged the history of the change, and it seems at the time when this test was added (in #15569), data source table doesn't use HMS to store table metadata by default (it was added #15515 later), but instead was using ListingFileCatalog (?). Maybe it was for testing purpose that we created a Hive table here but then constructed a LogicalRelation to feed into the PruneFileSourcePartitions rule?

Let me know if you see concern here @cloud-fan , since you are the main author of this test and the related code :)

|LOCATION '${dir.toURI}'""".stripMargin)
Comment on lines -50 to -54
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know why it uses external table before? Is it related to the test coverage here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explained this in the other thread and I don't think this is related to the test coverage here. Let me know if you think otherwise @viirya @cloud-fan .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I saw the comment. It looks making sense and that's also what I read from the test. Just wondering why it uses external table originally.

Copy link
Member Author

@sunchao sunchao Jul 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure too, IMO the EXTERNAL keyword doesn't matter here. I've run the test with and without it and the outcome is the same.


val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0)

val dataSchema = StructType(tableMeta.schema.filterNot { f =>
tableMeta.partitionColumnNames.contains(f.name)
})
val relation = HadoopFsRelation(
location = catalogFileIndex,
partitionSchema = tableMeta.partitionSchema,
dataSchema = dataSchema,
bucketSpec = None,
fileFormat = new ParquetFileFormat(),
options = Map.empty)(sparkSession = spark)

val logicalRelation = LogicalRelation(relation, tableMeta)
val query = Project(Seq(Symbol("i"), Symbol("p")),
Filter(Symbol("p") === 1, logicalRelation)).analyze

val optimized = Optimize.execute(query)
assert(optimized.missingInput.isEmpty)
}
spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").saveAsTable("test")
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you know, saveAsTable is different from STORED AS parquet. The original test coverage seems to be coupled with convertMetastoreParquet, but this one looks different. Are we losing the existing test coverage?

scala> spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").saveAsTable("t1")

scala> sql("DESCRIBE TABLE EXTENDED t1").show()
...
|            Provider|             parquet|       |
...
scala> sql("CREATE TABLE t2(a int) STORED AS parquet").show()
scala> sql("DESCRIBE TABLE EXTENDED t2").show()
...
|            Provider|                hive|       |
...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This specific test coverage should remain at hive module.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm what is convertMetastoreParquet? I couldn't find it anywhere.

Regarding the test, I think it is still covered (I've debugged the test and made sure it is still going through the related code paths in PruneFileSourcePartitions). Much has changed since 2016 though: the test (added in #15569) was originally designed to make sure that LogicalRelation.expectedOutputAttributes was correctly populated in the class. The expectedOutputAttributes, however, was later replaced by directly passing output in LogicalRelation (in #17552), which I think further prevented the issue from happening.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean spark.sql.hive.convertMetastoreParquet. Here is the document.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, CREATE TABLE ... USING PARQUET (spark syntax) and CREATE TABLE ... STORED AS PARQUET (hive syntax) generates different tables in Apache Spark.

Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Hive tables generated by STORED AS syntax, Spark converts them to data source tables on the fly because spark.sql.hive.convertMetastoreParquet is true by default. It's the same for ORC. For ORC, we have spark.sql.hive.convertMetastoreOrc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dongjoon-hyun . I found it now. However I'm not sure whether this matters for the test though: what it does is just 1) register table metadata in the catalog, 2) create a LogicalRelation wrapping a HadoopFsRelation which has the data and partition schema from the step 1), and 3) feed it into the rule PruneFileSourcePartitions and see if the LogicalRelation's expectedOutputAttributes is properly set. Seems this is irrelevant to what SerDe it is using?

val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0)

val dataSchema = StructType(tableMeta.schema.filterNot { f =>
tableMeta.partitionColumnNames.contains(f.name)
})
val relation = HadoopFsRelation(
location = catalogFileIndex,
partitionSchema = tableMeta.partitionSchema,
dataSchema = dataSchema,
bucketSpec = None,
fileFormat = new ParquetFileFormat(),
options = Map.empty)(sparkSession = spark)

val logicalRelation = LogicalRelation(relation, tableMeta)
val query = Project(Seq(Symbol("id"), Symbol("p")),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this claims a simple moving classes, shall we preserve i instead of introducing new column name, id?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I'm not sure whether it's worth doing so because we changed how the test table is created by using the DataFrame API spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").saveAsTable("test"), which creates id column by default. The id here is also consistent with the rest of the tests in this file as well as other tests which use the same API to create tables.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant we should recover it from id to i together~

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, if the title and scope becomes broaden, I'm okay for id, too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll keep as it is then :-)

Filter(Symbol("p") === 1, logicalRelation)).analyze

val optimized = Optimize.execute(query)
assert(optimized.missingInput.isEmpty)
}
}

Expand Down Expand Up @@ -142,6 +135,10 @@ class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase {
}
}

protected def collectPartitionFiltersFn(): PartialFunction[SparkPlan, Seq[Expression]] = {
case scan: FileSourceScanExec => scan.partitionFilters
}

override def getScanExecPartitionSize(plan: SparkPlan): Long = {
plan.collectFirst {
case p: FileSourceScanExec => p.selectedPartitions.length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@
* limitations under the License.
*/

package org.apache.spark.sql.hive.execution
package org.apache.spark.sql.execution.datasources

import org.apache.spark.sql.StatisticsCollectionTestBase
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BinaryOperator, Expression, IsNotNull, Literal}
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf.ADAPTIVE_EXECUTION_ENABLED

abstract class PrunePartitionSuiteBase extends StatisticsCollectionTestBase with TestHiveSingleton {
abstract class PrunePartitionSuiteBase extends StatisticsCollectionTestBase {

protected def format: String

Expand Down Expand Up @@ -95,11 +94,11 @@ abstract class PrunePartitionSuiteBase extends StatisticsCollectionTestBase with
val plan = qe.sparkPlan
assert(getScanExecPartitionSize(plan) == expectedPartitionCount)

val pushedDownPartitionFilters = plan.collectFirst {
case scan: FileSourceScanExec => scan.partitionFilters
case scan: HiveTableScanExec => scan.partitionPruningPred
val collectFn: PartialFunction[SparkPlan, Seq[Expression]] = collectPartitionFiltersFn orElse {
case BatchScanExec(_, scan: FileScan, _) => scan.partitionFilters
}.map(exps => exps.filterNot(e => e.isInstanceOf[IsNotNull]))
}
val pushedDownPartitionFilters = plan.collectFirst(collectFn)
.map(exps => exps.filterNot(e => e.isInstanceOf[IsNotNull]))
val pushedFilters = pushedDownPartitionFilters.map(filters => {
filters.foldLeft("")((currentStr, exp) => {
if (currentStr == "") {
Expand All @@ -113,5 +112,7 @@ abstract class PrunePartitionSuiteBase extends StatisticsCollectionTestBase with
assert(pushedFilters == Some(expectedPushedDownFilters))
}

protected def collectPartitionFiltersFn(): PartialFunction[SparkPlan, Seq[Expression]]

protected def getScanExecPartitionSize(plan: SparkPlan): Long
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
package org.apache.spark.sql.hive.execution

import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.PrunePartitionSuiteBase
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.LongType

class PruneHiveTablePartitionsSuite extends PrunePartitionSuiteBase {
class PruneHiveTablePartitionsSuite extends PrunePartitionSuiteBase with TestHiveSingleton {

override def format(): String = "hive"

Expand Down Expand Up @@ -131,6 +134,10 @@ class PruneHiveTablePartitionsSuite extends PrunePartitionSuiteBase {
}
}

protected def collectPartitionFiltersFn(): PartialFunction[SparkPlan, Seq[Expression]] = {
case scan: HiveTableScanExec => scan.partitionPruningPred
}

override def getScanExecPartitionSize(plan: SparkPlan): Long = {
plan.collectFirst {
case p: HiveTableScanExec => p
Expand Down