Skip to content

Commit 4c1428f

Browse files
viiryadbtsai
authored andcommitted
[SPARK-25363][SQL] Fix schema pruning in where clause by ignoring unnecessary root fields
## What changes were proposed in this pull request? Schema pruning doesn't work if nested column is used in where clause. For example, ``` sql("select name.first from contacts where name.first = 'David'") == Physical Plan == *(1) Project [name#19.first AS first#40] +- *(1) Filter (isnotnull(name#19) && (name#19.first = David)) +- *(1) FileScan parquet [name#19] Batched: false, Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:struct<first:string,middle:string,last:string>> ``` In above query plan, the scan node reads the entire schema of `name` column. This issue is reported by: #21320 (comment) The cause is that we infer a root field from expression `IsNotNull(name)`. However, for such expression, we don't really use the nested fields of this root field, so we can ignore the unnecessary nested fields. ## How was this patch tested? Unit tests. Closes #22357 from viirya/SPARK-25363. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: DB Tsai <d_tsai@apple.com> (cherry picked from commit 3030b82) Signed-off-by: DB Tsai <d_tsai@apple.com>
1 parent 071babb commit 4c1428f

File tree

2 files changed

+96
-15
lines changed

2 files changed

+96
-15
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.datasources.parquet
1919

20-
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, Expression, NamedExpression}
20+
import org.apache.spark.sql.catalyst.expressions._
2121
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2222
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
2323
import org.apache.spark.sql.catalyst.rules.Rule
@@ -110,7 +110,17 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
110110
val projectionRootFields = projects.flatMap(getRootFields)
111111
val filterRootFields = filters.flatMap(getRootFields)
112112

113-
(projectionRootFields ++ filterRootFields).distinct
113+
// Kind of expressions don't need to access any fields of a root fields, e.g., `IsNotNull`.
114+
// For them, if there are any nested fields accessed in the query, we don't need to add root
115+
// field access of above expressions.
116+
// For example, for a query `SELECT name.first FROM contacts WHERE name IS NOT NULL`,
117+
// we don't need to read nested fields of `name` struct other than `first` field.
118+
val (rootFields, optRootFields) = (projectionRootFields ++ filterRootFields)
119+
.distinct.partition(_.contentAccessed)
120+
121+
optRootFields.filter { opt =>
122+
!rootFields.exists(_.field.name == opt.field.name)
123+
} ++ rootFields
114124
}
115125

116126
/**
@@ -156,7 +166,7 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
156166
// in the resulting schema may differ from their ordering in the logical relation's
157167
// original schema
158168
val mergedSchema = requestedRootFields
159-
.map { case RootField(field, _) => StructType(Array(field)) }
169+
.map { case root: RootField => StructType(Array(root.field)) }
160170
.reduceLeft(_ merge _)
161171
val dataSchemaFieldNames = fileDataSchema.fieldNames.toSet
162172
val mergedDataSchema =
@@ -199,6 +209,15 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
199209
case att: Attribute =>
200210
RootField(StructField(att.name, att.dataType, att.nullable), derivedFromAtt = true) :: Nil
201211
case SelectedField(field) => RootField(field, derivedFromAtt = false) :: Nil
212+
// Root field accesses by `IsNotNull` and `IsNull` are special cases as the expressions
213+
// don't actually use any nested fields. These root field accesses might be excluded later
214+
// if there are any nested fields accesses in the query plan.
215+
case IsNotNull(SelectedField(field)) =>
216+
RootField(field, derivedFromAtt = false, contentAccessed = false) :: Nil
217+
case IsNull(SelectedField(field)) =>
218+
RootField(field, derivedFromAtt = false, contentAccessed = false) :: Nil
219+
case IsNotNull(_: Attribute) | IsNull(_: Attribute) =>
220+
expr.children.flatMap(getRootFields).map(_.copy(contentAccessed = false))
202221
case _ =>
203222
expr.children.flatMap(getRootFields)
204223
}
@@ -250,8 +269,11 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
250269
}
251270

252271
/**
253-
* A "root" schema field (aka top-level, no-parent) and whether it was derived from
254-
* an attribute or had a proper child.
272+
* This represents a "root" schema field (aka top-level, no-parent). `field` is the
273+
* `StructField` for field name and datatype. `derivedFromAtt` indicates whether it
274+
* was derived from an attribute or had a proper child. `contentAccessed` means whether
275+
* it was accessed with its content by the expressions refer it.
255276
*/
256-
private case class RootField(field: StructField, derivedFromAtt: Boolean)
277+
private case class RootField(field: StructField, derivedFromAtt: Boolean,
278+
contentAccessed: Boolean = true)
257279
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,22 +35,29 @@ class ParquetSchemaPruningSuite
3535
with SchemaPruningTest
3636
with SharedSQLContext {
3737
case class FullName(first: String, middle: String, last: String)
38+
case class Company(name: String, address: String)
39+
case class Employer(id: Int, company: Company)
3840
case class Contact(
3941
id: Int,
4042
name: FullName,
4143
address: String,
4244
pets: Int,
4345
friends: Array[FullName] = Array.empty,
44-
relatives: Map[String, FullName] = Map.empty)
46+
relatives: Map[String, FullName] = Map.empty,
47+
employer: Employer = null)
4548

4649
val janeDoe = FullName("Jane", "X.", "Doe")
4750
val johnDoe = FullName("John", "Y.", "Doe")
4851
val susanSmith = FullName("Susan", "Z.", "Smith")
4952

53+
val employer = Employer(0, Company("abc", "123 Business Street"))
54+
val employerWithNullCompany = Employer(1, null)
55+
5056
private val contacts =
5157
Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith),
52-
relatives = Map("brother" -> johnDoe)) ::
53-
Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> janeDoe)) :: Nil
58+
relatives = Map("brother" -> johnDoe), employer = employer) ::
59+
Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> janeDoe),
60+
employer = employerWithNullCompany) :: Nil
5461

5562
case class Name(first: String, last: String)
5663
case class BriefContact(id: Int, name: Name, address: String)
@@ -66,13 +73,14 @@ class ParquetSchemaPruningSuite
6673
pets: Int,
6774
friends: Array[FullName] = Array(),
6875
relatives: Map[String, FullName] = Map(),
76+
employer: Employer = null,
6977
p: Int)
7078

7179
case class BriefContactWithDataPartitionColumn(id: Int, name: Name, address: String, p: Int)
7280

7381
private val contactsWithDataPartitionColumn =
74-
contacts.map { case Contact(id, name, address, pets, friends, relatives) =>
75-
ContactWithDataPartitionColumn(id, name, address, pets, friends, relatives, 1) }
82+
contacts.map { case Contact(id, name, address, pets, friends, relatives, employer) =>
83+
ContactWithDataPartitionColumn(id, name, address, pets, friends, relatives, employer, 1) }
7684
private val briefContactsWithDataPartitionColumn =
7785
briefContacts.map { case BriefContact(id, name, address) =>
7886
BriefContactWithDataPartitionColumn(id, name, address, 2) }
@@ -155,6 +163,60 @@ class ParquetSchemaPruningSuite
155163
Row(null) :: Row(null) :: Nil)
156164
}
157165

166+
testSchemaPruning("select a single complex field and in where clause") {
167+
val query1 = sql("select name.first from contacts where name.first = 'Jane'")
168+
checkScan(query1, "struct<name:struct<first:string>>")
169+
checkAnswer(query1, Row("Jane") :: Nil)
170+
171+
val query2 = sql("select name.first, name.last from contacts where name.first = 'Jane'")
172+
checkScan(query2, "struct<name:struct<first:string,last:string>>")
173+
checkAnswer(query2, Row("Jane", "Doe") :: Nil)
174+
175+
val query3 = sql("select name.first from contacts " +
176+
"where employer.company.name = 'abc' and p = 1")
177+
checkScan(query3, "struct<name:struct<first:string>," +
178+
"employer:struct<company:struct<name:string>>>")
179+
checkAnswer(query3, Row("Jane") :: Nil)
180+
181+
val query4 = sql("select name.first, employer.company.name from contacts " +
182+
"where employer.company is not null and p = 1")
183+
checkScan(query4, "struct<name:struct<first:string>," +
184+
"employer:struct<company:struct<name:string>>>")
185+
checkAnswer(query4, Row("Jane", "abc") :: Nil)
186+
}
187+
188+
testSchemaPruning("select nullable complex field and having is not null predicate") {
189+
val query = sql("select employer.company from contacts " +
190+
"where employer is not null and p = 1")
191+
checkScan(query, "struct<employer:struct<company:struct<name:string,address:string>>>")
192+
checkAnswer(query, Row(Row("abc", "123 Business Street")) :: Row(null) :: Nil)
193+
}
194+
195+
testSchemaPruning("select a single complex field and is null expression in project") {
196+
val query = sql("select name.first, address is not null from contacts")
197+
checkScan(query, "struct<name:struct<first:string>,address:string>")
198+
checkAnswer(query.orderBy("id"),
199+
Row("Jane", true) :: Row("John", true) :: Row("Janet", true) :: Row("Jim", true) :: Nil)
200+
}
201+
202+
testSchemaPruning("select a single complex field array and in clause") {
203+
val query = sql("select friends.middle from contacts where friends.first[0] = 'Susan'")
204+
checkScan(query,
205+
"struct<friends:array<struct<first:string,middle:string>>>")
206+
checkAnswer(query.orderBy("id"),
207+
Row(Array("Z.")) :: Nil)
208+
}
209+
210+
testSchemaPruning("select a single complex field from a map entry and in clause") {
211+
val query =
212+
sql("select relatives[\"brother\"].middle from contacts " +
213+
"where relatives[\"brother\"].first = 'John'")
214+
checkScan(query,
215+
"struct<relatives:map<string,struct<first:string,middle:string>>>")
216+
checkAnswer(query.orderBy("id"),
217+
Row("Y.") :: Nil)
218+
}
219+
158220
private def testSchemaPruning(testName: String)(testThunk: => Unit) {
159221
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
160222
test(s"Spark vectorized reader - without partition data column - $testName") {
@@ -238,10 +300,7 @@ class ParquetSchemaPruningSuite
238300

239301
testMixedCasePruning("filter with different-case column names") {
240302
val query = sql("select id from mixedcase where Col2.b = 2")
241-
// Pruning with filters is currently unsupported. As-is, the file reader will read the id column
242-
// and the entire coL2 struct. Once pruning with filters has been implemented we can uncomment
243-
// this line
244-
// checkScan(query, "struct<id:int,coL2:struct<B:int>>")
303+
checkScan(query, "struct<id:int,coL2:struct<B:int>>")
245304
checkAnswer(query.orderBy("id"), Row(1) :: Nil)
246305
}
247306

0 commit comments

Comments
 (0)