-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29869][SQL] improve error message in HiveMetastoreCatalog#convertToLogicalRelation #26499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…oreCatalog#convertToLogicalRelation throws AssertionError
|
cc @wangyum , more input? |
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
Show resolved
Hide resolved
|
Test build #113688 has finished for PR 26499 at commit
|
|
Test build #113750 has finished for PR 26499 at commit
|
|
This issue caused by |
|
Test build #113758 has finished for PR 26499 at commit
|
| withTable("test") { | ||
| withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true", | ||
| SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "0") { | ||
| val partitionLikeDir = s"${dir.getCanonicalFile.toURI.toString}/test/dt=20191113" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val partitionLikeDir = s"${dir.getCanonicalFile.toURI.toString}/test/dt=20191113" -> val partitionLikeDir = s"$baseDir/dt=20191113"?
| test("SPARK-29869: HiveMetastoreCatalog#convertToLogicalRelation throws AssertionError") { | ||
| test("SPARK-29869: fix HiveMetastoreCatalog#convertToLogicalRelation throws AssertionError") { | ||
| withTempPath(dir => { | ||
| val baseDir = s"${dir.getCanonicalFile.toURI.toString}/test" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test -> non_partition_table ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the location can be set to anywhere. test here is fine. non_partition_table will over 100 chars :)
|
Soft ping @cloud-fan @HyukjinKwon @wangyum @viirya |
| val e = intercept[AssertionError](spark.sql("select * from test")).getMessage | ||
| assert(e.contains("assertion failed")) | ||
| assert(spark.sql("select * from non_partition_table").collect() === | ||
| Array(Row(0), Row(1), Row(2))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert(spark.sql("select * from non_partition_table").collect() === Array(Row(0), Row(1), Row(2))) -> checkAnswer(spark.table("non_partition_table"), Row(0, 1, 2))?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. I wanna wait more comments to update in one.
| withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true") { | ||
| spark.sql( | ||
| s""" | ||
| |CREATE TABLE non_partition_table (id bigint) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it a malformed table? Does hive ignore the directories for non-partitioned tables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems Hive return none when query this table(1.2.1):
hive> select * from
xxxxx.xxxx;
OK
Time taken: 25.301 seconds
But no assertion error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you are right. Actually the table LOCATION is /path/tablename/dt=yyyymmdd, But its data file paths are /path/tablename/dt=yyyymmdd/dt=yyyymmdd/xxx.parquet. I guess Hive does not recursively lookup load the data. So it return empty but not error.
And I found if when enable recursively lookup by .option("recursiveFileLookup", true), the inferPartitioning will be disable. So dt=yyyymmdd won't be treated as partitionSpec.
So should I revert the code changes and only keep the assert detail information? Or throws exception instead of assertion, and catch it then rollback to do not use built-in Parquet reader to read?
|
Test build #113773 has finished for PR 26499 at commit
|
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
Show resolved
Hide resolved
|
Since this issue table has a nest data path: But once recursively lookup enabled, the inferPartitioning will be disabled in current implementation. So this problem here can not be reproduced. So should we trim out the inferring partition cols for Hive table, anyhow? Or just throws a |
|
Test build #113839 has finished for PR 26499 at commit
|
|
+1 for throws a |
|
Test build #113866 has finished for PR 26499 at commit
|
| assert(result.output.length == relation.output.length && | ||
| result.output.zip(relation.output).forall { case (a1, a2) => a1.dataType == a2.dataType }) | ||
| if (result.output.length != relation.output.length) { | ||
| throw new HiveTableConvertException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we just throw AnalysisException? It's not in the runtime stage yet.
| |STORED AS PARQUET LOCATION '$baseDir' | ||
| |""".stripMargin) | ||
| val e = intercept[HiveTableConvertException]( | ||
| spark.table("non_partition_table"), Seq()).getMessage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: what does Seq() mean here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, typo
| s"or recreate table ${relation.tableMeta.identifier} to workaround.") | ||
| } | ||
| if (!result.output.zip(relation.output).forall { | ||
| case (a1, a2) => a1.dataType.sameType(a2.dataType) }) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a breaking change. We allow inferred schema may have different field names. sameType requires same field names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I see. It restricts the field names by l.name == r.name. Sorry for that, I will revert this.
|
Test build #114048 has finished for PR 26499 at commit
|
|
Test build #114051 has finished for PR 26499 at commit
|
|
thanks, merging to master! |
…ertToLogicalRelation
In our production, HiveMetastoreCatalog#convertToLogicalRelation throws AssertError occasionally:
```sql
scala> spark.table("hive_table").show
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208)
at org.apache.spark.sql.hive.HiveMetastoreCatalog.convertToLogicalRelation(HiveMetastoreCatalog.scala:261)
at org.apache.spark.sql.hive.HiveMetastoreCatalog.convert(HiveMetastoreCatalog.scala:137)
at org.apache.spark.sql.hive.RelationConversions$$anonfun$apply$4.applyOrElse(HiveStrategies.scala:220)
at org.apache.spark.sql.hive.RelationConversions$$anonfun$apply$4.applyOrElse(HiveStrategies.scala:207)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$2(AnalysisHelper.scala:108)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:108)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:106)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:104)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$4(AnalysisHelper.scala:113)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:376)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:214)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:374)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:113)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:106)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:104)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:73)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:72)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:29)
at org.apache.spark.sql.hive.RelationConversions.apply(HiveStrategies.scala:207)
at org.apache.spark.sql.hive.RelationConversions.apply(HiveStrategies.scala:191)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:130)
at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:49)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:127)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:119)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:119)
at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:168)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:162)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:122)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:98)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:98)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:146)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:145)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:66)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:63)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:63)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:55)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:86)
at org.apache.spark.sql.SparkSession.table(SparkSession.scala:585)
at org.apache.spark.sql.SparkSession.table(SparkSession.scala:581)
... 47 elided
````
Most of cases occurred in reading a table which created by an old Spark version.
After recreated the table, the issue will be gone.
After deep dive, the root cause is this external table is a non-partitioned table but the `LOCATION` set to a partitioned path {{/tablename/dt=yyyymmdd}}. The partitionSpec is inferred.
Above error message is very confused. We need more details about assert failure information.
This issue caused by `PartitioningAwareFileIndex#inferPartitioning()`. For non-HiveMetastore Spark, it's useful. But for Hive table, it shouldn't infer partition if Hive tell us it's a non partitioned table. (new added)
No
Add UT.
Closes apache#26499 from LantaoJin/SPARK-29869.
Authored-by: LantaoJin <jinlantao@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
RB=2281940
BUG=LIHADOOP-55655
G=spark-reviewers
R=ekrogen,vsowrira
A=mmuralid,ekrogen,vsowrira
What changes were proposed in this pull request?
In our production, HiveMetastoreCatalog#convertToLogicalRelation throws AssertError occasionally:
Most of cases occurred in reading a table which created by an old Spark version.
After recreated the table, the issue will be gone.
After deep dive, the root cause is this external table is a non-partitioned table but the
LOCATIONset to a partitioned path {{/tablename/dt=yyyymmdd}}. The partitionSpec is inferred.Why are the changes needed?
Above error message is very confused. We need more details about assert failure information.
This issue caused by
PartitioningAwareFileIndex#inferPartitioning(). For non-HiveMetastore Spark, it's useful. But for Hive table, it shouldn't infer partition if Hive tell us it's a non partitioned table. (new added)Does this PR introduce any user-facing change?
No
How was this patch tested?
Add UT.