-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-31684][SQL] Overwrite partition failed with 'WRONG FS' when the target partition is not belong to the filesystem as same as the table #28511
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…e target partition is not belong to the filesystem as same as the table
|
cc @cloud-fan @maropu @dongjoon-hyun @HyukjinKwon thanks. Marked this ticket as a blocker for 3.0, you may lower the priority if necessary |
|
Test build #122536 has finished for PR 28511 at commit
|
|
retest this please |
|
also cc: @viirya |
| tblAuthority.equalsIgnoreCase(partAuthority) | ||
| } else { | ||
| val defaultUri = FileSystem.getDefaultUri(hadoopConf) | ||
| if (tblAuthority != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: for readability, could we replace the current nested ifs with flatten patten-matching?
| .version | ||
| // https://issues.apache.org/jira/browse/SPARK-31684, | ||
| // For Hive 2.0.0 and onwards, as https://issues.apache.org/jira/browse/HIVE-11940 | ||
| // has been fixed, and there is no performance issue anymore. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to check; have you checked that Spark(w/ this fix)+Hive2.0.0+ has the same performance with the current Spark (w/o this fix)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't done the perf tests. According to the discussion in #15726, I choose to believe at the moment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code block was there for a long time, would be better to re-run benchmark now, to make sure we don't introduce regression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any existing benchmark?Or I can try to add one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea please add one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have just added a benchmark for InsertIntoHiveTable and updated the results.
It uses the "INSERT INTO" as the control group, and "INSERT OVERWRITE" as the experimental group.
With builtin hive 2.3.7, the results of 2 groups are close.
WIth builtin hive 1.2.1.xxx, the results of the experimental group reveal a huge performance degradation when the dynamic partition column exists (the reason here is that #15726 is not merged.
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused by the description, maybe you can explain more.
- For Hive versions before 2.0.0, we need to leave the replace work to hive when the table and partition locations do not belong to the same FileSystem
NOTE THAT For Hive 2.2.0 and earlier, if the table and partition locations do not belong together, we will still get the same error thrown by hive encryption check due to HIVE-14380.
For 2, so when the table and partition locations are not the same FileSystem, we leave to Hive with replace. That is to avoid an exception.
But if the table and partition locations are not the same FileSystem, you will get a same exception by HIVE-14380? So what is the difference to leave to Hive or not? You still get the same exception.
Thanks. I missed the ticket number in the description https://issues.apache.org/jira/browse/SPARK-31675 They are different exceptions although looks similar |
|
Test build #122537 has finished for PR 28511 at commit
|
|
how are you going to fix https://issues.apache.org/jira/browse/SPARK-31675 ? |
It's a good question. Besides the encryption check issue in SPARK-31675, I find that Hive 1.x has many places to do improper To achieve that, a hacky way that re-implementing the methods of For now, as we have upgraded built-in Hive to 2.3.7, it's not that urgent to fix that long-standing issues in those quite old hive 1.x versions. So may warning messages be enough if users are trying to modify a table contains data crossing clusters with Hive 1.x? |
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even you show a warning message, the users still get an exception later. If we are certain that in this case the exception will be thrown in Hive side, we may directly throw an exception in Spark side with more clear message telling what is going on and how to workaround with it.
|
@viirya thanks. your suggestion SGTM. I will take that when fixing SPARK-31675 if there are no better solutions. Now let us get back to this PR, shall we just use hive version to determine whether leave overwrite to hive or not? At least we can fix the problem for the default one and those cases with hive 2.x. |
| // scalastyle:off line.size.limit | ||
| /** | ||
| * If the table location and partition location do not belong to the same [[FileSystem]], We | ||
| * should not disable hive overwrite. Otherwise, hive will use the [[FileSystem]] instance belong |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we treat this as a known limitation, with the faith that most users will use Hive 2.x in the long term? Then we just need to check the version.
|
Test build #122683 has finished for PR 28511 at commit
|
|
Test build #122682 has finished for PR 28511 at commit
|
|
Test build #122688 has finished for PR 28511 at commit
|
|
retest this please |
|
Test build #122717 has finished for PR 28511 at commit
|
|
Test build #122777 has finished for PR 28511 at commit
|
| } | ||
| } | ||
|
|
||
| override def suffix: String = if (HiveUtils.isHive23) "" else "hive1.2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we put hive2.3 as suffix as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
| INSERT INTO STATIC 344 367 48 0.0 33585.1 21.4X | ||
| INSERT OVERWRITE DYNAMIC 7656 7714 82 0.0 747622.7 1.0X | ||
| INSERT OVERWRITE HYBRID 1179 1183 6 0.0 115163.3 6.2X | ||
| INSERT OVERWRITE STATIC 400 408 10 0.0 39014.2 18.4X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you have the numbers before your PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me run this benchmark on the master branch and update the result later in the PR description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-INSERT INTO DYNAMIC 7742 7918 248 0.0 756044.0 1.0X
-INSERT INTO HYBRID 1289 1307 26 0.0 125866.3 6.0X
-INSERT INTO STATIC 371 393 38 0.0 36219.4 20.9X
-INSERT OVERWRITE DYNAMIC 8456 8554 138 0.0 825790.3 0.9X
-INSERT OVERWRITE HYBRID 1303 1311 12 0.0 127198.4 5.9X
-INSERT OVERWRITE STATIC 434 447 13 0.0 42373.8 17.8X
+INSERT INTO DYNAMIC 7382 7456 105 0.0 720904.8 1.0X
+INSERT INTO HYBRID 1128 1129 1 0.0 110169.4 6.5X
+INSERT INTO STATIC 349 370 39 0.0 34095.4 21.1X
+INSERT OVERWRITE DYNAMIC 8149 8362 301 0.0 795821.8 0.9X
+INSERT OVERWRITE HYBRID 1317 1318 2 0.0 128616.7 5.6X
+INSERT OVERWRITE STATIC 387 408 37 0.0 37804.1 19.1X*+ for master
*- for this PR
both using hive 2.3.7
|
Hmm, what's wrong with the dependency tests on Jenkins? I passed them locally. |
|
retest this please |
1 similar comment
|
retest this please |
| // check. see https://issues.apache.org/jira/browse/HIVE-14380. | ||
| // So we still disable for Hive overwrite for Hive 1.x for better performance because | ||
| // the partition and table are on the same cluster in most cases. | ||
| val hiveVersDoHiveOverwrite: Set[HiveVersion] = Set(v2_0, v2_1, v2_2, v2_3, v3_0, v3_1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we able to write hiveVersion < v2_0? otherwise we need to update this list everytime we support a new hive version.
| hadoopConf.set("hive.exec.dynamic.partition.mode", "nonstrict") | ||
| hadoopConf.set("hive.exec.max.dynamic.partitions", numRows.toString) | ||
|
|
||
| def withTempTable(tableNames: String*)(f: => Unit): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withTempViews
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually we can inline it, as we only have one temp view.
|
Test build #122806 has finished for PR 28511 at commit
|
|
Test build #122807 has finished for PR 28511 at commit
|
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
Show resolved
Hide resolved
|
|
||
| override def getSparkSession: SparkSession = TestHive.sparkSession | ||
|
|
||
| val tempTable = "temp" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tempView
| def insertOverwriteDynamic(table: String, benchmark: Benchmark): Unit = { | ||
| benchmark.addCase("INSERT OVERWRITE DYNAMIC") { _ => | ||
| sql(s"INSERT OVERWRITE TABLE $table SELECT CAST(id AS INT) AS a," + | ||
| s" CAST(id % 10 AS INT) AS b, CAST(id % 100 AS INT) AS c FROM $tempTable DISTRIBUTE BY a") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does DISTRIBUTE BY a help here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create numRows files in total.
|
Test build #122803 has finished for PR 28511 at commit
|
|
Test build #122811 has finished for PR 28511 at commit
|
| def insertOverwriteDynamic(table: String, benchmark: Benchmark): Unit = { | ||
| benchmark.addCase("INSERT OVERWRITE DYNAMIC") { _ => | ||
| sql(s"INSERT OVERWRITE TABLE $table SELECT CAST(id AS INT) AS a," + | ||
| s" CAST(id % 10 AS INT) AS b, CAST(id % 100 AS INT) AS c FROM $tempView DISTRIBUTE BY a") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does the number of files matter? DISTRIBUTE BY a will add shuffle AFAIK, then the perf is not purely about table insertion and may be less accurate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the cause of the regression of hive is to copy files one by one, so I use DISTRIBUTE BY to create a certain number of files to feed it.
BUT after rerun the test w/o DISTRIBUTE BY, the result seems ok to illustrate the problem. I will rm that clause.
|
LGTM |
1 similar comment
|
LGTM |
|
Test build #122830 has finished for PR 28511 at commit
|
|
retest this please |
|
Test build #122839 has finished for PR 28511 at commit
|
|
retest this please |
|
Test build #122836 has finished for PR 28511 at commit
|
|
This fixes a bug for a corner case, when table and partition locations are in different file systems. I'm merging it to master only, to reduce risk. Thanks! |
|
Test build #122847 has finished for PR 28511 at commit
|
|
thanks for reviewing and merging! |
What changes were proposed in this pull request?
With SPARK-18107, we will disable the underlying replace(overwrite) and instead do delete in spark side and only do copy in hive side to bypass the performance issue - HIVE-11940
Conditionally, if the table location and partition location do not belong to the same
FileSystem, We should not disable hive overwrite. Otherwise, hive will use theFileSysteminstance belong to the table location to copy files, which will fail inFileSystem#checkPathhttps://github.com/apache/hive/blob/rel/release-2.3.7/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java#L1657
In this PR, for Hive 2.0.0 and onwards, as HIVE-11940 has been fixed, and there is no performance issue anymore. We should leave the overwrite logic to hive to avoid failure in
FileSystem#checkPathNOTE THAT For Hive 2.2.0 and earlier, if the table and partition locations do not belong together, we will still get the same error thrown by hive encryption check due to HIVE-14380 which need to fix in another ticket SPARK-31675.
Why are the changes needed?
bugfix. a logic table can be decoupled with the storage layer and may contain data from remote storage systems.
Does this PR introduce any user-facing change?
no
How was this patch tested?
Currently verified manually. add benchmark tests
both using hive 2.3.7