diff --git a/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java b/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java index 25420f74f2f2f..e7c56edffe2ad 100644 --- a/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java +++ b/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java @@ -30,6 +30,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TextInputFormat; /** * Delegate for SymlinkTextInputFormat, created to address SPARK-40815. @@ -95,8 +96,18 @@ public void readFields(DataInput in) throws IOException { @Override public RecordReader getRecordReader( InputSplit split, JobConf job, Reporter reporter) throws IOException { - InputSplit targetSplit = ((DelegateSymlinkTextInputSplit) split).getSplit(); - return super.getRecordReader(targetSplit, job, reporter); + DelegateSymlinkTextInputSplit delegateSplit = (DelegateSymlinkTextInputSplit) split; + InputSplit targetSplit = ((SymlinkTextInputSplit) delegateSplit.getSplit()).getTargetSplit(); + + // SPARK-40815: the code is derived from + // https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java + // However, we use the TextInputFormat record reader directly without HiveRecordReader to avoid + // ExecMapper.getDone() checks. + + // The target data is in TextInputFormat. + TextInputFormat inputFormat = new TextInputFormat(); + inputFormat.configure(job); + return inputFormat.getRecordReader(targetSplit, job, reporter); } @Override diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala index 05cd751d43483..aafc4764d2465 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala @@ -22,7 +22,6 @@ import java.nio.charset.StandardCharsets import java.nio.file.Files import java.sql.{Date, Timestamp} -import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.io.{DelegateSymlinkTextInputFormat, SymlinkTextInputFormat} import org.apache.hadoop.mapred.FileSplit; @@ -228,9 +227,6 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS } test("SPARK-40815: DelegateSymlinkTextInputFormat serialization") { - // Ignored due to JDK 11 failures reported in https://github.com/apache/spark/pull/38277. - assume(!SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) - def assertSerDe(split: DelegateSymlinkTextInputFormat.DelegateSymlinkTextInputSplit): Unit = { val buf = new ByteArrayOutputStream() val out = new DataOutputStream(buf) @@ -266,9 +262,6 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS } test("SPARK-40815: Read SymlinkTextInputFormat") { - // Ignored due to JDK 11 failures reported in https://github.com/apache/spark/pull/38277. - assume(!SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) - withTable("t") { withTempDir { root => val dataPath = new File(root, "data") @@ -300,6 +293,12 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS (0 until 10).map(Row(_)) ) + // Verify limit since we bypass ExecMapper.getDone(). + checkAnswer( + sql("SELECT id FROM t ORDER BY id ASC LIMIT 2"), + (0 until 2).map(Row(_)) + ) + // Verify that with the flag disabled, we use the original SymlinkTextInputFormat // which has the empty splits issue and therefore the result should be empty. withSQLConf(