diff --git a/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java b/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java
new file mode 100644
index 0000000000000..25420f74f2f2f
--- /dev/null
+++ b/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Delegate for SymlinkTextInputFormat, created to address SPARK-40815.
+ * Fixes an issue where SymlinkTextInputFormat returns empty splits which could result in
+ * the correctness issue when "spark.hadoopRDD.ignoreEmptySplits" is enabled.
+ *
+ * In this class, we update the split start and length to match the target file input thus fixing
+ * the issue.
+ */
+public class DelegateSymlinkTextInputFormat extends SymlinkTextInputFormat {
+
+ public static class DelegateSymlinkTextInputSplit extends FileSplit {
+ private Path targetPath; // Path to the actual data file, not the symlink file.
+
+ // Used for deserialisation.
+ public DelegateSymlinkTextInputSplit() {
+ super((Path) null, 0, 0, (String[]) null);
+ targetPath = null;
+ }
+
+ public DelegateSymlinkTextInputSplit(SymlinkTextInputSplit split) throws IOException {
+ // It is fine to set start and length to the target file split because
+ // SymlinkTextInputFormat maintains 1-1 mapping between SymlinkTextInputSplit and FileSplit.
+ super(split.getPath(),
+ split.getTargetSplit().getStart(),
+ split.getTargetSplit().getLength(),
+ split.getTargetSplit().getLocations());
+ this.targetPath = split.getTargetSplit().getPath();
+ }
+
+ /**
+ * Returns target path.
+ * Visible for testing.
+ */
+ public Path getTargetPath() {
+ return targetPath;
+ }
+
+ /**
+ * Reconstructs the delegate input split.
+ */
+ private SymlinkTextInputSplit getSplit() throws IOException {
+ return new SymlinkTextInputSplit(
+ getPath(),
+ new FileSplit(targetPath, getStart(), getLength(), getLocations())
+ );
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ Text.writeString(out, (this.targetPath != null) ? this.targetPath.toString() : "");
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ String target = Text.readString(in);
+ this.targetPath = (!target.isEmpty()) ? new Path(target) : null;
+ }
+ }
+
+ @Override
+ public RecordReader getRecordReader(
+ InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ InputSplit targetSplit = ((DelegateSymlinkTextInputSplit) split).getSplit();
+ return super.getRecordReader(targetSplit, job, reporter);
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ InputSplit[] splits = super.getSplits(job, numSplits);
+ for (int i = 0; i < splits.length; i++) {
+ SymlinkTextInputSplit split = (SymlinkTextInputSplit) splits[i];
+ splits[i] = new DelegateSymlinkTextInputSplit(split);
+ }
+ return splits;
+ }
+
+ @Override
+ public void configure(JobConf job) {
+ super.configure(job);
+ }
+
+ @Override
+ public ContentSummary getContentSummary(Path p, JobConf job) throws IOException {
+ return super.getContentSummary(p, job);
+ }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 143bcff95f88f..e00b22abc68d5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -198,6 +198,15 @@ private[spark] object HiveUtils extends Logging {
.booleanConf
.createWithDefault(true)
+ val USE_DELEGATE_FOR_SYMLINK_TEXT_INPUT_FORMAT =
+ buildConf("spark.sql.hive.useDelegateForSymlinkTextInputFormat")
+ .internal()
+ .doc("When true, SymlinkTextInputFormat is replaced with a similar delegate class during " +
+ "table scan in order to fix the issue of empty splits")
+ .version("3.4.0")
+ .booleanConf
+ .createWithDefault(true)
+
/**
* The version of the hive client that will be used to communicate with the metastore. Note that
* this does not necessarily need to be the same version of Hive that is used internally by
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index 05dd3ba6f5567..63e7d28c42ad9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -20,12 +20,14 @@ package org.apache.spark.sql.hive.execution
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.ql.io.{DelegateSymlinkTextInputFormat, SymlinkTextInputFormat}
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
+import org.apache.hadoop.mapred.InputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
@@ -89,7 +91,7 @@ case class HiveTableScanExec(
@transient private lazy val hiveQlTable = HiveClientImpl.toHiveTable(relation.tableMeta)
@transient private lazy val tableDesc = new TableDesc(
- hiveQlTable.getInputFormatClass,
+ getInputFormat(hiveQlTable.getInputFormatClass, conf),
hiveQlTable.getOutputFormatClass,
hiveQlTable.getMetadata)
@@ -231,6 +233,20 @@ case class HiveTableScanExec(
predicates.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral))
}
+ // Optionally returns a delegate input format based on the provided input format class.
+ // This is currently used to replace SymlinkTextInputFormat with DelegateSymlinkTextInputFormat
+ // in order to fix SPARK-40815.
+ private def getInputFormat(
+ inputFormatClass: Class[_ <: InputFormat[_, _]],
+ conf: SQLConf): Class[_ <: InputFormat[_, _]] = {
+ if (inputFormatClass == classOf[SymlinkTextInputFormat] &&
+ conf != null && conf.getConf(HiveUtils.USE_DELEGATE_FOR_SYMLINK_TEXT_INPUT_FORMAT)) {
+ classOf[DelegateSymlinkTextInputFormat]
+ } else {
+ inputFormatClass
+ }
+ }
+
override def doCanonicalize(): HiveTableScanExec = {
val input: AttributeSeq = relation.output
HiveTableScanExec(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala
index 3de2489f8deff..6c509297c1ab9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala
@@ -17,12 +17,19 @@
package org.apache.spark.sql.hive.execution
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File}
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
import java.sql.{Date, Timestamp}
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.ql.io.{DelegateSymlinkTextInputFormat, SymlinkTextInputFormat}
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS
import org.apache.spark.sql.{QueryTest, Row}
-import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET}
+import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET, USE_DELEGATE_FOR_SYMLINK_TEXT_INPUT_FORMAT}
import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION
+import org.apache.spark.sql.internal.SQLConf.{ORC_IMPLEMENTATION}
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.tags.SlowHiveTest
@@ -218,4 +225,86 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS
checkAnswer(spark.table("t1"), Seq(Row(Array("SPARK-34512", "HIVE-24797"))))
}
}
+
+ test("SPARK-40815: DelegateSymlinkTextInputFormat serialization") {
+ def assertSerDe(split: DelegateSymlinkTextInputFormat.DelegateSymlinkTextInputSplit): Unit = {
+ val buf = new ByteArrayOutputStream()
+ val out = new DataOutputStream(buf)
+ try {
+ split.write(out)
+ } finally {
+ out.close()
+ }
+
+ val res = new DelegateSymlinkTextInputFormat.DelegateSymlinkTextInputSplit()
+ val in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()))
+ try {
+ res.readFields(in)
+ } finally {
+ in.close()
+ }
+
+ assert(split.getPath == res.getPath)
+ assert(split.getStart == res.getStart)
+ assert(split.getLength == res.getLength)
+ assert(split.getLocations.toSeq == res.getLocations.toSeq)
+ assert(split.getTargetPath == res.getTargetPath)
+ }
+
+ assertSerDe(
+ new DelegateSymlinkTextInputFormat.DelegateSymlinkTextInputSplit(
+ new SymlinkTextInputFormat.SymlinkTextInputSplit(
+ new Path("file:/tmp/symlink"),
+ new FileSplit(new Path("file:/tmp/file"), 1L, 2L, Array[String]())
+ )
+ )
+ )
+ }
+
+ test("SPARK-40815: Read SymlinkTextInputFormat") {
+ withTable("t") {
+ withTempDir { root =>
+ val dataPath = new File(root, "data")
+ val symlinkPath = new File(root, "symlink")
+
+ spark.range(10).selectExpr("cast(id as string) as value")
+ .repartition(4).write.text(dataPath.getAbsolutePath)
+
+ // Generate symlink manifest file.
+ val files = dataPath.listFiles().filter(_.getName.endsWith(".txt"))
+ assert(files.length > 0)
+
+ symlinkPath.mkdir()
+ Files.write(
+ new File(symlinkPath, "symlink.txt").toPath,
+ files.mkString("\n").getBytes(StandardCharsets.UTF_8)
+ )
+
+ sql(s"""
+ CREATE TABLE t (id bigint)
+ STORED AS
+ INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
+ OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+ LOCATION '${symlinkPath.getAbsolutePath}';
+ """)
+
+ checkAnswer(
+ sql("SELECT id FROM t ORDER BY id ASC"),
+ (0 until 10).map(Row(_))
+ )
+
+ // Verify that with the flag disabled, we use the original SymlinkTextInputFormat
+ // which has the empty splits issue and therefore the result should be empty.
+ withSQLConf(
+ HADOOP_RDD_IGNORE_EMPTY_SPLITS.key -> "true",
+ USE_DELEGATE_FOR_SYMLINK_TEXT_INPUT_FORMAT.key -> "false") {
+
+ checkAnswer(
+ sql("SELECT id FROM t ORDER BY id ASC"),
+ Seq.empty[Row]
+ )
+ }
+ }
+ }
+ }
}