diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala index fe3000e4f..ff6432654 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala @@ -64,7 +64,6 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode { case d: DecimalType => case d: TimestampType => case d: BinaryType => - case d: ArrayType => ConverterUtils.checkIfTypeSupported(d.elementType) case _ => throw new UnsupportedOperationException(s"${field.dataType} " + s"is not supported in ArrowRowToColumnarExec.") diff --git a/native-sql-engine/core/src/test/scala/com/intel/oap/execution/ArrowRowToColumnarExecSuite.scala b/native-sql-engine/core/src/test/scala/com/intel/oap/execution/ArrowRowToColumnarExecSuite.scala deleted file mode 100644 index 0fd898c50..000000000 --- a/native-sql-engine/core/src/test/scala/com/intel/oap/execution/ArrowRowToColumnarExecSuite.scala +++ /dev/null @@ -1,359 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.intel.oap.execution - -import java.time.ZoneId - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ListBuffer - -import com.intel.oap.expression.ConverterUtils -import com.intel.oap.vectorized.ArrowRowToColumnarJniWrapper -import org.apache.arrow.dataset.jni.UnsafeRecordBatchSerializer -import org.apache.arrow.memory.ArrowBuf -import org.apache.arrow.vector.types.pojo.{Field, Schema} - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow} -import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData} -import org.apache.spark.sql.execution.datasources.v2.arrow.{SparkMemoryUtils, SparkSchemaUtils} -import org.apache.spark.sql.execution.vectorized.WritableColumnVector -import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{ArrayType, BooleanType, ByteType, DataType, DateType, Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, StructField, StructType, TimestampType} -import org.apache.spark.sql.util.ArrowUtils -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} -import org.apache.spark.unsafe.Platform -import org.apache.spark.unsafe.types.UTF8String - -class ArrowRowToColumnarExecSuite extends SharedSparkSession { - - test("ArrowRowToColumnarExec: Boolean type with array list") { - val converter: UnsafeProjection = - UnsafeProjection.create(Array[DataType](ArrayType(BooleanType))) - val schema = StructType(Seq(StructField("boolean type with array", ArrayType(BooleanType)))) - val rowIterator = (0 until 2).map { i => - converter.apply(InternalRow(new GenericArrayData(Seq(true, false)))) - }.toIterator - - val cb = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) - val convert_rowIterator = cb.rowIterator - - var rowId = 0 - while (rowId < cb.numRows()) { - val row = convert_rowIterator.next() - rowId += 1 - val array = row.getArray(0) - assert(true == array.getBoolean(0)) - assert(false == array.getBoolean(1)) - } - } - - test("ArrowRowToColumnarExec: Byte type with array list") { - val converter: UnsafeProjection = - UnsafeProjection.create(Array[DataType](ArrayType(ByteType))) - val schema = StructType(Seq(StructField("boolean type with array", ArrayType(ByteType)))) - val rowIterator = (0 until 2).map { i => - converter.apply(InternalRow(new GenericArrayData(Seq(1.toByte, 2.toByte)))) - }.toIterator - - val cb = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) - val convert_rowIterator = cb.rowIterator - - var rowId = 0 - while (rowId < cb.numRows()) { - val row = convert_rowIterator.next() - rowId += 1 - val array = row.getArray(0) - assert(1.toByte == array.getByte(0)) - assert(2.toByte == array.getByte(1)) - } - } - - test("ArrowRowToColumnarExec: Short type with array list") { - val converter: UnsafeProjection = - UnsafeProjection.create(Array[DataType](ArrayType(ShortType))) - val schema = StructType(Seq(StructField("short type with array", ArrayType(ShortType)))) - val rowIterator = (0 until 2).map { i => - converter.apply(InternalRow(new GenericArrayData(Seq(1.toShort, 2.toShort)))) - }.toIterator - - val cb = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) - val convert_rowIterator = cb.rowIterator - - var rowId = 0 - while (rowId < cb.numRows()) { - val row = convert_rowIterator.next() - rowId += 1 - val array = row.getArray(0) - assert(1.toShort == array.getShort(0)) - assert(2.toShort == array.getShort(1)) - } - } - - test("ArrowRowToColumnarExec: Int type with array list") { - val converter: UnsafeProjection = - UnsafeProjection.create(Array[DataType](ArrayType(IntegerType))) - val schema = StructType(Seq(StructField("Int type with array", ArrayType(IntegerType)))) - val rowIterator = (0 until 2).map { i => - converter.apply(InternalRow(new GenericArrayData(Seq(-10, -20)))) - }.toIterator - - val cb = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) - val convert_rowIterator = cb.rowIterator - - var rowId = 0 - while (rowId < cb.numRows()) { - val row = convert_rowIterator.next() - rowId += 1 - val array = row.getArray(0) - assert(-10 == array.getInt(0)) - assert(-20 == array.getInt(1)) - } - } - - test("ArrowRowToColumnarExec: Long type with array list") { - val converter: UnsafeProjection = - UnsafeProjection.create(Array[DataType](ArrayType(LongType))) - val schema = StructType(Seq(StructField("Long type with array", ArrayType(LongType)))) - val rowIterator = (0 until 2).map { i => - converter.apply(InternalRow(new GenericArrayData(Seq(1.toLong, 2.toLong)))) - }.toIterator - - val cb = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) - val convert_rowIterator = cb.rowIterator - - var rowId = 0 - while (rowId < cb.numRows()) { - val row = convert_rowIterator.next() - rowId += 1 - val array = row.getArray(0) - assert(1.toLong == array.getLong(0)) - assert(2.toLong == array.getLong(1)) - } - } - - test("ArrowRowToColumnarExec: Float type with array list") { - val converter: UnsafeProjection = - UnsafeProjection.create(Array[DataType](ArrayType(FloatType))) - val schema = StructType(Seq(StructField("Float type with array", ArrayType(FloatType)))) - val rowIterator = (0 until 2).map { i => - converter.apply(InternalRow(new GenericArrayData(Seq(1.toFloat, 2.toFloat)))) - }.toIterator - - val cb = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) - val convert_rowIterator = cb.rowIterator - - var rowId = 0 - while (rowId < cb.numRows()) { - val row = convert_rowIterator.next() - rowId += 1 - val array = row.getArray(0) - assert(1.toFloat == array.getFloat(0)) - assert(2.toFloat == array.getFloat(1)) - } - } - - test("ArrowRowToColumnarExec: Double type with array list") { - val converter: UnsafeProjection = - UnsafeProjection.create(Array[DataType](ArrayType(DoubleType))) - val schema = StructType(Seq(StructField("Double type with array", ArrayType(DoubleType)))) - val rowIterator = (0 until 2).map { i => - converter.apply(InternalRow(new GenericArrayData(Seq(1.toDouble, 2.toDouble)))) - }.toIterator - - val cb = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) - val convert_rowIterator = cb.rowIterator - - var rowId = 0 - while (rowId < cb.numRows()) { - val row = convert_rowIterator.next() - rowId += 1 - val array = row.getArray(0) - assert(1.toDouble == array.getDouble(0)) - assert(2.toDouble == array.getDouble(1)) - } - } - - test("ArrowRowToColumnarExec: String type with array list") { - val converter: UnsafeProjection = - UnsafeProjection.create(Array[DataType](ArrayType(StringType))) - val schema = StructType(Seq(StructField("String type with array", ArrayType(StringType)))) - val rowIterator = (0 until 2).map { i => - converter.apply(InternalRow(new GenericArrayData( - Seq(UTF8String.fromString("abc"), UTF8String.fromString("def"))))) - }.toIterator - - val cb = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) - val convert_rowIterator = cb.rowIterator - - var rowId = 0 - while (rowId < cb.numRows()) { - val row = convert_rowIterator.next() - rowId += 1 - val array = row.getArray(0) - assert(UTF8String.fromString("abc") == array.getUTF8String(0)) - assert(UTF8String.fromString("def") == array.getUTF8String(1)) - } - } - - test("ArrowRowToColumnarExec: Decimal type with array list precision <= 18") { - val converter: UnsafeProjection = - UnsafeProjection.create(Array[DataType](ArrayType(DecimalType(10, 4)))) - val schema = StructType( - Seq(StructField("Decimal type with array", ArrayType(DecimalType(10, 4))))) - val rowIterator = (0 until 2).map { i => - converter.apply(InternalRow(new GenericArrayData( - Seq(new Decimal().set(BigDecimal("-1.5645")), new Decimal().set(BigDecimal("-1.8645")))))) - }.toIterator - - val cb = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) - val convert_rowIterator = cb.rowIterator - - var rowId = 0 - while (rowId < cb.numRows()) { - val row = convert_rowIterator.next() - rowId += 1 - val array = row.getArray(0) - assert(new Decimal().set(BigDecimal("-1.5645")) == array.getDecimal(0, 10, 4)) - assert(new Decimal().set(BigDecimal("-1.8645")) == array.getDecimal(1, 10, 4)) - } - } - - test("ArrowRowToColumnarExec: Decimal type with array list precision > 18") { - val converter: UnsafeProjection = - UnsafeProjection.create(Array[DataType](ArrayType(DecimalType(19, 4)))) - val schema = StructType( - Seq(StructField("Decimal type with array", ArrayType(DecimalType(19, 4))))) - val rowIterator = (0 until 2).map { i => - converter.apply(InternalRow(new GenericArrayData( - Seq(new Decimal().set(BigDecimal("1.2457")), new Decimal().set(BigDecimal("1.2457")))))) - }.toIterator - - val cb = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) - val convert_rowIterator = cb.rowIterator - - var rowId = 0 - while (rowId < cb.numRows()) { - val row = convert_rowIterator.next() - rowId += 1 - val array = row.getArray(0) - assert(new Decimal().set(BigDecimal("1.2457")) == array.getDecimal(0, 19, 4)) - assert(new Decimal().set(BigDecimal("1.2457")) == array.getDecimal(1, 19, 4)) - } - } - - test("ArrowRowToColumnarExec: Timestamp type with array list ") { - val converter: UnsafeProjection = - UnsafeProjection.create(Array[DataType](ArrayType(TimestampType))) - val defaultZoneId = ZoneId.systemDefault() - val schema = StructType( - Seq(StructField("Timestamp type with array", ArrayType(TimestampType)))) - val rowIterator: Iterator[InternalRow] = (0 until 2).map { i => - converter.apply(InternalRow(new GenericArrayData( - Seq(DateTimeUtils.stringToTimestamp( - UTF8String.fromString("1970-1-1 00:00:00"), defaultZoneId).get, - DateTimeUtils.stringToTimestamp( - UTF8String.fromString("1970-1-1 00:00:00"), defaultZoneId).get)))) - }.toIterator - - val cb = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) - val convert_rowIterator = cb.rowIterator - - var rowId = 0 - while (rowId < cb.numRows()) { - val row = convert_rowIterator.next() - rowId += 1 - val array = row.getArray(0) - assert(DateTimeUtils.stringToTimestamp( - UTF8String.fromString("1970-1-1 00:00:00"), defaultZoneId).get == - array.get(0, TimestampType).asInstanceOf[Long]) - assert(DateTimeUtils.stringToTimestamp( - UTF8String.fromString("1970-1-1 00:00:00"), defaultZoneId).get == - array.get(1, TimestampType).asInstanceOf[Long]) - } - } - - test("ArrowRowToColumnarExec: Date32 type with array list ") { - val converter: UnsafeProjection = - UnsafeProjection.create(Array[DataType](ArrayType(DateType))) - val defaultZoneId = ZoneId.systemDefault() - val schema: StructType = StructType( - Seq(StructField("Date type with array", ArrayType(DateType)))) - val rowIterator: Iterator[InternalRow] = (0 until 2).map { i => - converter.apply(InternalRow(new GenericArrayData( - Seq(DateTimeUtils.stringToDate(UTF8String.fromString("1970-1-1"), defaultZoneId).get, - DateTimeUtils.stringToDate(UTF8String.fromString("1970-1-1"), defaultZoneId).get)))) - }.toIterator - - val cb: ColumnarBatch = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) - val convert_rowIterator = cb.rowIterator - - var rowId = 0 - while (rowId < cb.numRows()) { - val row = convert_rowIterator.next() - rowId += 1 - val array = row.getArray(0) - assert(DateTimeUtils.stringToDate(UTF8String.fromString("1970-1-1"), defaultZoneId).get == - array.get(0, DateType).asInstanceOf[Int]) - assert(DateTimeUtils.stringToDate(UTF8String.fromString("1970-1-1"), defaultZoneId).get == - array.get(1, DateType).asInstanceOf[Int]) - } - } -} - -object ArrowRowToColumnarExecSuite { - - def serializeSchema(fields: Seq[Field]): Array[Byte] = { - val schema = new Schema(fields.asJava) - ConverterUtils.getSchemaBytesBuf(schema) - } - - def nativeOp(schema: StructType, rowIterator: Iterator[InternalRow]): ColumnarBatch = { - val bufferSize = 1024 // 128M can estimator the buffer size based on the data type - val allocator = SparkMemoryUtils.contextAllocator() - val arrowBuf = allocator.buffer(bufferSize) - - val rowLength = new ListBuffer[Long]() - var rowCount = 0 - var offset = 0 - while (rowIterator.hasNext) { - val row = rowIterator.next() // UnsafeRow - assert(row.isInstanceOf[UnsafeRow]) - val unsafeRow = row.asInstanceOf[UnsafeRow] - val sizeInBytes = unsafeRow.getSizeInBytes - Platform.copyMemory(unsafeRow.getBaseObject, unsafeRow.getBaseOffset, - null, arrowBuf.memoryAddress() + offset, sizeInBytes) - offset += sizeInBytes - rowLength += sizeInBytes.toLong - rowCount += 1 - } - val timeZoneId = SparkSchemaUtils.getLocalTimezoneID() - val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) - val schemaBytes: Array[Byte] = ConverterUtils.getSchemaBytesBuf(arrowSchema) - val jniWrapper = new ArrowRowToColumnarJniWrapper() - val serializedRecordBatch = jniWrapper.nativeConvertRowToColumnar( - schemaBytes, rowLength.toArray, arrowBuf.memoryAddress(), - SparkMemoryUtils.contextMemoryPool().getNativeInstanceId) - val rb = UnsafeRecordBatchSerializer.deserializeUnsafe(allocator, serializedRecordBatch) - val output = ConverterUtils.fromArrowRecordBatch(arrowSchema, rb) - val outputNumRows = rb.getLength - ConverterUtils.releaseArrowRecordBatch(rb) - new ColumnarBatch(output.map(v => v.asInstanceOf[ColumnVector]).toArray, outputNumRows) - } -} - diff --git a/native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc b/native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc index 9f754ce2c..351620631 100644 --- a/native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc +++ b/native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc @@ -26,11 +26,11 @@ namespace sparkcolumnarplugin { namespace rowtocolumnar { -int64_t CalculateBitSetWidthInBytes(int32_t numFields) { +inline int64_t CalculateBitSetWidthInBytes(int32_t numFields) { return ((numFields + 63) / 64) * 8; } -int64_t GetFieldOffset(int64_t nullBitsetWidthInBytes, int32_t index) { +inline int64_t GetFieldOffset(int64_t nullBitsetWidthInBytes, int32_t index) { return nullBitsetWidthInBytes + 8L * index; } @@ -41,948 +41,193 @@ inline bool IsNull(uint8_t* buffer_address, int32_t index) { return (value & mask) != 0; } -int32_t CalculateHeaderPortionInBytes(int32_t num_elements) { +inline int32_t CalculateHeaderPortionInBytes(int32_t num_elements) { return 8 + ((num_elements + 63) / 64) * 8; } -arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num_rows, - int32_t columnar_id, int64_t fieldOffset, - std::vector& offsets, uint8_t* memory_address_, - std::shared_ptr* array, - arrow::MemoryPool* pool, bool support_avx512) { - auto field = schema->field(columnar_id); - auto type = field->type(); +inline arrow::Status CreateArrayData( + int32_t row_start, std::shared_ptr schema, int32_t batch_rows, + std::vector& offsets, uint8_t* memory_address_, arrow::MemoryPool* pool, + bool support_avx512, std::vector& typevec, + std::vector& typewidth, + std::vector>& columns, int num_fields, + std::vector& field_offset_vec) { + for (auto columnar_id = 0; columnar_id < num_fields; columnar_id++) { + auto& array = (columns[columnar_id]); + int64_t fieldOffset = field_offset_vec[columnar_id]; + switch (typevec[columnar_id]) { + case arrow::BooleanType::type_id: { + auto array_data = array->buffers[1]->mutable_data(); + int64_t position = row_start; + int64_t null_count = 0; - switch (type->id()) { - case arrow::BooleanType::type_id: { - arrow::ArrayData out_data; - out_data.length = num_rows; - out_data.buffers.resize(2); - out_data.type = arrow::TypeTraits::type_singleton(); - - ARROW_ASSIGN_OR_RAISE(out_data.buffers[1], AllocateBitmap(num_rows, pool)); - ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows, pool)); - auto array_data = out_data.buffers[1]->mutable_data(); - int64_t position = 0; - int64_t null_count = 0; - - auto out_is_valid = out_data.buffers[0]->mutable_data(); - while (position < num_rows) { - bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); - if (is_null) { - null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); - } else { - bool value = *(bool*)(memory_address_ + offsets[position] + fieldOffset); - arrow::BitUtil::SetBitTo(array_data, position, value); - arrow::BitUtil::SetBitTo(out_is_valid, position, true); - } - position++; - } - out_data.null_count = null_count; - *array = MakeArray(std::make_shared(std::move(out_data))); - return arrow::Status::OK(); - break; - } - case arrow::Int8Type::type_id: { - arrow::ArrayData out_data; - out_data.length = num_rows; - out_data.buffers.resize(2); - out_data.type = arrow::TypeTraits::type_singleton(); - ARROW_ASSIGN_OR_RAISE( - out_data.buffers[1], - AllocateBuffer(sizeof(arrow::TypeTraits::CType) * num_rows, - pool)); - ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows, pool)); - // auto array_data = out_data.buffers[1]->mutable_data(); - auto array_data = - out_data.GetMutableValues::CType>(1); - int64_t position = 0; - int64_t null_count = 0; - auto out_is_valid = out_data.buffers[0]->mutable_data(); - while (position < num_rows) { - bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); - if (is_null) { - null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); - array_data[position] = arrow::TypeTraits::CType{}; - } else { - auto value = *(int8_t*)(memory_address_ + offsets[position] + fieldOffset); - array_data[position] = value; - arrow::BitUtil::SetBitTo(out_is_valid, position, true); - } - position++; - } - out_data.null_count = null_count; - *array = MakeArray(std::make_shared(std::move(out_data))); - return arrow::Status::OK(); - break; - } - case arrow::Int16Type::type_id: { - arrow::ArrayData out_data; - out_data.length = num_rows; - out_data.buffers.resize(2); - out_data.type = arrow::TypeTraits::type_singleton(); - ARROW_ASSIGN_OR_RAISE( - out_data.buffers[1], - AllocateBuffer(sizeof(arrow::TypeTraits::CType) * num_rows, - pool)); - ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows, pool)); - // auto array_data = out_data.buffers[1]->mutable_data(); - auto array_data = - out_data.GetMutableValues::CType>(1); - int64_t position = 0; - int64_t null_count = 0; - auto out_is_valid = out_data.buffers[0]->mutable_data(); - while (position < num_rows) { - bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); - if (is_null) { - null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); - array_data[position] = arrow::TypeTraits::CType{}; - } else { - auto value = *(int16_t*)(memory_address_ + offsets[position] + fieldOffset); - array_data[position] = value; - arrow::BitUtil::SetBitTo(out_is_valid, position, true); - } - position++; - } - out_data.null_count = null_count; - *array = MakeArray(std::make_shared(std::move(out_data))); - return arrow::Status::OK(); - break; - } - case arrow::Int32Type::type_id: { - arrow::ArrayData out_data; - out_data.length = num_rows; - out_data.buffers.resize(2); - out_data.type = arrow::TypeTraits::type_singleton(); - ARROW_ASSIGN_OR_RAISE( - out_data.buffers[1], - AllocateBuffer(sizeof(arrow::TypeTraits::CType) * num_rows, - pool)); - ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows, pool)); - // auto array_data = out_data.buffers[1]->mutable_data(); - auto array_data = - out_data.GetMutableValues::CType>(1); - int64_t position = 0; - int64_t null_count = 0; - auto out_is_valid = out_data.buffers[0]->mutable_data(); - while (position < num_rows) { - bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); - if (is_null) { - null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); - array_data[position] = arrow::TypeTraits::CType{}; - } else { - auto value = *(int32_t*)(memory_address_ + offsets[position] + fieldOffset); - array_data[position] = value; - arrow::BitUtil::SetBitTo(out_is_valid, position, true); - } - position++; - } - out_data.null_count = null_count; - *array = MakeArray(std::make_shared(std::move(out_data))); - break; - } - case arrow::Int64Type::type_id: { - arrow::ArrayData out_data; - out_data.length = num_rows; - out_data.buffers.resize(2); - out_data.type = arrow::TypeTraits::type_singleton(); - ARROW_ASSIGN_OR_RAISE( - out_data.buffers[1], - AllocateBuffer(sizeof(arrow::TypeTraits::CType) * num_rows, - pool)); - ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows, pool)); - // auto array_data = out_data.buffers[1]->mutable_data(); - auto array_data = - out_data.GetMutableValues::CType>(1); - int64_t position = 0; - int64_t null_count = 0; - auto out_is_valid = out_data.buffers[0]->mutable_data(); - while (position < num_rows) { - bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); - if (is_null) { - null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); - array_data[position] = arrow::TypeTraits::CType{}; - } else { - auto value = *(int64_t*)(memory_address_ + offsets[position] + fieldOffset); - array_data[position] = value; - arrow::BitUtil::SetBitTo(out_is_valid, position, true); - } - position++; - } - out_data.null_count = null_count; - *array = MakeArray(std::make_shared(std::move(out_data))); - break; - } - case arrow::FloatType::type_id: { - arrow::ArrayData out_data; - out_data.length = num_rows; - out_data.buffers.resize(2); - out_data.type = arrow::TypeTraits::type_singleton(); - ARROW_ASSIGN_OR_RAISE( - out_data.buffers[1], - AllocateBuffer(sizeof(arrow::TypeTraits::CType) * num_rows, - pool)); - ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows, pool)); - // auto array_data = out_data.buffers[1]->mutable_data(); - auto array_data = - out_data.GetMutableValues::CType>(1); - int64_t position = 0; - int64_t null_count = 0; - auto out_is_valid = out_data.buffers[0]->mutable_data(); - while (position < num_rows) { - bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); - if (is_null) { - null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); - array_data[position] = arrow::TypeTraits::CType{}; - } else { - auto value = *(float*)(memory_address_ + offsets[position] + fieldOffset); - array_data[position] = value; - arrow::BitUtil::SetBitTo(out_is_valid, position, true); - } - position++; - } - out_data.null_count = null_count; - *array = MakeArray(std::make_shared(std::move(out_data))); - break; - } - case arrow::DoubleType::type_id: { - arrow::ArrayData out_data; - out_data.length = num_rows; - out_data.buffers.resize(2); - out_data.type = arrow::TypeTraits::type_singleton(); - ARROW_ASSIGN_OR_RAISE( - out_data.buffers[1], - AllocateBuffer(sizeof(arrow::TypeTraits::CType) * num_rows, - pool)); - ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows, pool)); - // auto array_data = out_data.buffers[1]->mutable_data(); - auto array_data = - out_data.GetMutableValues::CType>(1); - int64_t position = 0; - int64_t null_count = 0; - auto out_is_valid = out_data.buffers[0]->mutable_data(); - while (position < num_rows) { - bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); - if (is_null) { - null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); - array_data[position] = arrow::TypeTraits::CType{}; - } else { - auto value = *(double*)(memory_address_ + offsets[position] + fieldOffset); - array_data[position] = value; - arrow::BitUtil::SetBitTo(out_is_valid, position, true); - } - position++; - } - out_data.null_count = null_count; - *array = MakeArray(std::make_shared(std::move(out_data))); - break; - } - case arrow::BinaryType::type_id: - case arrow::StringType::type_id: { - arrow::ArrayData out_data; - out_data.length = num_rows; - out_data.buffers.resize(3); - out_data.type = field->type(); - using offset_type = typename arrow::StringType::offset_type; - ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows, pool)); - ARROW_ASSIGN_OR_RAISE(out_data.buffers[1], - AllocateBuffer(sizeof(offset_type) * (num_rows + 1), pool)); - ARROW_ASSIGN_OR_RAISE(out_data.buffers[2], - AllocateResizableBuffer(20 * num_rows, pool)); - auto validity_buffer = out_data.buffers[0]->mutable_data(); - // initialize all true once allocated - memset(validity_buffer, 0xff, out_data.buffers[0]->capacity()); - auto array_offset = out_data.GetMutableValues(1); - auto array_data = out_data.buffers[2]->mutable_data(); - int64_t null_count = 0; - - array_offset[0] = 0; - for (int64_t position = 0; position < num_rows; position++) { - bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); - if (is_null) { - arrow::BitUtil::SetBitTo(validity_buffer, position, false); - array_offset[position + 1] = array_offset[position]; - null_count++; - } else { - int64_t offsetAndSize = - *(int64_t*)(memory_address_ + offsets[position] + fieldOffset); - offset_type length = int32_t(offsetAndSize); - int32_t wordoffset = int32_t(offsetAndSize >> 32); - auto value_offset = array_offset[position + 1] = - array_offset[position] + length; - uint64_t capacity = out_data.buffers[2]->capacity(); - - if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { - // allocate value buffer again - // enlarge the buffer by 1.5x - capacity = capacity + std::max((capacity >> 1), (uint64_t)length); - auto value_buffer = - std::static_pointer_cast(out_data.buffers[2]); - value_buffer->Reserve(capacity); - array_data = value_buffer->mutable_data(); - } - - auto dst_value_base = array_data + array_offset[position]; - auto value_src_ptr = memory_address_ + offsets[position] + wordoffset; -#ifdef __AVX512BW__ - if (ARROW_PREDICT_TRUE(support_avx512)) { - // write the variable value - uint32_t k; - for (k = 0; k + 32 < length; k += 32) { - __m256i v = _mm256_loadu_si256((const __m256i*)(value_src_ptr + k)); - _mm256_storeu_si256((__m256i*)(dst_value_base + k), v); - } - auto mask = (1L << (length - k)) - 1; - __m256i v = _mm256_maskz_loadu_epi8(mask, value_src_ptr + k); - _mm256_mask_storeu_epi8(dst_value_base + k, mask, v); - } else -#endif - { - memcpy(dst_value_base, value_src_ptr, length); + auto out_is_valid = array->buffers[0]->mutable_data(); + while (position < row_start + batch_rows) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + null_count++; + arrow::BitUtil::SetBitTo(out_is_valid, position, false); + } else { + bool value = *(bool*)(memory_address_ + offsets[position] + fieldOffset); + arrow::BitUtil::SetBitTo(array_data, position, value); + arrow::BitUtil::SetBitTo(out_is_valid, position, true); } + position++; } + array->null_count += null_count; + break; } - out_data.null_count = null_count; - if (null_count == 0) { - out_data.buffers[0] == nullptr; - } - *array = MakeArray(std::make_shared(std::move(out_data))); - break; - } - case arrow::Decimal128Type::type_id: { - auto dtype = std::dynamic_pointer_cast(type); - int32_t precision = dtype->precision(); - int32_t scale = dtype->scale(); - - arrow::ArrayData out_data; - out_data.length = num_rows; - out_data.buffers.resize(2); - out_data.type = arrow::decimal128(precision, scale); - ARROW_ASSIGN_OR_RAISE(out_data.buffers[1], AllocateBuffer(16 * num_rows, pool)); - ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows, pool)); - auto array_data = out_data.GetMutableValues(1); + case arrow::Decimal128Type::type_id: { + auto field = schema->field(columnar_id); + auto type = field->type(); + auto dtype = std::dynamic_pointer_cast(type); + int32_t precision = dtype->precision(); + int32_t scale = dtype->scale(); - int64_t position = 0; - int64_t null_count = 0; - auto out_is_valid = out_data.buffers[0]->mutable_data(); - while (position < num_rows) { - bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); - if (is_null) { - null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); - array_data[position] = arrow::Decimal128{}; - } else { - arrow::BitUtil::SetBitTo(out_is_valid, position, true); - if (precision <= 18) { - int64_t low_value; - memcpy(&low_value, memory_address_ + offsets[position] + fieldOffset, 8); - arrow::Decimal128 value = - arrow::Decimal128(arrow::BasicDecimal128(low_value)); - array_data[position] = value; + auto array_data = array->GetMutableValues(1); + int64_t position = row_start; + int64_t null_count = 0; + auto out_is_valid = array->buffers[0]->mutable_data(); + while (position < row_start + batch_rows) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + null_count++; + arrow::BitUtil::SetBitTo(out_is_valid, position, false); + array_data[position] = arrow::Decimal128{}; } else { - int64_t offsetAndSize; - memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, - sizeof(int64_t)); - int32_t length = int32_t(offsetAndSize); - int32_t wordoffset = int32_t(offsetAndSize >> 32); - uint8_t bytesValue[length]; - memcpy(bytesValue, memory_address_ + offsets[position] + wordoffset, length); - uint8_t bytesValue2[16]{}; - for (int k = length - 1; k >= 0; k--) { - bytesValue2[length - 1 - k] = bytesValue[k]; - } - if (int8_t(bytesValue[0]) < 0) { - for (int k = length; k < 16; k++) { - bytesValue2[k] = 255; - } - } - arrow::Decimal128 value = - arrow::Decimal128(arrow::BasicDecimal128(bytesValue2)); - array_data[position] = value; - } - } - position++; - } - out_data.null_count = null_count; - *array = MakeArray(std::make_shared(std::move(out_data))); - break; - } - case arrow::Date32Type::type_id: { - arrow::ArrayData out_data; - out_data.length = num_rows; - out_data.buffers.resize(2); - out_data.type = arrow::TypeTraits::type_singleton(); - ARROW_ASSIGN_OR_RAISE( - out_data.buffers[1], - AllocateBuffer(sizeof(arrow::TypeTraits::CType) * num_rows, - pool)); - ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows, pool)); - // auto array_data = out_data.buffers[1]->mutable_data(); - auto array_data = - out_data.GetMutableValues::CType>(1); - int64_t position = 0; - int64_t null_count = 0; - auto out_is_valid = out_data.buffers[0]->mutable_data(); - while (position < num_rows) { - bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); - if (is_null) { - null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); - array_data[position] = arrow::TypeTraits::CType{}; - } else { - auto value = *(int32_t*)(memory_address_ + offsets[position] + fieldOffset); - array_data[position] = value; - arrow::BitUtil::SetBitTo(out_is_valid, position, true); - } - position++; - } - out_data.null_count = null_count; - *array = MakeArray(std::make_shared(std::move(out_data))); - break; - } - case arrow::TimestampType::type_id: { - arrow::ArrayData out_data; - out_data.length = num_rows; - out_data.buffers.resize(2); - out_data.type = arrow::int64(); - ARROW_ASSIGN_OR_RAISE( - out_data.buffers[1], - AllocateBuffer( - sizeof(arrow::TypeTraits::CType) * num_rows, pool)); - ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows, pool)); - // auto array_data = out_data.buffers[1]->mutable_data(); - auto array_data = - out_data.GetMutableValues::CType>(1); - int64_t position = 0; - int64_t null_count = 0; - auto out_is_valid = out_data.buffers[0]->mutable_data(); - while (position < num_rows) { - bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); - if (is_null) { - null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); - array_data[position] = arrow::TypeTraits::CType{}; - } else { - auto value = *(int64_t*)(memory_address_ + offsets[position] + fieldOffset); - array_data[position] = value; - arrow::BitUtil::SetBitTo(out_is_valid, position, true); - } - position++; - } - out_data.null_count = null_count; - *array = MakeArray(std::make_shared(std::move(out_data))); - break; - } - case arrow::ListType::type_id: { - auto list_type = std::dynamic_pointer_cast(type); - auto child_type = list_type->value_type(); - switch (child_type->id()) { - case arrow::BooleanType::type_id: { - arrow::ListBuilder parent_builder( - pool, std::make_shared(pool)); - // The following builder is owned by components_builder. - arrow::BooleanBuilder& child_builder = - *(static_cast(parent_builder.value_builder())); - for (int64_t position = 0; position < num_rows; position++) { - bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); - if (is_null) { - RETURN_NOT_OK(parent_builder.AppendNull()); + arrow::BitUtil::SetBitTo(out_is_valid, position, true); + if (precision <= 18) { + int64_t low_value; + memcpy(&low_value, memory_address_ + offsets[position] + fieldOffset, 8); + arrow::Decimal128 value = + arrow::Decimal128(arrow::BasicDecimal128(low_value)); + array_data[position] = value; } else { - RETURN_NOT_OK(parent_builder.Append()); int64_t offsetAndSize; memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, sizeof(int64_t)); int32_t length = int32_t(offsetAndSize); int32_t wordoffset = int32_t(offsetAndSize >> 32); - int64_t num_elements = - *(int64_t*)(memory_address_ + offsets[position] + wordoffset); - int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); - for (auto j = 0; j < num_elements; j++) { - bool is_null = - IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); - if (is_null) { - child_builder.AppendNull(); - } else { - bool value = *(bool*)(memory_address_ + offsets[position] + wordoffset + - header_in_bytes + j * sizeof(bool)); - RETURN_NOT_OK(child_builder.Append(value)); - } + uint8_t bytesValue[length]; + memcpy(bytesValue, memory_address_ + offsets[position] + wordoffset, + length); + uint8_t bytesValue2[16]{}; + for (int k = length - 1; k >= 0; k--) { + bytesValue2[length - 1 - k] = bytesValue[k]; } - } - } - ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); - break; - } - case arrow::Int8Type::type_id: { - arrow::ListBuilder parent_builder(pool, - std::make_shared(pool)); - // The following builder is owned by components_builder. - arrow::Int8Builder& child_builder = - *(static_cast(parent_builder.value_builder())); - for (int64_t position = 0; position < num_rows; position++) { - bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); - if (is_null) { - RETURN_NOT_OK(parent_builder.AppendNull()); - } else { - RETURN_NOT_OK(parent_builder.Append()); - int64_t offsetAndSize; - memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, - sizeof(int64_t)); - int32_t length = int32_t(offsetAndSize); - int32_t wordoffset = int32_t(offsetAndSize >> 32); - int64_t num_elements = - *(int64_t*)(memory_address_ + offsets[position] + wordoffset); - int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); - for (auto j = 0; j < num_elements; j++) { - bool is_null = - IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); - if (is_null) { - child_builder.AppendNull(); - } else { - auto value = - *(int8_t*)(memory_address_ + offsets[position] + wordoffset + - header_in_bytes + j * sizeof(int8_t)); - RETURN_NOT_OK(child_builder.Append(value)); + if (int8_t(bytesValue[0]) < 0) { + for (int k = length; k < 16; k++) { + bytesValue2[k] = 255; } } + arrow::Decimal128 value = + arrow::Decimal128(arrow::BasicDecimal128(bytesValue2)); + array_data[position] = value; } } - ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); - break; + position++; } - case arrow::Int16Type::type_id: { - arrow::ListBuilder parent_builder(pool, - std::make_shared(pool)); - // The following builder is owned by components_builder. - arrow::Int16Builder& child_builder = - *(static_cast(parent_builder.value_builder())); - for (int64_t position = 0; position < num_rows; position++) { - bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); - if (is_null) { - RETURN_NOT_OK(parent_builder.AppendNull()); - } else { - RETURN_NOT_OK(parent_builder.Append()); - int64_t offsetAndSize; - memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, - sizeof(int64_t)); - int32_t length = int32_t(offsetAndSize); - int32_t wordoffset = int32_t(offsetAndSize >> 32); - int64_t num_elements = - *(int64_t*)(memory_address_ + offsets[position] + wordoffset); - int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); - for (auto j = 0; j < num_elements; j++) { - bool is_null = - IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); - if (is_null) { - child_builder.AppendNull(); - } else { - auto value = - *(int16_t*)(memory_address_ + offsets[position] + wordoffset + - header_in_bytes + j * sizeof(int16_t)); - RETURN_NOT_OK(child_builder.Append(value)); - } - } - } - } - ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); - break; - } - case arrow::Int32Type::type_id: { - arrow::ListBuilder parent_builder(pool, - std::make_shared(pool)); - // The following builder is owned by components_builder. - arrow::Int32Builder& child_builder = - *(static_cast(parent_builder.value_builder())); - for (int64_t position = 0; position < num_rows; position++) { - bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); - if (is_null) { - RETURN_NOT_OK(parent_builder.AppendNull()); - } else { - RETURN_NOT_OK(parent_builder.Append()); - int64_t offsetAndSize; - memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, - sizeof(int64_t)); - int32_t length = int32_t(offsetAndSize); - int32_t wordoffset = int32_t(offsetAndSize >> 32); - int64_t num_elements = - *(int64_t*)(memory_address_ + offsets[position] + wordoffset); - int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); - for (auto j = 0; j < num_elements; j++) { - bool is_null = - IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); - if (is_null) { - RETURN_NOT_OK(child_builder.AppendNull()); - } else { - auto value = - *(int32_t*)(memory_address_ + offsets[position] + wordoffset + - header_in_bytes + j * sizeof(int32_t)); - RETURN_NOT_OK(child_builder.Append(value)); - } - } - } - } - ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); - break; - } - case arrow::Int64Type::type_id: { - arrow::ListBuilder parent_builder(pool, - std::make_shared(pool)); - // The following builder is owned by components_builder. - arrow::Int64Builder& child_builder = - *(static_cast(parent_builder.value_builder())); - for (int64_t position = 0; position < num_rows; position++) { - bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); - if (is_null) { - RETURN_NOT_OK(parent_builder.AppendNull()); - } else { - RETURN_NOT_OK(parent_builder.Append()); - int64_t offsetAndSize; - memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, - sizeof(int64_t)); - int32_t length = int32_t(offsetAndSize); - int32_t wordoffset = int32_t(offsetAndSize >> 32); - int64_t num_elements = - *(int64_t*)(memory_address_ + offsets[position] + wordoffset); - int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); - for (auto j = 0; j < num_elements; j++) { - bool is_null = - IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); - if (is_null) { - child_builder.AppendNull(); - } else { - auto value = - *(int64_t*)(memory_address_ + offsets[position] + wordoffset + - header_in_bytes + j * sizeof(int64_t)); - RETURN_NOT_OK(child_builder.Append(value)); - } - } - } - } - ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); - break; - } - case arrow::FloatType::type_id: { - arrow::ListBuilder parent_builder(pool, - std::make_shared(pool)); - // The following builder is owned by components_builder. - arrow::FloatBuilder& child_builder = - *(static_cast(parent_builder.value_builder())); - for (int64_t position = 0; position < num_rows; position++) { - bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); - if (is_null) { - RETURN_NOT_OK(parent_builder.AppendNull()); - } else { - RETURN_NOT_OK(parent_builder.Append()); - int64_t offsetAndSize; - memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, - sizeof(int64_t)); - int32_t length = int32_t(offsetAndSize); - int32_t wordoffset = int32_t(offsetAndSize >> 32); - int64_t num_elements = - *(int64_t*)(memory_address_ + offsets[position] + wordoffset); - int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); - for (auto j = 0; j < num_elements; j++) { - bool is_null = - IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); - if (is_null) { - child_builder.AppendNull(); - } else { - auto value = - *(float*)(memory_address_ + offsets[position] + wordoffset + - header_in_bytes + j * sizeof(float)); - RETURN_NOT_OK(child_builder.Append(value)); - } - } - } - } - ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); - break; - } - case arrow::DoubleType::type_id: { - arrow::ListBuilder parent_builder(pool, - std::make_shared(pool)); - // The following builder is owned by components_builder. - arrow::DoubleBuilder& child_builder = - *(static_cast(parent_builder.value_builder())); - for (int64_t position = 0; position < num_rows; position++) { - bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); - if (is_null) { - RETURN_NOT_OK(parent_builder.AppendNull()); - } else { - RETURN_NOT_OK(parent_builder.Append()); - int64_t offsetAndSize; - memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, - sizeof(int64_t)); - int32_t length = int32_t(offsetAndSize); - int32_t wordoffset = int32_t(offsetAndSize >> 32); - int64_t num_elements = - *(int64_t*)(memory_address_ + offsets[position] + wordoffset); - int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); - for (auto j = 0; j < num_elements; j++) { - bool is_null = - IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); - if (is_null) { - child_builder.AppendNull(); - } else { - auto value = - *(double*)(memory_address_ + offsets[position] + wordoffset + - header_in_bytes + j * sizeof(double)); - RETURN_NOT_OK(child_builder.Append(value)); - } - } - } - } - ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); - break; - } - case arrow::Date32Type::type_id: { - arrow::ListBuilder parent_builder(pool, - std::make_shared(pool)); - // The following builder is owned by components_builder. - arrow::Date32Builder& child_builder = - *(static_cast(parent_builder.value_builder())); - for (int64_t position = 0; position < num_rows; position++) { - bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); - if (is_null) { - RETURN_NOT_OK(parent_builder.AppendNull()); - } else { - RETURN_NOT_OK(parent_builder.Append()); - int64_t offsetAndSize; - memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, - sizeof(int64_t)); - int32_t length = int32_t(offsetAndSize); - int32_t wordoffset = int32_t(offsetAndSize >> 32); - int64_t num_elements = - *(int64_t*)(memory_address_ + offsets[position] + wordoffset); - int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); - for (auto j = 0; j < num_elements; j++) { - bool is_null = - IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); - if (is_null) { - child_builder.AppendNull(); - } else { - auto value = - *(int32_t*)(memory_address_ + offsets[position] + wordoffset + - header_in_bytes + j * sizeof(int32_t)); - RETURN_NOT_OK(child_builder.Append(value)); - } - } - } - } - ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); - break; - } - case arrow::TimestampType::type_id: { - arrow::ListBuilder parent_builder( - pool, std::make_shared(arrow::int64(), pool)); - // The following builder is owned by components_builder. - arrow::TimestampBuilder& child_builder = - *(static_cast(parent_builder.value_builder())); - for (int64_t position = 0; position < num_rows; position++) { - bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); - if (is_null) { - RETURN_NOT_OK(parent_builder.AppendNull()); - } else { - RETURN_NOT_OK(parent_builder.Append()); - int64_t offsetAndSize; - memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, - sizeof(int64_t)); - int32_t length = int32_t(offsetAndSize); - int32_t wordoffset = int32_t(offsetAndSize >> 32); - int64_t num_elements = - *(int64_t*)(memory_address_ + offsets[position] + wordoffset); - int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); - for (auto j = 0; j < num_elements; j++) { - bool is_null = - IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); - if (is_null) { - child_builder.AppendNull(); - } else { - auto value = - *(int64_t*)(memory_address_ + offsets[position] + wordoffset + - header_in_bytes + j * sizeof(int64_t)); - RETURN_NOT_OK(child_builder.Append(value)); - } - } + array->null_count += null_count; + break; + } + case arrow::BinaryType::type_id: + case arrow::StringType::type_id: { + using offset_type = typename arrow::StringType::offset_type; + auto validity_buffer = array->buffers[0]->mutable_data(); + auto array_offset = array->GetMutableValues(1); + auto array_data = array->buffers[2]->mutable_data(); + int64_t null_count = 0; + + array_offset[0] = 0; + for (int64_t position = row_start; position < row_start + batch_rows; + position++) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (!is_null) { + int64_t offsetAndSize = + *(int64_t*)(memory_address_ + offsets[position] + fieldOffset); + offset_type length = int32_t(offsetAndSize); + int32_t wordoffset = int32_t(offsetAndSize >> 32); + auto value_offset = array_offset[position + 1] = + array_offset[position] + length; + uint64_t capacity = array->buffers[2]->capacity(); + + if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { + // allocate value buffer again + // enlarge the buffer by 1.5x + capacity = capacity + std::max((capacity >> 1), (uint64_t)length); + auto value_buffer = + std::static_pointer_cast(array->buffers[2]); + value_buffer->Reserve(capacity); + array_data = value_buffer->mutable_data(); } - } - ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); - break; - } - case arrow::BinaryType::type_id: { - arrow::ListBuilder parent_builder(pool, - std::make_shared(pool)); - // The following builder is owned by components_builder. - arrow::BinaryBuilder& child_builder = - *(static_cast(parent_builder.value_builder())); - for (int64_t position = 0; position < num_rows; position++) { - bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); - if (is_null) { - RETURN_NOT_OK(parent_builder.AppendNull()); - } else { - RETURN_NOT_OK(parent_builder.Append()); - int64_t offsetAndSize; - memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, - sizeof(int64_t)); - int32_t length = int32_t(offsetAndSize); - int32_t wordoffset = int32_t(offsetAndSize >> 32); - int64_t num_elements = - *(int64_t*)(memory_address_ + offsets[position] + wordoffset); - int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); - using offset_type = typename arrow::BinaryType::offset_type; - for (auto j = 0; j < num_elements; j++) { - bool is_null = - IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); - if (is_null) { - child_builder.AppendNull(); - } else { - int64_t elementOffsetAndSize; - memcpy(&elementOffsetAndSize, - memory_address_ + offsets[position] + wordoffset + - header_in_bytes + 8 * j, - sizeof(int64_t)); - offset_type elementLength = int32_t(elementOffsetAndSize); - int32_t elementOffset = int32_t(elementOffsetAndSize >> 32); - RETURN_NOT_OK(child_builder.Append( - memory_address_ + offsets[position] + wordoffset + elementOffset, - elementLength)); - } + + auto dst_value_base = array_data + array_offset[position]; + auto value_src_ptr = memory_address_ + offsets[position] + wordoffset; +#ifdef __AVX512BW__ + if (ARROW_PREDICT_TRUE(support_avx512)) { + // write the variable value + uint32_t k; + for (k = 0; k + 32 < length; k += 32) { + __m256i v = _mm256_loadu_si256((const __m256i*)(value_src_ptr + k)); + _mm256_storeu_si256((__m256i*)(dst_value_base + k), v); } + auto mask = (1L << (length - k)) - 1; + __m256i v = _mm256_maskz_loadu_epi8(mask, value_src_ptr + k); + _mm256_mask_storeu_epi8(dst_value_base + k, mask, v); + } else +#endif + { + memcpy(dst_value_base, value_src_ptr, length); } + } else { + arrow::BitUtil::SetBitTo(validity_buffer, position, false); + array_offset[position + 1] = array_offset[position]; + null_count++; } - ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); - break; } - case arrow::StringType::type_id: { - arrow::ListBuilder parent_builder(pool, - std::make_shared(pool)); - // The following builder is owned by components_builder. - arrow::StringBuilder& child_builder = - *(static_cast(parent_builder.value_builder())); - for (int64_t position = 0; position < num_rows; position++) { - bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); - if (is_null) { - RETURN_NOT_OK(parent_builder.AppendNull()); - } else { - RETURN_NOT_OK(parent_builder.Append()); - int64_t offsetAndSize; - memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, - sizeof(int64_t)); - int32_t length = int32_t(offsetAndSize); - int32_t wordoffset = int32_t(offsetAndSize >> 32); - int64_t num_elements = - *(int64_t*)(memory_address_ + offsets[position] + wordoffset); - int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); - using offset_type = typename arrow::StringType::offset_type; - for (auto j = 0; j < num_elements; j++) { - bool is_null = - IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); - if (is_null) { - child_builder.AppendNull(); - } else { - int64_t elementOffsetAndSize; - memcpy(&elementOffsetAndSize, - memory_address_ + offsets[position] + wordoffset + - header_in_bytes + 8 * j, - sizeof(int64_t)); - offset_type elementLength = int32_t(elementOffsetAndSize); - int32_t elementOffset = int32_t(elementOffsetAndSize >> 32); - RETURN_NOT_OK(child_builder.Append( - memory_address_ + offsets[position] + wordoffset + elementOffset, - elementLength)); - } - } - } - } - ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); - break; + array->null_count += null_count; + if (null_count == 0) { + array->buffers[0] == nullptr; } - case arrow::Decimal128Type::type_id: { - std::shared_ptr dtype = - std::dynamic_pointer_cast(child_type); - int32_t precision = dtype->precision(); - int32_t scale = dtype->scale(); - arrow::ListBuilder parent_builder( - pool, std::make_shared(dtype, pool)); - // The following builder is owned by components_builder. - arrow::Decimal128Builder& child_builder = - *(static_cast(parent_builder.value_builder())); + break; + } + default: { + auto array_data = array->buffers[1]->mutable_data(); + int64_t position = row_start; + int64_t null_count = 0; + auto out_is_valid = array->buffers[0]->mutable_data(); - for (int64_t position = 0; position < num_rows; position++) { + if (typewidth[columnar_id] > 0) { + while (position < row_start + batch_rows) { + const uint8_t* srcptr = (memory_address_ + offsets[position] + fieldOffset); bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); - if (is_null) { - RETURN_NOT_OK(parent_builder.AppendNull()); - } else { - RETURN_NOT_OK(parent_builder.Append()); - int64_t offsetAndSize; - memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, - sizeof(int64_t)); - int32_t length = int32_t(offsetAndSize); - int32_t wordoffset = int32_t(offsetAndSize >> 32); - int64_t num_elements = - *(int64_t*)(memory_address_ + offsets[position] + wordoffset); - int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); - for (auto j = 0; j < num_elements; j++) { - bool is_null = - IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); - if (is_null) { - child_builder.AppendNull(); - } else { - if (precision <= 18) { - int64_t low_value; - memcpy(&low_value, - memory_address_ + offsets[position] + wordoffset + - header_in_bytes + 8 * j, - sizeof(int64_t)); - auto value = arrow::Decimal128(arrow::BasicDecimal128(low_value)); - RETURN_NOT_OK(child_builder.Append(value)); - } else { - int64_t elementOffsetAndSize; - memcpy(&elementOffsetAndSize, - memory_address_ + offsets[position] + wordoffset + - header_in_bytes + 8 * j, - sizeof(int64_t)); - int32_t elementLength = int32_t(elementOffsetAndSize); - int32_t elementOffset = int32_t(elementOffsetAndSize >> 32); - uint8_t bytesValue[elementLength]; - memcpy( - bytesValue, - memory_address_ + offsets[position] + wordoffset + elementOffset, - elementLength); - uint8_t bytesValue2[16]{}; - for (int k = elementLength - 1; k >= 0; k--) { - bytesValue2[elementLength - 1 - k] = bytesValue[k]; - } - if (int8_t(bytesValue[0]) < 0) { - for (int k = elementLength; k < 16; k++) { - bytesValue2[k] = 255; - } - } - arrow::Decimal128 value = - arrow::Decimal128(arrow::BasicDecimal128(bytesValue2)); - RETURN_NOT_OK(child_builder.Append(value)); - } - } + auto mask = (1L << (typewidth[columnar_id])) - 1; + auto shift = _tzcnt_u32(typewidth[columnar_id]); + uint8_t* destptr = array_data + (position << shift); + if (!is_null) { +#ifdef __AVX512BW__ + if (ARROW_PREDICT_TRUE(support_avx512)) { + __m256i v = _mm256_maskz_loadu_epi8(mask, srcptr); + _mm256_mask_storeu_epi8(destptr, mask, v); + } else +#endif + { + memcpy(destptr, srcptr, typewidth[columnar_id]); } + } else { + null_count++; + arrow::BitUtil::SetBitTo(out_is_valid, position, false); + memset(destptr, 0, typewidth[columnar_id]); } + position++; } - ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); + array->null_count += null_count; break; + } else { + return arrow::Status::Invalid("Unsupported data type: " + typevec[columnar_id]); } - default: - return arrow::Status::Invalid("Unsupported data type: " + child_type->id()); } - break; } - default: - return arrow::Status::Invalid("Unsupported data type: " + type->id()); } return arrow::Status::OK(); } @@ -999,15 +244,115 @@ arrow::Status RowToColumnarConverter::Init(std::shared_ptr* std::vector> arrays; auto num_fields = schema_->num_fields(); + std::vector typevec; + std::vector typewidth; + std::vector field_offset_vec; + typevec.resize(num_fields); + // Store bytes for different fixed width types + typewidth.resize(num_fields); + field_offset_vec.resize(num_fields); + + std::vector> columns; + columns.resize(num_fields); + + // Allocate Buffers firstly for (auto i = 0; i < num_fields; i++) { auto field = schema_->field(i); - std::shared_ptr array_data; - int64_t field_offset = GetFieldOffset(nullBitsetWidthInBytes, i); - RETURN_NOT_OK(CreateArrayData(schema_, num_rows_, i, field_offset, offsets_, - memory_address_, &array_data, m_pool_, - support_avx512_)); - arrays.push_back(array_data); + typevec[i] = field->type()->id(); + typewidth[i] = arrow::bit_width(typevec[i]) >> 3; + field_offset_vec[i] = GetFieldOffset(nullBitsetWidthInBytes, i); + + switch (typevec[i]) { + case arrow::BooleanType::type_id: { + arrow::ArrayData out_data; + out_data.length = num_rows_; + out_data.buffers.resize(2); + out_data.type = arrow::TypeTraits::type_singleton(); + out_data.null_count = 0; + + ARROW_ASSIGN_OR_RAISE(out_data.buffers[1], AllocateBitmap(num_rows_, m_pool_)); + ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows_, m_pool_)); + auto validity_buffer = out_data.buffers[0]->mutable_data(); + // initialize all true once allocated + memset(validity_buffer, 0xff, out_data.buffers[0]->capacity()); + columns[i] = std::make_shared(std::move(out_data)); + break; + } + case arrow::BinaryType::type_id: + case arrow::StringType::type_id: { + arrow::ArrayData out_data; + out_data.length = num_rows_; + out_data.buffers.resize(3); + out_data.type = field->type(); + out_data.null_count = 0; + using offset_type = typename arrow::StringType::offset_type; + ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows_, m_pool_)); + ARROW_ASSIGN_OR_RAISE( + out_data.buffers[1], + AllocateBuffer(sizeof(offset_type) * (num_rows_ + 1), m_pool_)); + ARROW_ASSIGN_OR_RAISE(out_data.buffers[2], + AllocateResizableBuffer(20 * num_rows_, m_pool_)); + auto validity_buffer = out_data.buffers[0]->mutable_data(); + // initialize all true once allocated + memset(validity_buffer, 0xff, out_data.buffers[0]->capacity()); + columns[i] = std::make_shared(std::move(out_data)); + break; + } + case arrow::Decimal128Type::type_id: { + auto dtype = std::dynamic_pointer_cast(field->type()); + int32_t precision = dtype->precision(); + int32_t scale = dtype->scale(); + + arrow::ArrayData out_data; + out_data.length = num_rows_; + out_data.buffers.resize(2); + out_data.type = arrow::decimal128(precision, scale); + out_data.null_count = 0; + ARROW_ASSIGN_OR_RAISE(out_data.buffers[1], + AllocateBuffer(16 * num_rows_, m_pool_)); + ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows_, m_pool_)); + auto validity_buffer = out_data.buffers[0]->mutable_data(); + // initialize all true once allocated + memset(validity_buffer, 0xff, out_data.buffers[0]->capacity()); + columns[i] = std::make_shared(std::move(out_data)); + break; + } + default: { + arrow::ArrayData out_data; + out_data.length = num_rows_; + out_data.buffers.resize(2); + out_data.type = field->type(); + out_data.null_count = 0; + ARROW_ASSIGN_OR_RAISE(out_data.buffers[1], + AllocateBuffer(typewidth[i] * num_rows_, m_pool_)); + ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows_, m_pool_)); + auto validity_buffer = out_data.buffers[0]->mutable_data(); + // initialize all true once allocated + memset(validity_buffer, 0xff, out_data.buffers[0]->capacity()); + columns[i] = std::make_shared(std::move(out_data)); + break; + } + } } + +#define BATCH_ROW_NUM 16 + int row = 0; + for (row; row + BATCH_ROW_NUM < num_rows_; row += BATCH_ROW_NUM) { + RETURN_NOT_OK(CreateArrayData(row, schema_, BATCH_ROW_NUM, offsets_, memory_address_, + m_pool_, support_avx512_, typevec, typewidth, columns, + num_fields, field_offset_vec)); + } + for (row; row < num_rows_; row++) { + RETURN_NOT_OK(CreateArrayData(row, schema_, 1, offsets_, memory_address_, m_pool_, + support_avx512_, typevec, typewidth, columns, + num_fields, field_offset_vec)); + } + + for (auto i = 0; i < num_fields; i++) { + auto array = MakeArray(columns[i]); + arrays.push_back(array); + } + *batch = arrow::RecordBatch::Make(schema_, num_rows_, arrays); return arrow::Status::OK(); }