Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -633,14 +633,28 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
Token(script, Nil) ::
Token("TOK_SERDE", serdeClause) ::
Token("TOK_RECORDREADER", readerClause) ::
outputClause :: Nil) :: Nil) =>
outputClause) :: Nil) =>

// TODO the output should be bind with the output clause or RecordReader
val output = outputClause match {
case Token("TOK_ALIASLIST", aliases) =>
case Token("TOK_ALIASLIST", aliases) :: Nil =>
aliases.map { case Token(name, Nil) => AttributeReference(name, StringType)() }
case Token("TOK_TABCOLLIST", attributes) =>
case Token("TOK_TABCOLLIST", attributes) :: Nil =>
attributes.map { case Token("TOK_TABCOL", Token(name, Nil) :: dataType :: Nil) =>
AttributeReference(name, nodeToDataType(dataType))() }
case Nil => // Not specified the output field names, let it be the same as input
(0 to inputExprs.length - 1).map { idx =>
// Keep the same as Hive does, the first field names is "key", and second is
// "value", however, Hive seems gives null string for the rest of the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to Hive manual, there should be only two outputs key and value when no output schema is defined. So I am not sure if it is a bug because it is explictly described in the manual. I suppose that it is a well-known and expected behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for notice that. I think this's probably a bug in Hive.
I did the queries in Hive CLI:

set hive.cli.print.header=true;
select transform(key + 1, key - 1, key) using '/bin/cat' from src limit 4;

create table test2 as select transform(key + 1, key - 1, key) using '/bin/cat' from src limit 4;


And print the result of the table test2:

You will see, it's not the expected result, of key and value, that's why I added the default field name for more than 2 columns.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is expected results as the Hive manual describes about 'Schema-less Map-reduce Scripts
' in transform:

If there is no AS clause after USING my_script, Hive assumes that the output of the script contains 2 parts: key which is before the first tab, and value which is the rest after the first tab.

So in your results, value column gets all query outputs after the first tab. The results of table test2 is just the alignment problem caused by tabs. It should follow the same rule too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I see. thanks for the explanation. I will update that.

// field name, which supposed to be a bug of Hive.
if (idx == 0) {
AttributeReference("key", StringType)()
} else if (idx == 1) {
AttributeReference("value", StringType)()
} else {
AttributeReference(s"_col${idx - 2}", StringType)()
}
}
}
val unescapedScript = BaseSemanticAnalyzer.unescapeSQLString(script)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.execution
import java.io.{BufferedReader, InputStreamReader}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.HiveContext
Expand Down Expand Up @@ -54,23 +55,44 @@ case class ScriptTransformation(
val outputStream = proc.getOutputStream
val reader = new BufferedReader(new InputStreamReader(inputStream))

// This projection outputs to the script, which runs in a single process
// TODO a Writer SerDe will be placed here.
val inputProjection = new InterpretedProjection(input, child.output)

// This projection is casting the scripts output into user specified data type
// TODO a Reader SerDe will be placed here for the casting the output
// data type into the required one
val outputProjection = new InterpretedProjection(output.zipWithIndex.map {
case (attr, idx) if (attr.dataType == StringType) => BoundReference(idx, StringType, true)
case (attr, idx) => Cast(BoundReference(idx, StringType, true), attr.dataType)
}, output)

// TODO: This should be exposed as an iterator instead of reading in all the data at once.
val outputLines = collection.mutable.ArrayBuffer[Row]()
val readerThread = new Thread("Transform OutputReader") {
val row = new GenericMutableRow(output.length)
override def run() {
var curLine = reader.readLine()
while (curLine != null) {
// TODO: Use SerDe
outputLines += new GenericRow(curLine.split("\t").asInstanceOf[Array[Any]])
// TODO: A Reader SerDe will be placed here.
val splits = curLine.split("\t")
var idx = 0
while (idx < output.length) {
row(idx) = if (idx < splits.length) splits(idx) else null
idx += 1
}

outputLines += outputProjection(row)
curLine = reader.readLine()
}
}
}

readerThread.start()
val outputProjection = new InterpretedProjection(input, child.output)

iter
.map(outputProjection)
// TODO: Use SerDe
.map(inputProjection)
// TODO: Use the Writer SerDe
.map(_.mkString("", "\t", "\n").getBytes("utf-8")).foreach(outputStream.write)
outputStream.close()
readerThread.join()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1 val_0
1 val_0
1 val_0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1 val_0
1 val_0
1 val_0
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
239.0 val_238
239.0 val_238
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,26 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
sql("SHOW TABLES")
}
}


createQueryTest("TRANSFORM #1 (without serde specified)",
"""
|SELECT transform(key + 1, value) USING '/bin/cat' AS a, b
|FROM src ORDER BY a, b DESC LIMIT 3
""".stripMargin)

createQueryTest("TRANSFORM #2 (without output field names specified)",
"""
|SELECT transform(key + 1, value) USING '/bin/cat'
|FROM src ORDER BY key, value DESC LIMIT 3
""".stripMargin)

createQueryTest("TRANSFORM #3 (with data type specified)",
"""
| SELECT a, b FROM (SELECT transform(key + 1, value)
| USING '/bin/cat' AS (a FLOAT, b STRING)
| FROM src) t WHERE a = 239.0
""".stripMargin)

createQueryTest("! operator",
"""
|SELECT a FROM (
Expand Down