Skip to content

Commit ac2d1fe

Browse files
committed
Refactor codes for comments.
1 parent a137933 commit ac2d1fe

File tree

6 files changed

+43
-63
lines changed

6 files changed

+43
-63
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,10 @@ case class ScriptTransformation(
3232
script: String,
3333
output: Seq[Attribute],
3434
child: LogicalPlan,
35-
ioschema: Option[ScriptInputOutputSchema]) extends UnaryNode
35+
ioschema: ScriptInputOutputSchema) extends UnaryNode
3636

3737
/**
38-
* The wrapper class of input and output schema properties for transforming with script.
39-
*
38+
* A placeholder for implementation specific input and output properties when passing data
39+
* to a script. For example, in Hive this would specify which SerDes to use.
4040
*/
4141
trait ScriptInputOutputSchema
42-

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 23 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ import org.apache.spark.sql.catalyst.plans._
3333
import org.apache.spark.sql.catalyst.plans.logical
3434
import org.apache.spark.sql.catalyst.plans.logical._
3535
import org.apache.spark.sql.execution.ExplainCommand
36-
import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DropTable, AnalyzeTable,
37-
HiveScriptIOSchema}
36+
import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DropTable, AnalyzeTable, HiveScriptIOSchema}
3837
import org.apache.spark.sql.types._
3938

4039
/* Implicit conversions */
@@ -648,44 +647,37 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
648647
AttributeReference("value", StringType)()), true)
649648
}
650649

651-
val (inputRowFormat, inputSerdeClass, inputSerdeProps) = inputSerdeClause match {
652-
case Token("TOK_SERDEPROPS", props) :: Nil =>
653-
(props.map { case Token(name, Token(value, Nil) :: Nil) => (name, value) },
654-
"", Nil)
655-
case Token("TOK_SERDENAME", Token(serde, Nil) :: Nil) :: Nil => (Nil, serde, Nil)
656-
case Token("TOK_SERDENAME", Token(serde, Nil) ::
657-
Token("TOK_TABLEPROPERTIES",
658-
Token("TOK_TABLEPROPLIST", props) :: Nil) :: Nil) :: Nil =>
659-
val tableprops = props.map {
660-
case Token("TOK_TABLEPROPERTY", Token(name, Nil) :: Token(value, Nil) :: Nil) =>
661-
(name, value)
662-
}
663-
(Nil, serde, tableprops)
664-
case Nil => (Nil, "", Nil)
665-
}
650+
def matchSerDe(clause: Seq[ASTNode]) = clause match {
651+
case Token("TOK_SERDEPROPS", propsClause) :: Nil =>
652+
val rowFormat = propsClause.map {
653+
case Token(name, Token(value, Nil) :: Nil) => (name, value)
654+
}
655+
(rowFormat, "", Nil)
656+
657+
case Token("TOK_SERDENAME", Token(serdeClass, Nil) :: Nil) :: Nil =>
658+
(Nil, serdeClass, Nil)
666659

667-
val (outputRowFormat, outputSerdeClass, outputSerdeProps) = outputSerdeClause match {
668-
case Token("TOK_SERDEPROPS", props) :: Nil =>
669-
(props.map { case Token(name, Token(value, Nil) :: Nil) => (name, value) },
670-
"", Nil)
671-
case Token("TOK_SERDENAME", Token(serde, Nil) :: Nil) :: Nil => (Nil, serde, Nil)
672-
case Token("TOK_SERDENAME", Token(serde, Nil) ::
673-
Token("TOK_TABLEPROPERTIES",
674-
Token("TOK_TABLEPROPLIST", props) :: Nil) :: Nil) :: Nil =>
675-
val tableprops = props.map {
660+
case Token("TOK_SERDENAME", Token(serdeClass, Nil) ::
661+
Token("TOK_TABLEPROPERTIES",
662+
Token("TOK_TABLEPROPLIST", propsClause) :: Nil) :: Nil) :: Nil =>
663+
val serdeProps = propsClause.map {
676664
case Token("TOK_TABLEPROPERTY", Token(name, Nil) :: Token(value, Nil) :: Nil) =>
677665
(name, value)
678666
}
679-
(Nil, serde, tableprops)
667+
(Nil, serdeClass, serdeProps)
668+
680669
case Nil => (Nil, "", Nil)
681670
}
682671

672+
val (inRowFormat, inSerdeClass, inSerdeProps) = matchSerDe(inputSerdeClause)
673+
val (outRowFormat, outSerdeClass, outSerdeProps) = matchSerDe(outputSerdeClause)
674+
683675
val unescapedScript = BaseSemanticAnalyzer.unescapeSQLString(script)
684676

685-
val schema = Some(HiveScriptIOSchema(
686-
inputRowFormat, outputRowFormat,
687-
inputSerdeClass, outputSerdeClass,
688-
inputSerdeProps, outputSerdeProps, schemaLess))
677+
val schema = HiveScriptIOSchema(
678+
inRowFormat, outRowFormat,
679+
inSerdeClass, outSerdeClass,
680+
inSerdeProps, outSerdeProps, schemaLess)
689681

690682
Some(
691683
logical.ScriptTransformation(

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,10 +166,8 @@ private[hive] trait HiveStrategies {
166166

167167
object Scripts extends Strategy {
168168
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
169-
case logical.ScriptTransformation(input, script, output, child, schema) =>
170-
ScriptTransformation(input, script, output,
171-
planLater(child), schema.map{ case s: HiveScriptIOSchema => s }.get
172-
)(hiveContext) :: Nil
169+
case logical.ScriptTransformation(input, script, output, child, schema: HiveScriptIOSchema) =>
170+
ScriptTransformation(input, script, output, planLater(child), schema)(hiveContext) :: Nil
173171
case _ => Nil
174172
}
175173
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.expressions._
3434
import org.apache.spark.sql.catalyst.plans.logical.ScriptInputOutputSchema
3535
import org.apache.spark.sql.execution._
3636
import org.apache.spark.sql.types.DataType
37-
import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveShim, ShimWritable}
37+
import org.apache.spark.sql.hive.{HiveContext, HiveInspectors}
3838
import org.apache.spark.sql.hive.HiveShim._
3939
import org.apache.spark.util.Utils
4040

@@ -162,9 +162,8 @@ case class ScriptTransformation(
162162

163163
outputStream.write(data)
164164
} else {
165-
val writable = new ShimWritable(
166-
inputSerde.serialize(row.asInstanceOf[GenericRow].values, inputSoi))
167-
writable.write(dataOutputStream)
165+
val writable = inputSerde.serialize(row.asInstanceOf[GenericRow].values, inputSoi)
166+
prepareWritable(writable).write(dataOutputStream)
168167
}
169168
}
170169
outputStream.close()
@@ -175,7 +174,6 @@ case class ScriptTransformation(
175174

176175
/**
177176
* The wrapper class of Hive input and output schema properties
178-
*
179177
*/
180178
case class HiveScriptIOSchema (
181179
inputRowFormat: Seq[(String, String)],
@@ -270,4 +268,3 @@ case class HiveScriptIOSchema (
270268
}
271269
}
272270
}
273-

sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -242,13 +242,11 @@ private[hive] object HiveShim {
242242
}
243243
}
244244

245-
implicit def prepareWritable(shimW: ShimWritable): Writable = {
246-
shimW.writable
245+
def prepareWritable(w: Writable): Writable = {
246+
w
247247
}
248248
}
249249

250-
case class ShimWritable(writable: Writable)
251-
252250
class ShimFileSinkDesc(var dir: String, var tableInfo: TableDesc, var compressed: Boolean)
253251
extends FileSinkDesc(dir, tableInfo, compressed) {
254252
}

sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -397,27 +397,23 @@ private[hive] object HiveShim {
397397
Decimal(hdoi.getPrimitiveJavaObject(data).bigDecimalValue(), hdoi.precision(), hdoi.scale())
398398
}
399399
}
400-
401-
implicit def prepareWritable(shimW: ShimWritable): Writable = {
402-
shimW.writable match {
400+
401+
/*
402+
* Bug introduced in hive-0.13. AvroGenericRecordWritable has a member recordReaderID that
403+
* is needed to initialize before serialization.
404+
*/
405+
def prepareWritable(w: Writable): Writable = {
406+
w match {
403407
case w: AvroGenericRecordWritable =>
404408
w.setRecordReaderID(new UID())
405409
case _ =>
406-
}
407-
shimW.writable
410+
}
411+
w
408412
}
409413
}
410414

411415
/*
412-
* Bug introdiced in hive-0.13. AvroGenericRecordWritable has a member recordReaderID that
413-
* is needed to initialize before serialization.
414-
* Fix it through wrapper.
415-
*/
416-
case class ShimWritable(writable: Writable)
417-
418-
419-
/*
420-
* Bug introdiced in hive-0.13. FileSinkDesc is serilizable, but its member path is not.
416+
* Bug introduced in hive-0.13. FileSinkDesc is serilizable, but its member path is not.
421417
* Fix it through wrapper.
422418
*/
423419
class ShimFileSinkDesc(var dir: String, var tableInfo: TableDesc, var compressed: Boolean)

0 commit comments

Comments
 (0)