Skip to content

Commit 0a7eda3

Browse files
bersprocketsdongjoon-hyun
authored andcommitted
[SPARK-38075][SQL][3.1] Fix hasNext in HiveScriptTransformationExec's process output iterator
Backport #35368 to 3.1. ### What changes were proposed in this pull request? Fix hasNext in HiveScriptTransformationExec's process output iterator to always return false if it had previously returned false. ### Why are the changes needed? When hasNext on the process output iterator returns false, it leaves the iterator in a state (i.e., scriptOutputWritable is not null) such that the next call returns true. The Guava Ordering used in TakeOrderedAndProjectExec will call hasNext on the process output iterator even after an earlier call had returned false. This results in fake rows when script transform is used with `order by` and `limit`. For example: ``` create or replace temp view t as select * from values (1), (2), (3) as t(a); select transform(a) USING 'cat' AS (a int) FROM t order by a limit 10; ``` This returns: ``` NULL NULL NULL 1 2 3 ``` ### Does this PR introduce _any_ user-facing change? No, other than removing the correctness issue. ### How was this patch tested? New unit test. Closes #35375 from bersprockets/SPARK-38075_3.1. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 91db9a3 commit 0a7eda3

File tree

2 files changed

+24
-1
lines changed

2 files changed

+24
-1
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala

+6-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ private[hive] case class HiveScriptTransformationExec(
6464
outputSoi: StructObjectInspector,
6565
hadoopConf: Configuration): Iterator[InternalRow] = {
6666
new Iterator[InternalRow] with HiveInspectors {
67-
var curLine: String = null
67+
private var completed = false
6868
val scriptOutputStream = new DataInputStream(inputStream)
6969

7070
val scriptOutputReader =
@@ -78,13 +78,17 @@ private[hive] case class HiveScriptTransformationExec(
7878
lazy val unwrappers = outputSoi.getAllStructFieldRefs.asScala.map(unwrapperFor)
7979

8080
override def hasNext: Boolean = {
81+
if (completed) {
82+
return false
83+
}
8184
try {
8285
if (scriptOutputWritable == null) {
8386
scriptOutputWritable = reusedWritableObject
8487

8588
if (scriptOutputReader != null) {
8689
if (scriptOutputReader.next(scriptOutputWritable) <= 0) {
8790
checkFailureAndPropagate(writerThread, null, proc, stderrBuffer)
91+
completed = true
8892
return false
8993
}
9094
} else {
@@ -97,6 +101,7 @@ private[hive] case class HiveScriptTransformationExec(
97101
// there can be a lag between EOF being written out and the process
98102
// being terminated. So explicitly waiting for the process to be done.
99103
checkFailureAndPropagate(writerThread, null, proc, stderrBuffer)
104+
completed = true
100105
return false
101106
}
102107
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala

+18
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
2323
import org.scalatest.exceptions.TestFailedException
2424

2525
import org.apache.spark.{SparkException, TestUtils}
26+
import org.apache.spark.sql.Row
2627
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
2728
import org.apache.spark.sql.execution._
2829
import org.apache.spark.sql.functions._
@@ -438,4 +439,21 @@ class HiveScriptTransformationSuite extends BaseScriptTransformationSuite with T
438439
assert(e2.contains("array<double> cannot be converted to Hive TypeInfo"))
439440
}
440441
}
442+
443+
test("SPARK-38075: ORDER BY with LIMIT should not add fake rows") {
444+
withTempView("v") {
445+
val df = Seq((1), (2), (3)).toDF("a")
446+
df.createTempView("v")
447+
checkAnswer(sql(
448+
"""
449+
|SELECT TRANSFORM(a)
450+
| USING 'cat' AS (a)
451+
|FROM v
452+
|ORDER BY a
453+
|LIMIT 10
454+
|""".stripMargin),
455+
identity,
456+
Row("1") :: Row("2") :: Row("3") :: Nil)
457+
}
458+
}
441459
}

0 commit comments

Comments
 (0)