diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ByteBufferColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ByteBufferColumnVector.java new file mode 100644 index 000000000000..531d4fad905f --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ByteBufferColumnVector.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.vectorized; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.expressions.GenericMutableRow; +import org.apache.spark.sql.catalyst.expressions.MutableRow; +import org.apache.spark.sql.execution.columnar.BasicColumnAccessor; +import org.apache.spark.sql.execution.columnar.ByteBufferHelper; +import org.apache.spark.sql.execution.columnar.NativeColumnAccessor; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.Platform; + +/** + * A column backed by an in memory JVM array. This stores the NULLs as a byte per value + * and a java array for the values. + */ +public final class ByteBufferColumnVector extends ColumnVector { + // The data stored in these arrays need to maintain binary compatible. We can + // directly pass this buffer to external components. + + // This is faster than a boolean array and we optimize this over memory footprint. + private byte[] nulls; + + // Array for each type. Only 1 is populated for any type. + private byte[] data; + private long offset; + + protected ByteBufferColumnVector(int capacity, DataType type, + boolean isConstant, ByteBuffer buffer, ByteBuffer nullsBuffer) { + super(capacity, type, MemoryMode.ON_HEAP); + if (this.resultArray != null || DecimalType.isByteArrayDecimalType(type)) { + throw new NotImplementedException(); + } else if (type instanceof BooleanType || type instanceof ByteType || + type instanceof ShortType || + type instanceof IntegerType || type instanceof DateType || + DecimalType.is32BitDecimalType(type) || + type instanceof LongType || DecimalType.is64BitDecimalType(type) || + (type instanceof FloatType) || (type instanceof DoubleType)) { + data = buffer.array(); + offset = Platform.BYTE_ARRAY_OFFSET + buffer.position(); + } else if (resultStruct != null) { + // Nothing to store. + } else { + throw new RuntimeException("Unhandled " + type); + } + nulls = new byte[capacity]; + reset(); + + int numNulls = ByteBufferHelper.getInt(nullsBuffer); + for (int i = 0; i < numNulls; i++) { + int cordinal = ByteBufferHelper.getInt(nullsBuffer); + putNull(cordinal); + } + if (isConstant) { + setIsConstant(); + } + } + + @Override + public final long valuesNativeAddress() { + throw new RuntimeException("Cannot get native address for on heap column"); + } + @Override + public final long nullsNativeAddress() { + throw new RuntimeException("Cannot get native address for on heap column"); + } + + @Override + public final void close() { + } + + // + // APIs dealing with nulls + // + + @Override + public final void putNotNull(int rowId) { + nulls[rowId] = (byte)0; + } + + @Override + public final void putNull(int rowId) { + nulls[rowId] = (byte)1; + ++numNulls; + anyNullsSet = true; + } + + @Override + public final void putNulls(int rowId, int count) { + for (int i = 0; i < count; ++i) { + nulls[rowId + i] = (byte)1; + } + anyNullsSet = true; + numNulls += count; + } + + @Override + public final void putNotNulls(int rowId, int count) { + if (!anyNullsSet) return; + for (int i = 0; i < count; ++i) { + nulls[rowId + i] = (byte)0; + } + } + + @Override + public final boolean isNullAt(int rowId) { + return nulls[rowId] == 1; + } + + // + // APIs dealing with Booleans + // + + @Override + public final void putBoolean(int rowId, boolean value) { + throw new NotImplementedException(); + } + + @Override + public final void putBooleans(int rowId, int count, boolean value) { + throw new NotImplementedException(); + } + + @Override + public final boolean getBoolean(int rowId) { + return Platform.getByte(data, offset + rowId) == 1; + } + + // + + // + // APIs dealing with Bytes + // + + @Override + public final void putByte(int rowId, byte value) { + throw new NotImplementedException(); + } + + @Override + public final void putBytes(int rowId, int count, byte value) { + throw new NotImplementedException(); + } + + @Override + public final void putBytes(int rowId, int count, byte[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public final byte getByte(int rowId) { + assert(dictionary == null); + return Platform.getByte(data, offset + rowId); + } + + // + // APIs dealing with Shorts + // + + @Override + public final void putShort(int rowId, short value) { + throw new NotImplementedException(); + } + + @Override + public final void putShorts(int rowId, int count, short value) { + throw new NotImplementedException(); + } + + @Override + public final void putShorts(int rowId, int count, short[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public final short getShort(int rowId) { + assert(dictionary == null); + return Platform.getShort(data, offset + rowId * 2); + } + + + // + // APIs dealing with Ints + // + + @Override + public final void putInt(int rowId, int value) { + throw new NotImplementedException(); + } + + @Override + public final void putInts(int rowId, int count, int value) { + throw new NotImplementedException(); + } + + @Override + public final void putInts(int rowId, int count, int[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public final void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public final int getInt(int rowId) { + assert(dictionary == null); + return Platform.getInt(data, offset + rowId * 4); + } + + // + // APIs dealing with Longs + // + + @Override + public final void putLong(int rowId, long value) { + throw new NotImplementedException(); + } + + @Override + public final void putLongs(int rowId, int count, long value) { + throw new NotImplementedException(); + } + + @Override + public final void putLongs(int rowId, int count, long[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public final void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public final long getLong(int rowId) { + assert(dictionary == null); + return Platform.getLong(data, offset + rowId * 8); + } + + // + // APIs dealing with floats + // + + @Override + public final void putFloat(int rowId, float value) { + throw new NotImplementedException(); + } + + @Override + public final void putFloats(int rowId, int count, float value) { + throw new NotImplementedException(); + } + + @Override + public final void putFloats(int rowId, int count, float[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public final void putFloats(int rowId, int count, byte[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public final float getFloat(int rowId) { + assert(dictionary == null); + return Platform.getFloat(data, offset + rowId * 4); + } + + // + // APIs dealing with doubles + // + + @Override + public final void putDouble(int rowId, double value) { + throw new NotImplementedException(); + } + + @Override + public final void putDoubles(int rowId, int count, double value) { + throw new NotImplementedException(); + } + + @Override + public final void putDoubles(int rowId, int count, double[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public final void putDoubles(int rowId, int count, byte[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public final double getDouble(int rowId) { + assert(dictionary == null); + return Platform.getDouble(data, offset + rowId * 8); + } + + // + // APIs dealing with Arrays + // + + @Override + public final int getArrayLength(int rowId) { throw new NotImplementedException(); } + @Override + public final int getArrayOffset(int rowId) { throw new NotImplementedException(); } + + @Override + public final void putArray(int rowId, int offset, int length) { + throw new NotImplementedException(); + } + + @Override + public final void loadBytes(ColumnVector.Array array) { + throw new NotImplementedException(); + } + + // + // APIs dealing with Byte Arrays + // + + @Override + public final int putByteArray(int rowId, byte[] value, int offset, int length) { + throw new NotImplementedException(); + } + + @Override + public final void reserve(int requiredCapacity) { + if (requiredCapacity > capacity) reserveInternal(requiredCapacity * 2); + } + + // Spilt this function out since it is the slow path. + private final void reserveInternal(int newCapacity) { + throw new NotImplementedException(); + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala index f9d606e37ea8..23ec157ac531 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala @@ -43,6 +43,12 @@ import org.apache.spark.unsafe.types.UTF8String * WARNING: This only works with HeapByteBuffer */ private[columnar] object ByteBufferHelper { + def getShort(buffer: ByteBuffer): Short = { + val pos = buffer.position() + buffer.position(pos + 2) + Platform.getShort(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos) + } + def getInt(buffer: ByteBuffer): Int = { val pos = buffer.position() buffer.position(pos + 4) @@ -66,6 +72,33 @@ private[columnar] object ByteBufferHelper { buffer.position(pos + 8) Platform.getDouble(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos) } + + def putShort(buffer: ByteBuffer, value: Short): Unit = { + val pos = buffer.position() + buffer.position(pos + 2) + Platform.putShort(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos, value) + } + + def putInt(buffer: ByteBuffer, value: Int): Unit = { + val pos = buffer.position() + buffer.position(pos + 4) + Platform.putInt(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos, value) + } + + def putLong(buffer: ByteBuffer, value: Long): Unit = { + val pos = buffer.position() + buffer.position(pos + 8) + Platform.putLong(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos, value) + } + + def copyMemory(src: ByteBuffer, dst: ByteBuffer, len: Int): Unit = { + val srcPos = src.position() + val dstPos = dst.position() + src.position(srcPos + len) + dst.position(dstPos + len) + Platform.copyMemory(src.array(), Platform.BYTE_ARRAY_OFFSET + srcPos, + dst.array(), Platform.BYTE_ARRAY_OFFSET + dstPos, len) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index 7a14879b8b9d..7ff231084e5a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.execution.columnar +import scala.collection.Iterator + import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeFormatter, CodeGenerator, UnsafeRowWriter} +import org.apache.spark.sql.execution.vectorized.ColumnVector import org.apache.spark.sql.types._ /** @@ -28,7 +31,12 @@ import org.apache.spark.sql.types._ */ abstract class ColumnarIterator extends Iterator[InternalRow] { def initialize(input: Iterator[CachedBatch], columnTypes: Array[DataType], - columnIndexes: Array[Int]): Unit + columnIndexes: Array[Int], inMemoryTableScanExec: InMemoryTableScanExec): Unit + def getColumnIndexes(index: Int) : Int + def getColumnTypes(index: Int): DataType + def isSupportColumnarCodeGen: Boolean + def initForColumnar: Int + def getColumn(index: Int): ColumnVector } /** @@ -68,6 +76,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera protected def create(columnTypes: Seq[DataType]): ColumnarIterator = { val ctx = newCodeGenContext() val numFields = columnTypes.size + var _isSupportColumnarCodeGen = true val (initializeAccessors, extractors) = columnTypes.zipWithIndex.map { case (dt, index) => val accessorName = ctx.freshName("accessor") val accessorCls = dt match { @@ -94,8 +103,10 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera case t if ctx.isPrimitiveType(dt) => s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));" case NullType | StringType | BinaryType => + _isSupportColumnarCodeGen = false s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));" case other => + _isSupportColumnarCodeGen = false s"""$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder), (${dt.getClass.getName}) columnTypes[$index]);""" } @@ -157,7 +168,10 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; + import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec; + import org.apache.spark.sql.execution.columnar.CachedBatch; import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; + import org.apache.spark.sql.execution.vectorized.ColumnVector; public SpecificColumnarIterator generate(Object[] references) { return new SpecificColumnarIterator(); @@ -170,7 +184,10 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera private UnsafeRow unsafeRow = new UnsafeRow($numFields); private BufferHolder bufferHolder = new BufferHolder(unsafeRow); private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, $numFields); + private InMemoryTableScanExec inMemoryTableScanExec = null; private MutableUnsafeRow mutableRow = null; + private boolean readPartitionIncremented = false; + private CachedBatch cachedBatch = null; private int currentRow = 0; private int numRowsInBatch = 0; @@ -187,10 +204,12 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera this.mutableRow = new MutableUnsafeRow(rowWriter); } - public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) { + public void initialize(Iterator input, DataType[] columnTypes, + int[] columnIndexes, InMemoryTableScanExec inMemoryTableScanExec) { this.input = input; this.columnTypes = columnTypes; this.columnIndexes = columnIndexes; + this.inMemoryTableScanExec = inMemoryTableScanExec; } ${ctx.declareAddedFunctions()} @@ -203,7 +222,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera return false; } - ${classOf[CachedBatch].getName} batch = (${classOf[CachedBatch].getName}) input.next(); + CachedBatch batch = (CachedBatch) input.next(); currentRow = 0; numRowsInBatch = batch.numRows(); for (int i = 0; i < columnIndexes.length; i ++) { @@ -222,6 +241,30 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera unsafeRow.setTotalSize(bufferHolder.totalSize()); return unsafeRow; } + + public int getColumnIndexes(int index) { return columnIndexes[index]; } + + public DataType getColumnTypes(int index) { return columnTypes[index]; } + + public boolean isSupportColumnarCodeGen() { + return ${_isSupportColumnarCodeGen}; + } + + public int initForColumnar() { + if (!input.hasNext()) { + return -1; + } + if ((inMemoryTableScanExec != null) && !readPartitionIncremented) { + inMemoryTableScanExec.incrementReadPartitionAccumulator(); + readPartitionIncremented = true; + } + cachedBatch = (CachedBatch) input.next(); + return cachedBatch.numRows(); + } + + public ColumnVector getColumn(int i) { + return cachedBatch.column(this, i); + } }""" val code = CodeFormatter.stripOverlappingComments( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index a1c2f0a8fbcf..dcb84af01666 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -17,7 +17,11 @@ package org.apache.spark.sql.execution.columnar +import java.nio.{ByteBuffer, ByteOrder} +import java.nio.ByteOrder.nativeOrder + import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer import org.apache.commons.lang.StringUtils @@ -33,7 +37,8 @@ import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.types.UserDefinedType +import org.apache.spark.sql.execution.vectorized.ColumnVector +import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.{AccumulatorContext, ListAccumulator, LongAccumulator} @@ -56,7 +61,35 @@ private[sql] object InMemoryRelation { * @param stats The stat of columns */ private[columnar] -case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) +case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) { + def column(columnarIterator: ColumnarIterator, index: Int): ColumnVector = { + val ordinal = columnarIterator.getColumnIndexes(index) + val dataType = columnarIterator.getColumnTypes(index) + val buffer = ByteBuffer.wrap(buffers(ordinal)).order(nativeOrder) + val accessor: BasicColumnAccessor[_] = dataType match { + case BooleanType => new BooleanColumnAccessor(buffer) + case ByteType => new ByteColumnAccessor(buffer) + case ShortType => new ShortColumnAccessor(buffer) + case IntegerType | DateType => new IntColumnAccessor(buffer) + case LongType | TimestampType => new LongColumnAccessor(buffer) + case FloatType => new FloatColumnAccessor(buffer) + case DoubleType => new DoubleColumnAccessor(buffer) + } + + val (out, nullsBuffer) = if (accessor.isInstanceOf[NativeColumnAccessor[_]]) { + val nativeAccessor = accessor.asInstanceOf[NativeColumnAccessor[_]] + nativeAccessor.decompress(numRows); + } else { + val buffer = accessor.getByteBuffer + val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder()) + nullsBuffer.rewind() + (buffer, nullsBuffer) + } + + org.apache.spark.sql.execution.vectorized.ColumnVector.allocate( + numRows, dataType, true, out, nullsBuffer) + } +} private[sql] case class InMemoryRelation( output: Seq[Attribute], @@ -296,6 +329,10 @@ private[sql] case class InMemoryTableScanExec( lazy val readPartitions = sparkContext.longAccumulator lazy val readBatches = sparkContext.longAccumulator + def incrementReadPartitionAccumulator(): Unit = { + readPartitions.add(1) + } + private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning protected override def doExecute(): RDD[InternalRow] = { @@ -358,9 +395,11 @@ private[sql] case class InMemoryTableScanExec( case other => other }.toArray val columnarIterator = GenerateColumnAccessor.generate(columnTypes) - columnarIterator.initialize(withMetrics, columnTypes, requestedColumnIndices.toArray) - if (enableAccumulators && columnarIterator.hasNext) { - readPartitions.add(1) + columnarIterator.initialize(withMetrics, columnTypes, requestedColumnIndices.toArray, + if (!enableAccumulators) null else this) + if (enableAccumulators && !columnarIterator.isSupportColumnarCodeGen && + columnarIterator.hasNext) { + incrementReadPartitionAccumulator } columnarIterator } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala index 941f03b745a0..556a36512f50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.columnar.compression import java.nio.ByteBuffer +import java.nio.ByteOrder import scala.collection.mutable @@ -61,6 +62,51 @@ private[columnar] case object PassThrough extends CompressionScheme { } override def hasNext: Boolean = buffer.hasRemaining + + override def decompress(capacity: Int): (ByteBuffer, ByteBuffer) = { + val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder()) + nullsBuffer.rewind() + val nullCount = ByteBufferHelper.getInt(nullsBuffer) + if (nullCount == 0) { + nullsBuffer.rewind() + (buffer.duplicate().order(ByteOrder.nativeOrder()), nullsBuffer) + } else { + val unitSize = columnType.dataType match { + case _: BooleanType => 1 + case _: ByteType => 1 + case _: ShortType => 2 + case _: IntegerType => 4 + case _: LongType => 8 + case _: FloatType => 4 + case _: DoubleType => 8 + case _ => throw new IllegalStateException("Not supported type in PassThru.") + } + var nextNullIndex = ByteBufferHelper.getInt(nullsBuffer) + var pos = 0 + var seenNulls = 0 + val out = ByteBuffer.allocate(capacity * unitSize).order(ByteOrder.nativeOrder()) + while (buffer.hasRemaining) { + if (pos != nextNullIndex) { + val len = nextNullIndex - pos + assert(len * unitSize < Int.MaxValue) + ByteBufferHelper.copyMemory(buffer, out, len * unitSize) + pos += len + } else { + seenNulls += 1 + nextNullIndex = if (seenNulls < nullCount) { + ByteBufferHelper.getInt(nullsBuffer) + } else { + capacity + } + out.position(out.position + unitSize) + pos += 1 + } + } + out.rewind() + nullsBuffer.rewind() + (out, nullsBuffer) + } + } } } @@ -169,6 +215,145 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme { } override def hasNext: Boolean = valueCount < run || buffer.hasRemaining + + override def decompress(capacity: Int): (ByteBuffer, ByteBuffer) = { + val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder()) + nullsBuffer.rewind() + val nullCount = ByteBufferHelper.getInt(nullsBuffer) + var nextNullIndex = if (nullCount > 0) ByteBufferHelper.getInt(nullsBuffer) else -1 + var pos = 0 + var seenNulls = 0 + var runLocal = 0 + var valueCountLocal = 0 + columnType.dataType match { + case _: BooleanType => + val out = ByteBuffer.allocate(capacity).order(ByteOrder.nativeOrder()) + var currentValueLocal: Boolean = false + while (valueCountLocal < runLocal || buffer.hasRemaining) { + if (pos != nextNullIndex) { + if (valueCountLocal == runLocal) { + currentValueLocal = buffer.get() == 1 + runLocal = ByteBufferHelper.getInt(buffer) + valueCountLocal = 1 + } else { + valueCountLocal += 1 + } + out.put(if (currentValueLocal) 1: Byte else 0: Byte) + } else { + seenNulls += 1 + if (seenNulls < nullCount) { + nextNullIndex = ByteBufferHelper.getInt(nullsBuffer) + } + out.position(out.position + 1) + } + pos += 1 + } + out.rewind() + nullsBuffer.rewind() + (out, nullsBuffer) + case _: ByteType => + val out = ByteBuffer.allocate(capacity).order(ByteOrder.nativeOrder()) + var currentValueLocal: Byte = 0 + while (valueCountLocal < runLocal || buffer.hasRemaining) { + if (pos != nextNullIndex) { + if (valueCountLocal == runLocal) { + currentValueLocal = buffer.get() + runLocal = ByteBufferHelper.getInt(buffer) + valueCountLocal = 1 + } else { + valueCountLocal += 1 + } + out.put(currentValueLocal) + } else { + seenNulls += 1 + if (seenNulls < nullCount) { + nextNullIndex = ByteBufferHelper.getInt(nullsBuffer) + } + out.position(out.position + 1) + } + pos += 1 + } + out.rewind() + nullsBuffer.rewind() + (out, nullsBuffer) + case _: ShortType => + val out = ByteBuffer.allocate(capacity * 2).order(ByteOrder.nativeOrder()) + var currentValueLocal: Short = 0 + while (valueCountLocal < runLocal || buffer.hasRemaining) { + if (pos != nextNullIndex) { + if (valueCountLocal == runLocal) { + currentValueLocal = buffer.getShort() + runLocal = ByteBufferHelper.getInt(buffer) + valueCountLocal = 1 + } else { + valueCountLocal += 1 + } + ByteBufferHelper.putShort(out, currentValueLocal) + } else { + seenNulls += 1 + if (seenNulls < nullCount) { + nextNullIndex = ByteBufferHelper.getInt(nullsBuffer) + } + out.position(out.position + 2) + } + pos += 1 + } + out.rewind() + nullsBuffer.rewind() + (out, nullsBuffer) + case _: IntegerType => + val out = ByteBuffer.allocate(capacity * 4).order(ByteOrder.nativeOrder()) + var currentValueLocal: Int = 0 + while (valueCountLocal < runLocal || buffer.hasRemaining) { + if (pos != nextNullIndex) { + if (valueCountLocal == runLocal) { + currentValueLocal = buffer.getInt() + runLocal = ByteBufferHelper.getInt(buffer) + valueCountLocal = 1 + } else { + valueCountLocal += 1 + } + ByteBufferHelper.putInt(out, currentValueLocal) + } else { + seenNulls += 1 + if (seenNulls < nullCount) { + nextNullIndex = ByteBufferHelper.getInt(nullsBuffer) + } + out.position(out.position + 4) + } + } + out.rewind() + nullsBuffer.rewind() + (out, nullsBuffer) + case _: LongType => + val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder()) + val out = ByteBuffer.allocate(capacity * 8).order(ByteOrder.nativeOrder()) + var currentValueLocal: Long = 0 + while (valueCountLocal < runLocal || buffer.hasRemaining) { + if (pos != nextNullIndex) { + if (valueCountLocal == runLocal) { + currentValueLocal = buffer.getLong() + runLocal = ByteBufferHelper.getInt(buffer) + valueCountLocal = 1 + } else { + valueCountLocal += 1 + } + ByteBufferHelper.putLong(out, currentValueLocal) + } else { + seenNulls += 1 + if (seenNulls < nullCount) { + nextNullIndex = ByteBufferHelper.getInt(nullsBuffer) + } + out.position(out.position + 8) + } + pos += 1 + } + out.rewind() + nullsBuffer.rewind() + (out, nullsBuffer) + case _ => throw new IllegalStateException("Not supported type in RunLengthEncoding.") + } + } } } @@ -278,6 +463,54 @@ private[columnar] case object DictionaryEncoding extends CompressionScheme { } override def hasNext: Boolean = buffer.hasRemaining + + override def decompress(capacity: Int): (ByteBuffer, ByteBuffer) = { + val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder()) + nullsBuffer.rewind() + val nullCount = ByteBufferHelper.getInt(nullsBuffer) + var nextNullIndex = if (nullCount > 0) ByteBufferHelper.getInt(nullsBuffer) else -1 + var pos = 0 + var seenNulls = 0 + columnType.dataType match { + case _: IntegerType => + val out = ByteBuffer.allocate(capacity * 4).order(ByteOrder.nativeOrder()) + while (buffer.hasRemaining) { + if (pos != nextNullIndex) { + val value = dictionary(buffer.getShort()).asInstanceOf[Int] + ByteBufferHelper.putInt(out, value) + } else { + seenNulls += 1 + if (seenNulls < nullCount) { + nextNullIndex = ByteBufferHelper.getInt(nullsBuffer) + } + out.position(out.position + 4) + } + pos += 1 + } + out.rewind() + nullsBuffer.rewind() + (out, nullsBuffer) + case _: LongType => + val out = ByteBuffer.allocate(capacity * 8).order(ByteOrder.nativeOrder()) + while (buffer.hasRemaining) { + if (pos != nextNullIndex) { + val value = dictionary(buffer.getShort()).asInstanceOf[Long] + ByteBufferHelper.putLong(out, value) + } else { + seenNulls += 1 + if (seenNulls < nullCount) { + nextNullIndex = ByteBufferHelper.getInt(nullsBuffer) + } + out.position(out.position + 8) + } + pos += 1 + } + out.rewind() + nullsBuffer.rewind() + (out, nullsBuffer) + case _ => throw new IllegalStateException("Not supported type in DictionaryEncoding.") + } + } } } @@ -368,6 +601,42 @@ private[columnar] case object BooleanBitSet extends CompressionScheme { } override def hasNext: Boolean = visited < count + + override def decompress(capacity: Int): (ByteBuffer, ByteBuffer) = { + val countLocal = count + var currentWordLocal: Long = 0 + var visitedLocal: Int = 0 + val out = ByteBuffer.allocate(capacity).order(ByteOrder.nativeOrder()) + val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder()) + nullsBuffer.rewind() + val nullCount = ByteBufferHelper.getInt(nullsBuffer) + var nextNullIndex = if (nullCount > 0) ByteBufferHelper.getInt(nullsBuffer) else -1 + var pos = 0 + var seenNulls = 0 + + while (visitedLocal < countLocal) { + if (pos != nextNullIndex) { + val bit = visitedLocal % BITS_PER_LONG + + visitedLocal += 1 + if (bit == 0) { + currentWordLocal = ByteBufferHelper.getLong(buffer) + } + + out.put(if (((currentWordLocal >> bit) & 1) != 0) 1: Byte else 0: Byte) + } else { + seenNulls += 1 + if (seenNulls < nullCount) { + nextNullIndex = ByteBufferHelper.getInt(nullsBuffer) + } + out.position(out.position + 1) + } + pos += 1 + } + out.rewind() + nullsBuffer.rewind() + (out, nullsBuffer) + } } } @@ -448,6 +717,37 @@ private[columnar] case object IntDelta extends CompressionScheme { prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getInt(buffer) row.setInt(ordinal, prev) } + + override def decompress(capacity: Int): (ByteBuffer, ByteBuffer) = { + var prevLocal: Int = 0 + val out = ByteBuffer.allocate(capacity * 4).order(ByteOrder.nativeOrder()) + val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder()) + nullsBuffer.rewind() + val nullCount = ByteBufferHelper.getInt(nullsBuffer) + var nextNullIndex = if (nullCount > 0) ByteBufferHelper.getInt(nullsBuffer) else -1 + var pos = 0 + var seenNulls = 0 + + while (buffer.hasRemaining) { + if (pos != nextNullIndex) { + val delta = buffer.get + prevLocal = if (delta > Byte.MinValue) { prevLocal + delta } else + { ByteBufferHelper.getInt(buffer) } + val p = out.position + ByteBufferHelper.putInt(out, prevLocal) + } else { + seenNulls += 1 + if (seenNulls < nullCount) { + nextNullIndex = ByteBufferHelper.getInt(nullsBuffer) + } + out.position(out.position + 4) + } + pos += 1 + } + out.rewind() + nullsBuffer.rewind() + (out, nullsBuffer) + } } } @@ -528,5 +828,35 @@ private[columnar] case object LongDelta extends CompressionScheme { prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getLong(buffer) row.setLong(ordinal, prev) } + + override def decompress(capacity: Int): (ByteBuffer, ByteBuffer) = { + var prevLocal: Long = 0 + val out = ByteBuffer.allocate(capacity * 8).order(ByteOrder.nativeOrder()) + val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder()) + nullsBuffer.rewind + val nullCount = ByteBufferHelper.getInt(nullsBuffer) + var nextNullIndex = if (nullCount > 0) ByteBufferHelper.getInt(nullsBuffer) else -1 + var pos = 0 + var seenNulls = 0 + + while (buffer.hasRemaining) { + if (pos != nextNullIndex) { + val delta = buffer.get() + prevLocal = if (delta > Byte.MinValue) { prevLocal + delta } else + { ByteBufferHelper.getLong(buffer) } + ByteBufferHelper.putLong(out, prevLocal) + } else { + seenNulls += 1 + if (seenNulls < nullCount) { + nextNullIndex = ByteBufferHelper.getInt(nullsBuffer) + } + out.position(out.position + 8) + } + pos += 1 + } + out.rewind() + nullsBuffer.rewind() + (out, nullsBuffer) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameCacheBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameCacheBenchmark.scala new file mode 100644 index 000000000000..fd1ee8618e8d --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameCacheBenchmark.scala @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql + +import java.util.Random + +import scala.util.Try + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql._ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Benchmark + +/** + * Benchmark to measure performance of columnar storage for dataframe cache. + * To run this: + * spark-submit --class + */ +object DataFrameCacheBenchmark { + val conf = new SparkConf() + val sc = new SparkContext("local[1]", "test-sql-context", conf) + val sqlContext = new SQLContext(sc) + import sqlContext.implicits._ + + // Set default configs. Individual cases will change them if necessary. + sqlContext.conf.setConfString(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { + val (keys, values) = pairs.unzip + val currentValues = keys.map(key => Try(sqlContext.conf.getConfString(key)).toOption) + (keys, values).zipped.foreach(sqlContext.conf.setConfString) + try f finally { + keys.zip(currentValues).foreach { + case (key, Some(value)) => sqlContext.conf.setConfString(key, value) + case (key, None) => sqlContext.conf.unsetConf(key) + } + } + } + + def intSumBenchmark(values: Int, iters: Int = 5): Unit = { + val suites = Seq(("InternalRow", "false"), ("ColumnVector", "true")) + + val benchmarkPT = new Benchmark("Int Sum with PassThrough cache", values, iters) + val rand1 = new Random(511) + val dfPassThrough = sc.parallelize(0 to values - 1, 1).map(i => rand1.nextInt()).toDF.cache() + dfPassThrough.count() // force to create df.cache() + suites.foreach { + case (str, value) => + benchmarkPT.addCase(s"$str codegen") { iter => + withSQLConf(SQLConf. COLUMN_VECTOR_CODEGEN.key -> value) { + dfPassThrough.agg(sum("value")).collect + } + } + } + + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_66-b17 on Linux 2.6.32-504.el6.x86_64 + Intel(R) Xeon(R) CPU E5-2667 v2 @ 3.30GHz + Int Sum with PassThrough cache: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + InternalRow codegen 462 / 466 68.1 14.7 1.0X + ColumnVector codegen 94 / 100 336.3 3.0 4.9X + */ + + benchmarkPT.run() + dfPassThrough.unpersist(true) + System.gc() + + val benchmarkRL = new Benchmark("Int Sum with RunLength cache", values, iters) + val dfRunLength = sc.parallelize(0 to values - 1, 1) + .map(i => (i, (i / 1024).toInt)).toDF("k", "v").cache() + dfRunLength.count() // force to create df.cache() + suites.foreach { + case (str, value) => + benchmarkRL.addCase(s"$str codegen") { iter => + withSQLConf(SQLConf. COLUMN_VECTOR_CODEGEN.key -> value) { + dfRunLength.agg(sum("v")).collect() + } + } + } + + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_66-b17 on Linux 2.6.32-504.el6.x86_64 + Intel(R) Xeon(R) CPU E5-2667 v2 @ 3.30GHz + Int Sum with RunLength cache: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + InternalRow codegen 492 / 553 63.9 15.7 1.0X + ColumnVector codegen 175 / 180 179.5 5.6 2.8X + */ + + benchmarkRL.run() + dfRunLength.unpersist(true) + System.gc() + + val rand2 = new Random(42) + val benchmarkDIC = new Benchmark("Int Sum with Dictionary cache", values, iters) + val dfDictionary = sc.parallelize(0 to values - 1, 1) + .map(i => (i, (rand2.nextInt() % 1023))).toDF("k", "v").cache() + dfDictionary.count() // force to create df.cache() + suites.foreach { + case (str, value) => + benchmarkDIC.addCase(s"$str codegen") { iter => + withSQLConf(SQLConf. COLUMN_VECTOR_CODEGEN.key -> value) { + dfDictionary.agg(sum("v")).collect() + } + } + } + + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_66-b17 on Linux 2.6.32-504.el6.x86_64 + Intel(R) Xeon(R) CPU E5-2667 v2 @ 3.30GHz + Int Sum with Dictionary cache: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + InternalRow codegen 599 / 610 52.5 19.0 1.0X + ColumnVector codegen 320 / 327 98.3 10.2 1.9X + */ + + benchmarkDIC.run() + dfDictionary.unpersist(true) + System.gc() + + val benchmarkIDelta = new Benchmark("Int Sum with Delta cache", values, iters) + val dfIntDelta = sc.parallelize(0 to values - 1, 1) + .map(i => (i, i)).toDF("k", "v").cache() + dfIntDelta.count() // force to create df.cache() + suites.foreach { + case (str, value) => + benchmarkIDelta.addCase(s"$str codegen") { iter => + withSQLConf(SQLConf. COLUMN_VECTOR_CODEGEN.key -> value) { + dfIntDelta.agg(sum("v")).collect() + } + } + } + + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_66-b17 on Linux 2.6.32-504.el6.x86_64 + Intel(R) Xeon(R) CPU E5-2667 v2 @ 3.30GHz + Int Sum with Delta cache: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + InternalRow codegen 467 / 512 67.4 14.8 1.0X + ColumnVector codegen 247 / 250 127.4 7.8 1.9X + */ + + benchmarkIDelta.run() + dfIntDelta.unpersist(true) + System.gc() + } + + def longSumBenchmark(values: Int, iters: Int = 5): Unit = { + val suites = Seq(("InternalRow", "false"), ("ColumnVector", "true")) + + val benchmarkPT = new Benchmark("Long Sum with PassThrough cache", values, iters) + val rand1 = new Random(511) + val dfPassThrough = sc.parallelize(0 to values - 1, 1).map(i => rand1.nextLong()).toDF().cache() + dfPassThrough.count() // force to create df.cache() + suites.foreach { + case (str, value) => + benchmarkPT.addCase(s"$str codegen") { iter => + withSQLConf(SQLConf. COLUMN_VECTOR_CODEGEN.key -> value) { + dfPassThrough.agg(sum("value")).collect + } + } + } + + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_66-b17 on Linux 2.6.32-504.el6.x86_64 + Intel(R) Xeon(R) CPU E5-2667 v2 @ 3.30GHz + Long Sum with PassThrough cache: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + InternalRow codegen 382 / 420 41.2 24.3 1.0X + ColumnVector codegen 89 / 101 176.2 5.7 4.3X + */ + + benchmarkPT.run() + dfPassThrough.unpersist(true) + System.gc() + } + + def floatSumBenchmark(values: Int, iters: Int = 5): Unit = { + val suites = Seq(("InternalRow", "false"), ("ColumnVector", "true")) + + val benchmarkPT = new Benchmark("Float Sum with PassThrough cache", values, iters) + val rand1 = new Random(511) + val dfPassThrough = sc.parallelize(0 to values - 1, 1) + .map(i => rand1.nextFloat()).toDF().cache() + dfPassThrough.count() // force to create df.cache() + suites.foreach { + case (str, value) => + benchmarkPT.addCase(s"$str codegen") { iter => + withSQLConf(SQLConf. COLUMN_VECTOR_CODEGEN.key -> value) { + dfPassThrough.agg(sum("value")).collect + } + } + } + + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_66-b17 on Linux 2.6.32-504.el6.x86_64 + Intel(R) Xeon(R) CPU E5-2667 v2 @ 3.30GHz + Float Sum with PassThrough cache: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + InternalRow codegen 476 / 483 66.1 15.1 1.0X + ColumnVector codegen 91 / 103 343.8 2.9 5.2X + */ + + benchmarkPT.run() + dfPassThrough.unpersist(true) + System.gc() + } + + def doubleSumBenchmark(values: Int, iters: Int = 5): Unit = { + val suites = Seq(("InternalRow", "false"), ("ColumnVector", "true")) + + val benchmarkPT = new Benchmark("Double Sum with PassThrough cache", values, iters) + val rand1 = new Random(511) + val dfPassThrough = sc.parallelize(0 to values - 1, 1) + .map(i => rand1.nextDouble()).toDF().cache() + dfPassThrough.count() // force to create df.cache() + suites.foreach { + case (str, value) => + benchmarkPT.addCase(s"$str codegen") { iter => + withSQLConf(SQLConf. COLUMN_VECTOR_CODEGEN.key -> value) { + dfPassThrough.agg(sum("value")).collect + } + } + } + + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_66-b17 on Linux 2.6.32-504.el6.x86_64 + Intel(R) Xeon(R) CPU E5-2667 v2 @ 3.30GHz + Double Sum with PassThrough cache: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + InternalRow codegen 290 / 306 54.3 18.4 1.0X + ColumnVector codegen 95 / 101 165.7 6.0 3.1X + */ + + benchmarkPT.run() + dfPassThrough.unpersist(true) + System.gc() + } + + def main(args: Array[String]): Unit = { + longSumBenchmark(1024 * 1024 * 15) + doubleSumBenchmark(1024 * 1024 * 15) + floatSumBenchmark(1024 * 1024 * 30) + intSumBenchmark(1024 * 1024 * 30) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala index f67e9c7dae27..08390d6cdb15 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala @@ -85,6 +85,37 @@ class BooleanBitSetSuite extends SparkFunSuite { assert(!decoder.hasNext) } + def skeletonForDecompress(count: Int) { + + val builder = TestCompressibleColumnBuilder(new NoopColumnStats, BOOLEAN, BooleanBitSet) + val rows = Seq.fill[InternalRow](count)(makeRandomRow(BOOLEAN)) + val values = rows.map(_.getBoolean(0)) + + rows.foreach(builder.appendFrom(_, 0)) + val buffer = builder.build() + + // ---------------- + // Tests decompress + // ---------------- + + // Rewinds, skips column header and 4 more bytes for compression scheme ID + val headerSize = CompressionScheme.columnHeaderSize(buffer) + buffer.position(headerSize) + assertResult(BooleanBitSet.typeId, "Wrong compression scheme ID")(buffer.getInt()) + + val decoder = BooleanBitSet.decoder(buffer, BOOLEAN) + val (decodeBuffer, nullsBuffer) = decoder.decompress(values.length) + + if (values.nonEmpty) { + values.zipWithIndex.foreach { case (b: Boolean, index: Int) => + assertResult(b, s"Wrong ${index}-th decoded boolean value") { + if (decodeBuffer.get() == 1) true else false + } + } + } + assert(!decodeBuffer.hasRemaining) + } + test(s"$BooleanBitSet: empty") { skeleton(0) } @@ -104,4 +135,24 @@ class BooleanBitSetSuite extends SparkFunSuite { test(s"$BooleanBitSet: multiple words and 1 more bit") { skeleton(BITS_PER_LONG * 2 + 1) } + + test(s"$BooleanBitSet: empty for decompression()") { + skeletonForDecompress(0) + } + + test(s"$BooleanBitSet: less than 1 word for decompression()") { + skeletonForDecompress(BITS_PER_LONG - 1) + } + + test(s"$BooleanBitSet: exactly 1 word for decompression()") { + skeletonForDecompress(BITS_PER_LONG) + } + + test(s"$BooleanBitSet: multiple whole words for decompression()") { + skeletonForDecompress(BITS_PER_LONG * 2) + } + + test(s"$BooleanBitSet: multiple words and 1 more bit for decompression()") { + skeletonForDecompress(BITS_PER_LONG * 2 + 1) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala index 830ca0294e1b..84a138f8586e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala @@ -26,13 +26,15 @@ import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ import org.apache.spark.sql.types.AtomicType class DictionaryEncodingSuite extends SparkFunSuite { + val nullValue = -1 testDictionaryEncoding(new IntColumnStats, INT) testDictionaryEncoding(new LongColumnStats, LONG) - testDictionaryEncoding(new StringColumnStats, STRING) + testDictionaryEncoding(new StringColumnStats, STRING, false) def testDictionaryEncoding[T <: AtomicType]( columnStats: ColumnStats, - columnType: NativeColumnType[T]) { + columnType: NativeColumnType[T], + testDecompress: Boolean = true) { val typeName = columnType.getClass.getSimpleName.stripSuffix("$") @@ -113,6 +115,66 @@ class DictionaryEncodingSuite extends SparkFunSuite { } } + def skeletonForDecompress(uniqueValueCount: Int, inputSeq: Seq[Int]) { + if (!testDecompress) return + val builder = TestCompressibleColumnBuilder(columnStats, columnType, DictionaryEncoding) + val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, uniqueValueCount) + val dictValues = stableDistinct(inputSeq) + + val nullRow = new GenericMutableRow(1) + nullRow.setNullAt(0) + inputSeq.foreach { i => + if (i == nullValue) { + builder.appendFrom(nullRow, 0) + } else { + builder.appendFrom(rows(i), 0) + } + } + val buffer = builder.build() + + // ---------------- + // Tests decompress + // ---------------- + // Rewinds, skips column header and 4 more bytes for compression scheme ID + val headerSize = CompressionScheme.columnHeaderSize(buffer) + buffer.position(headerSize) + assertResult(DictionaryEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt()) + + val decoder = DictionaryEncoding.decoder(buffer, columnType) + val (decodeBuffer, nullsBuffer) = decoder.decompress(inputSeq.length) + + if (inputSeq.nonEmpty) { + val numNulls = ByteBufferHelper.getInt(nullsBuffer) + var cntNulls = 0 + var nullPos = if (numNulls == 0) -1 else ByteBufferHelper.getInt(nullsBuffer) + inputSeq.zipWithIndex.foreach { case (i: Any, index: Int) => + if (i == nullValue) { + assertResult(index, "Wrong null position") { + nullPos + } + decodeBuffer.position(decodeBuffer.position + columnType.defaultSize) + cntNulls += 1 + if (cntNulls < numNulls) { + nullPos = ByteBufferHelper.getInt(nullsBuffer) + } + } else { + columnType match { + case INT => + assertResult(values(i), s"Wrong ${index}-th decoded int value") { + ByteBufferHelper.getInt(decodeBuffer) + } + case LONG => + assertResult(values(i), s"Wrong ${index}-th decoded long value") { + ByteBufferHelper.getLong(decodeBuffer) + } + case _ => fail("Unsupported type") + } + } + } + } + assert(!decodeBuffer.hasRemaining) + } + test(s"$DictionaryEncoding with $typeName: empty") { skeleton(0, Seq.empty) } @@ -124,5 +186,19 @@ class DictionaryEncodingSuite extends SparkFunSuite { test(s"$DictionaryEncoding with $typeName: dictionary overflow") { skeleton(DictionaryEncoding.MAX_DICT_SIZE + 1, 0 to DictionaryEncoding.MAX_DICT_SIZE) } + + test(s"$DictionaryEncoding with $typeName: empty for decompress()") { + skeletonForDecompress(0, Seq.empty) + } + + test(s"$DictionaryEncoding with $typeName: simple case for decompress()") { + skeletonForDecompress(2, Seq(0, nullValue, 0, nullValue)) + } + + test(s"$DictionaryEncoding with $typeName: dictionary overflow for decompress()") { + skeletonForDecompress(DictionaryEncoding.MAX_DICT_SIZE + 2, + Seq(nullValue) ++ (0 to DictionaryEncoding.MAX_DICT_SIZE - 1) ++ Seq(nullValue)) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala index 988a577a7b4d..405f2e0463da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ import org.apache.spark.sql.types.IntegralType class IntegralDeltaSuite extends SparkFunSuite { + val nullValue = -1 testIntegralDelta(new IntColumnStats, INT, IntDelta) testIntegralDelta(new LongColumnStats, LONG, LongDelta) @@ -109,6 +110,61 @@ class IntegralDeltaSuite extends SparkFunSuite { assert(!decoder.hasNext) } + def skeletonForDecompress(input: Seq[I#InternalType]) { + val builder = TestCompressibleColumnBuilder(columnStats, columnType, scheme) + val row = new GenericMutableRow(1) + val nullRow = new GenericMutableRow(1) + nullRow.setNullAt(0) + input.map { value => + if (value == nullValue) { + builder.appendFrom(nullRow, 0) + } else { + columnType.setField(row, 0, value) + builder.appendFrom(row, 0) + } + } + val buffer = builder.build() + + // ---------------- + // Tests decompress + // ---------------- + // Rewinds, skips column header and 4 more bytes for compression scheme ID + val headerSize = CompressionScheme.columnHeaderSize(buffer) + buffer.position(headerSize) + assertResult(scheme.typeId, "Wrong compression scheme ID")(buffer.getInt()) + + val decoder = scheme.decoder(buffer, columnType) + val (decodeBuffer, nullsBuffer) = decoder.decompress(input.length) + + if (input.nonEmpty) { + val numNulls = ByteBufferHelper.getInt(nullsBuffer) + var cntNulls = 0 + var nullPos = if (numNulls == 0) -1 else ByteBufferHelper.getInt(nullsBuffer) + input.zipWithIndex.foreach { + case (expected: Any, index: Int) if expected == nullValue => + assertResult(index, "Wrong null position") { + nullPos + } + decodeBuffer.position(decodeBuffer.position + columnType.defaultSize) + cntNulls += 1 + if (cntNulls < numNulls) { + nullPos = ByteBufferHelper.getInt(nullsBuffer) + } + case (expected: Int, index: Int) => + assertResult(expected, s"Wrong ${index}-th decoded int value") { + ByteBufferHelper.getInt(decodeBuffer) + } + case (expected: Long, index: Int) => + assertResult(expected, s"Wrong ${index}-th decoded long value") { + ByteBufferHelper.getLong(decodeBuffer) + } + case _ => + fail("Unsupported type") + } + } + assert(!decodeBuffer.hasRemaining) + } + test(s"$scheme: empty column") { skeleton(Seq.empty) } @@ -127,5 +183,27 @@ class IntegralDeltaSuite extends SparkFunSuite { val input = Array.fill[Any](10000)(makeRandomValue(columnType)) skeleton(input.map(_.asInstanceOf[I#InternalType])) } + + test(s"$scheme: empty column for decompress()") { + skeletonForDecompress(Seq.empty) + } + + test(s"$scheme: simple case for decompress()") { + val input = columnType match { + case INT => Seq(2: Int, 1: Int, 2: Int, 130: Int) + case LONG => Seq(2: Long, 1: Long, 2: Long, 130: Long) + } + + skeletonForDecompress(input.map(_.asInstanceOf[I#InternalType])) + } + + test(s"$scheme: simple case with null for decompress()") { + val input = columnType match { + case INT => Seq(2: Int, 1: Int, 2: Int, nullValue: Int, 5: Int) + case LONG => Seq(2: Long, 1: Long, 2: Long, nullValue: Long, 5: Long) + } + + skeletonForDecompress(input.map(_.asInstanceOf[I#InternalType])) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughSuite.scala new file mode 100644 index 000000000000..c9a9da9b132b --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughSuite.scala @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar.compression + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.expressions.GenericMutableRow +import org.apache.spark.sql.execution.columnar._ +import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ +import org.apache.spark.sql.types.{AtomicType, IntegralType} + +class PassThroughSuite extends SparkFunSuite { + val nullValue = -1 + testPassThrough(new ByteColumnStats, BYTE) + testPassThrough(new ShortColumnStats, SHORT) + testPassThrough(new IntColumnStats, INT) + testPassThrough(new LongColumnStats, LONG) + testPassThrough(new FloatColumnStats, FLOAT) + testPassThrough(new DoubleColumnStats, DOUBLE) + + def testPassThrough[T <: AtomicType]( + columnStats: ColumnStats, + columnType: NativeColumnType[T]) { + + val typeName = columnType.getClass.getSimpleName.stripSuffix("$") + + def skeleton(input: Seq[T#InternalType]) { + // ------------- + // Tests encoder + // ------------- + + val builder = TestCompressibleColumnBuilder(columnStats, columnType, PassThrough) + + input.map { value => + val row = new GenericMutableRow(1) + columnType.setField(row, 0, value) + builder.appendFrom(row, 0) + } + + val buffer = builder.build() + // Column type ID + null count + null positions + val headerSize = CompressionScheme.columnHeaderSize(buffer) + + // Compression scheme ID + compressed contents + val compressedSize = 4 + input.size * columnType.defaultSize + + // 4 extra bytes for compression scheme type ID + assertResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity) + + buffer.position(headerSize) + assertResult(PassThrough.typeId, "Wrong compression scheme ID")(buffer.getInt()) + + if (input.nonEmpty) { + input.foreach { value => + assertResult(value, "Wrong value")(columnType.extract(buffer)) + } + } + + // ------------- + // Tests decoder + // ------------- + + // Rewinds, skips column header and 4 more bytes for compression scheme ID + buffer.rewind().position(headerSize + 4) + + val decoder = PassThrough.decoder(buffer, columnType) + val mutableRow = new GenericMutableRow(1) + + if (input.nonEmpty) { + input.foreach{ + assert(decoder.hasNext) + assertResult(_, "Wrong decoded value") { + decoder.next(mutableRow, 0) + columnType.getField(mutableRow, 0) + } + } + } + assert(!decoder.hasNext) + } + + def skeletonForDecompress(input: Seq[T#InternalType]) { + val builder = TestCompressibleColumnBuilder(columnStats, columnType, PassThrough) + val row = new GenericMutableRow(1) + val nullRow = new GenericMutableRow(1) + nullRow.setNullAt(0) + input.map { value => + if (value == nullValue) { + builder.appendFrom(nullRow, 0) + } else { + columnType.setField(row, 0, value) + builder.appendFrom(row, 0) + } + } + val buffer = builder.build() + + // ---------------- + // Tests decompress + // ---------------- + // Rewinds, skips column header and 4 more bytes for compression scheme ID + val headerSize = CompressionScheme.columnHeaderSize(buffer) + buffer.position(headerSize) + assertResult(PassThrough.typeId, "Wrong compression scheme ID")(buffer.getInt()) + + val decoder = PassThrough.decoder(buffer, columnType) + val (decodeBuffer, nullsBuffer) = decoder.decompress(input.length) + + if (input.nonEmpty) { + val numNulls = ByteBufferHelper.getInt(nullsBuffer) + var cntNulls = 0 + var nullPos = if (numNulls == 0) -1 else ByteBufferHelper.getInt(nullsBuffer) + input.zipWithIndex.foreach { + case (expected: Any, index: Int) if expected == nullValue => + assertResult(index, "Wrong null position") { + nullPos + } + decodeBuffer.position(decodeBuffer.position + columnType.defaultSize) + cntNulls += 1 + if (cntNulls < numNulls) { + nullPos = ByteBufferHelper.getInt(nullsBuffer) + } + case (expected: Byte, index: Int) => + assertResult(expected, s"Wrong ${index}-th decoded byte value") { + decodeBuffer.get() + } + case (expected: Short, index: Int) => + assertResult(expected, s"Wrong ${index}-th decoded short value") { + ByteBufferHelper.getShort(decodeBuffer) + } + case (expected: Int, index: Int) => + assertResult(expected, s"Wrong ${index}-th decoded int value") { + ByteBufferHelper.getInt(decodeBuffer) + } + case (expected: Long, index: Int) => + assertResult(expected, s"Wrong ${index}-th decoded long value") { + ByteBufferHelper.getLong(decodeBuffer) + } + case (expected: Float, index: Int) => + assertResult(expected, s"Wrong ${index}-th decoded float value") { + ByteBufferHelper.getFloat(decodeBuffer) + } + case (expected: Double, index: Int) => + assertResult(expected, s"Wrong ${index}-th decoded double value") { + ByteBufferHelper.getDouble(decodeBuffer) + } + case _ => fail("Unsupported type") + } + } + assert(!decodeBuffer.hasRemaining) + } + + test(s"$PassThrough with $typeName: empty column") { + skeleton(Seq.empty) + } + + test(s"$PassThrough with $typeName: long random series") { + val input = Array.fill[Any](10000)(makeRandomValue(columnType)) + skeleton(input.map(_.asInstanceOf[T#InternalType])) + } + + test(s"$PassThrough with $typeName: empty column for decompress()") { + skeletonForDecompress(Seq.empty) + } + + test(s"$PassThrough with $typeName: long random series for decompress()") { + val input = Array.fill[Any](10000)(makeRandomValue(columnType)) + skeletonForDecompress(input.map(_.asInstanceOf[T#InternalType])) + } + + test(s"$PassThrough with $typeName: simple case with null for decompress()") { + val input = columnType match { + case BYTE => Seq(2: Byte, 1: Byte, 2: Byte, nullValue.toByte: Byte, 5: Byte) + case SHORT => Seq(2: Short, 1: Short, 2: Short, nullValue.toShort: Short, 5: Short) + case INT => Seq(2: Int, 1: Int, 2: Int, nullValue: Int, 5: Int) + case LONG => Seq(2: Long, 1: Long, 2: Long, nullValue: Long, 5: Long) + case FLOAT => Seq(2: Float, 1: Float, 2: Float, nullValue: Float, 5: Float) + case DOUBLE => Seq(2: Double, 1: Double, 2: Double, nullValue: Double, 5: Double) + } + + skeletonForDecompress(input.map(_.asInstanceOf[T#InternalType])) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala index 95642e93ae9f..86ef34f3cd83 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala @@ -24,16 +24,18 @@ import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ import org.apache.spark.sql.types.AtomicType class RunLengthEncodingSuite extends SparkFunSuite { + val nullValue = -1 testRunLengthEncoding(new NoopColumnStats, BOOLEAN) testRunLengthEncoding(new ByteColumnStats, BYTE) testRunLengthEncoding(new ShortColumnStats, SHORT) testRunLengthEncoding(new IntColumnStats, INT) testRunLengthEncoding(new LongColumnStats, LONG) - testRunLengthEncoding(new StringColumnStats, STRING) + testRunLengthEncoding(new StringColumnStats, STRING, false) def testRunLengthEncoding[T <: AtomicType]( columnStats: ColumnStats, - columnType: NativeColumnType[T]) { + columnType: NativeColumnType[T], + testDecompress: Boolean = true) { val typeName = columnType.getClass.getSimpleName.stripSuffix("$") @@ -95,6 +97,80 @@ class RunLengthEncodingSuite extends SparkFunSuite { assert(!decoder.hasNext) } + def skeletonForDecompress(uniqueValueCount: Int, inputRuns: Seq[(Int, Int)]) { + if (!testDecompress) return + val builder = TestCompressibleColumnBuilder(columnStats, columnType, RunLengthEncoding) + val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, uniqueValueCount) + val inputSeq = inputRuns.flatMap { case (index, run) => + Seq.fill(run)(index) + } + + val nullRow = new GenericMutableRow(1) + nullRow.setNullAt(0) + inputSeq.foreach { i => + if (i == nullValue) { + builder.appendFrom(nullRow, 0) + } else { + builder.appendFrom(rows(i), 0) + } + } + val buffer = builder.build() + + // ---------------- + // Tests decompress + // ---------------- + // Rewinds, skips column header and 4 more bytes for compression scheme ID + val headerSize = CompressionScheme.columnHeaderSize(buffer) + buffer.position(headerSize) + assertResult(RunLengthEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt()) + + val decoder = RunLengthEncoding.decoder(buffer, columnType) + val (decodeBuffer, nullsBuffer) = decoder.decompress(inputSeq.length) + + if (inputSeq.nonEmpty) { + val numNulls = ByteBufferHelper.getInt(nullsBuffer) + var cntNulls = 0 + var nullPos = if (numNulls == 0) -1 else ByteBufferHelper.getInt(nullsBuffer) + inputSeq.zipWithIndex.foreach { + case (expected: Any, index: Int) if expected == nullValue => + assertResult(index, "Wrong null position") { + nullPos + } + decodeBuffer.position(decodeBuffer.position + columnType.defaultSize) + cntNulls += 1 + if (cntNulls < numNulls) { + nullPos = ByteBufferHelper.getInt(nullsBuffer) + } + case (i: Int, index: Int) => + columnType match { + case BOOLEAN => + assertResult(values(i), s"Wrong ${index}-th decoded boolean value") { + if (decodeBuffer.get() == 1) true else false + } + case BYTE => + assertResult(values(i), s"Wrong ${index}-th decoded byte value") { + decodeBuffer.get() + } + case SHORT => + assertResult(values(i), s"Wrong ${index}-th decoded short value") { + ByteBufferHelper.getShort(decodeBuffer) + } + case INT => + assertResult(values(i), s"Wrong ${index}-th decoded int value") { + ByteBufferHelper.getInt(decodeBuffer) + } + case LONG => + assertResult(values(i), s"Wrong ${index}-th decoded long value") { + ByteBufferHelper.getLong(decodeBuffer) + } + case _ => fail("Unsupported type") + } + case _ => fail("Unsupported type") + } + } + assert(!decodeBuffer.hasRemaining) + } + test(s"$RunLengthEncoding with $typeName: empty column") { skeleton(0, Seq.empty) } @@ -110,5 +186,21 @@ class RunLengthEncodingSuite extends SparkFunSuite { test(s"$RunLengthEncoding with $typeName: single long run") { skeleton(1, Seq(0 -> 1000)) } + + test(s"$RunLengthEncoding with $typeName: empty column for decompress()") { + skeletonForDecompress(0, Seq.empty) + } + + test(s"$RunLengthEncoding with $typeName: simple case for decompress()") { + skeletonForDecompress(2, Seq(0 -> 2, 1 -> 2)) + } + + test(s"$RunLengthEncoding with $typeName: single long run for decompress()") { + skeletonForDecompress(1, Seq(0 -> 1000)) + } + + test(s"$RunLengthEncoding with $typeName: single case with null for decompress()") { + skeletonForDecompress(2, Seq(0 -> 2, nullValue -> 2)) + } } }