Skip to content

Commit 2dd7f93

Browse files
zhichao-limarmbrus
authored andcommitted
[SPARK-7862] [SQL] Fix the deadlock in script transformation for stderr
[Related PR SPARK-7044] (#5671) Author: zhichao.li <zhichao.li@intel.com> Closes #6404 from zhichao-li/transform and squashes the following commits: 8418c97 [zhichao.li] add comments and remove useless failAfter logic d9677e1 [zhichao.li] redirect the error desitination to be the same as the current process
1 parent b9d177c commit 2dd7f93

File tree

2 files changed

+17
-2
lines changed

2 files changed

+17
-2
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.hive.execution
1919

2020
import java.io.{BufferedReader, DataInputStream, DataOutputStream, EOFException, InputStreamReader}
21+
import java.lang.ProcessBuilder.Redirect
2122
import java.util.Properties
2223

2324
import scala.collection.JavaConversions._
@@ -58,6 +59,12 @@ case class ScriptTransformation(
5859
child.execute().mapPartitions { iter =>
5960
val cmd = List("/bin/bash", "-c", script)
6061
val builder = new ProcessBuilder(cmd)
62+
// redirectError(Redirect.INHERIT) would consume the error output from buffer and
63+
// then print it to stderr (inherit the target from the current Scala process).
64+
// If without this there would be 2 issues:
65+
// 1) The error msg generated by the script process would be hidden.
66+
// 2) If the error msg is too big to chock up the buffer, the input logic would be hung
67+
builder.redirectError(Redirect.INHERIT)
6168
val proc = builder.start()
6269
val inputStream = proc.getInputStream
6370
val outputStream = proc.getOutputStream

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -645,12 +645,20 @@ class SQLQuerySuite extends QueryTest {
645645
.queryExecution.analyzed
646646
}
647647

648-
test("test script transform") {
648+
test("test script transform for stdout") {
649649
val data = (1 to 100000).map { i => (i, i, i) }
650650
data.toDF("d1", "d2", "d3").registerTempTable("script_trans")
651651
assert(100000 ===
652652
sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat' AS (a,b,c) FROM script_trans")
653-
.queryExecution.toRdd.count())
653+
.queryExecution.toRdd.count())
654+
}
655+
656+
test("test script transform for stderr") {
657+
val data = (1 to 100000).map { i => (i, i, i) }
658+
data.toDF("d1", "d2", "d3").registerTempTable("script_trans")
659+
assert(0 ===
660+
sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat 1>&2' AS (a,b,c) FROM script_trans")
661+
.queryExecution.toRdd.count())
654662
}
655663

656664
test("window function: udaf with aggregate expressin") {

0 commit comments

Comments
 (0)