Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
5d2b74d
[SPARK-4502][SQL] Parquet nested column pruning
Jun 24, 2016
8129d29
Refactor SelectedFieldSuite to make its tests simpler and more
mallman Jun 4, 2018
4992a41
Remove test "select function over nested data" of unknown origin and
mallman Jun 4, 2018
dd4367a
Improve readability of ParquetSchemaPruning and
mallman Jun 4, 2018
f601ead
Don't handle non-data-field partition column names specially when
mallman Jun 4, 2018
3621356
Add test coverage for ParquetSchemaPruning for partitioned tables whose
mallman Jun 4, 2018
0a5146b
Remove the ColumnarFileFormat type to put it in another PR
mallman Jun 12, 2018
92e129e
Add test coverage for the enhancements to "is not null" constraint
mallman Jun 12, 2018
d407cc1
Revert changes to QueryPlanConstraints.scala and basicPhysicalOperato…
mallman Jun 24, 2018
63b1c6a
Revert a whitespace change in DataSourceScanExec.scala
mallman Jun 24, 2018
0312a51
Remove modifications to ParquetFileFormat.scala and
mallman Jul 21, 2018
6051b52
PR review: simplify some syntax and add a code doc
mallman Jul 21, 2018
b50ddb4
When creating a pruned schema by merging an array of root structs, sort
mallman Jul 28, 2018
cb35c0a
Re-enable ignored test in ParquetSchemaPruningSuite.scala that is
mallman Aug 4, 2018
a87b589
Enable schema pruning by default
mallman Aug 4, 2018
323875a
Revert "Enable schema pruning by default"
mallman Aug 5, 2018
30831a6
Add a method to not only check a query's scan schemata, but verify th…
mallman Aug 5, 2018
23d03fb
Refactor code slightly based on code review comments
ajacques Aug 2, 2018
8d822ee
Clean-up code based on review comments
ajacques Aug 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1381,6 +1381,16 @@ object SQLConf {
"issues. Turn on this config to insert a local sort before actually doing repartition " +
"to generate consistent repartition results. The performance of repartition() may go " +
"down since we insert extra local sort before it.")
.booleanConf
.createWithDefault(true)

val NESTED_SCHEMA_PRUNING_ENABLED =
buildConf("spark.sql.nestedSchemaPruning.enabled")
.internal()
.doc("Prune nested fields from a logical relation's output which are unnecessary in " +
"satisfying a query. This optimization allows columnar file format readers to avoid " +
"reading unnecessary nested column data. Currently Parquet is the only data source that " +
"implements this optimization.")
.booleanConf
.createWithDefault(true)

Expand Down Expand Up @@ -1854,6 +1864,8 @@ class SQLConf extends Serializable with Logging {
def partitionOverwriteMode: PartitionOverwriteMode.Value =
PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE))

def nestedSchemaPruningEnabled: Boolean = getConf(NESTED_SCHEMA_PRUNING_ENABLED)

def csvColumnPruning: Boolean = getConf(SQLConf.CSV_PARSER_COLUMN_PRUNING)

def legacySizeOfNull: Boolean = getConf(SQLConf.LEGACY_SIZE_OF_NULL)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField}
import org.apache.spark.sql.types.StructField

/**
* A Scala extractor that extracts the child expression and struct field from a [[GetStructField]].
* This is in contrast to the [[GetStructField]] case class extractor which returns the field
* ordinal instead of the field itself.
*/
private[execution] object GetStructFieldObject {
def unapply(getStructField: GetStructField): Option[(Expression, StructField)] =
Some((
getStructField.child,
getStructField.childSchema(getStructField.ordinal)))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._

/**
* A Scala extractor that projects an expression over a given schema. Data types,
* field indexes and field counts of complex type extractors and attributes
* are adjusted to fit the schema. All other expressions are left as-is. This
* class is motivated by columnar nested schema pruning.
*/
private[execution] case class ProjectionOverSchema(schema: StructType) {
private val fieldNames = schema.fieldNames.toSet

def unapply(expr: Expression): Option[Expression] = getProjection(expr)

private def getProjection(expr: Expression): Option[Expression] =
expr match {
case a: AttributeReference if fieldNames.contains(a.name) =>
Some(a.copy(dataType = schema(a.name).dataType)(a.exprId, a.qualifier))
case GetArrayItem(child, arrayItemOrdinal) =>
getProjection(child).map { projection => GetArrayItem(projection, arrayItemOrdinal) }
case a: GetArrayStructFields =>
getProjection(a.child).map(p => (p, p.dataType)).map {
case (projection, ArrayType(projSchema @ StructType(_), _)) =>
GetArrayStructFields(projection,
projSchema(a.field.name),
projSchema.fieldIndex(a.field.name),
projSchema.size,
a.containsNull)
}
case GetMapValue(child, key) =>
getProjection(child).map { projection => GetMapValue(projection, key) }
case GetStructFieldObject(child, field: StructField) =>
getProjection(child).map(p => (p, p.dataType)).map {
case (projection, projSchema: StructType) =>
GetStructField(projection, projSchema.fieldIndex(field.name))
}
case _ =>
None
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._

/**
* A Scala extractor that builds a [[org.apache.spark.sql.types.StructField]] from a Catalyst
* complex type extractor. For example, consider a relation with the following schema:
*
* {{{
* root
* |-- name: struct (nullable = true)
* | |-- first: string (nullable = true)
* | |-- last: string (nullable = true)
* }}}
*
* Further, suppose we take the select expression `name.first`. This will parse into an
* `Alias(child, "first")`. Ignoring the alias, `child` matches the following pattern:
*
* {{{
* GetStructFieldObject(
* AttributeReference("name", StructType(_), _, _),
* StructField("first", StringType, _, _))
* }}}
*
* [[SelectedField]] converts that expression into
*
* {{{
* StructField("name", StructType(Array(StructField("first", StringType))))
* }}}
*
* by mapping each complex type extractor to a [[org.apache.spark.sql.types.StructField]] with the
* same name as its child (or "parent" going right to left in the select expression) and a data
* type appropriate to the complex type extractor. In our example, the name of the child expression
* is "name" and its data type is a [[org.apache.spark.sql.types.StructType]] with a single string
* field named "first".
*
* @param expr the top-level complex type extractor
*/
private[execution] object SelectedField {
def unapply(expr: Expression): Option[StructField] = {
// If this expression is an alias, work on its child instead
val unaliased = expr match {
case Alias(child, _) => child
case expr => expr
}
selectField(unaliased, None)
}

private def selectField(expr: Expression, fieldOpt: Option[StructField]): Option[StructField] = {
expr match {
// No children. Returns a StructField with the attribute name or None if fieldOpt is None.
case AttributeReference(name, dataType, nullable, metadata) =>
fieldOpt.map(field =>
StructField(name, wrapStructType(dataType, field), nullable, metadata))
// Handles case "expr0.field[n]", where "expr0" is of struct type and "expr0.field" is of
// array type.
case GetArrayItem(x @ GetStructFieldObject(child, field @ StructField(name,
dataType, nullable, metadata)), _) =>
val childField = fieldOpt.map(field => StructField(name,
wrapStructType(dataType, field), nullable, metadata)).getOrElse(field)
selectField(child, Some(childField))
// Handles case "expr0.field[n]", where "expr0.field" is of array type.
case GetArrayItem(child, _) =>
selectField(child, fieldOpt)
// Handles case "expr0.field.subfield", where "expr0" and "expr0.field" are of array type.
case GetArrayStructFields(child: GetArrayStructFields,
field @ StructField(name, dataType, nullable, metadata), _, _, _) =>
val childField = fieldOpt.map(field => StructField(name,
wrapStructType(dataType, field),
nullable, metadata)).orElse(Some(field))
selectField(child, childField)
// Handles case "expr0.field", where "expr0" is of array type.
case GetArrayStructFields(child,
field @ StructField(name, dataType, nullable, metadata), _, _, _) =>
val childField =
fieldOpt.map(field => StructField(name,
wrapStructType(dataType, field),
nullable, metadata)).orElse(Some(field))
selectField(child, childField)
// Handles case "expr0.field[key]", where "expr0" is of struct type and "expr0.field" is of
// map type.
case GetMapValue(x @ GetStructFieldObject(child, field @ StructField(name,
dataType,
nullable, metadata)), _) =>
val childField = fieldOpt.map(field => StructField(name,
wrapStructType(dataType, field),
nullable, metadata)).orElse(Some(field))
selectField(child, childField)
// Handles case "expr0.field[key]", where "expr0.field" is of map type.
case GetMapValue(child, _) =>
selectField(child, fieldOpt)
// Handles case "expr0.field", where expr0 is of struct type.
case GetStructFieldObject(child,
field @ StructField(name, dataType, nullable, metadata)) =>
val childField = fieldOpt.map(field => StructField(name,
wrapStructType(dataType, field),
nullable, metadata)).orElse(Some(field))
selectField(child, childField)
case _ =>
None
}
}

// Constructs a composition of complex types with a StructType(Array(field)) at its core. Returns
// a StructType for a StructType, an ArrayType for an ArrayType and a MapType for a MapType.
private def wrapStructType(dataType: DataType, field: StructField): DataType = {
dataType match {
case _: StructType =>
StructType(Array(field))
case ArrayType(elementType, containsNull) =>
ArrayType(wrapStructType(elementType, field), containsNull)
case MapType(keyType, valueType, valueContainsNull) =>
MapType(keyType, wrapStructType(valueType, field), valueContainsNull)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.ExperimentalMethods
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions
import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaPruning
import org.apache.spark.sql.execution.python.ExtractPythonUDFFromAggregate

class SparkOptimizer(
Expand All @@ -31,7 +32,8 @@ class SparkOptimizer(
override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+
Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions)) ++
Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+
Batch("Parquet Schema Pruning", Once, ParquetSchemaPruning)) ++
postHocOptimizationBatches :+
Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ private[parquet] class ParquetRowConverter(
extends ParquetGroupConverter(updater) with Logging {

assert(
parquetType.getFieldCount == catalystType.length,
s"""Field counts of the Parquet schema and the Catalyst schema don't match:
parquetType.getFieldCount <= catalystType.length,
s"""Field count of the Parquet schema is greater than the field count of the Catalyst schema:
|
|Parquet schema:
|$parquetType
Expand Down Expand Up @@ -182,18 +182,20 @@ private[parquet] class ParquetRowConverter(

// Converters for each field.
private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map {
case ((parquetFieldType, catalystField), ordinal) =>
// Converted field value should be set to the `ordinal`-th cell of `currentRow`
newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal))
parquetType.getFields.asScala.map {
case parquetField =>
val fieldIndex = catalystType.fieldIndex(parquetField.getName)
val catalystField = catalystType(fieldIndex)
// Converted field value should be set to the `fieldIndex`-th cell of `currentRow`
newConverter(parquetField, catalystField.dataType, new RowUpdater(currentRow, fieldIndex))
}.toArray
}

override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex)

override def end(): Unit = {
var i = 0
while (i < currentRow.numFields) {
while (i < fieldConverters.length) {
fieldConverters(i).updater.end()
i += 1
}
Expand All @@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter(

override def start(): Unit = {
var i = 0
while (i < currentRow.numFields) {
while (i < fieldConverters.length) {
fieldConverters(i).updater.start()
currentRow.setNullAt(i)
i += 1
}
while (i < currentRow.numFields) {
currentRow.setNullAt(i)
i += 1
}
}

/**
Expand Down
Loading