Skip to content

Commit ed71bbd

Browse files
committed
Addressed all PR comments by @marmbrus
#285
1 parent d3a4fa9 commit ed71bbd

File tree

11 files changed

+156
-146
lines changed

11 files changed

+156
-146
lines changed

sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.columnar
2020
import org.apache.spark.sql.Row
2121
import org.apache.spark.sql.catalyst.types._
2222

23-
private[sql] sealed abstract class ColumnStats[T <: DataType, JvmType] extends Serializable{
23+
private[sql] sealed abstract class ColumnStats[T <: DataType, JvmType] extends Serializable {
2424
/**
2525
* Closed lower bound of this column.
2626
*/
@@ -246,14 +246,25 @@ private[sql] class FloatColumnStats extends BasicColumnStats(FLOAT) {
246246
}
247247
}
248248

249-
object IntColumnStats {
249+
private[sql] object IntColumnStats {
250250
val UNINITIALIZED = 0
251251
val INITIALIZED = 1
252252
val ASCENDING = 2
253253
val DESCENDING = 3
254254
val UNORDERED = 4
255255
}
256256

257+
/**
258+
* Statistical information for `Int` columns. More information is collected since `Int` is
259+
* frequently used. Extra information include:
260+
*
261+
* - Ordering state (ascending/descending/unordered), may be used to decide whether binary search
262+
* is applicable when searching elements.
263+
* - Maximum delta between adjacent elements, may be used to guide the `IntDelta` compression
264+
* scheme.
265+
*
266+
* (This two kinds of information are not used anywhere yet and might be removed later.)
267+
*/
257268
private[sql] class IntColumnStats extends BasicColumnStats(INT) {
258269
import IntColumnStats._
259270

sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ private[sql] sealed abstract class ColumnType[T <: DataType, JvmType](
7171
* Creates a duplicated copy of the value.
7272
*/
7373
def clone(v: JvmType): JvmType = v
74+
75+
override def toString = getClass.getSimpleName.stripSuffix("$")
7476
}
7577

7678
private[sql] abstract class NativeColumnType[T <: NativeType](
@@ -258,7 +260,7 @@ private[sql] object GENERIC extends ByteArrayColumnType[DataType](9, 16) {
258260
}
259261

260262
private[sql] object ColumnType {
261-
implicit def dataTypeToColumnType(dataType: DataType): ColumnType[_, _] = {
263+
def apply(dataType: DataType): ColumnType[_, _] = {
262264
dataType match {
263265
case IntegerType => INT
264266
case LongType => LONG

sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala renamed to sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,6 @@ import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute}
2121
import org.apache.spark.sql.execution.{SparkPlan, LeafNode}
2222
import org.apache.spark.sql.Row
2323

24-
/* Implicit conversions */
25-
import org.apache.spark.sql.columnar.ColumnType._
26-
2724
private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], child: SparkPlan)
2825
extends LeafNode {
2926

@@ -33,7 +30,7 @@ private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], ch
3330
val output = child.output
3431
val cached = child.execute().mapPartitions { iterator =>
3532
val columnBuilders = output.map { attribute =>
36-
ColumnBuilder(attribute.dataType.typeId, 0, attribute.name)
33+
ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name)
3734
}.toArray
3835

3936
var row: Row = null

sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ private[sql] trait WithCompressionSchemes {
5858
}
5959

6060
private[sql] trait AllCompressionSchemes extends WithCompressionSchemes {
61-
override val schemes = Seq(PassThrough, RunLengthEncoding, DictionaryEncoding)
61+
override val schemes: Seq[CompressionScheme] = {
62+
Seq(PassThrough, RunLengthEncoding, DictionaryEncoding)
63+
}
6264
}
6365

6466
private[sql] object CompressionScheme {

sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
2727
import org.apache.spark.sql.catalyst.types.NativeType
2828
import org.apache.spark.sql.columnar._
2929

30-
private[sql] object PassThrough extends CompressionScheme {
30+
private[sql] case object PassThrough extends CompressionScheme {
3131
override val typeId = 0
3232

3333
override def supports(columnType: ColumnType[_, _]) = true
@@ -63,7 +63,7 @@ private[sql] object PassThrough extends CompressionScheme {
6363
}
6464
}
6565

66-
private[sql] object RunLengthEncoding extends CompressionScheme {
66+
private[sql] case object RunLengthEncoding extends CompressionScheme {
6767
override def typeId = 1
6868

6969
override def encoder = new this.Encoder
@@ -171,7 +171,7 @@ private[sql] object RunLengthEncoding extends CompressionScheme {
171171
}
172172
}
173173

174-
private[sql] object DictionaryEncoding extends CompressionScheme {
174+
private[sql] case object DictionaryEncoding extends CompressionScheme {
175175
override def typeId: Int = 2
176176

177177
// 32K unique values allowed
@@ -270,6 +270,7 @@ private[sql] object DictionaryEncoding extends CompressionScheme {
270270
extends compression.Decoder[T] {
271271

272272
private val dictionary = {
273+
// TODO Can we clean up this mess? Maybe move this to `DataType`?
273274
implicit val classTag = {
274275
val mirror = runtimeMirror(getClass.getClassLoader)
275276
ClassTag[T#JvmType](mirror.runtimeClass(columnType.scalaTag.tpe))

sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ class ColumnStatsSuite extends FunSuite {
3939

4040
test(s"$columnStatsName: empty") {
4141
val columnStats = columnStatsClass.newInstance()
42-
assert((columnStats.lowerBound, columnStats.upperBound) === columnStats.initialBounds)
42+
expectResult(columnStats.initialBounds, "Wrong initial bounds") {
43+
(columnStats.lowerBound, columnStats.upperBound)
44+
}
4345
}
4446

4547
test(s"$columnStatsName: non-empty") {
@@ -52,8 +54,8 @@ class ColumnStatsSuite extends FunSuite {
5254
val values = rows.map(_.head.asInstanceOf[T#JvmType])
5355
val ordering = columnType.dataType.ordering.asInstanceOf[Ordering[T#JvmType]]
5456

55-
assert(columnStats.lowerBound === values.min(ordering))
56-
assert(columnStats.upperBound === values.max(ordering))
57+
expectResult(values.min(ordering), "Wrong lower bound")(columnStats.lowerBound)
58+
expectResult(values.max(ordering), "Wrong upper bound")(columnStats.upperBound)
5759
}
5860
}
5961
}

sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala

Lines changed: 67 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -28,35 +28,46 @@ import org.apache.spark.sql.execution.SparkSqlSerializer
2828
class ColumnTypeSuite extends FunSuite {
2929
val DEFAULT_BUFFER_SIZE = 512
3030

31-
val columnTypes = Seq(INT, SHORT, LONG, BYTE, DOUBLE, FLOAT, STRING, BINARY, GENERIC)
32-
3331
test("defaultSize") {
34-
val defaultSize = Seq(4, 2, 8, 1, 8, 4, 8, 16, 16)
32+
val checks = Map(
33+
INT -> 4, SHORT -> 2, LONG -> 8, BYTE -> 1, DOUBLE -> 8, FLOAT -> 4,
34+
BOOLEAN -> 1, STRING -> 8, BINARY -> 16, GENERIC -> 16)
3535

36-
columnTypes.zip(defaultSize).foreach { case (columnType, size) =>
37-
assert(columnType.defaultSize === size)
36+
checks.foreach { case (columnType, expectedSize) =>
37+
expectResult(expectedSize, s"Wrong defaultSize for $columnType") {
38+
columnType.defaultSize
39+
}
3840
}
3941
}
4042

4143
test("actualSize") {
42-
val expectedSizes = Seq(4, 2, 8, 1, 8, 4, 4 + 5, 4 + 4, 4 + 11)
43-
val actualSizes = Seq(
44-
INT.actualSize(Int.MaxValue),
45-
SHORT.actualSize(Short.MaxValue),
46-
LONG.actualSize(Long.MaxValue),
47-
BYTE.actualSize(Byte.MaxValue),
48-
DOUBLE.actualSize(Double.MaxValue),
49-
FLOAT.actualSize(Float.MaxValue),
50-
STRING.actualSize("hello"),
51-
BINARY.actualSize(new Array[Byte](4)),
52-
GENERIC.actualSize(SparkSqlSerializer.serialize(Map(1 -> "a"))))
53-
54-
expectedSizes.zip(actualSizes).foreach { case (expected, actual) =>
55-
assert(expected === actual)
44+
def checkActualSize[T <: DataType, JvmType](
45+
columnType: ColumnType[T, JvmType],
46+
value: JvmType,
47+
expected: Int) {
48+
49+
expectResult(expected, s"Wrong actualSize for $columnType") {
50+
columnType.actualSize(value)
51+
}
5652
}
53+
54+
checkActualSize(INT, Int.MaxValue, 4)
55+
checkActualSize(SHORT, Short.MaxValue, 2)
56+
checkActualSize(LONG, Long.MaxValue, 8)
57+
checkActualSize(BYTE, Byte.MaxValue, 1)
58+
checkActualSize(DOUBLE, Double.MaxValue, 8)
59+
checkActualSize(FLOAT, Float.MaxValue, 4)
60+
checkActualSize(BOOLEAN, true, 1)
61+
checkActualSize(STRING, "hello", 4 + 5)
62+
63+
val binary = Array.fill[Byte](4)(0: Byte)
64+
checkActualSize(BINARY, binary, 4 + 4)
65+
66+
val generic = Map(1 -> "a")
67+
checkActualSize(GENERIC, SparkSqlSerializer.serialize(generic), 4 + 11)
5768
}
5869

59-
testNativeColumnStats[BooleanType.type](
70+
testNativeColumnType[BooleanType.type](
6071
BOOLEAN,
6172
(buffer: ByteBuffer, v: Boolean) => {
6273
buffer.put((if (v) 1 else 0).toByte)
@@ -65,37 +76,19 @@ class ColumnTypeSuite extends FunSuite {
6576
buffer.get() == 1
6677
})
6778

68-
testNativeColumnStats[IntegerType.type](
69-
INT,
70-
(_: ByteBuffer).putInt(_),
71-
(_: ByteBuffer).getInt)
72-
73-
testNativeColumnStats[ShortType.type](
74-
SHORT,
75-
(_: ByteBuffer).putShort(_),
76-
(_: ByteBuffer).getShort)
77-
78-
testNativeColumnStats[LongType.type](
79-
LONG,
80-
(_: ByteBuffer).putLong(_),
81-
(_: ByteBuffer).getLong)
82-
83-
testNativeColumnStats[ByteType.type](
84-
BYTE,
85-
(_: ByteBuffer).put(_),
86-
(_: ByteBuffer).get)
87-
88-
testNativeColumnStats[DoubleType.type](
89-
DOUBLE,
90-
(_: ByteBuffer).putDouble(_),
91-
(_: ByteBuffer).getDouble)
92-
93-
testNativeColumnStats[FloatType.type](
94-
FLOAT,
95-
(_: ByteBuffer).putFloat(_),
96-
(_: ByteBuffer).getFloat)
97-
98-
testNativeColumnStats[StringType.type](
79+
testNativeColumnType[IntegerType.type](INT, _.putInt(_), _.getInt)
80+
81+
testNativeColumnType[ShortType.type](SHORT, _.putShort(_), _.getShort)
82+
83+
testNativeColumnType[LongType.type](LONG, _.putLong(_), _.getLong)
84+
85+
testNativeColumnType[ByteType.type](BYTE, _.put(_), _.get)
86+
87+
testNativeColumnType[DoubleType.type](DOUBLE, _.putDouble(_), _.getDouble)
88+
89+
testNativeColumnType[FloatType.type](FLOAT, _.putFloat(_), _.getFloat)
90+
91+
testNativeColumnType[StringType.type](
9992
STRING,
10093
(buffer: ByteBuffer, string: String) => {
10194
val bytes = string.getBytes()
@@ -108,7 +101,7 @@ class ColumnTypeSuite extends FunSuite {
108101
new String(bytes)
109102
})
110103

111-
testColumnStats[BinaryType.type, Array[Byte]](
104+
testColumnType[BinaryType.type, Array[Byte]](
112105
BINARY,
113106
(buffer: ByteBuffer, bytes: Array[Byte]) => {
114107
buffer.putInt(bytes.length).put(bytes)
@@ -131,51 +124,58 @@ class ColumnTypeSuite extends FunSuite {
131124
val length = buffer.getInt()
132125
assert(length === serializedObj.length)
133126

134-
val bytes = new Array[Byte](length)
135-
buffer.get(bytes, 0, length)
136-
assert(obj === SparkSqlSerializer.deserialize(bytes))
127+
expectResult(obj, "Deserialized object didn't equal to the original object") {
128+
val bytes = new Array[Byte](length)
129+
buffer.get(bytes, 0, length)
130+
SparkSqlSerializer.deserialize(bytes)
131+
}
137132

138133
buffer.rewind()
139134
buffer.putInt(serializedObj.length).put(serializedObj)
140135

141-
buffer.rewind()
142-
assert(obj === SparkSqlSerializer.deserialize(GENERIC.extract(buffer)))
136+
expectResult(obj, "Deserialized object didn't equal to the original object") {
137+
buffer.rewind()
138+
SparkSqlSerializer.deserialize(GENERIC.extract(buffer))
139+
}
143140
}
144141

145-
def testNativeColumnStats[T <: NativeType](
142+
def testNativeColumnType[T <: NativeType](
146143
columnType: NativeColumnType[T],
147144
putter: (ByteBuffer, T#JvmType) => Unit,
148145
getter: (ByteBuffer) => T#JvmType) {
149146

150-
testColumnStats[T, T#JvmType](columnType, putter, getter)
147+
testColumnType[T, T#JvmType](columnType, putter, getter)
151148
}
152149

153-
def testColumnStats[T <: DataType, JvmType](
150+
def testColumnType[T <: DataType, JvmType](
154151
columnType: ColumnType[T, JvmType],
155152
putter: (ByteBuffer, JvmType) => Unit,
156153
getter: (ByteBuffer) => JvmType) {
157154

158155
val buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE)
159-
val columnTypeName = columnType.getClass.getSimpleName.stripSuffix("$")
160156
val seq = (0 until 4).map(_ => makeRandomValue(columnType))
161157

162-
test(s"$columnTypeName.extract") {
158+
test(s"$columnType.extract") {
163159
buffer.rewind()
164160
seq.foreach(putter(buffer, _))
165161

166162
buffer.rewind()
167-
seq.foreach { i =>
168-
assert(i === columnType.extract(buffer))
163+
seq.foreach { expected =>
164+
assert(
165+
expected === columnType.extract(buffer),
166+
"Extracted value didn't equal to the original one")
169167
}
170168
}
171169

172-
test(s"$columnTypeName.append") {
170+
test(s"$columnType.append") {
173171
buffer.rewind()
174172
seq.foreach(columnType.append(_, buffer))
175173

176174
buffer.rewind()
177-
seq.foreach { i =>
178-
assert(i === getter(buffer))
175+
seq.foreach { expected =>
176+
assert(
177+
expected === getter(buffer),
178+
"Extracted value didn't equal to the original one")
179179
}
180180
}
181181
}

sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,13 @@ class NullableColumnAccessorSuite extends FunSuite {
4949
val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
5050
val nullRow = makeNullRow(1)
5151

52-
test(s"Nullable $typeName accessor: empty column") {
52+
test(s"Nullable $typeName column accessor: empty column") {
5353
val builder = TestNullableColumnBuilder(columnType)
5454
val accessor = TestNullableColumnAccessor(builder.build(), columnType)
5555
assert(!accessor.hasNext)
5656
}
5757

58-
test(s"Nullable $typeName accessor: access null values") {
58+
test(s"Nullable $typeName column accessor: access null values") {
5959
val builder = TestNullableColumnBuilder(columnType)
6060
val randomRow = makeRandomRow(columnType)
6161

@@ -72,7 +72,7 @@ class NullableColumnAccessorSuite extends FunSuite {
7272
assert(row(0) === randomRow(0))
7373

7474
accessor.extractTo(row, 0)
75-
assert(row(0) === null)
75+
assert(row.isNullAt(0))
7676
}
7777
}
7878
}

0 commit comments

Comments
 (0)