From 16144931eb03b7ba93cf4df47b2a1fa04ffc9dd5 Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Thu, 25 Sep 2014 10:12:12 +0800 Subject: [PATCH 1/5] add missing row api --- .../sql/catalyst/expressions/Projection.scala | 15 +++++++++++ .../spark/sql/catalyst/expressions/Row.scala | 27 ++++++++++++------- .../catalyst/expressions/SpecificRow.scala | 10 +++++-- 3 files changed, 41 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index ef1d12531f109..204904ecf04db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -137,6 +137,9 @@ class JoinedRow extends Row { def getString(i: Int): String = if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size) + override def getAs[T](i: Int): T = + if (i < row1.size) row1.getAs[T](i) else row2.getAs[T](i - row1.size) + def copy() = { val totalSize = row1.size + row2.size val copiedValues = new Array[Any](totalSize) @@ -226,6 +229,9 @@ class JoinedRow2 extends Row { def getString(i: Int): String = if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size) + override def getAs[T](i: Int): T = + if (i < row1.size) row1.getAs[T](i) else row2.getAs[T](i - row1.size) + def copy() = { val totalSize = row1.size + row2.size val copiedValues = new Array[Any](totalSize) @@ -309,6 +315,9 @@ class JoinedRow3 extends Row { def getString(i: Int): String = if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size) + override def getAs[T](i: Int): T = + if (i < row1.size) row1.getAs[T](i) else row2.getAs[T](i - row1.size) + def copy() = { val totalSize = row1.size + row2.size val copiedValues = new Array[Any](totalSize) @@ -392,6 +401,9 @@ class JoinedRow4 extends Row { def getString(i: Int): String = if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size) + override def getAs[T](i: Int): T = + if (i < row1.size) row1.getAs[T](i) else row2.getAs[T](i - row1.size) + def copy() = { val totalSize = row1.size + row2.size val copiedValues = new Array[Any](totalSize) @@ -475,6 +487,9 @@ class JoinedRow5 extends Row { def getString(i: Int): String = if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size) + override def getAs[T](i: Int): T = + if (i < row1.size) row1.getAs[T](i) else row2.getAs[T](i - row1.size) + def copy() = { val totalSize = row1.size + row2.size val copiedValues = new Array[Any](totalSize) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala index d68a4fabeac77..8d97a2ed988db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala @@ -64,6 +64,7 @@ trait Row extends Seq[Any] with Serializable { def getShort(i: Int): Short def getByte(i: Int): Byte def getString(i: Int): String + def getAs[T](i: Int): T = apply(i).asInstanceOf[T] override def toString() = s"[${this.mkString(",")}]" @@ -98,6 +99,7 @@ trait MutableRow extends Row { def setByte(ordinal: Int, value: Byte) def setFloat(ordinal: Int, value: Float) def setString(ordinal: Int, value: String) + def setAs[T](ordinal: Int, value: T) = update(ordinal, value) } /** @@ -118,6 +120,7 @@ object EmptyRow extends Row { def getShort(i: Int): Short = throw new UnsupportedOperationException def getByte(i: Int): Byte = throw new UnsupportedOperationException def getString(i: Int): String = throw new UnsupportedOperationException + override def getAs[T](i: Int): T = throw new UnsupportedOperationException def copy() = this } @@ -181,6 +184,11 @@ class GenericRow(protected[sql] val values: Array[Any]) extends Row { values(i).asInstanceOf[String] } + override def getAs[T](i: Int): T = { + if (values(i) == null) sys.error("Failed to check null bit for generic value.") + values(i).asInstanceOf[T] + } + // Custom hashCode function that matches the efficient code generated version. override def hashCode(): Int = { var result: Int = 37 @@ -217,19 +225,20 @@ class GenericMutableRow(size: Int) extends GenericRow(size) with MutableRow { /** No-arg constructor for serialization. */ def this() = this(0) - override def setBoolean(ordinal: Int,value: Boolean): Unit = { values(ordinal) = value } - override def setByte(ordinal: Int,value: Byte): Unit = { values(ordinal) = value } - override def setDouble(ordinal: Int,value: Double): Unit = { values(ordinal) = value } - override def setFloat(ordinal: Int,value: Float): Unit = { values(ordinal) = value } - override def setInt(ordinal: Int,value: Int): Unit = { values(ordinal) = value } - override def setLong(ordinal: Int,value: Long): Unit = { values(ordinal) = value } - override def setString(ordinal: Int,value: String): Unit = { values(ordinal) = value } + override def setBoolean(ordinal: Int, value: Boolean): Unit = { values(ordinal) = value } + override def setByte(ordinal: Int, value: Byte): Unit = { values(ordinal) = value } + override def setDouble(ordinal: Int, value: Double): Unit = { values(ordinal) = value } + override def setFloat(ordinal: Int, value: Float): Unit = { values(ordinal) = value } + override def setInt(ordinal: Int, value: Int): Unit = { values(ordinal) = value } + override def setLong(ordinal: Int, value: Long): Unit = { values(ordinal) = value } + override def setString(ordinal: Int, value: String): Unit = { values(ordinal) = value } + override def setAs[T](ordinal: Int, value: T): Unit = { values(ordinal) = value } override def setNullAt(i: Int): Unit = { values(i) = null } - override def setShort(ordinal: Int,value: Short): Unit = { values(ordinal) = value } + override def setShort(ordinal: Int, value: Short): Unit = { values(ordinal) = value } - override def update(ordinal: Int,value: Any): Unit = { values(ordinal) = value } + override def update(ordinal: Int, value: Any): Unit = { values(ordinal) = value } override def copy() = new GenericRow(values.clone()) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificRow.scala index 9cbab3d5d0d0d..e1d781458a9ad 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificRow.scala @@ -233,9 +233,9 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR override def iterator: Iterator[Any] = values.map(_.boxed).iterator - def setString(ordinal: Int, value: String) = update(ordinal, value) + override def setString(ordinal: Int, value: String) = update(ordinal, value) - def getString(ordinal: Int) = apply(ordinal).asInstanceOf[String] + override def getString(ordinal: Int) = apply(ordinal).asInstanceOf[String] override def setInt(ordinal: Int, value: Int): Unit = { val currentValue = values(ordinal).asInstanceOf[MutableInt] @@ -306,4 +306,10 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR override def getByte(i: Int): Byte = { values(i).asInstanceOf[MutableByte].value } + + override def setAs[T](ordinal: Int, value: T): Unit = update(ordinal, value) + + override def getAs[T](i: Int): T = { + values(i).asInstanceOf[MutableAny].value.asInstanceOf[T] + } } From 4c18c29faedd52ca6a3d925ea039841b860862f7 Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Sun, 28 Sep 2014 02:00:39 -0700 Subject: [PATCH 2/5] remove setAs[T] and null judge --- .../org/apache/spark/sql/catalyst/expressions/Row.scala | 7 ------- .../spark/sql/catalyst/expressions/SpecificRow.scala | 2 -- 2 files changed, 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala index 8d97a2ed988db..d00ec39774c35 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala @@ -99,7 +99,6 @@ trait MutableRow extends Row { def setByte(ordinal: Int, value: Byte) def setFloat(ordinal: Int, value: Float) def setString(ordinal: Int, value: String) - def setAs[T](ordinal: Int, value: T) = update(ordinal, value) } /** @@ -184,11 +183,6 @@ class GenericRow(protected[sql] val values: Array[Any]) extends Row { values(i).asInstanceOf[String] } - override def getAs[T](i: Int): T = { - if (values(i) == null) sys.error("Failed to check null bit for generic value.") - values(i).asInstanceOf[T] - } - // Custom hashCode function that matches the efficient code generated version. override def hashCode(): Int = { var result: Int = 37 @@ -232,7 +226,6 @@ class GenericMutableRow(size: Int) extends GenericRow(size) with MutableRow { override def setInt(ordinal: Int, value: Int): Unit = { values(ordinal) = value } override def setLong(ordinal: Int, value: Long): Unit = { values(ordinal) = value } override def setString(ordinal: Int, value: String): Unit = { values(ordinal) = value } - override def setAs[T](ordinal: Int, value: T): Unit = { values(ordinal) = value } override def setNullAt(i: Int): Unit = { values(i) = null } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificRow.scala index e1d781458a9ad..a57e531ad2b65 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificRow.scala @@ -307,8 +307,6 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR values(i).asInstanceOf[MutableByte].value } - override def setAs[T](ordinal: Int, value: T): Unit = update(ordinal, value) - override def getAs[T](i: Int): T = { values(i).asInstanceOf[MutableAny].value.asInstanceOf[T] } From 7a39456ea85292dd66939ad355e1ff4f86b0fbe5 Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Tue, 7 Oct 2014 23:04:36 -0700 Subject: [PATCH 3/5] rename file and refresh getAs[T] --- .../{SpecificRow.scala => SpecificMutableRow.scala} | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/{SpecificRow.scala => SpecificMutableRow.scala} (94%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala similarity index 94% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificRow.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala index a57e531ad2b65..0636efc2265ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala @@ -307,7 +307,14 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR values(i).asInstanceOf[MutableByte].value } - override def getAs[T](i: Int): T = { - values(i).asInstanceOf[MutableAny].value.asInstanceOf[T] + override def getAs[T](i: Int): T = values(i) match { + case r: MutableInt => r.value.asInstanceOf[T] + case r: MutableByte => r.value.asInstanceOf[T] + case r: MutableFloat => r.value.asInstanceOf[T] + case r: MutableShort => r.value.asInstanceOf[T] + case r: MutableDouble => r.value.asInstanceOf[T] + case r: MutableBoolean => r.value.asInstanceOf[T] + case r: MutableLong => r.value.asInstanceOf[T] + case r => r.asInstanceOf[MutableAny].value.asInstanceOf[T] } } From 7b7e6e327aaa2942b30f2de1ee64417ca091269e Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Tue, 7 Oct 2014 23:18:18 -0700 Subject: [PATCH 4/5] update pattern match --- .../spark/sql/catalyst/expressions/SpecificMutableRow.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala index 0636efc2265ff..5e744ac95bb18 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala @@ -315,6 +315,6 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR case r: MutableDouble => r.value.asInstanceOf[T] case r: MutableBoolean => r.value.asInstanceOf[T] case r: MutableLong => r.value.asInstanceOf[T] - case r => r.asInstanceOf[MutableAny].value.asInstanceOf[T] + case r: MutableAny => r.value.asInstanceOf[T] } } From c6594b2757967e1645452fd21167b47ca339fca6 Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Wed, 8 Oct 2014 18:55:48 -0700 Subject: [PATCH 5/5] using boxed --- .../sql/catalyst/expressions/SpecificMutableRow.scala | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala index 5e744ac95bb18..570379c533e1f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala @@ -307,14 +307,7 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR values(i).asInstanceOf[MutableByte].value } - override def getAs[T](i: Int): T = values(i) match { - case r: MutableInt => r.value.asInstanceOf[T] - case r: MutableByte => r.value.asInstanceOf[T] - case r: MutableFloat => r.value.asInstanceOf[T] - case r: MutableShort => r.value.asInstanceOf[T] - case r: MutableDouble => r.value.asInstanceOf[T] - case r: MutableBoolean => r.value.asInstanceOf[T] - case r: MutableLong => r.value.asInstanceOf[T] - case r: MutableAny => r.value.asInstanceOf[T] + override def getAs[T](i: Int): T = { + values(i).boxed.asInstanceOf[T] } }