Skip to content

Commit f2d3542

Browse files
mallmanajacquesMichael Allman
authored andcommitted
[SPARK-4502][SQL] Parquet nested column pruning - foundation
(Link to Jira: https://issues.apache.org/jira/browse/SPARK-4502) _N.B. This is a restart of PR #16578 which includes a subset of that code. Relevant review comments from that PR should be considered incorporated by reference. Please avoid duplication in review by reviewing that PR first. The summary below is an edited copy of the summary of the previous PR._ ## What changes were proposed in this pull request? One of the hallmarks of a column-oriented data storage format is the ability to read data from a subset of columns, efficiently skipping reads from other columns. Spark has long had support for pruning unneeded top-level schema fields from the scan of a parquet file. For example, consider a table, `contacts`, backed by parquet with the following Spark SQL schema: ``` root |-- name: struct | |-- first: string | |-- last: string |-- address: string ``` Parquet stores this table's data in three physical columns: `name.first`, `name.last` and `address`. To answer the query ```SQL select address from contacts ``` Spark will read only from the `address` column of parquet data. However, to answer the query ```SQL select name.first from contacts ``` Spark will read `name.first` and `name.last` from parquet. This PR modifies Spark SQL to support a finer-grain of schema pruning. With this patch, Spark reads only the `name.first` column to answer the previous query. ### Implementation There are two main components of this patch. First, there is a `ParquetSchemaPruning` optimizer rule for gathering the required schema fields of a `PhysicalOperation` over a parquet file, constructing a new schema based on those required fields and rewriting the plan in terms of that pruned schema. The pruned schema fields are pushed down to the parquet requested read schema. `ParquetSchemaPruning` uses a new `ProjectionOverSchema` extractor for rewriting a catalyst expression in terms of a pruned schema. Second, the `ParquetRowConverter` has been patched to ensure the ordinals of the parquet columns read are correct for the pruned schema. `ParquetReadSupport` has been patched to address a compatibility mismatch between Spark's built in vectorized reader and the parquet-mr library's reader. ### Limitation Among the complex Spark SQL data types, this patch supports parquet column pruning of nested sequences of struct fields only. ## How was this patch tested? Care has been taken to ensure correctness and prevent regressions. A more advanced version of this patch incorporating optimizations for rewriting queries involving aggregations and joins has been running on a production Spark cluster at VideoAmp for several years. In that time, one bug was found and fixed early on, and we added a regression test for that bug. We forward-ported this patch to Spark master in June 2016 and have been running this patch against Spark 2.x branches on ad-hoc clusters since then. Closes #21320 from mallman/spark-4502-parquet_column_pruning-foundation. Lead-authored-by: Michael Allman <msa@allman.ms> Co-authored-by: Adam Jacques <adam@technowizardry.net> Co-authored-by: Michael Allman <michael@videoamp.com> Signed-off-by: Xiao Li <gatorsmile@gmail.com>
1 parent cd6dff7 commit f2d3542

File tree

10 files changed

+1313
-5
lines changed

10 files changed

+1313
-5
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1419,8 +1419,18 @@ object SQLConf {
14191419
"issues. Turn on this config to insert a local sort before actually doing repartition " +
14201420
"to generate consistent repartition results. The performance of repartition() may go " +
14211421
"down since we insert extra local sort before it.")
1422+
.booleanConf
1423+
.createWithDefault(true)
1424+
1425+
val NESTED_SCHEMA_PRUNING_ENABLED =
1426+
buildConf("spark.sql.nestedSchemaPruning.enabled")
1427+
.internal()
1428+
.doc("Prune nested fields from a logical relation's output which are unnecessary in " +
1429+
"satisfying a query. This optimization allows columnar file format readers to avoid " +
1430+
"reading unnecessary nested column data. Currently Parquet is the only data source that " +
1431+
"implements this optimization.")
14221432
.booleanConf
1423-
.createWithDefault(true)
1433+
.createWithDefault(false)
14241434

14251435
val TOP_K_SORT_FALLBACK_THRESHOLD =
14261436
buildConf("spark.sql.execution.topKSortFallbackThreshold")
@@ -1895,6 +1905,8 @@ class SQLConf extends Serializable with Logging {
18951905
def partitionOverwriteMode: PartitionOverwriteMode.Value =
18961906
PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE))
18971907

1908+
def nestedSchemaPruningEnabled: Boolean = getConf(NESTED_SCHEMA_PRUNING_ENABLED)
1909+
18981910
def csvColumnPruning: Boolean = getConf(SQLConf.CSV_PARSER_COLUMN_PRUNING)
18991911

19001912
def legacySizeOfNull: Boolean = getConf(SQLConf.LEGACY_SIZE_OF_NULL)
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst
19+
20+
import org.scalatest.BeforeAndAfterAll
21+
22+
import org.apache.spark.sql.catalyst.plans.PlanTest
23+
import org.apache.spark.sql.internal.SQLConf.NESTED_SCHEMA_PRUNING_ENABLED
24+
25+
/**
26+
* A PlanTest that ensures that all tests in this suite are run with nested schema pruning enabled.
27+
* Remove this trait once the default value of SQLConf.NESTED_SCHEMA_PRUNING_ENABLED is set to true.
28+
*/
29+
private[sql] trait SchemaPruningTest extends PlanTest with BeforeAndAfterAll {
30+
private var originalConfSchemaPruningEnabled = false
31+
32+
override protected def beforeAll(): Unit = {
33+
originalConfSchemaPruningEnabled = conf.nestedSchemaPruningEnabled
34+
conf.setConf(NESTED_SCHEMA_PRUNING_ENABLED, true)
35+
super.beforeAll()
36+
}
37+
38+
override protected def afterAll(): Unit = {
39+
try {
40+
super.afterAll()
41+
} finally {
42+
conf.setConf(NESTED_SCHEMA_PRUNING_ENABLED, originalConfSchemaPruningEnabled)
43+
}
44+
}
45+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution
19+
20+
import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField}
21+
import org.apache.spark.sql.types.StructField
22+
23+
/**
24+
* A Scala extractor that extracts the child expression and struct field from a [[GetStructField]].
25+
* This is in contrast to the [[GetStructField]] case class extractor which returns the field
26+
* ordinal instead of the field itself.
27+
*/
28+
private[execution] object GetStructFieldObject {
29+
def unapply(getStructField: GetStructField): Option[(Expression, StructField)] =
30+
Some((
31+
getStructField.child,
32+
getStructField.childSchema(getStructField.ordinal)))
33+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution
19+
20+
import org.apache.spark.sql.catalyst.expressions._
21+
import org.apache.spark.sql.types._
22+
23+
/**
24+
* A Scala extractor that projects an expression over a given schema. Data types,
25+
* field indexes and field counts of complex type extractors and attributes
26+
* are adjusted to fit the schema. All other expressions are left as-is. This
27+
* class is motivated by columnar nested schema pruning.
28+
*/
29+
private[execution] case class ProjectionOverSchema(schema: StructType) {
30+
private val fieldNames = schema.fieldNames.toSet
31+
32+
def unapply(expr: Expression): Option[Expression] = getProjection(expr)
33+
34+
private def getProjection(expr: Expression): Option[Expression] =
35+
expr match {
36+
case a: AttributeReference if fieldNames.contains(a.name) =>
37+
Some(a.copy(dataType = schema(a.name).dataType)(a.exprId, a.qualifier))
38+
case GetArrayItem(child, arrayItemOrdinal) =>
39+
getProjection(child).map { projection => GetArrayItem(projection, arrayItemOrdinal) }
40+
case a: GetArrayStructFields =>
41+
getProjection(a.child).map(p => (p, p.dataType)).map {
42+
case (projection, ArrayType(projSchema @ StructType(_), _)) =>
43+
GetArrayStructFields(projection,
44+
projSchema(a.field.name),
45+
projSchema.fieldIndex(a.field.name),
46+
projSchema.size,
47+
a.containsNull)
48+
}
49+
case GetMapValue(child, key) =>
50+
getProjection(child).map { projection => GetMapValue(projection, key) }
51+
case GetStructFieldObject(child, field: StructField) =>
52+
getProjection(child).map(p => (p, p.dataType)).map {
53+
case (projection, projSchema: StructType) =>
54+
GetStructField(projection, projSchema.fieldIndex(field.name))
55+
}
56+
case _ =>
57+
None
58+
}
59+
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution
19+
20+
import org.apache.spark.sql.catalyst.expressions._
21+
import org.apache.spark.sql.types._
22+
23+
/**
24+
* A Scala extractor that builds a [[org.apache.spark.sql.types.StructField]] from a Catalyst
25+
* complex type extractor. For example, consider a relation with the following schema:
26+
*
27+
* {{{
28+
* root
29+
* |-- name: struct (nullable = true)
30+
* | |-- first: string (nullable = true)
31+
* | |-- last: string (nullable = true)
32+
* }}}
33+
*
34+
* Further, suppose we take the select expression `name.first`. This will parse into an
35+
* `Alias(child, "first")`. Ignoring the alias, `child` matches the following pattern:
36+
*
37+
* {{{
38+
* GetStructFieldObject(
39+
* AttributeReference("name", StructType(_), _, _),
40+
* StructField("first", StringType, _, _))
41+
* }}}
42+
*
43+
* [[SelectedField]] converts that expression into
44+
*
45+
* {{{
46+
* StructField("name", StructType(Array(StructField("first", StringType))))
47+
* }}}
48+
*
49+
* by mapping each complex type extractor to a [[org.apache.spark.sql.types.StructField]] with the
50+
* same name as its child (or "parent" going right to left in the select expression) and a data
51+
* type appropriate to the complex type extractor. In our example, the name of the child expression
52+
* is "name" and its data type is a [[org.apache.spark.sql.types.StructType]] with a single string
53+
* field named "first".
54+
*
55+
* @param expr the top-level complex type extractor
56+
*/
57+
private[execution] object SelectedField {
58+
def unapply(expr: Expression): Option[StructField] = {
59+
// If this expression is an alias, work on its child instead
60+
val unaliased = expr match {
61+
case Alias(child, _) => child
62+
case expr => expr
63+
}
64+
selectField(unaliased, None)
65+
}
66+
67+
private def selectField(expr: Expression, fieldOpt: Option[StructField]): Option[StructField] = {
68+
expr match {
69+
// No children. Returns a StructField with the attribute name or None if fieldOpt is None.
70+
case AttributeReference(name, dataType, nullable, metadata) =>
71+
fieldOpt.map(field =>
72+
StructField(name, wrapStructType(dataType, field), nullable, metadata))
73+
// Handles case "expr0.field[n]", where "expr0" is of struct type and "expr0.field" is of
74+
// array type.
75+
case GetArrayItem(x @ GetStructFieldObject(child, field @ StructField(name,
76+
dataType, nullable, metadata)), _) =>
77+
val childField = fieldOpt.map(field => StructField(name,
78+
wrapStructType(dataType, field), nullable, metadata)).getOrElse(field)
79+
selectField(child, Some(childField))
80+
// Handles case "expr0.field[n]", where "expr0.field" is of array type.
81+
case GetArrayItem(child, _) =>
82+
selectField(child, fieldOpt)
83+
// Handles case "expr0.field.subfield", where "expr0" and "expr0.field" are of array type.
84+
case GetArrayStructFields(child: GetArrayStructFields,
85+
field @ StructField(name, dataType, nullable, metadata), _, _, _) =>
86+
val childField = fieldOpt.map(field => StructField(name,
87+
wrapStructType(dataType, field),
88+
nullable, metadata)).orElse(Some(field))
89+
selectField(child, childField)
90+
// Handles case "expr0.field", where "expr0" is of array type.
91+
case GetArrayStructFields(child,
92+
field @ StructField(name, dataType, nullable, metadata), _, _, _) =>
93+
val childField =
94+
fieldOpt.map(field => StructField(name,
95+
wrapStructType(dataType, field),
96+
nullable, metadata)).orElse(Some(field))
97+
selectField(child, childField)
98+
// Handles case "expr0.field[key]", where "expr0" is of struct type and "expr0.field" is of
99+
// map type.
100+
case GetMapValue(x @ GetStructFieldObject(child, field @ StructField(name,
101+
dataType,
102+
nullable, metadata)), _) =>
103+
val childField = fieldOpt.map(field => StructField(name,
104+
wrapStructType(dataType, field),
105+
nullable, metadata)).orElse(Some(field))
106+
selectField(child, childField)
107+
// Handles case "expr0.field[key]", where "expr0.field" is of map type.
108+
case GetMapValue(child, _) =>
109+
selectField(child, fieldOpt)
110+
// Handles case "expr0.field", where expr0 is of struct type.
111+
case GetStructFieldObject(child,
112+
field @ StructField(name, dataType, nullable, metadata)) =>
113+
val childField = fieldOpt.map(field => StructField(name,
114+
wrapStructType(dataType, field),
115+
nullable, metadata)).orElse(Some(field))
116+
selectField(child, childField)
117+
case _ =>
118+
None
119+
}
120+
}
121+
122+
// Constructs a composition of complex types with a StructType(Array(field)) at its core. Returns
123+
// a StructType for a StructType, an ArrayType for an ArrayType and a MapType for a MapType.
124+
private def wrapStructType(dataType: DataType, field: StructField): DataType = {
125+
dataType match {
126+
case _: StructType =>
127+
StructType(Array(field))
128+
case ArrayType(elementType, containsNull) =>
129+
ArrayType(wrapStructType(elementType, field), containsNull)
130+
case MapType(keyType, valueType, valueContainsNull) =>
131+
MapType(keyType, wrapStructType(valueType, field), valueContainsNull)
132+
}
133+
}
134+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.spark.sql.ExperimentalMethods
2121
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
2222
import org.apache.spark.sql.catalyst.optimizer.Optimizer
2323
import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions
24+
import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaPruning
2425
import org.apache.spark.sql.execution.python.ExtractPythonUDFFromAggregate
2526

2627
class SparkOptimizer(
@@ -31,7 +32,8 @@ class SparkOptimizer(
3132
override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+
3233
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
3334
Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+
34-
Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions)) ++
35+
Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+
36+
Batch("Parquet Schema Pruning", Once, ParquetSchemaPruning)) ++
3537
postHocOptimizationBatches :+
3638
Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)
3739

0 commit comments

Comments
 (0)