Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,17 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
* @param input the set of expression that should be passed to the script.
* @param script the command that should be executed.
* @param output the attributes that are produced by the script.
* @param ioschema the input and output schema applied in the execution of the script.
*/
case class ScriptTransformation(
input: Seq[Expression],
script: String,
output: Seq[Attribute],
child: LogicalPlan) extends UnaryNode
child: LogicalPlan,
ioschema: ScriptInputOutputSchema) extends UnaryNode

/**
* A placeholder for implementation specific input and output properties when passing data
* to a script. For example, in Hive this would specify which SerDes to use.
*/
trait ScriptInputOutputSchema
59 changes: 47 additions & 12 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.ExplainCommand
import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DropTable, AnalyzeTable}
import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DropTable, AnalyzeTable, HiveScriptIOSchema}
import org.apache.spark.sql.types._

/* Implicit conversions */
Expand Down Expand Up @@ -627,29 +627,64 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
case Token("TOK_SELEXPR",
Token("TOK_TRANSFORM",
Token("TOK_EXPLIST", inputExprs) ::
Token("TOK_SERDE", Nil) ::
Token("TOK_SERDE", inputSerdeClause) ::
Token("TOK_RECORDWRITER", writerClause) ::
// TODO: Need to support other types of (in/out)put
Token(script, Nil) ::
Token("TOK_SERDE", serdeClause) ::
Token("TOK_SERDE", outputSerdeClause) ::
Token("TOK_RECORDREADER", readerClause) ::
outputClause :: Nil) :: Nil) =>

val output = outputClause match {
case Token("TOK_ALIASLIST", aliases) =>
aliases.map { case Token(name, Nil) => AttributeReference(name, StringType)() }
case Token("TOK_TABCOLLIST", attributes) =>
attributes.map { case Token("TOK_TABCOL", Token(name, Nil) :: dataType :: Nil) =>
AttributeReference(name, nodeToDataType(dataType))() }
outputClause) :: Nil) =>

val (output, schemaLess) = outputClause match {
case Token("TOK_ALIASLIST", aliases) :: Nil =>
(aliases.map { case Token(name, Nil) => AttributeReference(name, StringType)() },
false)
case Token("TOK_TABCOLLIST", attributes) :: Nil =>
(attributes.map { case Token("TOK_TABCOL", Token(name, Nil) :: dataType :: Nil) =>
AttributeReference(name, nodeToDataType(dataType))() }, false)
case Nil =>
(List(AttributeReference("key", StringType)(),
AttributeReference("value", StringType)()), true)
}

def matchSerDe(clause: Seq[ASTNode]) = clause match {
case Token("TOK_SERDEPROPS", propsClause) :: Nil =>
val rowFormat = propsClause.map {
case Token(name, Token(value, Nil) :: Nil) => (name, value)
}
(rowFormat, "", Nil)

case Token("TOK_SERDENAME", Token(serdeClass, Nil) :: Nil) :: Nil =>
(Nil, serdeClass, Nil)

case Token("TOK_SERDENAME", Token(serdeClass, Nil) ::
Token("TOK_TABLEPROPERTIES",
Token("TOK_TABLEPROPLIST", propsClause) :: Nil) :: Nil) :: Nil =>
val serdeProps = propsClause.map {
case Token("TOK_TABLEPROPERTY", Token(name, Nil) :: Token(value, Nil) :: Nil) =>
(name, value)
}
(Nil, serdeClass, serdeProps)

case Nil => (Nil, "", Nil)
}

val (inRowFormat, inSerdeClass, inSerdeProps) = matchSerDe(inputSerdeClause)
val (outRowFormat, outSerdeClass, outSerdeProps) = matchSerDe(outputSerdeClause)

val unescapedScript = BaseSemanticAnalyzer.unescapeSQLString(script)

val schema = HiveScriptIOSchema(
inRowFormat, outRowFormat,
inSerdeClass, outSerdeClass,
inSerdeProps, outSerdeProps, schemaLess)

Some(
logical.ScriptTransformation(
inputExprs.map(nodeToExpr),
unescapedScript,
output,
withWhere))
withWhere, schema))
case _ => None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ private[hive] trait HiveStrategies {

object Scripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ScriptTransformation(input, script, output, child) =>
ScriptTransformation(input, script, output, planLater(child))(hiveContext) :: Nil
case logical.ScriptTransformation(input, script, output, child, schema: HiveScriptIOSchema) =>
ScriptTransformation(input, script, output, planLater(child), schema)(hiveContext) :: Nil
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,26 @@
package org.apache.spark.sql.hive.execution

import java.io.{BufferedReader, InputStreamReader}
import java.io.{DataInputStream, DataOutputStream, EOFException}
import java.util.Properties

import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.AbstractSerDe
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.ScriptInputOutputSchema
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.hive.{HiveContext, HiveInspectors}
import org.apache.spark.sql.hive.HiveShim._
import org.apache.spark.util.Utils


/* Implicit conversions */
import scala.collection.JavaConversions._
Expand All @@ -40,7 +55,8 @@ case class ScriptTransformation(
input: Seq[Expression],
script: String,
output: Seq[Attribute],
child: SparkPlan)(@transient sc: HiveContext)
child: SparkPlan,
ioschema: HiveScriptIOSchema)(@transient sc: HiveContext)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can understand why removing the reader thread, but it would be helpful in the future if we support the streaming style output, which will save lots of memory, do you might leave it unchanged? or at least keep the TODO

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am wrong please let me know, but I suppose that because I use an iterator here so it shouldn't already be the streaming style output and do as what the TODO wants?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, got it, that make sense.

extends UnaryNode {

override def otherCopyArgs = sc :: Nil
Expand All @@ -53,28 +69,202 @@ case class ScriptTransformation(
val inputStream = proc.getInputStream
val outputStream = proc.getOutputStream
val reader = new BufferedReader(new InputStreamReader(inputStream))

val (outputSerde, outputSoi) = ioschema.initOutputSerDe(output)

val iterator: Iterator[Row] = new Iterator[Row] with HiveInspectors {
var cacheRow: Row = null
var curLine: String = null
var eof: Boolean = false

override def hasNext: Boolean = {
if (outputSerde == null) {
if (curLine == null) {
curLine = reader.readLine()
curLine != null
} else {
true
}
} else {
!eof
}
}

def deserialize(): Row = {
if (cacheRow != null) return cacheRow

val mutableRow = new SpecificMutableRow(output.map(_.dataType))
try {
val dataInputStream = new DataInputStream(inputStream)
val writable = outputSerde.getSerializedClass().newInstance
writable.readFields(dataInputStream)

val raw = outputSerde.deserialize(writable)
val dataList = outputSoi.getStructFieldsDataAsList(raw)
val fieldList = outputSoi.getAllStructFieldRefs()

var i = 0
dataList.foreach( element => {
if (element == null) {
mutableRow.setNullAt(i)
} else {
mutableRow(i) = unwrap(element, fieldList(i).getFieldObjectInspector)
}
i += 1
})
return mutableRow
} catch {
case e: EOFException =>
eof = true
return null
}
}

// TODO: This should be exposed as an iterator instead of reading in all the data at once.
val outputLines = collection.mutable.ArrayBuffer[Row]()
val readerThread = new Thread("Transform OutputReader") {
override def run() {
var curLine = reader.readLine()
while (curLine != null) {
// TODO: Use SerDe
outputLines += new GenericRow(curLine.split("\t").asInstanceOf[Array[Any]])
override def next(): Row = {
if (!hasNext) {
throw new NoSuchElementException
}

if (outputSerde == null) {
val prevLine = curLine
curLine = reader.readLine()

if (!ioschema.schemaLess) {
new GenericRow(
prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD"))
.asInstanceOf[Array[Any]])
} else {
new GenericRow(
prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD"), 2)
.asInstanceOf[Array[Any]])
}
} else {
val ret = deserialize()
if (!eof) {
cacheRow = null
cacheRow = deserialize()
}
ret
}
}
}
readerThread.start()

val (inputSerde, inputSoi) = ioschema.initInputSerDe(input)
val dataOutputStream = new DataOutputStream(outputStream)
val outputProjection = new InterpretedProjection(input, child.output)

iter
.map(outputProjection)
// TODO: Use SerDe
.map(_.mkString("", "\t", "\n").getBytes("utf-8")).foreach(outputStream.write)
.foreach { row =>
if (inputSerde == null) {
val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")

outputStream.write(data)
} else {
val writable = inputSerde.serialize(row.asInstanceOf[GenericRow].values, inputSoi)
prepareWritable(writable).write(dataOutputStream)
}
}
outputStream.close()
readerThread.join()
outputLines.toIterator
iterator
}
}
}

/**
* The wrapper class of Hive input and output schema properties
*/
case class HiveScriptIOSchema (
inputRowFormat: Seq[(String, String)],
outputRowFormat: Seq[(String, String)],
inputSerdeClass: String,
outputSerdeClass: String,
inputSerdeProps: Seq[(String, String)],
outputSerdeProps: Seq[(String, String)],
schemaLess: Boolean) extends ScriptInputOutputSchema with HiveInspectors {

val defaultFormat = Map(("TOK_TABLEROWFORMATFIELD", "\t"),
("TOK_TABLEROWFORMATLINES", "\n"))

val inputRowFormatMap = inputRowFormat.toMap.withDefault((k) => defaultFormat(k))
val outputRowFormatMap = outputRowFormat.toMap.withDefault((k) => defaultFormat(k))


def initInputSerDe(input: Seq[Expression]): (AbstractSerDe, ObjectInspector) = {
val (columns, columnTypes) = parseAttrs(input)
val serde = initSerDe(inputSerdeClass, columns, columnTypes, inputSerdeProps)
(serde, initInputSoi(serde, columns, columnTypes))
}

def initOutputSerDe(output: Seq[Attribute]): (AbstractSerDe, StructObjectInspector) = {
val (columns, columnTypes) = parseAttrs(output)
val serde = initSerDe(outputSerdeClass, columns, columnTypes, outputSerdeProps)
(serde, initOutputputSoi(serde))
}

def parseAttrs(attrs: Seq[Expression]): (Seq[String], Seq[DataType]) = {

val columns = attrs.map {
case aref: AttributeReference => aref.name
case e: NamedExpression => e.name
case _ => null
}

val columnTypes = attrs.map {
case aref: AttributeReference => aref.dataType
case e: NamedExpression => e.dataType
case _ => null
}

(columns, columnTypes)
}

def initSerDe(serdeClassName: String, columns: Seq[String],
columnTypes: Seq[DataType], serdeProps: Seq[(String, String)]): AbstractSerDe = {

val serde: AbstractSerDe = if (serdeClassName != "") {
val trimed_class = serdeClassName.split("'")(1)
Utils.classForName(trimed_class)
.newInstance.asInstanceOf[AbstractSerDe]
} else {
null
}

if (serde != null) {
val columnTypesNames = columnTypes.map(_.toTypeInfo.getTypeName()).mkString(",")

var propsMap = serdeProps.map(kv => {
(kv._1.split("'")(1), kv._2.split("'")(1))
}).toMap + (serdeConstants.LIST_COLUMNS -> columns.mkString(","))
propsMap = propsMap + (serdeConstants.LIST_COLUMN_TYPES -> columnTypesNames)

val properties = new Properties()
properties.putAll(propsMap)
serde.initialize(null, properties)
}

serde
}

def initInputSoi(inputSerde: AbstractSerDe, columns: Seq[String], columnTypes: Seq[DataType])
: ObjectInspector = {

if (inputSerde != null) {
val fieldObjectInspectors = columnTypes.map(toInspector(_))
ObjectInspectorFactory
.getStandardStructObjectInspector(columns, fieldObjectInspectors)
.asInstanceOf[ObjectInspector]
} else {
null
}
}

def initOutputputSoi(outputSerde: AbstractSerDe): StructObjectInspector = {
if (outputSerde != null) {
outputSerde.getObjectInspector().asInstanceOf[StructObjectInspector]
} else {
null
}
}
}
Loading