Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import java.io.OutputStream
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit

import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration

import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.{CircularBuffer, SerializableConfiguration, Utils}

trait BaseScriptTransformationExec extends UnaryExecNode {

override def producedAttributes: AttributeSet = outputSet -- inputSet

override def outputPartitioning: Partitioning = child.outputPartitioning

override def doExecute(): RDD[InternalRow] = {
val broadcastedHadoopConf =
new SerializableConfiguration(sqlContext.sessionState.newHadoopConf())

child.execute().mapPartitions { iter =>
if (iter.hasNext) {
val proj = UnsafeProjection.create(schema)
processIterator(iter, broadcastedHadoopConf.value).map(proj)
} else {
// If the input iterator has no rows then do not launch the external script.
Iterator.empty
}
}
}

def processIterator(
inputIterator: Iterator[InternalRow],
hadoopConf: Configuration): Iterator[InternalRow]

protected def checkFailureAndPropagate(
writerThread: BaseScriptTransformationWriterThread,
cause: Throwable = null,
proc: Process,
stderrBuffer: CircularBuffer): Unit = {
if (writerThread.exception.isDefined) {
throw writerThread.exception.get
}

// There can be a lag between reader read EOF and the process termination.
// If the script fails to startup, this kind of error may be missed.
// So explicitly waiting for the process termination.
val timeout = conf.getConf(SQLConf.SCRIPT_TRANSFORMATION_EXIT_TIMEOUT)
val exitRes = proc.waitFor(timeout, TimeUnit.SECONDS)
if (!exitRes) {
log.warn(s"Transformation script process exits timeout in $timeout seconds")
}

if (!proc.isAlive) {
val exitCode = proc.exitValue()
if (exitCode != 0) {
logError(stderrBuffer.toString) // log the stderr circular buffer
throw new SparkException(s"Subprocess exited with status $exitCode. " +
s"Error: ${stderrBuffer.toString}", cause)
}
}
}
}

abstract class BaseScriptTransformationWriterThread(
iter: Iterator[InternalRow],
inputSchema: Seq[DataType],
ioSchema: BaseScriptTransformIOSchema,
outputStream: OutputStream,
proc: Process,
stderrBuffer: CircularBuffer,
taskContext: TaskContext,
conf: Configuration) extends Thread with Logging {

setDaemon(true)

@volatile protected var _exception: Throwable = null

/** Contains the exception thrown while writing the parent iterator to the external process. */
def exception: Option[Throwable] = Option(_exception)

protected def processRows(): Unit

protected def processRowsWithoutSerde(): Unit = {
val len = inputSchema.length
iter.foreach { row =>
val data = if (len == 0) {
ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")
} else {
val sb = new StringBuilder
sb.append(row.get(0, inputSchema(0)))
var i = 1
while (i < len) {
sb.append(ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"))
sb.append(row.get(i, inputSchema(i)))
i += 1
}
sb.append(ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATLINES"))
sb.toString()
}
outputStream.write(data.getBytes(StandardCharsets.UTF_8))
}
}

override def run(): Unit = Utils.logUncaughtExceptions {
TaskContext.setTaskContext(taskContext)

// We can't use Utils.tryWithSafeFinally here because we also need a `catch` block, so
// let's use a variable to record whether the `finally` block was hit due to an exception
var threwException: Boolean = true
try {
processRows()
threwException = false
} catch {
// SPARK-25158 Exception should not be thrown again, otherwise it will be captured by
// SparkUncaughtExceptionHandler, then Executor will exit because of this Uncaught Exception,
// so pass the exception to `ScriptTransformationExec` is enough.
case t: Throwable =>
// An error occurred while writing input, so kill the child process. According to the
// Javadoc this call will not throw an exception:
_exception = t
proc.destroy()
logError("Thread-ScriptTransformation-Feed exit cause by: ", t)
} finally {
try {
Utils.tryLogNonFatalError(outputStream.close())
if (proc.waitFor() != 0) {
logError(stderrBuffer.toString) // log the stderr circular buffer
}
} catch {
case NonFatal(exceptionFromFinallyBlock) =>
if (!threwException) {
throw exceptionFromFinallyBlock
} else {
log.error("Exception in finally block", exceptionFromFinallyBlock)
}
}
}
}
}

/**
* The wrapper class of input and output schema properties
*/
abstract class BaseScriptTransformIOSchema extends Serializable {
import ScriptIOSchema._

def inputRowFormat: Seq[(String, String)]

def outputRowFormat: Seq[(String, String)]

def inputSerdeClass: Option[String]

def outputSerdeClass: Option[String]

def inputSerdeProps: Seq[(String, String)]

def outputSerdeProps: Seq[(String, String)]

def recordReaderClass: Option[String]

def recordWriterClass: Option[String]

def schemaLess: Boolean

val inputRowFormatMap = inputRowFormat.toMap.withDefault((k) => defaultFormat(k))
val outputRowFormatMap = outputRowFormat.toMap.withDefault((k) => defaultFormat(k))
}

object ScriptIOSchema {
val defaultFormat = Map(
("TOK_TABLEROWFORMATFIELD", "\t"),
("TOK_TABLEROWFORMATLINES", "\n")
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
override val sparkSession: SparkSession = session

override def extraPlanningStrategies: Seq[Strategy] =
super.extraPlanningStrategies ++ customPlanningStrategies ++ Seq(HiveTableScans, Scripts)
super.extraPlanningStrategies ++ customPlanningStrategies ++
Seq(HiveTableScans, HiveScripts)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.hive.execution.{HiveScriptIOSchema, HiveScriptTransformationExec}
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}


Expand Down Expand Up @@ -237,11 +238,11 @@ private[hive] trait HiveStrategies {

val sparkSession: SparkSession

object Scripts extends Strategy {
object HiveScripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ScriptTransformation(input, script, output, child, ioschema) =>
val hiveIoSchema = HiveScriptIOSchema(ioschema)
ScriptTransformationExec(input, script, output, planLater(child), hiveIoSchema) :: Nil
HiveScriptTransformationExec(input, script, output, planLater(child), hiveIoSchema) :: Nil
case _ => Nil
}
}
Expand Down
Loading