Skip to content

Commit 2237e81

Browse files
fix the deadlock in ScriptTransform
1 parent e76317b commit 2237e81

File tree

2 files changed

+29
-12
lines changed

2 files changed

+29
-12
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -146,20 +146,29 @@ case class ScriptTransformation(
146146
val dataOutputStream = new DataOutputStream(outputStream)
147147
val outputProjection = new InterpretedProjection(input, child.output)
148148

149-
iter
150-
.map(outputProjection)
151-
.foreach { row =>
152-
if (inputSerde == null) {
153-
val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
154-
ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
155-
156-
outputStream.write(data)
157-
} else {
158-
val writable = inputSerde.serialize(row.asInstanceOf[GenericRow].values, inputSoi)
159-
prepareWritable(writable).write(dataOutputStream)
149+
// Put the write(output to the pipeline) into a single thread
150+
// and keep the collector as remain in the main thread.
151+
// otherwise it will causes deadlock if the data size greater than
152+
// the pipeline / buffer capacity.
153+
new Thread(new Runnable() {
154+
override def run(): Unit = {
155+
iter
156+
.map(outputProjection)
157+
.foreach { row =>
158+
if (inputSerde == null) {
159+
val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
160+
ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
161+
162+
outputStream.write(data)
163+
} else {
164+
val writable = inputSerde.serialize(row.asInstanceOf[GenericRow].values, inputSoi)
165+
prepareWritable(writable).write(dataOutputStream)
166+
}
160167
}
168+
outputStream.close()
161169
}
162-
outputStream.close()
170+
}).start()
171+
163172
iterator
164173
}
165174
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,4 +538,12 @@ class SQLQuerySuite extends QueryTest {
538538
sql(s"DROP TABLE $tableName")
539539
}
540540
}
541+
542+
test("test script transform") {
543+
val data = (1 to 100000).map { i => (i, i, i) }
544+
data.toDF("d1", "d2", "d3").registerTempTable("script_trans")
545+
assert(100000 ===
546+
sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat' AS (a,b,c) FROM script_trans")
547+
.queryExecution.toRdd.count())
548+
}
541549
}

0 commit comments

Comments
 (0)