-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25313][SQL]Fix regression in FileFormatWriter output names #22320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #95606 has finished for PR 22320 at commit
|
|
retest this please. |
| * instead of `data.output`. | ||
| * @param outputColumnNames The original output column names of the input query plan. The | ||
| * optimizer may not preserve the output column's names' case, so we need | ||
| * this parameter instead of `data.output`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
* @param outputColumnNames The original output column names of the input query plan. The
* optimizer may not preserve the output column's names' case, so we need
* this parameter instead of `data.output`.
| element.withName(names(index)) | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If #22311 merged, we don't need this function anymore? If so, IMHO it'd be better to fix this issue in the FileFormatWriter side as a workaround?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or make it a util function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems overkill to add a function here. But in FileFormatWriter we can't not access LogicalPlan to get the attributes.
Another way is to put this method in a Util.
Do you have a good suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking...
object FileFormatWriter {
...
// workaround: a helper function...
def outputWithNames(outputAttributes: Seq[Attribute], names: Seq[String]): Seq[Attribute] = {
assert(outputAttributes.length == names.length,
"The length of provided names doesn't match the length of output attributes.")
outputAttributes.zipWithIndex.map { case (element, index) =>
element.withName(names(index))
}
}
Then, in each callsite, just say FileFormatWriter. outputWithNames(logicalPlan.output, names)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu Thanks! I have create object DataWritingCommand for this.
| val resolved = cmd.copy(partitionColumns = resolvedPartCols, outputColumns = outputColumns) | ||
| val resolved = cmd.copy( | ||
| partitionColumns = resolvedPartCols, | ||
| outputColumnNames = outputColumns.map(_.name)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why can't we use outputColumnNames directly here?
| assert(outputAttributes.length == names.length, | ||
| "The length of provided names doesn't match the length of output attributes.") | ||
| outputAttributes.zipWithIndex.map { case (element, index) => | ||
| element.withName(names(index)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
outputAttributes.zip(names).map { case (attr, outputName) => attr.withName(outputName) }?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gengliangwang In what situations would outputAttributes.length != names.length,could u give me an example?
| query: LogicalPlan, | ||
| names: Seq[String]): Seq[Attribute] = { | ||
| // Save the output attributes to a variable to avoid duplicated function calls. | ||
| val outputAttributes = query.output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
query: LogicalPlan -> outputAttributes: Seq[Attribute] in the function argument, then drop the line above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think both are OK. The current way makes it easier to call this Util function, and it is easier to understand what the parameter should be. While the ways you suggests makes the argument carrying minimal information.
|
Test build #95609 has finished for PR 22320 at commit
|
|
Test build #95610 has finished for PR 22320 at commit
|
| } | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to move these tests into DataFrameReaderWriterSuite?
| overwrite: Boolean, | ||
| ifPartitionNotExists: Boolean, | ||
| outputColumns: Seq[Attribute]) extends SaveAsHiveFile { | ||
| outputColumnNames: Seq[String]) extends SaveAsHiveFile { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For better test coverage, can you add tests for hive tables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks!
|
|
||
| import org.apache.spark.{AccumulatorSuite, SparkException} | ||
| import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} | ||
| import org.apache.spark.sql.catalyst.TableIdentifier |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary change
| spark.sql("CREATE TABLE tbl2(COL1 long, COL2 int, COL3 int) USING parquet PARTITIONED " + | ||
| "BY (COL2) CLUSTERED BY (COL3) INTO 3 BUCKETS") | ||
| spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT COL1, COL2, COL3 " + | ||
| "FROM view1 CLUSTER BY COL3") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it legal to put CLUSTER BY in the INSERT statement?
| test("Insert overwrite Hive table should output correct schema") { | ||
| withTable("tbl", "tbl2") { | ||
| withView("view1") { | ||
| spark.sql("CREATE TABLE tbl(id long)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please run this test within withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET -> false)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not familiar with Hive. But as I look at the debug message of this logical plan, the top level is InsertIntoHiveTable default.tbl2, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, true, false, [ID]. It should not be related to this configuration, right?
|
LGTM except some minor comments |
|
Test build #95619 has finished for PR 22320 at commit
|
|
Test build #95620 has finished for PR 22320 at commit
|
|
Test build #95627 has finished for PR 22320 at commit
|
| test("Insert overwrite table command should output correct schema: basic") { | ||
| withTable("tbl", "tbl2") { | ||
| withView("view1") { | ||
| val df = spark.range(10).toDF("id") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is toDF("id") required? Why not spark.range(10) alone?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is trivial...As the column name id is case sensitive and used below, I would like to show it explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"case sensitive"? How is so since Spark SQL is case-insensitive by default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @gengliangwang meant case preserving, which is the behavior we are testing against.
spark.range(10).toDF("id") is same as spark.range(10), it's just clearer to people who don't know spark.range outputs a single column named "id".
| spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl") | ||
| spark.sql("CREATE TABLE tbl2(ID long) USING parquet") | ||
| spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1") | ||
| val identifier = TableIdentifier("tbl2", Some("default")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default is the default database name, isn't it? I'd remove it from the test or use spark.catalog.currentDatabase.
| spark.sql("CREATE TABLE tbl2(COL1 long, COL2 int, COL3 int) USING parquet PARTITIONED " + | ||
| "BY (COL2) CLUSTERED BY (COL3) INTO 3 BUCKETS") | ||
| spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT COL1, COL2, COL3 FROM view1") | ||
| val identifier = TableIdentifier("tbl2", Some("default")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
| val identifier = TableIdentifier("tbl2", Some("default")) | ||
| val location = spark.sessionState.catalog.getTableMetadata(identifier).location.toString | ||
| val expectedSchema = StructType(Seq( | ||
| StructField("COL1", LongType, true), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nullable is true by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keeping it should be OK.
| val location = spark.sessionState.catalog.getTableMetadata(identifier).location.toString | ||
| val expectedSchema = StructType(Seq( | ||
| StructField("COL1", LongType, true), | ||
| StructField("COL3", IntegerType, true), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could use a little magic here: $"COL1".int
scala> $"COL1".int
res1: org.apache.spark.sql.types.StructField = StructField(COL1,IntegerType,true)
| overwrite = false, | ||
| ifPartitionNotExists = false, | ||
| outputColumns = outputColumns).run(sparkSession, child) | ||
| outputColumnNames = outputColumnNames).run(sparkSession, child) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you remove one outputColumnNames?
| withView("view1") { | ||
| withTempPath { path => | ||
| spark.sql("CREATE TABLE tbl(id long)") | ||
| spark.sql("INSERT OVERWRITE TABLE tbl SELECT 4") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/SELECT/VALUES as it could be a bit more Spark-idiomatic?
|
Test build #95633 has finished for PR 22320 at commit
|
| spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl") | ||
| spark.sql("CREATE TABLE tbl2(ID long)") | ||
| spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1") | ||
| checkAnswer(spark.table("tbl2"), Seq(Row(4))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add schema assert please. We can read data since SPARK-25132.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I found that CreateHiveTableAsSelectCommand output wrong schema after adding a new test case.
|
Test build #95649 has finished for PR 22320 at commit
|
|
retest this please |
| assert(tableDesc.schema.isEmpty) | ||
| catalog.createTable(tableDesc.copy(schema = query.schema), ignoreIfExists = false) | ||
| val schema = DataWritingCommand.logicalPlanSchemaWithNames(query, outputColumnNames) | ||
| catalog.createTable(tableDesc.copy(schema = schema), ignoreIfExists = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The schema naming need to be consistent with outputColumnNames here.
|
Test build #95657 has finished for PR 22320 at commit
|
|
Test build #95663 has finished for PR 22320 at commit
|
| outputColumnNames: Seq[String]) | ||
| extends DataWritingCommand { | ||
| import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 66: query.schema should be DataWritingCommand.logicalPlanSchemaWithNames(query, outputColumnNames).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, then we can use this method instead.
def checkColumnNameDuplication(
columnNames: Seq[String], colType: String, caseSensitiveAnalysis: Boolean): Unit
|
Test build #95692 has finished for PR 22320 at commit
|
|
retest this please |
| test("Insert overwrite table command should output correct schema: basic") { | ||
| withTable("tbl", "tbl2") { | ||
| withView("view1") { | ||
| val df = spark.range(10).toDF("id") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"case sensitive"? How is so since Spark SQL is case-insensitive by default?
| overwrite = true, | ||
| ifPartitionNotExists = false, | ||
| outputColumns = outputColumns).run(sparkSession, child) | ||
| outputColumnNames = outputColumnNames).run(sparkSession, child) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this duplication needed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the duplication?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
outputColumnNames themselves. Specyfing outputColumnNames as the name of the property to set using outputColumnNames does nothing but introduces a duplication. If you removed one outputColumnNames the comprehension should not be lowered whatsoever, shouldn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel it's better to specify parameters by name if the previous parameter is already specified by name, e.g. ifPartitionNotExists = false
| withTable("tbl", "tbl2") { | ||
| withView("view1") { | ||
| spark.sql("CREATE TABLE tbl(id long)") | ||
| spark.sql("INSERT OVERWRITE TABLE tbl VALUES 4") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be missing something, but why does this test use SQL statements not DataFrameWriter API, e.g. Seq(4).toDF("id").write.mode(SaveMode.Overwrite).saveAsTable("tbl")?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can, but it's important to keep the code style consistent with the existing code in the same file. In this test suite, seems SQL statements are prefered.
|
Test build #95702 has finished for PR 22320 at commit
|
|
retest this please |
|
Test build #95711 has finished for PR 22320 at commit
|
|
thanks, merging to master! |
|
@gengliangwang We need backport this pr to branch-2.3. |
## What changes were proposed in this pull request?
Let's see the follow example:
```
val location = "/tmp/t"
val df = spark.range(10).toDF("id")
df.write.format("parquet").saveAsTable("tbl")
spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl")
spark.sql(s"CREATE TABLE tbl2(ID long) USING parquet location $location")
spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1")
println(spark.read.parquet(location).schema)
spark.table("tbl2").show()
```
The output column name in schema will be `id` instead of `ID`, thus the last query shows nothing from `tbl2`.
By enabling the debug message we can see that the output naming is changed from `ID` to `id`, and then the `outputColumns` in `InsertIntoHadoopFsRelationCommand` is changed in `RemoveRedundantAliases`.


**To guarantee correctness**, we should change the output columns from `Seq[Attribute]` to `Seq[String]` to avoid its names being replaced by optimizer.
I will fix project elimination related rules in apache#22311 after this one.
## How was this patch tested?
Unit test.
Closes apache#22320 from gengliangwang/fixOutputSchema.
Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…put names Port #22320 to branch-2.3 ## What changes were proposed in this pull request? Let's see the follow example: ``` val location = "/tmp/t" val df = spark.range(10).toDF("id") df.write.format("parquet").saveAsTable("tbl") spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl") spark.sql(s"CREATE TABLE tbl2(ID long) USING parquet location $location") spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1") println(spark.read.parquet(location).schema) spark.table("tbl2").show() ``` The output column name in schema will be `id` instead of `ID`, thus the last query shows nothing from `tbl2`. By enabling the debug message we can see that the output naming is changed from `ID` to `id`, and then the `outputColumns` in `InsertIntoHadoopFsRelationCommand` is changed in `RemoveRedundantAliases`.   **To guarantee correctness**, we should change the output columns from `Seq[Attribute]` to `Seq[String]` to avoid its names being replaced by optimizer. I will fix project elimination related rules in #22311 after this one. ## How was this patch tested? Unit test. Closes #22346 from gengliangwang/portSchemaOutputName2.3. Authored-by: Gengliang Wang <gengliang.wang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Let's see the follow example:
The output column name in schema will be

idinstead ofID, thus the last query shows nothing fromtbl2.By enabling the debug message we can see that the output naming is changed from
IDtoid, and then theoutputColumnsinInsertIntoHadoopFsRelationCommandis changed inRemoveRedundantAliases.To guarantee correctness, we should change the output columns from
Seq[Attribute]toSeq[String]to avoid its names being replaced by optimizer.I will fix project elimination related rules in #22311 after this one.
How was this patch tested?
Unit test.