-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-40815][SQL] Add DelegateSymlinkTextInputFormat to workaround SymlinkTextInputSplit bug
#38277
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@dongjoon-hyun Would you be able to review this PR? I have read the comments on the original PR and was ambivalent about disabling the flag again vs fixing it in the code. I suppose SymlinkTextInputFormat is one of the cases where this change is not fully safe as it silently causes incorrect results instead of throwing an error. I also considered an alternative fix of substituting Or fixing it in Hive but I don't know how long it would take to do so. Maybe it would be better to implement the shim input format in Spark instead of disabling the flag, so let me know. I am open to alternative approaches. |
|
Thank you for pinging me, @sadikovi . Will take a look at |
|
cc @sunchao too since he is the Apache Hive PMC member. |
|
Thank you. I also thought about fixing it in Hive but I don't know how long that would take to fix it there and make the fix available in Spark, therefore I am open to alternative approaches, let me know 👍 . |
|
Can one of the admins verify this patch? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
Thank you for the review. Sure, I can add the migration guide notes. |
|
@sunchao Could you review and comment on the alternative solutions?
|
|
Given that this went into 3.2 already, and was a change in behavior - do we want to revert this back ? |
|
Is it possible to treat val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala
val rawSplits = if (ignoreEmptySplits &&
inputFormat.getClass.getName != "org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat") {
allRowSplits.filter(_.getLength > 0)
} else {
allRowSplits
}I think a fix in Spark itself will be a good short term solution. A fix in Hive appears to be more involved. For one, it's hard to give a reasonable start and length for |
|
If it is very specific to this case, the approach you detailed sounds fine as a short term measure @sunchao. Thoughts @dongjoon-hyun ? |
|
@sunchao I think it is possible to give start and length to SymlinkTextInputSplit - it is a one-to-one mapping to the parent file split. Based on this, I decided to introduce @sunchao @mridulm @dongjoon-hyun Could you review the PR again? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yay!
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java
Outdated
Show resolved
Hide resolved
eeb641c to
23c9f0f
Compare
| .internal() | ||
| .doc("When true, SymlinkTextInputFormat is replaced with a similar delegate class during " + | ||
| "table scan in order to fix the issue of empty splits") | ||
| .version("3.4.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, got it. Initially, I thought this PR aims to be backported in order to fix the correctness issues of SymlinkTextInputFormat.
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java
Outdated
Show resolved
Hide resolved
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java
Outdated
Show resolved
Hide resolved
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java
Show resolved
Hide resolved
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java
Show resolved
Hide resolved
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java
Show resolved
Hide resolved
| split = new SymlinkTextInputSplit(); | ||
| } | ||
|
|
||
| public DelegateSymlinkTextInputSplit(Path symlinkPath, SymlinkTextInputSplit split) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do we need symlinkPath? it can be replaced by split.getPath().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it cannot. These paths are different https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java#L162.
|
@sunchao @dongjoon-hyun Could you take another look? Thanks. I have addressed your comments. |
|
@sadikovi sorry for the delay, will take a look. |
sunchao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| * Delegate for SymlinkTextInputFormat, created to address SPARK-40815. | ||
| * Fixes an issue where SymlinkTextInputFormat returns empty splits which could result in | ||
| * the correctness issue when "spark.hadoopRDD.ignoreEmptySplits" is enabled. | ||
| * |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add <p> for better formatting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
| * In this class, we update the split start and length to match the target file input thus fixing | ||
| * the issue. | ||
| */ | ||
| @SuppressWarnings("deprecation") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unnecessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
DelegateSymlinkTextInputFormat to workaround SymlinkTextInputSplit bug
|
I revise the PR title to simplify, @sadikovi . You can change it back if you want. |
|
Thanks for updating the PR title 👍. |
|
Thank you @dongjoon-hyun for merging 👍. |
| ) | ||
| } | ||
|
|
||
| test("SPARK-40815: Read SymlinkTextInputFormat") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test fails in JDK 11 and 17 😢
https://github.com/apache/spark/actions/runs/3379157338/jobs/5610899432
https://github.com/apache/spark/actions/runs/3381461270/jobs/5615405153
[info] - SPARK-40815: Read SymlinkTextInputFormat *** FAILED *** (587 milliseconds)
[info] Results do not match for query:
[info] Timezone: sun.util.calendar.ZoneInfo[id="America/Los_Angeles",offset=-28800000,dstSavings=3600000,useDaylight=true,transitions=185,lastRule=java.util.SimpleTimeZone[id=America/Los_Angeles,offset=-28800000,dstSavings=3600000,useDaylight=true,startYear=0,startMode=3,startMonth=2,startDay=8,startDayOfWeek=1,startTime=7200000,startTimeMode=0,endMode=3,endMonth=10,endDay=1,endDayOfWeek=1,endTime=7200000,endTimeMode=0]]
[info] Timezone Env:
[info]
[info] == Parsed Logical Plan ==
[info] 'Sort ['id ASC NULLS FIRST], true
[info] +- 'Project ['id]
[info] +- 'UnresolvedRelation [t], [], false
[info]
[info] == Analyzed Logical Plan ==
[info] id: bigint
[info] Sort [id#175602L ASC NULLS FIRST], true
[info] +- Project [id#175602L]
[info] +- SubqueryAlias spark_catalog.default.t
[info] +- HiveTableRelation [`spark_catalog`.`default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#175602L], Partition Cols: []]
[info]
[info] == Optimized Logical Plan ==
[info] Sort [id#175602L ASC NULLS FIRST], true
[info] +- HiveTableRelation [`spark_catalog`.`default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#175602L], Partition Cols: []]
[info]
[info] == Physical Plan ==
[info] AdaptiveSparkPlan isFinalPlan=true
[info] +- == Final Plan ==
[info] LocalTableScan <empty>, [id#175602L]
[info] +- == Initial Plan ==
[info] Sort [id#175602L ASC NULLS FIRST], true, 0
[info] +- Exchange rangepartitioning(id#175602L ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, [plan_id=176717]
[info] +- Scan hive spark_catalog.default.t [id#175602L], HiveTableRelation [`spark_catalog`.`default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#175602L], Partition Cols: []]
[info]
[info] == Results ==
[info]
[info] == Results ==
[info] !== Correct Answer - 10 == == Spark Answer - 0 ==
[info] struct<> struct<>
[info] ![0]
[info] ![1]
[info] ![2]
[info] ![3]
[info] ![4]
[info] ![5]
[info] ![6]
[info] ![7]
[info] ![8]
[info] ![9] (QueryTest.scala:243)
[info] org.scalatest.exceptions.TestFailedException:
[info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info] at org.apache.spark.sql.QueryTest$.newAssertionFailedException(QueryTest.scala:233)
[info] at org.scalatest.Assertions.fail(Assertions.scala:933)
[info] at org.scalatest.Assertions.fail$(Assertions.scala:929)
[info] at org.apache.spark.sql.QueryTest$.fail(QueryTest.scala:233)
[info] at org.apache.spark.sql.QueryTest$.checkAnswer(QueryTest.scala:243)
[info] at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:150)
[info] at org.apache.spark.sql.hive.execution.HiveSerDeReadWriteSuite.$anonfun$new$9(HiveSerDeReadWriteSuite.scala:293)
[info] at org.apache.spark.sql.hive.execution.HiveSerDeReadWriteSuite.$anonfun$new$9$adapted(HiveSerDeReadWriteSuite.scala:266)
[info] at org.apache.spark.sql.test.SQLTestUtils.$anonfun$withTempDir$1(SQLTestUtils.scala:79)
[info] at org.apache.spark.sql.test.SQLTestUtils.$anonfun$withTempDir$1$adapted(SQLTestUtils.scala:78)
[info] at org.apache.spark.SparkFunSuite.withTempDir(SparkFunSuite.scala:225)
[info] at org.apache.spark.sql.hive.execution.HiveSerDeReadWriteSuite.org$apache$spark$sql$test$SQLTestUtils$$super$withTempDir(HiveSerDeReadWriteSuite.scala:37)
[info] at org.apache.spark.sql.test.SQLTestUtils.withTempDir(SQLTestUtils.scala:78)
[info] at org.apache.spark.sql.test.SQLTestUtils.withTempDir$(SQLTestUtils.scala:77)
[info] at org.apache.spark.sql.hive.execution.HiveSerDeReadWriteSuite.withTempDir(HiveSerDeReadWriteSuite.scala:37)
[info] at org.apache.spark.sql.hive.execution.HiveSerDeReadWriteSuite.$anonfun$new$8(HiveSerDeReadWriteSuite.scala:266)
[info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
[info] at org.apache.spark.sql.test.SQLTestUtilsBase.withTable(SQLTestUtils.scala:306)
[info] at org.apache.spark.sql.test.SQLTestUtilsBase.withTable$(SQLTestUtils.scala:304)
[info] at org.apache.spark.sql.hive.execution.HiveSerDeReadWriteSuite.withTable(HiveSerDeReadWriteSuite.scala:37)
[info] at org.apache.spark.sql.hive.execution.HiveSerDeReadWriteSuite.$anonfun$new$7(HiveSerDeReadWriteSuite.scala:266)
[info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] at org.scalatest.Transformer.apply(Transformer.scala:22)
[info] at org.scalatest.Transformer.apply(Transformer.scala:20)
[info] at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info] at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:207)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:66)
|
@HyukjinKwon Strange, there is nothing JDK specific in the test. Can you share how I can reproduce the issue locally with JDK 11 or JDK 17? I can open a follow-up PR to fix the test tomorrow. |
|
I haven't tested it locally actually. It's GitHub Actions build .. just installing JDK 11 and setting My gut feeling is about Hive behaviour based on JDK version .. |
|
Actually, it does look like consistently failing (I retriggered: https://github.com/apache/spark/actions/runs/3379157338/jobs/5620261594). Let me see if it fails in my local. |
|
Yes, it's locally reproduced with JDK 11 (just set JAVA_HOME): |
|
Thank you for spotting that, @HyukjinKwon . |
|
I also confirmed it fails on Java 11 although the symlink is read by Spark. |
|
Let me open a PR to disable the test and I will open a fix as a follow-up. |
…tests for JDK 9+ ### What changes were proposed in this pull request? This PR is a follow-up for #38277. This change is required due to test failures in JDK 11 and JDK 17. The patch disables the unit test for JDK 9+. This is a temporary measure while I am debugging and working on the fix for higher versions of JDK. ### Why are the changes needed? Fixes the test failure in JDK 11. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? N/A. Closes #38504 from sadikovi/fix-symlink-test. Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
|
@dongjoon-hyun I am trying to repro with JDK 11 (11.0.16) and the test passes just fine. Did you have to do any special setup to trigger the problem? |
|
Actually, I can repro by running the entire test suite, does not reproduce when running the test individually. |
|
Issue is reproducible even with SymlinkTextInputFormat, not related to delegate class. Also, the test runs just fine when "SPARK-34512: Disable validate default values when parsing Avro schemas" is not run prior or swapped to run after my test. I will continue to debug but it does not appear to be related to the patch itself. |
… `SymlinkTextInputSplit` bug ### What changes were proposed in this pull request? This PR is a follow-up for apache#31909. In the original PR, `spark.hadoopRDD.ignoreEmptySplits` was enabled due to seemingly no side-effects, however, this change breaks `SymlinkTextInputFormat` so any table that uses the input format would return empty results. This is due to a combination of problems: 1. Incorrect implementation of [SymlinkTextInputSplit](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java#L73). The input format does not set `start` and `length` fields from the target split. `SymlinkTextInputSplit` is an abstraction over FileSplit and all downstream systems treat it as such - those fields should be extracted and passed from the target split. 2. `spark.hadoopRDD.ignoreEmptySplits` being enabled causes HadoopRDD to filter out all of the empty splits which does not work in the case of SymlinkTextInputFormat. This is due to 1. Because we don't set any length (and start) those splits are considered to be empty and are removed from the final list of partitions even though the target splits themselves are non-empty. Technically, this needs to be addressed in Hive but I figured it would be much faster to fix this in Spark. The PR introduces `DelegateSymlinkTextInputFormat` that wraps SymlinkTextInputFormat and provides splits with the correct start and length attributes. This is controlled by `spark.sql.hive.useDelegateForSymlinkTextInputFormat` which is enabled by default. When disabled, the user-provided SymlinkTextInputFormat will be used. ### Why are the changes needed? Fixes a correctness issue when using `SymlinkTextInputSplit` in Spark. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added a unit test that reproduces the issue and verified that it passes with the fix. Closes apache#38277 from sadikovi/fix-symlink-input-format. Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…tests for JDK 9+ ### What changes were proposed in this pull request? This PR is a follow-up for apache#38277. This change is required due to test failures in JDK 11 and JDK 17. The patch disables the unit test for JDK 9+. This is a temporary measure while I am debugging and working on the fix for higher versions of JDK. ### Why are the changes needed? Fixes the test failure in JDK 11. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? N/A. Closes apache#38504 from sadikovi/fix-symlink-test. Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
This PR is a follow-up for #31909. In the original PR,
spark.hadoopRDD.ignoreEmptySplitswas enabled due to seemingly no side-effects, however, this change breaksSymlinkTextInputFormatso any table that uses the input format would return empty results.This is due to a combination of problems:
startandlengthfields from the target split.SymlinkTextInputSplitis an abstraction over FileSplit and all downstream systems treat it as such - those fields should be extracted and passed from the target split.spark.hadoopRDD.ignoreEmptySplitsbeing enabled causes HadoopRDD to filter out all of the empty splits which does not work in the case of SymlinkTextInputFormat. This is due to 1. Because we don't set any length (and start) those splits are considered to be empty and are removed from the final list of partitions even though the target splits themselves are non-empty.Technically, this needs to be addressed in Hive but I figured it would be much faster to fix this in Spark.
The PR introduces
DelegateSymlinkTextInputFormatthat wraps SymlinkTextInputFormat and provides splits with the correct start and length attributes. This is controlled byspark.sql.hive.useDelegateForSymlinkTextInputFormatwhich is enabled by default. When disabled, the user-provided SymlinkTextInputFormat will be used.Why are the changes needed?
Fixes a correctness issue when using
SymlinkTextInputSplitin Spark.Does this PR introduce any user-facing change?
No.
How was this patch tested?
I added a unit test that reproduces the issue and verified that it passes with the fix.