Skip to content
This repository was archived by the owner on Feb 13, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c8e3a1e
[SPARK-16980][SQL] Load only catalog table partition metadata required
Aug 10, 2016
ac89aef
Add a new catalyst optimizer rule to SQL core for pruning unneeded
Sep 13, 2016
f657256
Include the type of file catalog in the FileSourceScanExec metadata
Oct 8, 2016
65298f0
TODO: Consider renaming FileCatalog to better differentiate it from
Oct 8, 2016
4d257e1
Refactor the FileSourceScanExec.metadata val to make it prettier
Oct 11, 2016
d3b9f3c
try out parquet case insensitive fallback
ericl Oct 11, 2016
b1847ad
fix and add test for input files
ericl Oct 11, 2016
84f3741
rename test
ericl Oct 11, 2016
026951c
Refactor `TableFileCatalog.listFiles` to call `listDataLeafFiles` once
Oct 11, 2016
fb664d6
fix it
ericl Oct 13, 2016
25e880f
more test cases
ericl Oct 13, 2016
869d090
also fix a bug with zero partitions selected
ericl Oct 13, 2016
225d0fe
feature flag
ericl Oct 12, 2016
8aa1ed1
add comments
ericl Oct 12, 2016
5f3061b
extend and fix flakiness in test
ericl Oct 13, 2016
bf6f46f
Enhance `ParquetMetastoreSuite` with mixed-case partition columns
Oct 13, 2016
d48ff10
Tidy up a little by removing some unused imports, an unused method and
Oct 13, 2016
3a072bd
Put partition count in `FileSourceScanExec.metadata` for partitioned
Oct 13, 2016
dc9e613
Fix some errors in my revision of `ParquetSourceSuite`
Oct 13, 2016
989f3b3
Thu Oct 13 17:18:14 PDT 2016
ericl Oct 14, 2016
49112e6
Merge commit '765f93c' into more-testing
ericl Oct 14, 2016
6a46fea
more generic
ericl Oct 14, 2016
3f192cd
Thu Oct 13 18:09:42 PDT 2016
ericl Oct 14, 2016
a7c0d35
Thu Oct 13 18:09:55 PDT 2016
ericl Oct 14, 2016
39513b7
Thu Oct 13 18:22:31 PDT 2016
ericl Oct 14, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,32 @@ object CodegenMetrics extends Source {
val METRIC_GENERATED_METHOD_BYTECODE_SIZE =
metricRegistry.histogram(MetricRegistry.name("generatedMethodSize"))
}

/**
* :: Experimental ::
* Metrics for access to the hive external catalog.
*/
@Experimental
object HiveCatalogMetrics extends Source {
override val sourceName: String = "HiveExternalCatalog"
override val metricRegistry: MetricRegistry = new MetricRegistry()

/**
* Tracks the total number of partition metadata entries fetched via the client api.
*/
val METRIC_PARTITIONS_FETCHED = metricRegistry.counter(MetricRegistry.name("partitionsFetched"))

/**
* Tracks the total number of files discovered off of the filesystem by ListingFileCatalog.
*/
val METRIC_FILES_DISCOVERED = metricRegistry.counter(MetricRegistry.name("filesDiscovered"))

def reset(): Unit = {
METRIC_PARTITIONS_FETCHED.dec(METRIC_PARTITIONS_FETCHED.getCount())
METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount())
}

// clients can use these to avoid classloader issues with the codahale classes
def incrementFetchedPartitions(n: Int): Unit = METRIC_PARTITIONS_FETCHED.inc(n)
def incrementFilesDiscovered(n: Int): Unit = METRIC_FILES_DISCOVERED.inc(n)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.hadoop.fs._
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}

import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.SerializableConfiguration

Expand Down Expand Up @@ -56,6 +57,7 @@ abstract class SessionFileCatalog(sparkSession: SparkSession)
SessionFileCatalog.listLeafFilesInSerial(paths, hadoopConf)
}

HiveCatalogMetrics.incrementFilesDiscovered(files.size)
mutable.LinkedHashSet(files: _*)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException}
Expand Down Expand Up @@ -528,17 +529,21 @@ private[hive] class HiveClientImpl(
table: CatalogTable,
spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState {
val hiveTable = toHiveTable(table)
spec match {
val parts = spec match {
case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition)
case Some(s) => client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition)
}
HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
parts
}

override def getPartitionsByFilter(
table: CatalogTable,
predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState {
val hiveTable = toHiveTable(table)
shim.getPartitionsByFilter(client, hiveTable, predicates).map(fromHivePartition)
val parts = shim.getPartitionsByFilter(client, hiveTable, predicates).map(fromHivePartition)
HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
parts
}

override def listTables(dbName: String): Seq[String] = withHiveState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.sql.hive

import java.io.File

import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.QueryTest
Expand All @@ -36,21 +39,25 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt
assert(hiveClient.getConf("hive.in.test", "") == "true")
}

private def setupPartitionedTable(tableName: String, dir: File): Unit = {
spark.range(5).selectExpr("id", "id as partCol1", "id as partCol2").write
.partitionBy("partCol1", "partCol2")
.mode("overwrite")
.parquet(dir.getAbsolutePath)

spark.sql(s"""
|create external table $tableName (id long)
|partitioned by (partCol1 int, partCol2 int)
|stored as parquet
|location "${dir.getAbsolutePath}"""".stripMargin)
spark.sql(s"msck repair table $tableName")
}

test("partitioned pruned table reports only selected files") {
assert(spark.sqlContext.getConf(HiveUtils.CONVERT_METASTORE_PARQUET.key) == "true")
withTable("test") {
withTempDir { dir =>
spark.range(5).selectExpr("id", "id as partCol1", "id as partCol2").write
.partitionBy("partCol1", "partCol2")
.mode("overwrite")
.parquet(dir.getAbsolutePath)

spark.sql(s"""
|create external table test (id long)
|partitioned by (partCol1 int, partCol2 int)
|stored as parquet
|location "${dir.getAbsolutePath}"""".stripMargin)
spark.sql("msck repair table test")

setupPartitionedTable("test", dir)
val df = spark.sql("select * from test")
assert(df.count() == 5)
assert(df.inputFiles.length == 5) // unpruned
Expand All @@ -69,4 +76,69 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt
}
}
}

test("lazy partition pruning reads only necessary partition data") {
withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> "true") {
withTable("test") {
withTempDir { dir =>
setupPartitionedTable("test", dir)
HiveCatalogMetrics.reset()
spark.sql("select * from test where partCol1 = 999").count()
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)

HiveCatalogMetrics.reset()
spark.sql("select * from test where partCol1 < 2").count()
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2)

HiveCatalogMetrics.reset()
spark.sql("select * from test where partCol1 < 3").count()
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 3)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 3)

// should read all
HiveCatalogMetrics.reset()
spark.sql("select * from test").count()
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)

// read all should be cached
HiveCatalogMetrics.reset()
spark.sql("select * from test").count()
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
}
}
}
}

test("all partitions read and cached when filesource partition pruning is off") {
withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> "false") {
withTable("test") {
withTempDir { dir =>
setupPartitionedTable("test", dir)

// We actually query the partitions from hive each time the table is resolved in this
// mode. This is kind of terrible, but is needed to preserve the legacy behavior
// of doing plan cache validation based on the entire partition set.
HiveCatalogMetrics.reset()
spark.sql("select * from test where partCol1 = 999").count()
// 5 from table resolution, another 5 from ListingFileCatalog
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 10)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect this to be 5 because this table has 5 partitions. Why does the test expect 10?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first 5 are from resolving the table, and the latter 5 are from ListingFileCatalog. It is possible to optimize this to only have 5, but it didn't seem worth the cost since this is (1) legacy mode and (2) not a regression..

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, maybe I can break it up into analysis and execution to make it more clear.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not easy, so just added a comment here.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification. I think that adding the comment is good enough.

assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)

HiveCatalogMetrics.reset()
spark.sql("select * from test where partCol1 < 2").count()
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)

HiveCatalogMetrics.reset()
spark.sql("select * from test").count()
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
(1 to 10).map(i => Tuple1(Seq(new Integer(i), null))).toDF("a")
.createOrReplaceTempView("jt_array")

setConf(HiveUtils.CONVERT_METASTORE_PARQUET, true)
assert(spark.sqlContext.getConf(HiveUtils.CONVERT_METASTORE_PARQUET.key) == "true")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why you made this change?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should no longer be needed since the flag value is true by default. I changed it to an assert to validate this.

This lets us get rid of the setConf(..., false) in the afterAll(), which was causing the conf value to be leaked to other suites.

}

override def afterAll(): Unit = {
Expand All @@ -187,7 +187,6 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
"jt",
"jt_array",
"test_parquet")
setConf(HiveUtils.CONVERT_METASTORE_PARQUET, false)
}

test(s"conversion is working") {
Expand Down