Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
6b02b3c
[SPARK-16980][SQL] Load only catalog table partition metadata required
Oct 14, 2016
e816919
Add a new catalyst optimizer rule to SQL core for pruning unnecessary
Sep 13, 2016
8cca6dc
Include the type of file catalog in the FileSourceScanExec metadata
Oct 8, 2016
7acc3f1
try out parquet case insensitive fallback
ericl Oct 11, 2016
cf7d1f1
Refactor the FileSourceScanExec.metadata val to make it prettier
Oct 11, 2016
c75855c
fix and add test for input files
ericl Oct 11, 2016
821372f
rename test
ericl Oct 11, 2016
d0b893b
Refactor `TableFileCatalog.listFiles` to call `listDataLeafFiles` once
Oct 11, 2016
c47a2a3
feature flag
ericl Oct 12, 2016
ed7dd37
add comments
ericl Oct 12, 2016
bdff488
fix it
ericl Oct 13, 2016
5ad4b25
more test cases
ericl Oct 13, 2016
fa19224
also fix a bug with zero partitions selected
ericl Oct 13, 2016
00bf912
extend and fix flakiness in test
ericl Oct 13, 2016
b5f7691
Enhance `ParquetMetastoreSuite` with mixed-case partition columns
Oct 13, 2016
77932a1
Tidy up a little by removing some unused imports, an unused method and
Oct 13, 2016
97cd27d
Put partition count in `FileSourceScanExec.metadata` for partitioned
Oct 13, 2016
851d7f9
Fix some errors in my revision of `ParquetSourceSuite`
Oct 13, 2016
26e0d34
Add metrics and cost tests for partition pruning effectiveness (#5)
ericl Oct 14, 2016
83a168c
Actually register the hive catalog metrics, also revert broken tests …
ericl Oct 14, 2016
014c998
Also support mixed case field resolution for converted ORC tables (#7)
ericl Oct 14, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ private[spark] object StaticSources {
* The set of all static sources. These sources may be reported to from any class, including
* static classes, without requiring reference to a SparkEnv.
*/
val allSources = Seq(CodegenMetrics)
val allSources = Seq(CodegenMetrics, HiveCatalogMetrics)
}

/**
Expand Down Expand Up @@ -60,3 +60,35 @@ object CodegenMetrics extends Source {
val METRIC_GENERATED_METHOD_BYTECODE_SIZE =
metricRegistry.histogram(MetricRegistry.name("generatedMethodSize"))
}

/**
* :: Experimental ::
* Metrics for access to the hive external catalog.
*/
@Experimental
object HiveCatalogMetrics extends Source {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we move it to sql module?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, codegen is here too

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we update StaticSources.allSources?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, the reason you can't is because to register this source it needs to be in the list above. This made me realize I forgot to add it to the list actually: VideoAmp#6

override val sourceName: String = "HiveExternalCatalog"
override val metricRegistry: MetricRegistry = new MetricRegistry()

/**
* Tracks the total number of partition metadata entries fetched via the client api.
*/
val METRIC_PARTITIONS_FETCHED = metricRegistry.counter(MetricRegistry.name("partitionsFetched"))

/**
* Tracks the total number of files discovered off of the filesystem by ListingFileCatalog.
*/
val METRIC_FILES_DISCOVERED = metricRegistry.counter(MetricRegistry.name("filesDiscovered"))

/**
* Resets the values of all metrics to zero. This is useful in tests.
*/
def reset(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we mention that this is for testing only?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in VideoAmp#6

METRIC_PARTITIONS_FETCHED.dec(METRIC_PARTITIONS_FETCHED.getCount())
METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount())
}

// clients can use these to avoid classloader issues with the codahale classes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand this comment. what issue do this 2 method address?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand the issue, but if you reference the Counter object directly from the caller sites then you get

[info] Exception encountered when attempting to run a suite with class name: org.apache.spark.sql.hive.HiveDataFrameSuite *** ABORTED *** (12 seconds, 51 milliseconds)
[info]   java.lang.LinkageError: loader constraint violation: loader (instance of org/apache/spark/sql/hive/client/IsolatedClientLoader$$anon$1) previously initiated loading for a different type with name "com/codahale/metrics/Counter"
[info]   at java.lang.ClassLoader.defineClass1(Native Method)
[info]   at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
[info]   at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
[info]   at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, maybe this is a load order issue

def incrementFetchedPartitions(n: Int): Unit = METRIC_PARTITIONS_FETCHED.inc(n)
def incrementFilesDiscovered(n: Int): Unit = METRIC_FILES_DISCOVERED.inc(n)
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,12 @@ abstract class ExternalCatalog {
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition]

/**
* List the metadata of selected partitions according to the given partition predicates.
* List the metadata of partitions that belong to the specified table, assuming it exists, that
* satisfy the given partition-pruning predicate expressions.
*
* @param db database name
* @param table table name
* @param predicates partition predicated
* @param predicates partition-pruning predicates
*/
def listPartitionsByFilter(
db: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,9 @@ class InMemoryCatalog(
db: String,
table: String,
predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
throw new UnsupportedOperationException("listPartitionsByFilter is not implemented.")
// TODO: Provide an implementation
throw new UnsupportedOperationException(
"listPartitionsByFilter is not implemented for InMemoryCatalog")
}

// --------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ package org.apache.spark.sql.catalyst.catalog
import java.util.Date

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StructField, StructType}


/**
Expand Down Expand Up @@ -97,6 +97,15 @@ case class CatalogTablePartition(

output.filter(_.nonEmpty).mkString("CatalogPartition(\n\t", "\n\t", ")")
}

/**
* Given the partition schema, returns a row with that schema holding the partition values.
*/
def toRow(partitionSchema: StructType): InternalRow = {
InternalRow.fromSeq(partitionSchema.map { case StructField(name, dataType, _, _) =>
Cast(Literal(spec(name)), dataType).eval()
})
}
}


Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.{FileCatalog, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery}
Expand Down Expand Up @@ -2602,7 +2602,7 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
def inputFiles: Array[String] = {
val files: Seq[String] = logicalPlan.collect {
val files: Seq[String] = queryExecution.optimizedPlan.collect {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only determine the partitions read after optimization, so it's necessary to read it from that instead of the logical plan.

case LogicalRelation(fsBasedRelation: FileRelation, _, _) =>
fsBasedRelation.inputFiles
case fr: FileRelation =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class CacheManager extends Logging {
plan match {
case lr: LogicalRelation => lr.relation match {
case hr: HadoopFsRelation =>
val invalidate = hr.location.paths
val invalidate = hr.location.rootPaths
.map(_.makeQualified(fs.getUri, fs.getWorkingDirectory))
.contains(qualifiedPath)
if (invalidate) hr.location.refresh()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,27 @@ case class FileSourceScanExec(
}

// These metadata values make scan plans uniquely identifiable for equality checking.
override val metadata: Map[String, String] = Map(
"Format" -> relation.fileFormat.toString,
"ReadSchema" -> outputSchema.catalogString,
"Batched" -> supportsBatch.toString,
"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"),
"PushedFilters" -> dataFilters.mkString("[", ", ", "]"),
"InputPaths" -> relation.location.paths.mkString(", "))
override val metadata: Map[String, String] = {
def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
val location = relation.location
val locationDesc =
location.getClass.getSimpleName + seqToString(location.rootPaths)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should they be separated by space?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This style emulates the way relations are shown in a query plan, e.g.

Relation[a#1, b#2]

I'm in favor of keeping this as-is.

val metadata =
Map(
"Format" -> relation.fileFormat.toString,
"ReadSchema" -> outputSchema.catalogString,
"Batched" -> supportsBatch.toString,
"PartitionFilters" -> seqToString(partitionFilters),
"PushedFilters" -> seqToString(dataFilters),
"Location" -> locationDesc)
val withOptPartitionCount =
relation.partitionSchemaOption.map { _ =>
metadata + ("PartitionCount" -> selectedPartitions.size.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

} getOrElse {
metadata
}
withOptPartitionCount
}

private lazy val inputRDD: RDD[InternalRow] = {
val readFile: (PartitionedFile) => Iterator[InternalRow] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.sql.ExperimentalMethods
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions
import org.apache.spark.sql.execution.python.ExtractPythonUDFFromAggregate
import org.apache.spark.sql.internal.SQLConf

Expand All @@ -32,5 +33,6 @@ class SparkOptimizer(
override def batches: Seq[Batch] = super.batches :+
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog, conf)) :+
Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+
Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+
Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo

dataSource match {
case fs: HadoopFsRelation =>
if (table.tableType == CatalogTableType.EXTERNAL && fs.location.paths.isEmpty) {
if (table.tableType == CatalogTableType.EXTERNAL && fs.location.rootPaths.isEmpty) {
throw new AnalysisException(
"Cannot create a file-based external data source table without path")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,7 @@ case class DataSource(
val existingPartitionColumns = Try {
resolveRelation()
.asInstanceOf[HadoopFsRelation]
.location
.partitionSpec()
.partitionColumns
.partitionSchema
.fieldNames
.toSeq
}.getOrElse(Seq.empty[String])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,14 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
if query.resolved && t.schema.asNullable == query.schema.asNullable =>

// Sanity checks
if (t.location.paths.size != 1) {
if (t.location.rootPaths.size != 1) {
throw new AnalysisException(
"Can only write data to relations with a single path.")
}

val outputPath = t.location.paths.head
val outputPath = t.location.rootPaths.head
val inputPaths = query.collect {
case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.paths
case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.rootPaths
}.flatten

val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
Expand All @@ -184,7 +184,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver),
t.bucketSpec,
t.fileFormat,
() => t.refresh(),
() => t.location.refresh(),
t.options,
query,
mode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType


/**
* Used to read and write data stored in files to/from the [[InternalRow]] format.
*/
Expand Down Expand Up @@ -182,16 +183,17 @@ abstract class TextBasedFileFormat extends FileFormat {
case class Partition(values: InternalRow, files: Seq[FileStatus])

/**
* An interface for objects capable of enumerating the files that comprise a relation as well
* as the partitioning characteristics of those files.
* An interface for objects capable of enumerating the root paths of a relation as well as the
* partitions of a relation subject to some pruning expressions.
*/
trait FileCatalog {

/** Returns the list of input paths from which the catalog will get files. */
def paths: Seq[Path]
trait BasicFileCatalog {

/** Returns the specification of the partitions inferred from the data. */
def partitionSpec(): PartitionSpec
/**
* Returns the list of root input paths from which the catalog will get files. There may be a
* single root path from which partitions are discovered, or individual partitions may be
* specified by each path.
*/
def rootPaths: Seq[Path]

/**
* Returns all valid files grouped into partitions when the data is partitioned. If the data is
Expand All @@ -204,9 +206,33 @@ trait FileCatalog {
*/
def listFiles(filters: Seq[Expression]): Seq[Partition]

/** Returns the list of files that will be read when scanning this relation. */
def inputFiles: Array[String]

/** Refresh any cached file listings */
def refresh(): Unit

/** Sum of table file sizes, in bytes */
def sizeInBytes: Long
}

/**
* A [[BasicFileCatalog]] which can enumerate all of the files comprising a relation and, from
* those, infer the relation's partition specification.
*/
// TODO: Consider a more descriptive, appropriate name which suggests this is a file catalog for
// which it is safe to list all of its files?
trait FileCatalog extends BasicFileCatalog {

/** Returns the specification of the partitions inferred from the data. */
def partitionSpec(): PartitionSpec

/** Returns all the valid files. */
def allFiles(): Seq[FileStatus]

/** Refresh the file listing */
def refresh(): Unit
/** Returns the list of files that will be read when scanning this relation. */
override def inputFiles: Array[String] =
allFiles().map(_.getPath.toUri.toString).toArray

override def sizeInBytes: Long = allFiles().map(_.getLen).sum
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import org.apache.spark.sql.types.StructType
* Acts as a container for all of the metadata required to read from a datasource. All discovery,
* resolution and merging logic for schemas and partitions has been removed.
*
* @param location A [[FileCatalog]] that can enumerate the locations of all the files that comprise
* this relation.
* @param location A [[BasicFileCatalog]] that can enumerate the locations of all the files that
* comprise this relation.
* @param partitionSchema The schema of the columns (if any) that are used to partition the relation
* @param dataSchema The schema of any remaining columns. Note that if any partition columns are
* present in the actual data files as well, they are preserved.
Expand All @@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType
* @param options Configuration used when reading / writing data.
*/
case class HadoopFsRelation(
location: FileCatalog,
location: BasicFileCatalog,
partitionSchema: StructType,
dataSchema: StructType,
bucketSpec: Option[BucketSpec],
Expand All @@ -58,20 +58,14 @@ case class HadoopFsRelation(
def partitionSchemaOption: Option[StructType] =
if (partitionSchema.isEmpty) None else Some(partitionSchema)

def partitionSpec: PartitionSpec = location.partitionSpec()

def refresh(): Unit = location.refresh()

override def toString: String = {
fileFormat match {
case source: DataSourceRegister => source.shortName()
case _ => "HadoopFiles"
}
}

/** Returns the list of files that will be read when scanning this relation. */
override def inputFiles: Array[String] =
location.allFiles().map(_.getPath.toUri.toString).toArray
override def sizeInBytes: Long = location.sizeInBytes

override def sizeInBytes: Long = location.allFiles().map(_.getLen).sum
override def inputFiles: Array[String] = location.inputFiles
}
Loading