-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-38918][SQL] Nested column pruning should filter out attributes that do not belong to the current relation #36216
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,11 +61,15 @@ abstract class SchemaPruningSuite | |
| override protected def sparkConf: SparkConf = | ||
| super.sparkConf.set(SQLConf.ANSI_STRICT_INDEX_OPERATOR.key, "false") | ||
|
|
||
| case class Employee(id: Int, name: FullName, employer: Company) | ||
|
|
||
| val janeDoe = FullName("Jane", "X.", "Doe") | ||
| val johnDoe = FullName("John", "Y.", "Doe") | ||
| val susanSmith = FullName("Susan", "Z.", "Smith") | ||
|
|
||
| val employer = Employer(0, Company("abc", "123 Business Street")) | ||
| val company = Company("abc", "123 Business Street") | ||
|
|
||
| val employer = Employer(0, company) | ||
| val employerWithNullCompany = Employer(1, null) | ||
| val employerWithNullCompany2 = Employer(2, null) | ||
|
|
||
|
|
@@ -81,6 +85,8 @@ abstract class SchemaPruningSuite | |
| Department(1, "Marketing", 1, employerWithNullCompany) :: | ||
| Department(2, "Operation", 4, employerWithNullCompany2) :: Nil | ||
|
|
||
| val employees = Employee(0, janeDoe, company) :: Employee(1, johnDoe, company) :: Nil | ||
|
|
||
| case class Name(first: String, last: String) | ||
| case class BriefContact(id: Int, name: Name, address: String) | ||
|
|
||
|
|
@@ -621,6 +627,26 @@ abstract class SchemaPruningSuite | |
| } | ||
| } | ||
|
|
||
| testSchemaPruning("SPARK-38918: nested schema pruning with correlated subqueries") { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Without this PR, this test failed with
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes it looks like a separate issue with column pruning and subquery rewrite (data source v1 only). I will investigate more. |
||
| withContacts { | ||
| withEmployees { | ||
| val query = sql( | ||
| """ | ||
| |select count(*) | ||
| |from contacts c | ||
| |where not exists (select null from employees e where e.name.first = c.name.first | ||
| | and e.employer.name = c.employer.company.name) | ||
| |""".stripMargin) | ||
| checkScan(query, | ||
| "struct<name:struct<first:string,middle:string,last:string>," + | ||
| "employer:struct<id:int,company:struct<name:string,address:string>>>", | ||
| "struct<name:struct<first:string,middle:string,last:string>," + | ||
| "employer:struct<name:string,address:string>>") | ||
| checkAnswer(query, Row(3)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we check pruning schema too? |
||
| } | ||
| } | ||
| } | ||
|
|
||
| protected def testSchemaPruning(testName: String)(testThunk: => Unit): Unit = { | ||
| test(s"Spark vectorized reader - without partition data column - $testName") { | ||
| withSQLConf(vectorizedReaderEnabledKey -> "true") { | ||
|
|
@@ -701,6 +727,23 @@ abstract class SchemaPruningSuite | |
| } | ||
| } | ||
|
|
||
| private def withEmployees(testThunk: => Unit): Unit = { | ||
| withTempPath { dir => | ||
| val path = dir.getCanonicalPath | ||
|
|
||
| makeDataSourceFile(employees, new File(path + "/employees")) | ||
|
|
||
| // Providing user specified schema. Inferred schema from different data sources might | ||
| // be different. | ||
| val schema = "`id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, " + | ||
| "`employer` STRUCT<`name`: STRING, `address`: STRING>" | ||
| spark.read.format(dataSourceName).schema(schema).load(path + "/employees") | ||
| .createOrReplaceTempView("employees") | ||
|
|
||
| testThunk | ||
| } | ||
| } | ||
|
|
||
| case class MixedCaseColumn(a: String, B: Int) | ||
| case class MixedCase(id: Int, CoL1: String, coL2: MixedCaseColumn) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's this for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I discovered that this Once batch is not idempotent.
ColumnPruningandCollapseProjectcan be applied multiple times after correlated IN/EXISTS subqueries are rewritten. Happy to discuss other ways to fix/improve this batch. cc @cloud-fanAttached the plan change log for the test case:
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't find that before because we don't have the test coverage?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping @allisonwang-db