From 2b171183881dd0d089ef8844b74437a212f23d9d Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Wed, 7 Jul 2021 16:57:10 +0800 Subject: [PATCH 01/19] ColumnarToRow optimization --- .../ArrowColumnarToRowJniWrapper.java | 20 ++ .../scala/com/intel/oap/ColumnarPlugin.scala | 6 +- .../execution/ArrowColumnarToRowExec.scala | 88 ++++++ native-sql-engine/cpp/src/CMakeLists.txt | 1 + native-sql-engine/cpp/src/jni/jni_wrapper.cc | 150 ++++++++- .../operators/unsafe_row_writer_and_reader.cc | 293 ++++++++++++++++++ .../operators/unsafe_row_writer_and_reader.h | 55 ++++ .../cpp/src/tests/CMakeLists.txt | 1 + native-sql-engine/cpp/src/tests/test_utils.h | 28 ++ .../cpp/src/tests/unsaferow_test.cc | 168 ++++++++++ 10 files changed, 808 insertions(+), 2 deletions(-) create mode 100644 native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ArrowColumnarToRowJniWrapper.java create mode 100644 native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala create mode 100644 native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc create mode 100644 native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.h create mode 100644 native-sql-engine/cpp/src/tests/unsaferow_test.cc diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ArrowColumnarToRowJniWrapper.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ArrowColumnarToRowJniWrapper.java new file mode 100644 index 000000000..82ea2340c --- /dev/null +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ArrowColumnarToRowJniWrapper.java @@ -0,0 +1,20 @@ +package com.intel.oap.vectorized; + + +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; + +import java.io.IOException; + +public class ArrowColumnarToRowJniWrapper { + + public ArrowColumnarToRowJniWrapper() throws IOException { + JniUtils.getInstance(); + } + + public native long nativeConvertColumnarToRow( + byte[] schema, int numRows, long[] bufAddrs, long[] bufSizes, long memory_pool_id) throws RuntimeException; + + public native boolean nativeHasNext(long instanceID); + + public native UnsafeRow nativeNext(long instanceID); +} diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala index 62656ce30..4e9a7e893 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala @@ -292,6 +292,9 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] { val child = replaceWithColumnarPlan(plan.child) logDebug(s"ColumnarPostOverrides RowToArrowColumnarExec(${child.getClass})") RowToArrowColumnarExec(child) + case plan: ColumnarToRowExec => + val child = replaceWithColumnarPlan(plan.child) + new ArrowColumnarToRowExec(child) case ColumnarToRowExec(child: ColumnarShuffleExchangeAdaptor) => replaceWithColumnarPlan(child) case ColumnarToRowExec(child: ColumnarBroadcastExchangeAdaptor) => @@ -305,7 +308,8 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] { val children = r.children.map(c => c match { case c: ColumnarToRowExec => - c.withNewChildren(c.children.map(replaceWithColumnarPlan)) + val child = replaceWithColumnarPlan(c.child) + new ArrowColumnarToRowExec(child) case other => replaceWithColumnarPlan(other) }) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala new file mode 100644 index 000000000..7ad962530 --- /dev/null +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala @@ -0,0 +1,88 @@ +package com.intel.oap.execution + +import com.intel.oap.expression.ConverterUtils +import com.intel.oap.vectorized.{ArrowColumnarToRowJniWrapper, ArrowWritableColumnVector} +import org.apache.arrow.vector.types.pojo.{Field, Schema} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, UnsafeRow} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +case class ArrowColumnarToRowExec(child: SparkPlan) extends UnaryExecNode { + override def nodeName: String = "ArrowColumnarToRow" + + assert(child.supportsColumnar) + + override def output: Seq[Attribute] = child.output + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def doExecute(): RDD[InternalRow] = { + + + child.executeColumnar().mapPartitions { batches => + // TODO:: pass the jni jniWrapper and arrowSchema and serializeSchema method by broadcast ? + // currently only for debug. + val jniWrapper = new ArrowColumnarToRowJniWrapper() + var arrowSchema: Array[Byte] = null + + def serializeSchema(fields: Seq[Field]): Array[Byte] = { + val schema = new Schema(fields.asJava) + ConverterUtils.getSchemaBytesBuf(schema) + } + + batches.flatMap { batch => + if (batch.numRows == 0 || batch.numCols == 0) { + logInfo(s"Skip ColumnarBatch of ${batch.numRows} rows, ${batch.numCols} cols") + Iterator.empty + } else { + val bufAddrs = new ListBuffer[Long]() + val bufSizes = new ListBuffer[Long]() + val fields = new ListBuffer[Field]() + (0 until batch.numCols).foreach { idx => + val column = batch.column(idx).asInstanceOf[ArrowWritableColumnVector] + fields += column.getValueVector.getField + column.getValueVector + .getBuffers(false) + .foreach { buffer => + bufAddrs += buffer.memoryAddress() + bufSizes += buffer.readableBytes() + } + } + + if (arrowSchema == null) { + arrowSchema = serializeSchema(fields) + } + + val instanceID = jniWrapper.nativeConvertColumnarToRow( + arrowSchema, batch.numRows, bufAddrs.toArray, bufSizes.toArray, + SparkMemoryUtils.contextMemoryPool().getNativeInstanceId) + + new Iterator[InternalRow] { + override def hasNext: Boolean = { + jniWrapper.nativeHasNext(instanceID) + } + override def next: UnsafeRow = { + jniWrapper.nativeNext(instanceID) + } + } + } + } + } + } + + override def canEqual(other: Any): Boolean = other.isInstanceOf[ArrowColumnarToRowExec] + + override def equals(other: Any): Boolean = other match { + case that: ArrowColumnarToRowExec => + (that canEqual this) && super.equals(that) + case _ => false + } +} \ No newline at end of file diff --git a/native-sql-engine/cpp/src/CMakeLists.txt b/native-sql-engine/cpp/src/CMakeLists.txt index 930985bc7..ef6ebb1d5 100644 --- a/native-sql-engine/cpp/src/CMakeLists.txt +++ b/native-sql-engine/cpp/src/CMakeLists.txt @@ -467,6 +467,7 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS codegen/arrow_compute/ext/sort_kernel.cc codegen/arrow_compute/ext/kernels_ext.cc shuffle/splitter.cc + operators/unsafe_row_writer_and_reader.cc precompile/hash_map.cc precompile/sparse_hash_map.cc precompile/builder.cc diff --git a/native-sql-engine/cpp/src/jni/jni_wrapper.cc b/native-sql-engine/cpp/src/jni/jni_wrapper.cc index 52ba27dfd..7c9aa1e32 100644 --- a/native-sql-engine/cpp/src/jni/jni_wrapper.cc +++ b/native-sql-engine/cpp/src/jni/jni_wrapper.cc @@ -39,6 +39,7 @@ #include "jni/jni_common.h" #include "proto/protobuf_utils.h" #include "shuffle/splitter.h" +#include "operators/unsafe_row_writer_and_reader.h" namespace types { class ExpressionList; @@ -62,16 +63,24 @@ static jmethodID split_result_constructor; static jclass metrics_builder_class; static jmethodID metrics_builder_constructor; + +static jclass unsafe_row_class; +static jmethodID unsafe_row_class_constructor; +static jmethodID unsafe_row_class_point_to; using arrow::jni::ConcurrentMap; static ConcurrentMap> buffer_holder_; static jint JNI_VERSION = JNI_VERSION_1_8; using CodeGenerator = sparkcolumnarplugin::codegen::CodeGenerator; +using UnsafeRowWriterAndReader = sparkcolumnarplugin::unsaferow::UnsafeRowWriterAndReader; static arrow::jni::ConcurrentMap> handler_holder_; static arrow::jni::ConcurrentMap> batch_iterator_holder_; +static arrow::jni::ConcurrentMap> + unsafe_writer_and_reader_holder_; + using sparkcolumnarplugin::shuffle::SplitOptions; using sparkcolumnarplugin::shuffle::Splitter; static arrow::jni::ConcurrentMap> shuffle_splitter_holder_; @@ -198,7 +207,13 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { CreateGlobalClassReference(env, "Lcom/intel/oap/vectorized/MetricsObject;"); metrics_builder_constructor = GetMethodID(env, metrics_builder_class, "", "([J[J)V"); - + + unsafe_row_class = + CreateGlobalClassReference(env, "Lorg/apache/spark/sql/catalyst/expressions/UnsafeRow;"); + unsafe_row_class_constructor = + GetMethodID(env, unsafe_row_class, "", "(I)V"); + unsafe_row_class_point_to = + GetMethodID(env, unsafe_row_class, "pointTo", "([BI)V"); return JNI_VERSION; } @@ -217,10 +232,12 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) { env->DeleteGlobalRef(arrow_record_batch_builder_class); env->DeleteGlobalRef(serializable_obj_builder_class); env->DeleteGlobalRef(split_result_class); + env->DeleteGlobalRef(unsafe_row_class); buffer_holder_.Clear(); handler_holder_.Clear(); batch_iterator_holder_.Clear(); + unsafe_writer_and_reader_holder_.Clear(); shuffle_splitter_holder_.Clear(); decompression_schema_holder_.Clear(); } @@ -1350,6 +1367,137 @@ JNIEXPORT void JNICALL Java_com_intel_oap_vectorized_ShuffleDecompressionJniWrap decompression_schema_holder_.Erase(schema_holder_id); } +JNIEXPORT jlong JNICALL Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapper_nativeConvertColumnarToRow( + JNIEnv* env, jobject, jbyteArray schema_arr, + jint num_rows, jlongArray buf_addrs, jlongArray buf_sizes, jlong memory_pool_id) { + if (schema_arr == NULL) { + env->ThrowNew(illegal_argument_exception_class, + std::string("Native convert columnar to row schema can't be null").c_str()); + return -1; + } + if (buf_addrs == NULL) { + env->ThrowNew(illegal_argument_exception_class, + std::string("Native convert columnar to row: buf_addrs can't be null").c_str()); + return -1; + } + if (buf_sizes == NULL) { + env->ThrowNew(illegal_argument_exception_class, + std::string("Native convert columnar to row: buf_sizes can't be null").c_str()); + return -1; + } + + int in_bufs_len = env->GetArrayLength(buf_addrs); + if (in_bufs_len != env->GetArrayLength(buf_sizes)) { + env->ThrowNew( + illegal_argument_exception_class, + std::string("Native convert columnar to row: length of buf_addrs and buf_sizes mismatch").c_str()); + return -1; + } + + std::shared_ptr schema; + // ValueOrDie in MakeSchema + MakeSchema(env, schema_arr, &schema); + + jlong* in_buf_addrs = env->GetLongArrayElements(buf_addrs, JNI_FALSE); + jlong* in_buf_sizes = env->GetLongArrayElements(buf_sizes, JNI_FALSE); + + std::shared_ptr rb; + auto status = + MakeRecordBatch(schema, num_rows, (int64_t*)in_buf_addrs, + (int64_t*)in_buf_sizes, in_bufs_len, &rb); + + env->ReleaseLongArrayElements(buf_addrs, in_buf_addrs, JNI_ABORT); + env->ReleaseLongArrayElements(buf_sizes, in_buf_sizes, JNI_ABORT); + + if (!status.ok()) { + env->ThrowNew( + illegal_argument_exception_class, + std::string("Native convert columnar to row: make record batch failed, error message is " + + status.message()) + .c_str()); + return -1; + } + + // convert the record batch to spark unsafe row. + jlong result; + try { + auto* pool = reinterpret_cast(memory_pool_id); + if (pool == nullptr) { + env->ThrowNew(illegal_argument_exception_class, + "Memory pool does not exist or has been closed"); + return -1; + } + + std::shared_ptr unsafe_row_writer_reader = + std::make_shared(rb, pool); + auto status = unsafe_row_writer_reader->Init(); + if (!status.ok()) { + env->ThrowNew( + illegal_argument_exception_class, + std::string("Native convert columnar to row: Init UnsafeRowWriterAndReader failed, error message is " + + status.message()) + .c_str()); + return -1; + } + + status = unsafe_row_writer_reader->Write(); + + if (!status.ok()) { + env->ThrowNew( + illegal_argument_exception_class, + std::string("Native convert columnar to row: UnsafeRowWriterAndReader write failed, error message is " + + status.message()) + .c_str()); + return -1; + } + + result = unsafe_writer_and_reader_holder_.Insert(std::move(unsafe_row_writer_reader)); + + } catch (const std::runtime_error& error) { + env->ThrowNew(unsupportedoperation_exception_class, error.what()); + } catch (const std::exception& error) { + env->ThrowNew(io_exception_class, error.what()); + } + + return result; +} + +std::shared_ptr GetUnsafeRowWriterAndReader(JNIEnv* env, jlong id) { + auto unsafe_writer_and_reader = unsafe_writer_and_reader_holder_.Lookup(id); + if (!unsafe_writer_and_reader) { + std::string error_message = "invalid unsafe_writer_and_reader id" + std::to_string(id); + env->ThrowNew(illegal_argument_exception_class, error_message.c_str()); + } + return unsafe_writer_and_reader; +} + +JNIEXPORT jboolean JNICALL Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapper_nativeHasNext( + JNIEnv* env, jobject, jlong instanceID) { + auto unsafe_writer_and_reader = GetUnsafeRowWriterAndReader(env, instanceID); + return unsafe_writer_and_reader->HasNext(); +} + +JNIEXPORT jobject JNICALL Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapper_nativeNext( + JNIEnv* env, jobject, jlong instanceID) { + auto unsafe_writer_and_reader = GetUnsafeRowWriterAndReader(env, instanceID); + int64_t num_cols = unsafe_writer_and_reader->GetNumCols(); + int64_t length; + std::shared_ptr buffer; + unsafe_writer_and_reader->Next(&length, &buffer); + uint8_t* data = buffer->mutable_data(); + // create the UnsafeRow Object + jobject unsafe_row = env->NewObject( + unsafe_row_class, unsafe_row_class_constructor, num_cols); + + // convert the data to jbyteArray + jbyteArray byteArray = env->NewByteArray(length); + env->SetByteArrayRegion(byteArray, 0, length, reinterpret_cast(data)); + // call the pointer to method to pointer the byte array + env->CallVoidMethod(unsafe_row, unsafe_row_class_point_to, byteArray, length); + + return unsafe_row; +} + JNIEXPORT void JNICALL Java_com_intel_oap_tpc_MallocUtils_mallocTrim(JNIEnv* env, jobject obj) { // malloc_stats_print(statsPrint, nullptr, nullptr); diff --git a/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc b/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc new file mode 100644 index 000000000..79e8e975f --- /dev/null +++ b/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "operators/unsafe_row_writer_and_reader.h" + +namespace sparkcolumnarplugin { +namespace unsaferow { + + int64_t CalculateBitSetWidthInBytes(int32_t numFields) { + return ((numFields + 63) / 64) * 8; + } + + int32_t GetVariableColsNum(std::shared_ptr schema, int64_t num_cols) { + std::vector> fields = schema->fields(); + int32_t count = 0; + for (auto i = 0; i < num_cols; i ++) { + auto type_id = fields[i]->type()->id(); + if ((type_id == arrow::BinaryType::type_id) || (type_id == arrow::StringType::type_id) || + (type_id == arrow::Decimal128Type::type_id)) count++; + } + return count; + } + + arrow::Status UnsafeRowWriterAndReader::Init() { + num_rows_ = rb_->num_rows(); + num_cols_ = rb_->num_columns(); + + // Calculate the initial size + nullBitsetWidthInBytes_ = CalculateBitSetWidthInBytes(num_cols_); + + int64_t fixedSize = nullBitsetWidthInBytes_ + 8 * num_cols_; // not contain the variable size + + int32_t num_variable_cols = GetVariableColsNum(rb_->schema(), num_cols_); + + // Initialize the buffers_ with the initial size. + for(auto i = 0; i < num_rows_; i ++) { + int64_t new_size = fixedSize + num_variable_cols * 32; // 32 is same with vanilla spark. + std::shared_ptr buffer; + ARROW_ASSIGN_OR_RAISE( + buffer, + arrow::AllocateResizableBuffer(arrow::BitUtil::BytesForBits(new_size * 8), + memory_pool_)); + memset(buffer->mutable_data(), 0, new_size); + buffer_cursor_.push_back(fixedSize); + buffers_.push_back(std::move(buffer)); + } + + row_cursor_ = 0; + return arrow::Status::OK(); + } + + bool UnsafeRowWriterAndReader::HasNext() { + if (row_cursor_ < num_rows_) { + return true; + } else { + return false; + } + } + + arrow::Status UnsafeRowWriterAndReader::Next(int64_t* length, + std::shared_ptr* buffer) { + *buffer = buffers_[row_cursor_]; + *length = buffer_cursor_[row_cursor_]; + row_cursor_ ++; + return arrow::Status::OK(); + } + + void BitSet(uint8_t* data, int32_t index) { + int64_t mask = 1L << (index & 0x3f); // mod 64 and shift + int64_t wordOffset = (index >> 6) * 8; + int64_t word; + memcpy(&word, data + wordOffset, sizeof(int64_t)); + int64_t value = word | mask; + memcpy(data + wordOffset, &value, sizeof(int64_t)); + } + + int64_t GetFieldOffset(int64_t nullBitsetWidthInBytes, int32_t index) { + return nullBitsetWidthInBytes + 8L * index; + } + + void SetNullAt(uint8_t* data, int64_t offset, int32_t col_index) { + BitSet(data, col_index); + // set the value to 0 + memset(data + offset, 0, sizeof(int64_t)); + return ; + } + + int64_t RoundNumberOfBytesToNearestWord(int64_t numBytes) { + int64_t remainder = numBytes & 0x07; // This is equivalent to `numBytes % 8` + if (remainder == 0) { + return numBytes; + } else { + return numBytes + (8 - remainder); + } + } + + arrow::Status WriteValue(std::shared_ptr buffer, + int64_t offset, int32_t row_index, std::shared_ptr array, int64_t currentCursor, int32_t col_index, + int64_t* updatedCursor) { + auto data = buffer->mutable_data(); + // Write the value into the buffer + switch(array->type_id()) { + case arrow::BooleanType::type_id: + { + // Boolean type + auto boolArray = std::static_pointer_cast(array); + auto value = boolArray->Value(row_index); + memcpy(data + offset, &value, sizeof(bool)); + break; + } + case arrow::Int8Type::type_id: + { + // Byte type + auto int8Array = std::static_pointer_cast(array); + auto value = int8Array->Value(row_index); + memcpy(data + offset, &value, sizeof(int8_t)); + break; + } + case arrow::Int16Type::type_id: + { + // Short type + auto int16Array = std::static_pointer_cast(array); + auto value = int16Array->Value(row_index); + memcpy(data + offset, &value, sizeof(int16_t)); + break; + } + case arrow::Int32Type::type_id: + { + // Integer type + auto int32Array = std::static_pointer_cast(array); + auto value = int32Array->Value(row_index); + memcpy(data + offset, &value, sizeof(int32_t)); + break; + } + case arrow::Int64Type::type_id: + { + // Long type + auto int64Array = std::static_pointer_cast(array); + auto value = int64Array->Value(row_index); + memcpy(data + offset, &value, sizeof(int64_t)); + break; + } + case arrow::FloatType::type_id: + { + // Float type + auto floatArray = std::static_pointer_cast(array); + auto value = floatArray->Value(row_index); + memcpy(data + offset, &value, sizeof(float)); + break; + } + case arrow::DoubleType::type_id: + { + // Double type + auto doubleArray = std::static_pointer_cast(array); + auto value = doubleArray->Value(row_index); + memcpy(data + offset, &value, sizeof(double)); + break; + } + case arrow::BinaryType::type_id: + { + // Binary type + auto binaryArray = std::static_pointer_cast(array); + using offset_type = typename arrow::BinaryType::offset_type; + offset_type length; + auto value = binaryArray->GetValue(row_index, &length); + int64_t roundedSize = RoundNumberOfBytesToNearestWord(length); + if (roundedSize > 32) { + // after call the resize method , the buffer is wrong. + RETURN_NOT_OK(buffer->Resize(roundedSize)); + } + // After resize buffer, the data address is changed and the value is reset to 0. + auto new_data = buffer->mutable_data(); + // write the variable value + memcpy(new_data + currentCursor, value, length); + // write the offset and size + int64_t offsetAndSize = (currentCursor << 32) | length; + memcpy(new_data + offset, &offsetAndSize, sizeof(int64_t)); + // Update the cursor of the buffer. + *updatedCursor = currentCursor + roundedSize; + break; + } + case arrow::StringType::type_id: + { + // String type + auto stringArray = std::static_pointer_cast(array); + using offset_type = typename arrow::StringType::offset_type; + offset_type length; + auto value = stringArray->GetValue(row_index, &length); + int64_t roundedSize = RoundNumberOfBytesToNearestWord(length); + + if (roundedSize > 32) { + // after call the resize method , the buffer is wrong. + RETURN_NOT_OK(buffer->Resize(roundedSize)); + } + + // After resize buffer, the data address is changed and the value is reset to 0. + auto new_data = buffer->mutable_data(); + // write the variable value + memcpy(new_data + currentCursor, value, length); + // write the offset and size + int64_t offsetAndSize = (currentCursor << 32) | length; + memcpy(new_data + offset, &offsetAndSize, sizeof(int64_t)); + // Update the cursor of the buffer. + *updatedCursor = currentCursor + roundedSize; + break; + } + case arrow::Decimal128Type::type_id: + { + + auto out_array = dynamic_cast(array.get()); + auto dtype = dynamic_cast(out_array->type().get()); + + int32_t precision = dtype->precision(); + int32_t scale = dtype->scale(); + + const arrow::Decimal128 out_value(out_array->GetValue(row_index)); + + if (precision <= 18) { + if (out_value != NULL) { + // Get the long value and write the long value + // Refer to the int64_t() method of Decimal128 + int64_t long_value = static_cast(out_value.low_bits()); + memcpy(data + offset, &long_value, sizeof(long)); + } else { + SetNullAt(data, offset, col_index); + } + } else { + if (out_value == NULL) { + SetNullAt(data, offset, col_index); + } else { + std::string value = out_value.ToIntegerString(); + const char* by = value.c_str(); + int32_t size =strlen(by); + assert(size <= 16); + + // write the variable value + memcpy(data + currentCursor, by, size); + // write the offset and size + int64_t offsetAndSize = (currentCursor << 32) | size; + memcpy(data + offset, &offsetAndSize, sizeof(int64_t)); + } + + // Update the cursor of the buffer. + *updatedCursor = currentCursor + 16; + } + break; + } + default: + return arrow::Status::Invalid("Unsupported data type: " + array->type_id()); + } + return arrow::Status::OK();; + } + + arrow::Status UnsafeRowWriterAndReader::Write() { + int64_t num_rows = rb_->num_rows(); + // Get each row value and write to the buffer + for (auto i = 0; i < num_rows; i++) { + auto buffer = buffers_[i]; + for(auto j = 0; j < num_cols_; j++) { + uint8_t* data = buffer->mutable_data(); + auto array = rb_->column(j); + // for each column, get the current row value, check whether it is null, and then write it to data. + bool is_null = array->IsNull(i); + int64_t offset = GetFieldOffset(nullBitsetWidthInBytes_, j); + if (is_null) { + SetNullAt(data, offset, j); + } else { + // Write the value to the buffer + int64_t updatedCursor = buffer_cursor_[i]; + WriteValue(buffer, offset, i, array, buffer_cursor_[i], j, &updatedCursor); + buffer_cursor_[i] = updatedCursor; + } + } + } + return arrow::Status::OK(); + } + +} // namespace unsaferow +} // namespace sparkcolumnarplugin diff --git a/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.h b/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.h new file mode 100644 index 000000000..a9848fb57 --- /dev/null +++ b/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.h @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include "gandiva/decimal_type_util.h" + +namespace sparkcolumnarplugin { +namespace unsaferow { + +class UnsafeRowWriterAndReader { + + public: + UnsafeRowWriterAndReader(std::shared_ptr rb, arrow::MemoryPool* memory_pool) + : rb_(rb), memory_pool_(memory_pool){} + arrow::Status Init(); + arrow::Status Write(); + bool HasNext(); + arrow::Status Next(int64_t* length, std::shared_ptr* buffer); + int64_t GetNumCols() { return num_cols_; } + + protected: + std::vector> buffers_; + std::vector buffer_cursor_; + std::shared_ptr rb_; + int64_t nullBitsetWidthInBytes_; + int64_t row_cursor_; + int64_t num_cols_; + int64_t num_rows_; + arrow::MemoryPool* memory_pool_ = arrow::default_memory_pool(); +}; + +} // namespace unsaferow +} // namespace sparkcolumnarplugin diff --git a/native-sql-engine/cpp/src/tests/CMakeLists.txt b/native-sql-engine/cpp/src/tests/CMakeLists.txt index 2d677f18a..885b7508e 100644 --- a/native-sql-engine/cpp/src/tests/CMakeLists.txt +++ b/native-sql-engine/cpp/src/tests/CMakeLists.txt @@ -8,4 +8,5 @@ package_add_test(TestArrowComputeWSCG arrow_compute_test_wscg.cc) package_add_test(TestArrowComputeJoinWOCG arrow_compute_test_join_wocg.cc) package_add_test(TestShuffleSplit shuffle_split_test.cc) package_add_test(TestArrowComputeWindow arrow_compute_test_window.cc) +package_add_test(TestUnsafeRowWriterAndReader unsaferow_test.cc) diff --git a/native-sql-engine/cpp/src/tests/test_utils.h b/native-sql-engine/cpp/src/tests/test_utils.h index 26f25ef65..d5a87f159 100644 --- a/native-sql-engine/cpp/src/tests/test_utils.h +++ b/native-sql-engine/cpp/src/tests/test_utils.h @@ -100,3 +100,31 @@ void MakeInputBatch(std::vector input_data, *input_batch = RecordBatch::Make(sch, length, array_list); return; } + +void ConstructNullInputBatch(std::shared_ptr* null_batch) { + std::vector> columns; + arrow::Int64Builder builder1; + builder1.AppendNull(); + builder1.Append(1); + + arrow::Int64Builder builder2; + builder2.Append(1); + builder2.AppendNull(); + + std::shared_ptr array1; + builder1.Finish(&array1); + std::shared_ptr array2; + builder2.Finish(&array2); + + columns.push_back(array1); + columns.push_back(array2); + + std::vector> schema_vec{ + arrow::field("col1", arrow::int64()), + arrow::field("col2", arrow::int64()),}; + + std::shared_ptr schema{ + std::make_shared(schema_vec)}; + *null_batch = arrow::RecordBatch::Make(schema, 2, columns); + return; +} diff --git a/native-sql-engine/cpp/src/tests/unsaferow_test.cc b/native-sql-engine/cpp/src/tests/unsaferow_test.cc new file mode 100644 index 000000000..a158f2940 --- /dev/null +++ b/native-sql-engine/cpp/src/tests/unsaferow_test.cc @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "operators/unsafe_row_writer_and_reader.h" +#include "tests/test_utils.h" + +namespace sparkcolumnarplugin { +namespace unsaferow { + +class MyMemoryPool : public arrow::MemoryPool { + public: + explicit MyMemoryPool(int64_t capacity) : capacity_(capacity) {} + + Status Allocate(int64_t size, uint8_t** out) override { + if (bytes_allocated() + size > capacity_) { + return Status::OutOfMemory("malloc of size ", size, " failed"); + } + RETURN_NOT_OK(pool_->Allocate(size, out)); + stats_.UpdateAllocatedBytes(size); + return arrow::Status::OK(); + } + + Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override { + if (new_size > capacity_) { + return Status::OutOfMemory("malloc of size ", new_size, " failed"); + } + RETURN_NOT_OK(pool_->Reallocate(old_size, new_size, ptr)); + stats_.UpdateAllocatedBytes(new_size - old_size); + return arrow::Status::OK(); + } + + void Free(uint8_t* buffer, int64_t size) override { + pool_->Free(buffer, size); + stats_.UpdateAllocatedBytes(-size); + } + + int64_t bytes_allocated() const override { return stats_.bytes_allocated(); } + + int64_t max_memory() const override { return pool_->max_memory(); } + + std::string backend_name() const override { return pool_->backend_name(); } + + private: + MemoryPool* pool_ = arrow::default_memory_pool(); + int64_t capacity_; + arrow::internal::MemoryPoolStats stats_; +}; + +class UnsaferowTest : public ::testing::Test { + protected: + void SetUp() { + auto f_int8 = field("f_int8_a", arrow::int8()); + auto f_int16 = field("f_int16", arrow::int16()); + auto f_int32 = field("f_int32", arrow::int32()); + auto f_int64 = field("f_int64", arrow::int64()); + auto f_double = field("f_double", arrow::float64()); + auto f_float = field("f_float", arrow::float32()); + auto f_bool = field("f_bool", arrow::boolean()); + auto f_string = field("f_string", arrow::utf8()); + auto f_binary = field("f_binary", arrow::binary()); + auto f_decimal = field("f_decimal128", arrow::decimal(10, 2)); + + schema_ = arrow::schema({f_bool, f_int8, f_int16, f_int32, + f_int64, f_float, f_double, f_binary, f_decimal}); + + MakeInputBatch(input_data_, schema_, &input_batch_); + ConstructNullInputBatch(&nullable_input_batch_); + } + + static const std::vector input_data_1; + static const std::vector input_data_; + + std::shared_ptr schema_; + + std::shared_ptr input_batch_1_; + std::shared_ptr input_batch_; + std::shared_ptr nullable_input_batch_; + +}; + +const std::vector UnsaferowTest::input_data_ = { + "[true, true]", + "[1, 1]", + "[1, 1]", + "[1, 1]", + "[1, 1]", + "[3.5, 3.5]", + "[1, 1]", + R"(["abc", "abc"])", + R"(["100.00"])" + }; + + +TEST_F(UnsaferowTest, TestNullTypeCheck) { + std::shared_ptr pool = std::make_shared(4000); + std::shared_ptr unsafe_row_writer_reader = + std::make_shared(nullable_input_batch_, pool.get()); + + unsafe_row_writer_reader->Init(); + unsafe_row_writer_reader->Write(); + + long expected[2][3] = {{1, 0, 1}, {2, 1, 0}}; + int32_t count = 0; + while(unsafe_row_writer_reader->HasNext()) { + int64_t length; + std::shared_ptr buffer; + unsafe_row_writer_reader->Next(&length, &buffer); + + auto data = buffer->mutable_data(); + long value = 0; + long result[3] = {0, 0, 0}; + int32_t k = 0; + for(int32_t i =0; i < length; i+=sizeof(long)) { + memcpy(&value, data + i, sizeof(long)); + result[k ++] = value; + } + + int32_t result_size = sizeof(result) / sizeof(result[0]); + ASSERT_EQ(result_size, 3); + + for (int32_t i = 0; i < result_size; i ++) { + ASSERT_EQ(result[i], expected[count][i]); + } + count ++; + } +} + +TEST_F(UnsaferowTest, TestUnsaferowWriterandReader) { + std::shared_ptr pool = std::make_shared(4000); + std::shared_ptr unsafe_row_writer_reader = + std::make_shared(input_batch_, pool.get()); + + unsafe_row_writer_reader->Init(); + unsafe_row_writer_reader->Write(); + + while(unsafe_row_writer_reader->HasNext()) { + int64_t length; + std::shared_ptr buffer; + unsafe_row_writer_reader->Next(&length, &buffer); + } +} +} // namespace shuffle +} // namespace sparkcolumnarplugin From 4cbfc96982bd3a1e896a2ef4417a23c1c01ab952 Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Wed, 7 Jul 2021 16:55:17 +0800 Subject: [PATCH 02/19] clang format --- native-sql-engine/cpp/src/jni/jni_wrapper.cc | 104 ++-- .../operators/unsafe_row_writer_and_reader.cc | 477 +++++++++--------- .../operators/unsafe_row_writer_and_reader.h | 26 +- native-sql-engine/cpp/src/tests/test_utils.h | 12 +- .../cpp/src/tests/unsaferow_test.cc | 97 ++-- 5 files changed, 355 insertions(+), 361 deletions(-) diff --git a/native-sql-engine/cpp/src/jni/jni_wrapper.cc b/native-sql-engine/cpp/src/jni/jni_wrapper.cc index 7c9aa1e32..5cb147358 100644 --- a/native-sql-engine/cpp/src/jni/jni_wrapper.cc +++ b/native-sql-engine/cpp/src/jni/jni_wrapper.cc @@ -37,9 +37,9 @@ #include "codegen/common/result_iterator.h" #include "jni/concurrent_map.h" #include "jni/jni_common.h" +#include "operators/unsafe_row_writer_and_reader.h" #include "proto/protobuf_utils.h" #include "shuffle/splitter.h" -#include "operators/unsafe_row_writer_and_reader.h" namespace types { class ExpressionList; @@ -63,7 +63,6 @@ static jmethodID split_result_constructor; static jclass metrics_builder_class; static jmethodID metrics_builder_constructor; - static jclass unsafe_row_class; static jmethodID unsafe_row_class_constructor; static jmethodID unsafe_row_class_point_to; @@ -207,13 +206,11 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { CreateGlobalClassReference(env, "Lcom/intel/oap/vectorized/MetricsObject;"); metrics_builder_constructor = GetMethodID(env, metrics_builder_class, "", "([J[J)V"); - - unsafe_row_class = - CreateGlobalClassReference(env, "Lorg/apache/spark/sql/catalyst/expressions/UnsafeRow;"); - unsafe_row_class_constructor = - GetMethodID(env, unsafe_row_class, "", "(I)V"); - unsafe_row_class_point_to = - GetMethodID(env, unsafe_row_class, "pointTo", "([BI)V"); + + unsafe_row_class = CreateGlobalClassReference( + env, "Lorg/apache/spark/sql/catalyst/expressions/UnsafeRow;"); + unsafe_row_class_constructor = GetMethodID(env, unsafe_row_class, "", "(I)V"); + unsafe_row_class_point_to = GetMethodID(env, unsafe_row_class, "pointTo", "([BI)V"); return JNI_VERSION; } @@ -1367,22 +1364,26 @@ JNIEXPORT void JNICALL Java_com_intel_oap_vectorized_ShuffleDecompressionJniWrap decompression_schema_holder_.Erase(schema_holder_id); } -JNIEXPORT jlong JNICALL Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapper_nativeConvertColumnarToRow( - JNIEnv* env, jobject, jbyteArray schema_arr, - jint num_rows, jlongArray buf_addrs, jlongArray buf_sizes, jlong memory_pool_id) { +JNIEXPORT jlong JNICALL +Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapper_nativeConvertColumnarToRow( + JNIEnv* env, jobject, jbyteArray schema_arr, jint num_rows, jlongArray buf_addrs, + jlongArray buf_sizes, jlong memory_pool_id) { if (schema_arr == NULL) { - env->ThrowNew(illegal_argument_exception_class, - std::string("Native convert columnar to row schema can't be null").c_str()); + env->ThrowNew( + illegal_argument_exception_class, + std::string("Native convert columnar to row schema can't be null").c_str()); return -1; - } + } if (buf_addrs == NULL) { - env->ThrowNew(illegal_argument_exception_class, - std::string("Native convert columnar to row: buf_addrs can't be null").c_str()); + env->ThrowNew( + illegal_argument_exception_class, + std::string("Native convert columnar to row: buf_addrs can't be null").c_str()); return -1; } if (buf_sizes == NULL) { - env->ThrowNew(illegal_argument_exception_class, - std::string("Native convert columnar to row: buf_sizes can't be null").c_str()); + env->ThrowNew( + illegal_argument_exception_class, + std::string("Native convert columnar to row: buf_sizes can't be null").c_str()); return -1; } @@ -1390,10 +1391,12 @@ JNIEXPORT jlong JNICALL Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapp if (in_bufs_len != env->GetArrayLength(buf_sizes)) { env->ThrowNew( illegal_argument_exception_class, - std::string("Native convert columnar to row: length of buf_addrs and buf_sizes mismatch").c_str()); + std::string( + "Native convert columnar to row: length of buf_addrs and buf_sizes mismatch") + .c_str()); return -1; } - + std::shared_ptr schema; // ValueOrDie in MakeSchema MakeSchema(env, schema_arr, &schema); @@ -1402,19 +1405,18 @@ JNIEXPORT jlong JNICALL Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapp jlong* in_buf_sizes = env->GetLongArrayElements(buf_sizes, JNI_FALSE); std::shared_ptr rb; - auto status = - MakeRecordBatch(schema, num_rows, (int64_t*)in_buf_addrs, - (int64_t*)in_buf_sizes, in_bufs_len, &rb); + auto status = MakeRecordBatch(schema, num_rows, (int64_t*)in_buf_addrs, + (int64_t*)in_buf_sizes, in_bufs_len, &rb); env->ReleaseLongArrayElements(buf_addrs, in_buf_addrs, JNI_ABORT); env->ReleaseLongArrayElements(buf_sizes, in_buf_sizes, JNI_ABORT); if (!status.ok()) { - env->ThrowNew( - illegal_argument_exception_class, - std::string("Native convert columnar to row: make record batch failed, error message is " + - status.message()) - .c_str()); + env->ThrowNew(illegal_argument_exception_class, + std::string("Native convert columnar to row: make record batch failed, " + "error message is " + + status.message()) + .c_str()); return -1; } @@ -1428,15 +1430,15 @@ JNIEXPORT jlong JNICALL Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapp return -1; } - std::shared_ptr unsafe_row_writer_reader = - std::make_shared(rb, pool); + std::shared_ptr unsafe_row_writer_reader = + std::make_shared(rb, pool); auto status = unsafe_row_writer_reader->Init(); if (!status.ok()) { - env->ThrowNew( - illegal_argument_exception_class, - std::string("Native convert columnar to row: Init UnsafeRowWriterAndReader failed, error message is " + - status.message()) - .c_str()); + env->ThrowNew(illegal_argument_exception_class, + std::string("Native convert columnar to row: Init " + "UnsafeRowWriterAndReader failed, error message is " + + status.message()) + .c_str()); return -1; } @@ -1444,10 +1446,11 @@ JNIEXPORT jlong JNICALL Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapp if (!status.ok()) { env->ThrowNew( - illegal_argument_exception_class, - std::string("Native convert columnar to row: UnsafeRowWriterAndReader write failed, error message is " + - status.message()) - .c_str()); + illegal_argument_exception_class, + std::string("Native convert columnar to row: UnsafeRowWriterAndReader write " + "failed, error message is " + + status.message()) + .c_str()); return -1; } @@ -1462,23 +1465,28 @@ JNIEXPORT jlong JNICALL Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapp return result; } -std::shared_ptr GetUnsafeRowWriterAndReader(JNIEnv* env, jlong id) { +std::shared_ptr GetUnsafeRowWriterAndReader(JNIEnv* env, + jlong id) { auto unsafe_writer_and_reader = unsafe_writer_and_reader_holder_.Lookup(id); if (!unsafe_writer_and_reader) { - std::string error_message = "invalid unsafe_writer_and_reader id" + std::to_string(id); + std::string error_message = + "invalid unsafe_writer_and_reader id" + std::to_string(id); env->ThrowNew(illegal_argument_exception_class, error_message.c_str()); } return unsafe_writer_and_reader; } -JNIEXPORT jboolean JNICALL Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapper_nativeHasNext( +JNIEXPORT jboolean JNICALL +Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapper_nativeHasNext( JNIEnv* env, jobject, jlong instanceID) { auto unsafe_writer_and_reader = GetUnsafeRowWriterAndReader(env, instanceID); return unsafe_writer_and_reader->HasNext(); } -JNIEXPORT jobject JNICALL Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapper_nativeNext( - JNIEnv* env, jobject, jlong instanceID) { +JNIEXPORT jobject JNICALL +Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapper_nativeNext(JNIEnv* env, + jobject, + jlong instanceID) { auto unsafe_writer_and_reader = GetUnsafeRowWriterAndReader(env, instanceID); int64_t num_cols = unsafe_writer_and_reader->GetNumCols(); int64_t length; @@ -1486,9 +1494,9 @@ JNIEXPORT jobject JNICALL Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWra unsafe_writer_and_reader->Next(&length, &buffer); uint8_t* data = buffer->mutable_data(); // create the UnsafeRow Object - jobject unsafe_row = env->NewObject( - unsafe_row_class, unsafe_row_class_constructor, num_cols); - + jobject unsafe_row = + env->NewObject(unsafe_row_class, unsafe_row_class_constructor, num_cols); + // convert the data to jbyteArray jbyteArray byteArray = env->NewByteArray(length); env->SetByteArrayRegion(byteArray, 0, length, reinterpret_cast(data)); diff --git a/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc b/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc index 79e8e975f..715a86167 100644 --- a/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc +++ b/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc @@ -20,274 +20,269 @@ namespace sparkcolumnarplugin { namespace unsaferow { - int64_t CalculateBitSetWidthInBytes(int32_t numFields) { - return ((numFields + 63) / 64) * 8; - } +int64_t CalculateBitSetWidthInBytes(int32_t numFields) { + return ((numFields + 63) / 64) * 8; +} - int32_t GetVariableColsNum(std::shared_ptr schema, int64_t num_cols) { - std::vector> fields = schema->fields(); - int32_t count = 0; - for (auto i = 0; i < num_cols; i ++) { - auto type_id = fields[i]->type()->id(); - if ((type_id == arrow::BinaryType::type_id) || (type_id == arrow::StringType::type_id) || - (type_id == arrow::Decimal128Type::type_id)) count++; - } - return count; +int32_t GetVariableColsNum(std::shared_ptr schema, int64_t num_cols) { + std::vector> fields = schema->fields(); + int32_t count = 0; + for (auto i = 0; i < num_cols; i++) { + auto type_id = fields[i]->type()->id(); + if ((type_id == arrow::BinaryType::type_id) || + (type_id == arrow::StringType::type_id) || + (type_id == arrow::Decimal128Type::type_id)) + count++; } + return count; +} - arrow::Status UnsafeRowWriterAndReader::Init() { - num_rows_ = rb_->num_rows(); - num_cols_ = rb_->num_columns(); - - // Calculate the initial size - nullBitsetWidthInBytes_ = CalculateBitSetWidthInBytes(num_cols_); - - int64_t fixedSize = nullBitsetWidthInBytes_ + 8 * num_cols_; // not contain the variable size +arrow::Status UnsafeRowWriterAndReader::Init() { + num_rows_ = rb_->num_rows(); + num_cols_ = rb_->num_columns(); - int32_t num_variable_cols = GetVariableColsNum(rb_->schema(), num_cols_); - - // Initialize the buffers_ with the initial size. - for(auto i = 0; i < num_rows_; i ++) { - int64_t new_size = fixedSize + num_variable_cols * 32; // 32 is same with vanilla spark. - std::shared_ptr buffer; - ARROW_ASSIGN_OR_RAISE( - buffer, - arrow::AllocateResizableBuffer(arrow::BitUtil::BytesForBits(new_size * 8), - memory_pool_)); - memset(buffer->mutable_data(), 0, new_size); - buffer_cursor_.push_back(fixedSize); - buffers_.push_back(std::move(buffer)); - } + // Calculate the initial size + nullBitsetWidthInBytes_ = CalculateBitSetWidthInBytes(num_cols_); - row_cursor_ = 0; - return arrow::Status::OK(); - } + int64_t fixedSize = + nullBitsetWidthInBytes_ + 8 * num_cols_; // not contain the variable size - bool UnsafeRowWriterAndReader::HasNext() { - if (row_cursor_ < num_rows_) { - return true; - } else { - return false; - } - } + int32_t num_variable_cols = GetVariableColsNum(rb_->schema(), num_cols_); - arrow::Status UnsafeRowWriterAndReader::Next(int64_t* length, - std::shared_ptr* buffer) { - *buffer = buffers_[row_cursor_]; - *length = buffer_cursor_[row_cursor_]; - row_cursor_ ++; - return arrow::Status::OK(); + // Initialize the buffers_ with the initial size. + for (auto i = 0; i < num_rows_; i++) { + int64_t new_size = + fixedSize + num_variable_cols * 32; // 32 is same with vanilla spark. + std::shared_ptr buffer; + ARROW_ASSIGN_OR_RAISE( + buffer, arrow::AllocateResizableBuffer(arrow::BitUtil::BytesForBits(new_size * 8), + memory_pool_)); + memset(buffer->mutable_data(), 0, new_size); + buffer_cursor_.push_back(fixedSize); + buffers_.push_back(std::move(buffer)); } - void BitSet(uint8_t* data, int32_t index) { - int64_t mask = 1L << (index & 0x3f); // mod 64 and shift - int64_t wordOffset = (index >> 6) * 8; - int64_t word; - memcpy(&word, data + wordOffset, sizeof(int64_t)); - int64_t value = word | mask; - memcpy(data + wordOffset, &value, sizeof(int64_t)); - } + row_cursor_ = 0; + return arrow::Status::OK(); +} - int64_t GetFieldOffset(int64_t nullBitsetWidthInBytes, int32_t index) { - return nullBitsetWidthInBytes + 8L * index; +bool UnsafeRowWriterAndReader::HasNext() { + if (row_cursor_ < num_rows_) { + return true; + } else { + return false; } +} - void SetNullAt(uint8_t* data, int64_t offset, int32_t col_index) { - BitSet(data, col_index); - // set the value to 0 - memset(data + offset, 0, sizeof(int64_t)); - return ; - } +arrow::Status UnsafeRowWriterAndReader::Next( + int64_t* length, std::shared_ptr* buffer) { + *buffer = buffers_[row_cursor_]; + *length = buffer_cursor_[row_cursor_]; + row_cursor_++; + return arrow::Status::OK(); +} - int64_t RoundNumberOfBytesToNearestWord(int64_t numBytes) { - int64_t remainder = numBytes & 0x07; // This is equivalent to `numBytes % 8` - if (remainder == 0) { - return numBytes; - } else { - return numBytes + (8 - remainder); - } +void BitSet(uint8_t* data, int32_t index) { + int64_t mask = 1L << (index & 0x3f); // mod 64 and shift + int64_t wordOffset = (index >> 6) * 8; + int64_t word; + memcpy(&word, data + wordOffset, sizeof(int64_t)); + int64_t value = word | mask; + memcpy(data + wordOffset, &value, sizeof(int64_t)); +} + +int64_t GetFieldOffset(int64_t nullBitsetWidthInBytes, int32_t index) { + return nullBitsetWidthInBytes + 8L * index; +} + +void SetNullAt(uint8_t* data, int64_t offset, int32_t col_index) { + BitSet(data, col_index); + // set the value to 0 + memset(data + offset, 0, sizeof(int64_t)); + return; +} + +int64_t RoundNumberOfBytesToNearestWord(int64_t numBytes) { + int64_t remainder = numBytes & 0x07; // This is equivalent to `numBytes % 8` + if (remainder == 0) { + return numBytes; + } else { + return numBytes + (8 - remainder); } +} - arrow::Status WriteValue(std::shared_ptr buffer, - int64_t offset, int32_t row_index, std::shared_ptr array, int64_t currentCursor, int32_t col_index, - int64_t* updatedCursor) { - auto data = buffer->mutable_data(); - // Write the value into the buffer - switch(array->type_id()) { - case arrow::BooleanType::type_id: - { - // Boolean type - auto boolArray = std::static_pointer_cast(array); - auto value = boolArray->Value(row_index); - memcpy(data + offset, &value, sizeof(bool)); - break; - } - case arrow::Int8Type::type_id: - { - // Byte type - auto int8Array = std::static_pointer_cast(array); - auto value = int8Array->Value(row_index); - memcpy(data + offset, &value, sizeof(int8_t)); - break; - } - case arrow::Int16Type::type_id: - { - // Short type - auto int16Array = std::static_pointer_cast(array); - auto value = int16Array->Value(row_index); - memcpy(data + offset, &value, sizeof(int16_t)); - break; - } - case arrow::Int32Type::type_id: - { - // Integer type - auto int32Array = std::static_pointer_cast(array); - auto value = int32Array->Value(row_index); - memcpy(data + offset, &value, sizeof(int32_t)); - break; - } - case arrow::Int64Type::type_id: - { - // Long type - auto int64Array = std::static_pointer_cast(array); - auto value = int64Array->Value(row_index); - memcpy(data + offset, &value, sizeof(int64_t)); - break; - } - case arrow::FloatType::type_id: - { - // Float type - auto floatArray = std::static_pointer_cast(array); - auto value = floatArray->Value(row_index); - memcpy(data + offset, &value, sizeof(float)); - break; - } - case arrow::DoubleType::type_id: - { - // Double type - auto doubleArray = std::static_pointer_cast(array); - auto value = doubleArray->Value(row_index); - memcpy(data + offset, &value, sizeof(double)); - break; - } - case arrow::BinaryType::type_id: - { - // Binary type - auto binaryArray = std::static_pointer_cast(array); - using offset_type = typename arrow::BinaryType::offset_type; - offset_type length; - auto value = binaryArray->GetValue(row_index, &length); - int64_t roundedSize = RoundNumberOfBytesToNearestWord(length); - if (roundedSize > 32) { - // after call the resize method , the buffer is wrong. - RETURN_NOT_OK(buffer->Resize(roundedSize)); - } - // After resize buffer, the data address is changed and the value is reset to 0. - auto new_data = buffer->mutable_data(); - // write the variable value - memcpy(new_data + currentCursor, value, length); - // write the offset and size - int64_t offsetAndSize = (currentCursor << 32) | length; - memcpy(new_data + offset, &offsetAndSize, sizeof(int64_t)); - // Update the cursor of the buffer. - *updatedCursor = currentCursor + roundedSize; - break; - } - case arrow::StringType::type_id: - { - // String type - auto stringArray = std::static_pointer_cast(array); - using offset_type = typename arrow::StringType::offset_type; - offset_type length; - auto value = stringArray->GetValue(row_index, &length); - int64_t roundedSize = RoundNumberOfBytesToNearestWord(length); - - if (roundedSize > 32) { - // after call the resize method , the buffer is wrong. - RETURN_NOT_OK(buffer->Resize(roundedSize)); - } - - // After resize buffer, the data address is changed and the value is reset to 0. - auto new_data = buffer->mutable_data(); - // write the variable value - memcpy(new_data + currentCursor, value, length); - // write the offset and size - int64_t offsetAndSize = (currentCursor << 32) | length; - memcpy(new_data + offset, &offsetAndSize, sizeof(int64_t)); - // Update the cursor of the buffer. - *updatedCursor = currentCursor + roundedSize; - break; - } - case arrow::Decimal128Type::type_id: - { +arrow::Status WriteValue(std::shared_ptr buffer, int64_t offset, + int32_t row_index, std::shared_ptr array, + int64_t currentCursor, int32_t col_index, + int64_t* updatedCursor) { + auto data = buffer->mutable_data(); + // Write the value into the buffer + switch (array->type_id()) { + case arrow::BooleanType::type_id: { + // Boolean type + auto boolArray = std::static_pointer_cast(array); + auto value = boolArray->Value(row_index); + memcpy(data + offset, &value, sizeof(bool)); + break; + } + case arrow::Int8Type::type_id: { + // Byte type + auto int8Array = std::static_pointer_cast(array); + auto value = int8Array->Value(row_index); + memcpy(data + offset, &value, sizeof(int8_t)); + break; + } + case arrow::Int16Type::type_id: { + // Short type + auto int16Array = std::static_pointer_cast(array); + auto value = int16Array->Value(row_index); + memcpy(data + offset, &value, sizeof(int16_t)); + break; + } + case arrow::Int32Type::type_id: { + // Integer type + auto int32Array = std::static_pointer_cast(array); + auto value = int32Array->Value(row_index); + memcpy(data + offset, &value, sizeof(int32_t)); + break; + } + case arrow::Int64Type::type_id: { + // Long type + auto int64Array = std::static_pointer_cast(array); + auto value = int64Array->Value(row_index); + memcpy(data + offset, &value, sizeof(int64_t)); + break; + } + case arrow::FloatType::type_id: { + // Float type + auto floatArray = std::static_pointer_cast(array); + auto value = floatArray->Value(row_index); + memcpy(data + offset, &value, sizeof(float)); + break; + } + case arrow::DoubleType::type_id: { + // Double type + auto doubleArray = std::static_pointer_cast(array); + auto value = doubleArray->Value(row_index); + memcpy(data + offset, &value, sizeof(double)); + break; + } + case arrow::BinaryType::type_id: { + // Binary type + auto binaryArray = std::static_pointer_cast(array); + using offset_type = typename arrow::BinaryType::offset_type; + offset_type length; + auto value = binaryArray->GetValue(row_index, &length); + int64_t roundedSize = RoundNumberOfBytesToNearestWord(length); + if (roundedSize > 32) { + // after call the resize method , the buffer is wrong. + RETURN_NOT_OK(buffer->Resize(roundedSize)); + } + // After resize buffer, the data address is changed and the value is reset to 0. + auto new_data = buffer->mutable_data(); + // write the variable value + memcpy(new_data + currentCursor, value, length); + // write the offset and size + int64_t offsetAndSize = (currentCursor << 32) | length; + memcpy(new_data + offset, &offsetAndSize, sizeof(int64_t)); + // Update the cursor of the buffer. + *updatedCursor = currentCursor + roundedSize; + break; + } + case arrow::StringType::type_id: { + // String type + auto stringArray = std::static_pointer_cast(array); + using offset_type = typename arrow::StringType::offset_type; + offset_type length; + auto value = stringArray->GetValue(row_index, &length); + int64_t roundedSize = RoundNumberOfBytesToNearestWord(length); - auto out_array = dynamic_cast(array.get()); - auto dtype = dynamic_cast(out_array->type().get()); + if (roundedSize > 32) { + // after call the resize method , the buffer is wrong. + RETURN_NOT_OK(buffer->Resize(roundedSize)); + } - int32_t precision = dtype->precision(); - int32_t scale = dtype->scale(); + // After resize buffer, the data address is changed and the value is reset to 0. + auto new_data = buffer->mutable_data(); + // write the variable value + memcpy(new_data + currentCursor, value, length); + // write the offset and size + int64_t offsetAndSize = (currentCursor << 32) | length; + memcpy(new_data + offset, &offsetAndSize, sizeof(int64_t)); + // Update the cursor of the buffer. + *updatedCursor = currentCursor + roundedSize; + break; + } + case arrow::Decimal128Type::type_id: { + auto out_array = dynamic_cast(array.get()); + auto dtype = dynamic_cast(out_array->type().get()); - const arrow::Decimal128 out_value(out_array->GetValue(row_index)); + int32_t precision = dtype->precision(); + int32_t scale = dtype->scale(); - if (precision <= 18) { - if (out_value != NULL) { - // Get the long value and write the long value - // Refer to the int64_t() method of Decimal128 - int64_t long_value = static_cast(out_value.low_bits()); - memcpy(data + offset, &long_value, sizeof(long)); - } else { - SetNullAt(data, offset, col_index); - } - } else { - if (out_value == NULL) { - SetNullAt(data, offset, col_index); - } else { - std::string value = out_value.ToIntegerString(); - const char* by = value.c_str(); - int32_t size =strlen(by); - assert(size <= 16); + const arrow::Decimal128 out_value(out_array->GetValue(row_index)); - // write the variable value - memcpy(data + currentCursor, by, size); - // write the offset and size - int64_t offsetAndSize = (currentCursor << 32) | size; - memcpy(data + offset, &offsetAndSize, sizeof(int64_t)); - } + if (precision <= 18) { + if (out_value != NULL) { + // Get the long value and write the long value + // Refer to the int64_t() method of Decimal128 + int64_t long_value = static_cast(out_value.low_bits()); + memcpy(data + offset, &long_value, sizeof(long)); + } else { + SetNullAt(data, offset, col_index); + } + } else { + if (out_value == NULL) { + SetNullAt(data, offset, col_index); + } else { + std::string value = out_value.ToIntegerString(); + const char* by = value.c_str(); + int32_t size = strlen(by); + assert(size <= 16); - // Update the cursor of the buffer. - *updatedCursor = currentCursor + 16; - } - break; + // write the variable value + memcpy(data + currentCursor, by, size); + // write the offset and size + int64_t offsetAndSize = (currentCursor << 32) | size; + memcpy(data + offset, &offsetAndSize, sizeof(int64_t)); } - default: - return arrow::Status::Invalid("Unsupported data type: " + array->type_id()); + + // Update the cursor of the buffer. + *updatedCursor = currentCursor + 16; + } + break; } - return arrow::Status::OK();; + default: + return arrow::Status::Invalid("Unsupported data type: " + array->type_id()); } + return arrow::Status::OK(); + ; +} - arrow::Status UnsafeRowWriterAndReader::Write() { - int64_t num_rows = rb_->num_rows(); - // Get each row value and write to the buffer - for (auto i = 0; i < num_rows; i++) { - auto buffer = buffers_[i]; - for(auto j = 0; j < num_cols_; j++) { - uint8_t* data = buffer->mutable_data(); - auto array = rb_->column(j); - // for each column, get the current row value, check whether it is null, and then write it to data. - bool is_null = array->IsNull(i); - int64_t offset = GetFieldOffset(nullBitsetWidthInBytes_, j); - if (is_null) { - SetNullAt(data, offset, j); - } else { - // Write the value to the buffer - int64_t updatedCursor = buffer_cursor_[i]; - WriteValue(buffer, offset, i, array, buffer_cursor_[i], j, &updatedCursor); - buffer_cursor_[i] = updatedCursor; - } - } +arrow::Status UnsafeRowWriterAndReader::Write() { + int64_t num_rows = rb_->num_rows(); + // Get each row value and write to the buffer + for (auto i = 0; i < num_rows; i++) { + auto buffer = buffers_[i]; + for (auto j = 0; j < num_cols_; j++) { + uint8_t* data = buffer->mutable_data(); + auto array = rb_->column(j); + // for each column, get the current row value, check whether it is null, and then + // write it to data. + bool is_null = array->IsNull(i); + int64_t offset = GetFieldOffset(nullBitsetWidthInBytes_, j); + if (is_null) { + SetNullAt(data, offset, j); + } else { + // Write the value to the buffer + int64_t updatedCursor = buffer_cursor_[i]; + WriteValue(buffer, offset, i, array, buffer_cursor_[i], j, &updatedCursor); + buffer_cursor_[i] = updatedCursor; + } } - return arrow::Status::OK(); } + return arrow::Status::OK(); +} } // namespace unsaferow } // namespace sparkcolumnarplugin diff --git a/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.h b/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.h index a9848fb57..897ea7324 100644 --- a/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.h +++ b/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.h @@ -17,28 +17,28 @@ #pragma once +#include +#include +#include +#include #include #include -#include #include -#include -#include -#include #include "gandiva/decimal_type_util.h" namespace sparkcolumnarplugin { namespace unsaferow { class UnsafeRowWriterAndReader { - public: - UnsafeRowWriterAndReader(std::shared_ptr rb, arrow::MemoryPool* memory_pool) - : rb_(rb), memory_pool_(memory_pool){} - arrow::Status Init(); - arrow::Status Write(); - bool HasNext(); - arrow::Status Next(int64_t* length, std::shared_ptr* buffer); - int64_t GetNumCols() { return num_cols_; } + UnsafeRowWriterAndReader(std::shared_ptr rb, + arrow::MemoryPool* memory_pool) + : rb_(rb), memory_pool_(memory_pool) {} + arrow::Status Init(); + arrow::Status Write(); + bool HasNext(); + arrow::Status Next(int64_t* length, std::shared_ptr* buffer); + int64_t GetNumCols() { return num_cols_; } protected: std::vector> buffers_; @@ -48,7 +48,7 @@ class UnsafeRowWriterAndReader { int64_t row_cursor_; int64_t num_cols_; int64_t num_rows_; - arrow::MemoryPool* memory_pool_ = arrow::default_memory_pool(); + arrow::MemoryPool* memory_pool_ = arrow::default_memory_pool(); }; } // namespace unsaferow diff --git a/native-sql-engine/cpp/src/tests/test_utils.h b/native-sql-engine/cpp/src/tests/test_utils.h index d5a87f159..711b69dd4 100644 --- a/native-sql-engine/cpp/src/tests/test_utils.h +++ b/native-sql-engine/cpp/src/tests/test_utils.h @@ -120,11 +120,11 @@ void ConstructNullInputBatch(std::shared_ptr* null_batch) { columns.push_back(array2); std::vector> schema_vec{ - arrow::field("col1", arrow::int64()), - arrow::field("col2", arrow::int64()),}; - - std::shared_ptr schema{ - std::make_shared(schema_vec)}; - *null_batch = arrow::RecordBatch::Make(schema, 2, columns); + arrow::field("col1", arrow::int64()), + arrow::field("col2", arrow::int64()), + }; + + std::shared_ptr schema{std::make_shared(schema_vec)}; + *null_batch = arrow::RecordBatch::Make(schema, 2, columns); return; } diff --git a/native-sql-engine/cpp/src/tests/unsaferow_test.cc b/native-sql-engine/cpp/src/tests/unsaferow_test.cc index a158f2940..af638f9ea 100644 --- a/native-sql-engine/cpp/src/tests/unsaferow_test.cc +++ b/native-sql-engine/cpp/src/tests/unsaferow_test.cc @@ -79,19 +79,19 @@ class UnsaferowTest : public ::testing::Test { auto f_int32 = field("f_int32", arrow::int32()); auto f_int64 = field("f_int64", arrow::int64()); auto f_double = field("f_double", arrow::float64()); - auto f_float = field("f_float", arrow::float32()); + auto f_float = field("f_float", arrow::float32()); auto f_bool = field("f_bool", arrow::boolean()); auto f_string = field("f_string", arrow::utf8()); auto f_binary = field("f_binary", arrow::binary()); auto f_decimal = field("f_decimal128", arrow::decimal(10, 2)); - schema_ = arrow::schema({f_bool, f_int8, f_int16, f_int32, - f_int64, f_float, f_double, f_binary, f_decimal}); + schema_ = arrow::schema({f_bool, f_int8, f_int16, f_int32, f_int64, f_float, f_double, + f_binary, f_decimal}); MakeInputBatch(input_data_, schema_, &input_batch_); ConstructNullInputBatch(&nullable_input_batch_); } - + static const std::vector input_data_1; static const std::vector input_data_; @@ -100,69 +100,60 @@ class UnsaferowTest : public ::testing::Test { std::shared_ptr input_batch_1_; std::shared_ptr input_batch_; std::shared_ptr nullable_input_batch_; - }; const std::vector UnsaferowTest::input_data_ = { - "[true, true]", - "[1, 1]", - "[1, 1]", - "[1, 1]", - "[1, 1]", - "[3.5, 3.5]", - "[1, 1]", - R"(["abc", "abc"])", - R"(["100.00"])" - }; - - -TEST_F(UnsaferowTest, TestNullTypeCheck) { + "[true, true]", "[1, 1]", "[1, 1]", "[1, 1]", "[1, 1]", "[3.5, 3.5]", "[1, 1]", + R"(["abc", "abc"])", + R"(["100.00"])"}; + +TEST_F(UnsaferowTest, TestNullTypeCheck) { std::shared_ptr pool = std::make_shared(4000); - std::shared_ptr unsafe_row_writer_reader = + std::shared_ptr unsafe_row_writer_reader = std::make_shared(nullable_input_batch_, pool.get()); - + unsafe_row_writer_reader->Init(); unsafe_row_writer_reader->Write(); - + long expected[2][3] = {{1, 0, 1}, {2, 1, 0}}; int32_t count = 0; - while(unsafe_row_writer_reader->HasNext()) { - int64_t length; - std::shared_ptr buffer; - unsafe_row_writer_reader->Next(&length, &buffer); - - auto data = buffer->mutable_data(); - long value = 0; - long result[3] = {0, 0, 0}; - int32_t k = 0; - for(int32_t i =0; i < length; i+=sizeof(long)) { - memcpy(&value, data + i, sizeof(long)); - result[k ++] = value; - } - - int32_t result_size = sizeof(result) / sizeof(result[0]); - ASSERT_EQ(result_size, 3); - - for (int32_t i = 0; i < result_size; i ++) { - ASSERT_EQ(result[i], expected[count][i]); - } - count ++; + while (unsafe_row_writer_reader->HasNext()) { + int64_t length; + std::shared_ptr buffer; + unsafe_row_writer_reader->Next(&length, &buffer); + + auto data = buffer->mutable_data(); + long value = 0; + long result[3] = {0, 0, 0}; + int32_t k = 0; + for (int32_t i = 0; i < length; i += sizeof(long)) { + memcpy(&value, data + i, sizeof(long)); + result[k++] = value; + } + + int32_t result_size = sizeof(result) / sizeof(result[0]); + ASSERT_EQ(result_size, 3); + + for (int32_t i = 0; i < result_size; i++) { + ASSERT_EQ(result[i], expected[count][i]); + } + count++; } } -TEST_F(UnsaferowTest, TestUnsaferowWriterandReader) { - std::shared_ptr pool = std::make_shared(4000); - std::shared_ptr unsafe_row_writer_reader = +TEST_F(UnsaferowTest, TestUnsaferowWriterandReader) { + std::shared_ptr pool = std::make_shared(4000); + std::shared_ptr unsafe_row_writer_reader = std::make_shared(input_batch_, pool.get()); - unsafe_row_writer_reader->Init(); - unsafe_row_writer_reader->Write(); + unsafe_row_writer_reader->Init(); + unsafe_row_writer_reader->Write(); - while(unsafe_row_writer_reader->HasNext()) { - int64_t length; - std::shared_ptr buffer; - unsafe_row_writer_reader->Next(&length, &buffer); - } + while (unsafe_row_writer_reader->HasNext()) { + int64_t length; + std::shared_ptr buffer; + unsafe_row_writer_reader->Next(&length, &buffer); + } } -} // namespace shuffle +} // namespace unsaferow } // namespace sparkcolumnarplugin From 1da4603c5758b5f3880fca5bb41a417b05d8840b Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Wed, 7 Jul 2021 20:06:52 +0800 Subject: [PATCH 03/19] code style --- .../ArrowColumnarToRowJniWrapper.java | 18 +++++++++++++++- .../execution/ArrowColumnarToRowExec.scala | 21 ++++++++++++++++--- .../cpp/src/tests/unsaferow_test.cc | 14 ++++++++----- 3 files changed, 44 insertions(+), 9 deletions(-) diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ArrowColumnarToRowJniWrapper.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ArrowColumnarToRowJniWrapper.java index 82ea2340c..be9f282d3 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ArrowColumnarToRowJniWrapper.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ArrowColumnarToRowJniWrapper.java @@ -1,5 +1,21 @@ -package com.intel.oap.vectorized; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.intel.oap.vectorized; import org.apache.spark.sql.catalyst.expressions.UnsafeRow; diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala index 7ad962530..66472d064 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.intel.oap.execution import com.intel.oap.expression.ConverterUtils @@ -26,10 +43,8 @@ case class ArrowColumnarToRowExec(child: SparkPlan) extends UnaryExecNode { override def doExecute(): RDD[InternalRow] = { - child.executeColumnar().mapPartitions { batches => - // TODO:: pass the jni jniWrapper and arrowSchema and serializeSchema method by broadcast ? - // currently only for debug. + // TODO:: pass the jni jniWrapper and arrowSchema and serializeSchema method by broadcast val jniWrapper = new ArrowColumnarToRowJniWrapper() var arrowSchema: Array[Byte] = null diff --git a/native-sql-engine/cpp/src/tests/unsaferow_test.cc b/native-sql-engine/cpp/src/tests/unsaferow_test.cc index af638f9ea..240d21450 100644 --- a/native-sql-engine/cpp/src/tests/unsaferow_test.cc +++ b/native-sql-engine/cpp/src/tests/unsaferow_test.cc @@ -97,15 +97,19 @@ class UnsaferowTest : public ::testing::Test { std::shared_ptr schema_; - std::shared_ptr input_batch_1_; std::shared_ptr input_batch_; std::shared_ptr nullable_input_batch_; }; -const std::vector UnsaferowTest::input_data_ = { - "[true, true]", "[1, 1]", "[1, 1]", "[1, 1]", "[1, 1]", "[3.5, 3.5]", "[1, 1]", - R"(["abc", "abc"])", - R"(["100.00"])"}; +const std::vector UnsaferowTest::input_data_ = {"[true, true]", + "[1, 1]", + "[1, 1]", + "[1, 1]", + "[1, 1]", + "[3.5, 3.5]", + "[1, 1]", + R"(["abc", "abc"])", + R"(["100.00", "100.00"])"}; TEST_F(UnsaferowTest, TestNullTypeCheck) { std::shared_ptr pool = std::make_shared(4000); From 45fbc7064c92f64d440c5861a19d5b6e67da0eb2 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 8 Jul 2021 06:15:23 +0000 Subject: [PATCH 04/19] clang format 10 --- .../cpp/src/operators/unsafe_row_writer_and_reader.h | 1 + 1 file changed, 1 insertion(+) diff --git a/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.h b/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.h index 897ea7324..8a85677d7 100644 --- a/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.h +++ b/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.h @@ -24,6 +24,7 @@ #include #include #include + #include "gandiva/decimal_type_util.h" namespace sparkcolumnarplugin { From 382cb9ec6e93cf5886920c3db4413da5a98584cb Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Sun, 11 Jul 2021 17:41:56 +0800 Subject: [PATCH 05/19] Fix the Decimal type with precision > 18 --- .../operators/unsafe_row_writer_and_reader.cc | 81 ++++++++++++++++++- 1 file changed, 77 insertions(+), 4 deletions(-) diff --git a/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc b/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc index 715a86167..5af2c223e 100644 --- a/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc +++ b/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc @@ -17,6 +17,8 @@ #include "operators/unsafe_row_writer_and_reader.h" +#include + namespace sparkcolumnarplugin { namespace unsaferow { @@ -111,6 +113,78 @@ int64_t RoundNumberOfBytesToNearestWord(int64_t numBytes) { } } +int32_t FirstNonzeroLongNum(std::array mag) { + int32_t fn = 0; + int32_t i; + int32_t mlen = mag.size(); + for ( i = mlen - 1; i >= 0 && mag[i] == 0; i--); + fn = mlen - i - 1; + return fn; + } + + +int64_t GetLong(int32_t n, int32_t sig, std::array mag) { + if (n < 0) + return 0; + if (n >= mag.size()) + return sig; + + int magInt = mag[mag.size()-n-1]; + + return (sig >= 0 ? magInt : + (n <= FirstNonzeroLongNum(mag) ? -magInt : ~magInt)); + } + + int32_t GetBitLen(int64_t high, uint64_t low) { + if (high != 0) { + return log(high) + 64; + } else { + return log(low); + } + } +/* +* This method refer to the BigInterger#toByteArray() method in Java side. +*/ +std::array ToByteArray( arrow::Decimal128 value, int32_t* length) { + int64_t high = value.high_bits(); + uint64_t low = value.low_bits(); + arrow::Decimal128 new_value; + int32_t sig; + if (value > 0) { + new_value = value; + sig = 1; + } else if (value <0){ + new_value = value.Abs(); + sig = -1; + } else { + new_value = value; + sig = 0; + } + + int64_t new_high = new_value.high_bits(); + uint64_t new_low = new_value.low_bits(); + + std::array mag{{0}}; + mag[0] = new_value.high_bits(); + mag[1] = new_value.low_bits(); + int32_t byte_length = GetBitLen(new_high, new_low) / 8 + 1; + std::array out{{0}}; + int64_t nextLong=0; + for (int32_t i=byte_length-1, bytesCopied=8 , intIndex=0; i >= 0; i--) { + if (bytesCopied == 8) { + nextLong = GetLong(intIndex++, sig, mag); + bytesCopied = 1; + } else { + nextLong>>= 8; + bytesCopied++; + } + out[i] = (uint8_t)nextLong; + } + *length = byte_length; + return out; +} + + arrow::Status WriteValue(std::shared_ptr buffer, int64_t offset, int32_t row_index, std::shared_ptr array, int64_t currentCursor, int32_t col_index, @@ -235,13 +309,12 @@ arrow::Status WriteValue(std::shared_ptr buffer, int64_t if (out_value == NULL) { SetNullAt(data, offset, col_index); } else { - std::string value = out_value.ToIntegerString(); - const char* by = value.c_str(); - int32_t size = strlen(by); + int32_t size; + auto out = ToByteArray(out_value, &size); assert(size <= 16); // write the variable value - memcpy(data + currentCursor, by, size); + memcpy(data + currentCursor, &out[0], size); // write the offset and size int64_t offsetAndSize = (currentCursor << 32) | size; memcpy(data + offset, &offsetAndSize, sizeof(int64_t)); From f390bbdf88107ce2cd486fb51a88b9f4e2eaec1b Mon Sep 17 00:00:00 2001 From: root Date: Sun, 11 Jul 2021 09:46:21 +0000 Subject: [PATCH 06/19] clang format --- .../operators/unsafe_row_writer_and_reader.cc | 116 +++++++++--------- 1 file changed, 56 insertions(+), 60 deletions(-) diff --git a/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc b/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc index 5af2c223e..95fae16b6 100644 --- a/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc +++ b/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc @@ -114,77 +114,73 @@ int64_t RoundNumberOfBytesToNearestWord(int64_t numBytes) { } int32_t FirstNonzeroLongNum(std::array mag) { - int32_t fn = 0; - int32_t i; - int32_t mlen = mag.size(); - for ( i = mlen - 1; i >= 0 && mag[i] == 0; i--); - fn = mlen - i - 1; - return fn; - } - + int32_t fn = 0; + int32_t i; + int32_t mlen = mag.size(); + for (i = mlen - 1; i >= 0 && mag[i] == 0; i--) + ; + fn = mlen - i - 1; + return fn; +} int64_t GetLong(int32_t n, int32_t sig, std::array mag) { - if (n < 0) - return 0; - if (n >= mag.size()) - return sig; + if (n < 0) return 0; + if (n >= mag.size()) return sig; - int magInt = mag[mag.size()-n-1]; + int magInt = mag[mag.size() - n - 1]; - return (sig >= 0 ? magInt : - (n <= FirstNonzeroLongNum(mag) ? -magInt : ~magInt)); - } + return (sig >= 0 ? magInt : (n <= FirstNonzeroLongNum(mag) ? -magInt : ~magInt)); +} - int32_t GetBitLen(int64_t high, uint64_t low) { - if (high != 0) { - return log(high) + 64; - } else { - return log(low); - } +int32_t GetBitLen(int64_t high, uint64_t low) { + if (high != 0) { + return log(high) + 64; + } else { + return log(low); } +} /* -* This method refer to the BigInterger#toByteArray() method in Java side. -*/ -std::array ToByteArray( arrow::Decimal128 value, int32_t* length) { - int64_t high = value.high_bits(); - uint64_t low = value.low_bits(); - arrow::Decimal128 new_value; - int32_t sig; - if (value > 0) { - new_value = value; - sig = 1; - } else if (value <0){ - new_value = value.Abs(); - sig = -1; - } else { - new_value = value; - sig = 0; - } + * This method refer to the BigInterger#toByteArray() method in Java side. + */ +std::array ToByteArray(arrow::Decimal128 value, int32_t* length) { + int64_t high = value.high_bits(); + uint64_t low = value.low_bits(); + arrow::Decimal128 new_value; + int32_t sig; + if (value > 0) { + new_value = value; + sig = 1; + } else if (value < 0) { + new_value = value.Abs(); + sig = -1; + } else { + new_value = value; + sig = 0; + } - int64_t new_high = new_value.high_bits(); - uint64_t new_low = new_value.low_bits(); - - std::array mag{{0}}; - mag[0] = new_value.high_bits(); - mag[1] = new_value.low_bits(); - int32_t byte_length = GetBitLen(new_high, new_low) / 8 + 1; - std::array out{{0}}; - int64_t nextLong=0; - for (int32_t i=byte_length-1, bytesCopied=8 , intIndex=0; i >= 0; i--) { - if (bytesCopied == 8) { - nextLong = GetLong(intIndex++, sig, mag); - bytesCopied = 1; - } else { - nextLong>>= 8; - bytesCopied++; - } - out[i] = (uint8_t)nextLong; + int64_t new_high = new_value.high_bits(); + uint64_t new_low = new_value.low_bits(); + + std::array mag{{0}}; + mag[0] = new_value.high_bits(); + mag[1] = new_value.low_bits(); + int32_t byte_length = GetBitLen(new_high, new_low) / 8 + 1; + std::array out{{0}}; + int64_t nextLong = 0; + for (int32_t i = byte_length - 1, bytesCopied = 8, intIndex = 0; i >= 0; i--) { + if (bytesCopied == 8) { + nextLong = GetLong(intIndex++, sig, mag); + bytesCopied = 1; + } else { + nextLong >>= 8; + bytesCopied++; } - *length = byte_length; - return out; + out[i] = (uint8_t)nextLong; + } + *length = byte_length; + return out; } - arrow::Status WriteValue(std::shared_ptr buffer, int64_t offset, int32_t row_index, std::shared_ptr array, int64_t currentCursor, int32_t col_index, From c0cb677241b81a3163c8bab0bf589cafd7ff9461 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 12 Jul 2021 06:45:24 +0000 Subject: [PATCH 07/19] fix the decimal type with precision > 18 bug --- .../operators/unsafe_row_writer_and_reader.cc | 151 ++++++++++++++---- 1 file changed, 123 insertions(+), 28 deletions(-) diff --git a/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc b/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc index 95fae16b6..de6af32fb 100644 --- a/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc +++ b/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc @@ -17,8 +17,6 @@ #include "operators/unsafe_row_writer_and_reader.h" -#include - namespace sparkcolumnarplugin { namespace unsaferow { @@ -113,32 +111,120 @@ int64_t RoundNumberOfBytesToNearestWord(int64_t numBytes) { } } -int32_t FirstNonzeroLongNum(std::array mag) { +int32_t FirstNonzeroLongNum(int32_t* mag, int32_t length) { int32_t fn = 0; int32_t i; - int32_t mlen = mag.size(); - for (i = mlen - 1; i >= 0 && mag[i] == 0; i--) + for (i = length - 1; i >= 0 && mag[i] == 0; i--) ; - fn = mlen - i - 1; + fn = length - i - 1; return fn; } -int64_t GetLong(int32_t n, int32_t sig, std::array mag) { +int32_t GetInt(int32_t n, int32_t sig, int32_t* mag, int32_t length) { if (n < 0) return 0; - if (n >= mag.size()) return sig; + if (n >= length) return sig < 0 ? -1 : 0; + + int32_t magInt = mag[length - n - 1]; + return (sig >= 0 ? magInt + : (n <= FirstNonzeroLongNum(mag, length) ? -magInt : ~magInt)); +} + +int32_t GetNumberOfLeadingZeros(uint32_t i) { + // HD, Figure 5-6 + if (i == 0) return 32; + int32_t n = 1; + if (i >> 16 == 0) { + n += 16; + i <<= 16; + } + if (i >> 24 == 0) { + n += 8; + i <<= 8; + } + if (i >> 28 == 0) { + n += 4; + i <<= 4; + } + if (i >> 30 == 0) { + n += 2; + i <<= 2; + } + n -= i >> 31; + return n; +} - int magInt = mag[mag.size() - n - 1]; +int32_t GetBitLengthForInt(uint32_t n) { return 32 - GetNumberOfLeadingZeros(n); } - return (sig >= 0 ? magInt : (n <= FirstNonzeroLongNum(mag) ? -magInt : ~magInt)); +int32_t GetBitCount(uint32_t i) { + // HD, Figure 5-2 + i = i - ((i >> 1) & 0x55555555); + i = (i & 0x33333333) + ((i >> 2) & 0x33333333); + i = (i + (i >> 4)) & 0x0f0f0f0f; + i = i + (i >> 8); + i = i + (i >> 16); + return i & 0x3f; } -int32_t GetBitLen(int64_t high, uint64_t low) { - if (high != 0) { - return log(high) + 64; +int32_t GetBitLength(int32_t sig, int32_t* mag, int32_t len) { + int32_t n = -1; + if (len == 0) { + n = 0; } else { - return log(low); + // Calculate the bit length of the magnitude + int32_t mag_bit_length = ((len - 1) << 5) + GetBitLengthForInt((uint32_t)mag[0]); + if (sig < 0) { + // Check if magnitude is a power of two + bool pow2 = (GetBitCount((uint32_t)mag[0]) == 1); + for (int i = 1; i < len && pow2; i++) pow2 = (mag[i] == 0); + + n = (pow2 ? mag_bit_length - 1 : mag_bit_length); + } else { + n = mag_bit_length; + } + } + return n; +} + +uint32_t* RevertArray(int64_t new_high, uint64_t new_low, int32_t* size) { + std::array mag{{0}}; + memcpy(&mag[0], &new_high, 8); + memcpy(&mag[2], &new_low, 8); + + int32_t start = 0; + // remove the front 0 + for (int32_t i = 0; i < 4; i++) { + if (mag[i] == 0) start++; + if (mag[i] != 0) break; + } + + int32_t length = 4 - start; + uint32_t* new_mag = new uint32_t[length]; + int32_t k = 0; + // revert the mag after remove the high 0 + for (int32_t i = 3; i >= start; i--) { + new_mag[k++] = mag[i]; + } + + start = 0; + // remove the front 0 + for (int32_t i = 0; i < length; i++) { + if (new_mag[i] == 0) start++; + if (new_mag[i] != 0) break; + } + int32_t final_length = length - start; + + uint32_t* final_mag = new uint32_t[final_length]; + k = 0; + // copy the non-0 value to final mag. + for (int32_t i = start; i < length; i++) { + final_mag[k++] = new_mag[i]; } + + delete new_mag; + *size = final_length; + return final_mag; } + /* * This method refer to the BigInterger#toByteArray() method in Java side. */ @@ -161,23 +247,31 @@ std::array ToByteArray(arrow::Decimal128 value, int32_t* length) { int64_t new_high = new_value.high_bits(); uint64_t new_low = new_value.low_bits(); - std::array mag{{0}}; - mag[0] = new_value.high_bits(); - mag[1] = new_value.low_bits(); - int32_t byte_length = GetBitLen(new_high, new_low) / 8 + 1; + uint32_t* mag; + int32_t size; + mag = RevertArray(new_high, new_low, &size); + + int32_t* final_mag = new int32_t[size]; + memcpy(final_mag, mag, size * 4); + + int32_t byte_length = GetBitLength(sig, final_mag, size) / 8 + 1; + std::array out{{0}}; - int64_t nextLong = 0; - for (int32_t i = byte_length - 1, bytesCopied = 8, intIndex = 0; i >= 0; i--) { - if (bytesCopied == 8) { - nextLong = GetLong(intIndex++, sig, mag); - bytesCopied = 1; + uint32_t next_int = 0; + for (int32_t i = byte_length - 1, bytes_copied = 4, int_index = 0; i >= 0; i--) { + if (bytes_copied == 4) { + next_int = GetInt(int_index++, sig, final_mag, size); + bytes_copied = 1; } else { - nextLong >>= 8; - bytesCopied++; + next_int >>= 8; + bytes_copied++; } - out[i] = (uint8_t)nextLong; + + out[i] = (uint8_t)next_int; } *length = byte_length; + + delete mag, final_mag; return out; } @@ -291,9 +385,10 @@ arrow::Status WriteValue(std::shared_ptr buffer, int64_t int32_t scale = dtype->scale(); const arrow::Decimal128 out_value(out_array->GetValue(row_index)); + bool flag = out_array->IsNull(row_index); if (precision <= 18) { - if (out_value != NULL) { + if (!flag) { // Get the long value and write the long value // Refer to the int64_t() method of Decimal128 int64_t long_value = static_cast(out_value.low_bits()); @@ -302,7 +397,7 @@ arrow::Status WriteValue(std::shared_ptr buffer, int64_t SetNullAt(data, offset, col_index); } } else { - if (out_value == NULL) { + if (flag) { SetNullAt(data, offset, col_index); } else { int32_t size; From 8be590587754d06abb3a115c3d02d9b5bfed7750 Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Mon, 12 Jul 2021 16:17:01 +0800 Subject: [PATCH 08/19] fix the AQE issue and fall back to ColumnarToRow when contain the CalendarIntervalType --- .../scala/com/intel/oap/ColumnarPlugin.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala index 4e9a7e893..dbcfd0b8d 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala @@ -19,7 +19,6 @@ package com.intel.oap import com.intel.oap.execution._ import com.intel.oap.sql.execution.RowToArrowColumnarExec - import org.apache.spark.internal.config._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} @@ -35,6 +34,7 @@ import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, ColumnarArrowEvalPythonExec} import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.CalendarIntervalType case class ColumnarPreOverrides() extends Rule[SparkPlan] { val columnarConf: ColumnarPluginConfig = ColumnarPluginConfig.getSessionConf @@ -292,9 +292,6 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] { val child = replaceWithColumnarPlan(plan.child) logDebug(s"ColumnarPostOverrides RowToArrowColumnarExec(${child.getClass})") RowToArrowColumnarExec(child) - case plan: ColumnarToRowExec => - val child = replaceWithColumnarPlan(plan.child) - new ArrowColumnarToRowExec(child) case ColumnarToRowExec(child: ColumnarShuffleExchangeAdaptor) => replaceWithColumnarPlan(child) case ColumnarToRowExec(child: ColumnarBroadcastExchangeAdaptor) => @@ -308,8 +305,15 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] { val children = r.children.map(c => c match { case c: ColumnarToRowExec => - val child = replaceWithColumnarPlan(c.child) - new ArrowColumnarToRowExec(child) + val containCalendarType = c.schema.exists( + _.dataType.isInstanceOf[CalendarIntervalType]) + if (containCalendarType) { + // ArrowColumnarToRowExec does not support CalendarIntervalType type. + c.withNewChildren(c.children.map(replaceWithColumnarPlan)) + } else { + val child = replaceWithColumnarPlan(c.child) + new ArrowColumnarToRowExec(child) + } case other => replaceWithColumnarPlan(other) }) From 24be2c1a8863f64dccf241fceb18360424e9213f Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Tue, 13 Jul 2021 10:23:48 +0800 Subject: [PATCH 09/19] refine the convert mag array method for decimal type --- .../operators/unsafe_row_writer_and_reader.cc | 38 +++++++------------ 1 file changed, 13 insertions(+), 25 deletions(-) diff --git a/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc b/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc index de6af32fb..4673ea630 100644 --- a/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc +++ b/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc @@ -185,10 +185,13 @@ int32_t GetBitLength(int32_t sig, int32_t* mag, int32_t len) { return n; } -uint32_t* RevertArray(int64_t new_high, uint64_t new_low, int32_t* size) { - std::array mag{{0}}; - memcpy(&mag[0], &new_high, 8); - memcpy(&mag[2], &new_low, 8); +uint32_t* ConvertMagArray(int64_t new_high, uint64_t new_low, int32_t* size) { + // convert the new_high and new_low to 4 int value. + uint32_t* mag = new uint32_t[4]; + mag[3] = (uint32_t)new_low; + mag[2] = new_low >>= 32; + mag[1] = (uint32_t)new_high; + mag[0] = new_high >>= 32; int32_t start = 0; // remove the front 0 @@ -200,29 +203,14 @@ uint32_t* RevertArray(int64_t new_high, uint64_t new_low, int32_t* size) { int32_t length = 4 - start; uint32_t* new_mag = new uint32_t[length]; int32_t k = 0; - // revert the mag after remove the high 0 - for (int32_t i = 3; i >= start; i--) { + // get the mag after remove the high 0 + for (int32_t i = start; i < 4; i++) { new_mag[k++] = mag[i]; } - start = 0; - // remove the front 0 - for (int32_t i = 0; i < length; i++) { - if (new_mag[i] == 0) start++; - if (new_mag[i] != 0) break; - } - int32_t final_length = length - start; - - uint32_t* final_mag = new uint32_t[final_length]; - k = 0; - // copy the non-0 value to final mag. - for (int32_t i = start; i < length; i++) { - final_mag[k++] = new_mag[i]; - } - - delete new_mag; - *size = final_length; - return final_mag; + delete mag; + *size = length; + return new_mag; } /* @@ -249,7 +237,7 @@ std::array ToByteArray(arrow::Decimal128 value, int32_t* length) { uint32_t* mag; int32_t size; - mag = RevertArray(new_high, new_low, &size); + mag = ConvertMagArray(new_high, new_low, &size); int32_t* final_mag = new int32_t[size]; memcpy(final_mag, mag, size * 4); From e5461f514fa3b69622ec57a871542e8332cb0437 Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Tue, 13 Jul 2021 18:11:28 +0800 Subject: [PATCH 10/19] fix the buffer resize issue --- .../cpp/src/operators/unsafe_row_writer_and_reader.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc b/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc index 4673ea630..30b2e8ed0 100644 --- a/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc +++ b/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc @@ -327,8 +327,8 @@ arrow::Status WriteValue(std::shared_ptr buffer, int64_t auto value = binaryArray->GetValue(row_index, &length); int64_t roundedSize = RoundNumberOfBytesToNearestWord(length); if (roundedSize > 32) { - // after call the resize method , the buffer is wrong. - RETURN_NOT_OK(buffer->Resize(roundedSize)); + int64_t new_size = buffer->size() + roundedSize; + RETURN_NOT_OK(buffer->Resize(new_size)); } // After resize buffer, the data address is changed and the value is reset to 0. auto new_data = buffer->mutable_data(); @@ -350,8 +350,8 @@ arrow::Status WriteValue(std::shared_ptr buffer, int64_t int64_t roundedSize = RoundNumberOfBytesToNearestWord(length); if (roundedSize > 32) { - // after call the resize method , the buffer is wrong. - RETURN_NOT_OK(buffer->Resize(roundedSize)); + int64_t new_size = buffer->size() + roundedSize; + RETURN_NOT_OK(buffer->Resize(new_size)); } // After resize buffer, the data address is changed and the value is reset to 0. From f16c59e8a3e5e65e466b1e3fb11b883156e05ac0 Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Wed, 14 Jul 2021 13:44:22 +0800 Subject: [PATCH 11/19] support the date type --- .../cpp/src/operators/unsafe_row_writer_and_reader.cc | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc b/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc index 30b2e8ed0..1e995f4ef 100644 --- a/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc +++ b/native-sql-engine/cpp/src/operators/unsafe_row_writer_and_reader.cc @@ -404,6 +404,12 @@ arrow::Status WriteValue(std::shared_ptr buffer, int64_t } break; } + case arrow::Date32Type::type_id: { + auto date32Array = std::static_pointer_cast(array); + auto value = date32Array->Value(row_index); + memcpy(data + offset, &value, sizeof(int32_t)); + break; + } default: return arrow::Status::Invalid("Unsupported data type: " + array->type_id()); } @@ -412,9 +418,8 @@ arrow::Status WriteValue(std::shared_ptr buffer, int64_t } arrow::Status UnsafeRowWriterAndReader::Write() { - int64_t num_rows = rb_->num_rows(); // Get each row value and write to the buffer - for (auto i = 0; i < num_rows; i++) { + for (auto i = 0; i < num_rows_; i++) { auto buffer = buffers_[i]; for (auto j = 0; j < num_cols_; j++) { uint8_t* data = buffer->mutable_data(); From bd47d084a38bd88ac3bc4f38404202c8dcfa38e7 Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Wed, 14 Jul 2021 17:15:57 +0800 Subject: [PATCH 12/19] add the buildCheck() method and metrics --- .../scala/com/intel/oap/ColumnarPlugin.scala | 17 +++++++----- .../com/intel/oap/ColumnarPluginConfig.scala | 3 +++ .../execution/ArrowColumnarToRowExec.scala | 26 +++++++++++++++++++ 3 files changed, 39 insertions(+), 7 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala index dbcfd0b8d..8cff1f60c 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala @@ -305,14 +305,17 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] { val children = r.children.map(c => c match { case c: ColumnarToRowExec => - val containCalendarType = c.schema.exists( - _.dataType.isInstanceOf[CalendarIntervalType]) - if (containCalendarType) { - // ArrowColumnarToRowExec does not support CalendarIntervalType type. - c.withNewChildren(c.children.map(replaceWithColumnarPlan)) + if (columnarConf.enableArrowColumnarToRow) { + try { + val child = replaceWithColumnarPlan(c.child) + ArrowColumnarToRowExec(child) + } catch { + case _: Throwable => + logInfo("ArrowColumnarToRow : Falling back to ColumnarToRow...") + c.withNewChildren(c.children.map(replaceWithColumnarPlan)) + } } else { - val child = replaceWithColumnarPlan(c.child) - new ArrowColumnarToRowExec(child) + c.withNewChildren(c.children.map(replaceWithColumnarPlan)) } case other => replaceWithColumnarPlan(other) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala index 7f084b0b7..91d4cdc6b 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala @@ -71,6 +71,9 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging { val enableColumnarShuffledHashJoin: Boolean = conf.getConfString("spark.oap.sql.columnar.shuffledhashjoin", "true").toBoolean && enableCpu + val enableArrowColumnarToRow: Boolean = + conf.getConfString("spark.oap.sql.columnar.columnartorow", "true").toBoolean && enableCpu + // enable or disable columnar sortmergejoin // this should be set with preferSortMergeJoin=false val enableColumnarSortMergeJoin: Boolean = diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala index 66472d064..b86be1bdc 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, UnsafeRow} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import scala.collection.JavaConverters._ @@ -41,7 +42,29 @@ case class ArrowColumnarToRowExec(child: SparkPlan) extends UnaryExecNode { override def outputOrdering: Seq[SortOrder] = child.outputOrdering + buildCheck() + + def buildCheck(): Unit = { + val schema = child.schema + for (field <- schema.fields) { + try { + ConverterUtils.checkIfTypeSupported(field.dataType) + } catch { + case e: UnsupportedOperationException => + throw new UnsupportedOperationException( + s"${field.dataType} is not supported in ArrowColumnarToRowExec.") + } + } + } + + override lazy val metrics: Map[String, SQLMetric] = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches") + ) + override def doExecute(): RDD[InternalRow] = { + val numOutputRows = longMetric("numOutputRows") + val numInputBatches = longMetric("numInputBatches") child.executeColumnar().mapPartitions { batches => // TODO:: pass the jni jniWrapper and arrowSchema and serializeSchema method by broadcast @@ -54,6 +77,9 @@ case class ArrowColumnarToRowExec(child: SparkPlan) extends UnaryExecNode { } batches.flatMap { batch => + numInputBatches += 1 + numOutputRows += batch.numRows() + if (batch.numRows == 0 || batch.numCols == 0) { logInfo(s"Skip ColumnarBatch of ${batch.numRows} rows, ${batch.numCols} cols") Iterator.empty From ea4103e34f8a2048bd7ecc97521c5423cc215be1 Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Fri, 16 Jul 2021 17:54:51 +0800 Subject: [PATCH 13/19] add the time metric in ArrowColumnarToRowExec operator --- .../execution/ArrowColumnarToRowExec.scala | 25 ++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala index b86be1bdc..2abe413c8 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer +import scala.concurrent.duration._ case class ArrowColumnarToRowExec(child: SparkPlan) extends UnaryExecNode { override def nodeName: String = "ArrowColumnarToRow" @@ -59,12 +60,18 @@ case class ArrowColumnarToRowExec(child: SparkPlan) extends UnaryExecNode { override lazy val metrics: Map[String, SQLMetric] = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches") + "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"), + "convertTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to convert"), + "hasNextTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to has next"), + "nextTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to next") ) override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") val numInputBatches = longMetric("numInputBatches") + val convertTime = longMetric("convertTime") + val hasNextTime = longMetric("hasNextTime") + val nextTime = longMetric("nextTime") child.executeColumnar().mapPartitions { batches => // TODO:: pass the jni jniWrapper and arrowSchema and serializeSchema method by broadcast @@ -102,16 +109,28 @@ case class ArrowColumnarToRowExec(child: SparkPlan) extends UnaryExecNode { arrowSchema = serializeSchema(fields) } + val beforeConvert = System.nanoTime() + val instanceID = jniWrapper.nativeConvertColumnarToRow( arrowSchema, batch.numRows, bufAddrs.toArray, bufSizes.toArray, SparkMemoryUtils.contextMemoryPool().getNativeInstanceId) + convertTime += NANOSECONDS.toMillis(System.nanoTime() - beforeConvert) + new Iterator[InternalRow] { override def hasNext: Boolean = { - jniWrapper.nativeHasNext(instanceID) + val beforeHasNext = System.nanoTime() + val result = jniWrapper.nativeHasNext(instanceID) + + hasNextTime += NANOSECONDS.toMillis(System.nanoTime() - beforeHasNext) + result + } override def next: UnsafeRow = { - jniWrapper.nativeNext(instanceID) + val beforeNext = System.nanoTime() + val row = jniWrapper.nativeNext(instanceID) + nextTime += NANOSECONDS.toMillis(System.nanoTime() - beforeNext) + row } } } From 43f0a80330ed8302d904f9f1cc7b71d32530f3ff Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Mon, 19 Jul 2021 13:14:40 +0800 Subject: [PATCH 14/19] print the failed uts in travis --- .github/workflows/unittests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/unittests.yml b/.github/workflows/unittests.yml index 708db07ee..5bd73f630 100644 --- a/.github/workflows/unittests.yml +++ b/.github/workflows/unittests.yml @@ -101,7 +101,7 @@ jobs: mvn clean install -DskipTests -Dbuild_arrow=OFF cd .. mvn clean package -P full-scala-compiler -am -pl native-sql-engine/core -DskipTests -Dbuild_arrow=OFF - mvn test -P full-scala-compiler -DmembersOnlySuites=org.apache.spark.sql.nativesql -am -DfailIfNoTests=false -Dexec.skip=true -DargLine="-Dspark.test.home=/tmp/spark-3.1.1-bin-hadoop2.7" &> log-file.log + mvn test -P full-scala-compiler -DmembersOnlySuites=org.apache.spark.sql.nativesql -am -DfailIfNoTests=false -Dexec.skip=true -DargLine="-Dspark.test.home=/tmp/spark-3.1.1-bin-hadoop2.7" echo '#!/bin/bash' > grep.sh echo "module_tested=0; module_should_test=8; tests_total=0; while read -r line; do num=\$(echo \"\$line\" | grep -o -E '[0-9]+'); tests_total=\$((tests_total+num)); done <<<\"\$(grep \"Total number of tests run:\" log-file.log)\"; succeed_total=0; while read -r line; do [[ \$line =~ [^0-9]*([0-9]+)\, ]]; num=\${BASH_REMATCH[1]}; succeed_total=\$((succeed_total+num)); let module_tested++; done <<<\"\$(grep \"succeeded\" log-file.log)\"; if test \$tests_total -eq \$succeed_total -a \$module_tested -eq \$module_should_test; then echo \"All unit tests succeed\"; else echo \"Unit tests failed\"; exit 1; fi" >> grep.sh bash grep.sh From ed6edb3e0c91f3564b6af5d640f78b4bebf28915 Mon Sep 17 00:00:00 2001 From: Rui Mo Date: Mon, 19 Jul 2021 17:50:10 +0800 Subject: [PATCH 15/19] fix value count --- .../com/intel/oap/execution/ColumnarHashAggregateExec.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala index ac7050159..169bfd443 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala @@ -279,6 +279,9 @@ case class ColumnarHashAggregateExec( def getResForAggregateAndGroupingLiteral: ColumnarBatch = { val resultColumnVectors = ArrowWritableColumnVector.allocateColumns(1, resultStructType) + for (vector <- resultColumnVectors) { + vector.getValueVector.setValueCount(1) + } var idx = 0 for (exp <- groupingExpressions) { val out_res = exp.children.head.asInstanceOf[Literal].value @@ -376,6 +379,9 @@ case class ColumnarHashAggregateExec( } val resultColumnVectors = ArrowWritableColumnVector.allocateColumns(1, resultStructType) + for (vector <- resultColumnVectors) { + vector.getValueVector.setValueCount(1) + } // If groupby is not required, for Final mode, a default value will be // returned if input is empty. var idx = 0 From 7a6d9a98353074b4d00891341cf192dd2bd79c1b Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Wed, 21 Jul 2021 12:23:58 +0800 Subject: [PATCH 16/19] fix the failed ut and set the default value from true to false --- .../src/main/scala/com/intel/oap/ColumnarPluginConfig.scala | 2 +- .../com/intel/oap/execution/ArrowColumnarToRowExec.scala | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala index 91d4cdc6b..cc4c16ed9 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala @@ -72,7 +72,7 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging { conf.getConfString("spark.oap.sql.columnar.shuffledhashjoin", "true").toBoolean && enableCpu val enableArrowColumnarToRow: Boolean = - conf.getConfString("spark.oap.sql.columnar.columnartorow", "true").toBoolean && enableCpu + conf.getConfString("spark.oap.sql.columnar.columnartorow", "false").toBoolean && enableCpu // enable or disable columnar sortmergejoin // this should be set with preferSortMergeJoin=false diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala index 2abe413c8..f3ce947aa 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala @@ -142,7 +142,8 @@ case class ArrowColumnarToRowExec(child: SparkPlan) extends UnaryExecNode { override def equals(other: Any): Boolean = other match { case that: ArrowColumnarToRowExec => - (that canEqual this) && super.equals(that) + (that canEqual this) case _ => false } -} \ No newline at end of file +} + From 2c2ffd3db73ea1342ad44c9d26a572efeb8b70a6 Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Wed, 21 Jul 2021 12:24:14 +0800 Subject: [PATCH 17/19] Revert "print the failed uts in travis" This reverts commit 43f0a80330ed8302d904f9f1cc7b71d32530f3ff. --- .github/workflows/unittests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/unittests.yml b/.github/workflows/unittests.yml index 5bd73f630..708db07ee 100644 --- a/.github/workflows/unittests.yml +++ b/.github/workflows/unittests.yml @@ -101,7 +101,7 @@ jobs: mvn clean install -DskipTests -Dbuild_arrow=OFF cd .. mvn clean package -P full-scala-compiler -am -pl native-sql-engine/core -DskipTests -Dbuild_arrow=OFF - mvn test -P full-scala-compiler -DmembersOnlySuites=org.apache.spark.sql.nativesql -am -DfailIfNoTests=false -Dexec.skip=true -DargLine="-Dspark.test.home=/tmp/spark-3.1.1-bin-hadoop2.7" + mvn test -P full-scala-compiler -DmembersOnlySuites=org.apache.spark.sql.nativesql -am -DfailIfNoTests=false -Dexec.skip=true -DargLine="-Dspark.test.home=/tmp/spark-3.1.1-bin-hadoop2.7" &> log-file.log echo '#!/bin/bash' > grep.sh echo "module_tested=0; module_should_test=8; tests_total=0; while read -r line; do num=\$(echo \"\$line\" | grep -o -E '[0-9]+'); tests_total=\$((tests_total+num)); done <<<\"\$(grep \"Total number of tests run:\" log-file.log)\"; succeed_total=0; while read -r line; do [[ \$line =~ [^0-9]*([0-9]+)\, ]]; num=\${BASH_REMATCH[1]}; succeed_total=\$((succeed_total+num)); let module_tested++; done <<<\"\$(grep \"succeeded\" log-file.log)\"; if test \$tests_total -eq \$succeed_total -a \$module_tested -eq \$module_should_test; then echo \"All unit tests succeed\"; else echo \"Unit tests failed\"; exit 1; fi" >> grep.sh bash grep.sh From a926a26bb27a9a740f9a8a18b4a06f2728eca12e Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Thu, 22 Jul 2021 21:07:06 +0800 Subject: [PATCH 18/19] fix the failed ut and turn on the ArrowToColumnarToRow for test --- .../scala/com/intel/oap/ColumnarPlugin.scala | 2 +- .../com/intel/oap/ColumnarPluginConfig.scala | 2 +- .../execution/ArrowColumnarToRowExec.scala | 19 +++++-------------- 3 files changed, 7 insertions(+), 16 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala index 8cff1f60c..c077a3f16 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala @@ -308,7 +308,7 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] { if (columnarConf.enableArrowColumnarToRow) { try { val child = replaceWithColumnarPlan(c.child) - ArrowColumnarToRowExec(child) + new ArrowColumnarToRowExec(child) } catch { case _: Throwable => logInfo("ArrowColumnarToRow : Falling back to ColumnarToRow...") diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala index cc4c16ed9..91d4cdc6b 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala @@ -72,7 +72,7 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging { conf.getConfString("spark.oap.sql.columnar.shuffledhashjoin", "true").toBoolean && enableCpu val enableArrowColumnarToRow: Boolean = - conf.getConfString("spark.oap.sql.columnar.columnartorow", "false").toBoolean && enableCpu + conf.getConfString("spark.oap.sql.columnar.columnartorow", "true").toBoolean && enableCpu // enable or disable columnar sortmergejoin // this should be set with preferSortMergeJoin=false diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala index f3ce947aa..2fab06dd6 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala @@ -22,27 +22,18 @@ import com.intel.oap.vectorized.{ArrowColumnarToRowJniWrapper, ArrowWritableColu import org.apache.arrow.vector.types.pojo.{Field, Schema} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, UnsafeRow} -import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan} import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import scala.concurrent.duration._ -case class ArrowColumnarToRowExec(child: SparkPlan) extends UnaryExecNode { +class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExec(child = child) { override def nodeName: String = "ArrowColumnarToRow" - assert(child.supportsColumnar) - - override def output: Seq[Attribute] = child.output - - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def outputOrdering: Seq[SortOrder] = child.outputOrdering - buildCheck() def buildCheck(): Unit = { @@ -87,7 +78,7 @@ case class ArrowColumnarToRowExec(child: SparkPlan) extends UnaryExecNode { numInputBatches += 1 numOutputRows += batch.numRows() - if (batch.numRows == 0 || batch.numCols == 0) { + if (batch.numRows == 0) { logInfo(s"Skip ColumnarBatch of ${batch.numRows} rows, ${batch.numCols} cols") Iterator.empty } else { @@ -142,7 +133,7 @@ case class ArrowColumnarToRowExec(child: SparkPlan) extends UnaryExecNode { override def equals(other: Any): Boolean = other match { case that: ArrowColumnarToRowExec => - (that canEqual this) + (that canEqual this) && super.equals(that) case _ => false } } From ccc913136624ee346b7c3eaded046a3f065abaca Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Thu, 22 Jul 2021 22:32:58 +0800 Subject: [PATCH 19/19] diable columnar to row --- .../src/main/scala/com/intel/oap/ColumnarPluginConfig.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala index 91d4cdc6b..cc4c16ed9 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala @@ -72,7 +72,7 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging { conf.getConfString("spark.oap.sql.columnar.shuffledhashjoin", "true").toBoolean && enableCpu val enableArrowColumnarToRow: Boolean = - conf.getConfString("spark.oap.sql.columnar.columnartorow", "true").toBoolean && enableCpu + conf.getConfString("spark.oap.sql.columnar.columnartorow", "false").toBoolean && enableCpu // enable or disable columnar sortmergejoin // this should be set with preferSortMergeJoin=false