diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowUtils.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowUtils.scala index bbffab14d..4ef604114 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowUtils.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowUtils.scala @@ -94,6 +94,7 @@ object ArrowUtils { val partitionColumns = ArrowWritableColumnVector.allocateColumns(rowCount, partitionSchema) (0 until partitionColumns.length).foreach(i => { ColumnVectorUtils.populate(partitionColumns(i), partitionValues, i) + partitionColumns(i).setValueCount(rowCount) partitionColumns(i).setIsConstant() }) diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/datasource/parquet/ParquetReader.java b/native-sql-engine/core/src/main/java/com/intel/oap/datasource/parquet/ParquetReader.java index 0b0ef0fd8..fcbad8090 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/datasource/parquet/ParquetReader.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/datasource/parquet/ParquetReader.java @@ -119,6 +119,9 @@ public ArrowRecordBatch readNext() throws IOException { ArrowRecordBatchBuilderImpl recordBatchBuilderImpl = new ArrowRecordBatchBuilderImpl(recordBatchBuilder); ArrowRecordBatch batch = recordBatchBuilderImpl.build(); + if (batch == null) { + throw new IllegalArgumentException("failed to build record batch"); + } this.lastReadLength = batch.getLength(); return batch; } diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/JniUtils.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/JniUtils.java index cd5f774dc..ebca1bff7 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/JniUtils.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/JniUtils.java @@ -183,11 +183,10 @@ private static File moveFileFromJarToTemp(String tmpDir, String libraryToLoad) t try (final InputStream is = JniUtils.class.getClassLoader().getResourceAsStream(libraryToLoad)) { if (is == null) { throw new FileNotFoundException(libraryToLoad); - } else { - try { - Files.copy(is, temp.toPath()); - } catch (Exception e) { - } + } + try { + Files.copy(is, temp.toPath()); + } catch (Exception e) { } } return temp; diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarConditionProjector.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarConditionProjector.scala index 439fdf013..8016bdfd2 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarConditionProjector.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarConditionProjector.scala @@ -17,23 +17,24 @@ package com.intel.oap.expression -import java.util.Collections +import java.util +import java.util.Objects import java.util.concurrent.TimeUnit import com.google.common.collect.Lists +import com.intel.oap.expression.ColumnarConditionProjector.{FieldOptimizedProjector, FilterProjector, ProjectorWrapper} import com.intel.oap.vectorized.ArrowWritableColumnVector - import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.sql.types._ -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} +import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} - import org.apache.arrow.gandiva.evaluator._ import org.apache.arrow.gandiva.exceptions.GandivaException import org.apache.arrow.gandiva.expression._ +import org.apache.arrow.gandiva.ipc.GandivaTypes import org.apache.arrow.gandiva.ipc.GandivaTypes.SelectionVectorType import org.apache.arrow.memory.BufferAllocator import org.apache.arrow.memory.RootAllocator @@ -42,9 +43,13 @@ import org.apache.arrow.vector.ipc.message.ArrowRecordBatch import org.apache.arrow.vector.types.pojo.Schema import org.apache.arrow.vector.types.pojo.Field import org.apache.arrow.vector.types.pojo.ArrowType - import org.apache.arrow.memory.ArrowBuf +import org.apache.arrow.util.AutoCloseables +import org.apache.arrow.vector.ValueVector + import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.util.control.Breaks._ class ColumnarConditionProjector( condPrepareList: (TreeNode, ArrowType), @@ -115,7 +120,7 @@ class ColumnarConditionProjector( false } val projector = if (skip == false) { - createProjector(projectionArrowSchema, projPrepareList, withCond) + createProjector(projectionArrowSchema, resultArrowSchema, projPrepareList, withCond) } else { null } @@ -134,24 +139,25 @@ class ColumnarConditionProjector( } def createProjector( - arrowSchema: Schema, + projectionSchema: Schema, + resultSchema: Schema, prepareList: Seq[(ExpressionTree, ArrowType)], - withCond: Boolean): Projector = synchronized { + withCond: Boolean): ProjectorWrapper = synchronized { if (projector != null) { return projector } val fieldNodesList = prepareList.map(_._1).toList.asJava try { if (withCond) { - Projector.make(arrowSchema, fieldNodesList, SelectionVectorType.SV_INT16) + new FilterProjector(projectionSchema, resultSchema, fieldNodesList, SelectionVectorType.SV_INT16) } else { - Projector.make(arrowSchema, fieldNodesList) + new FieldOptimizedProjector(projectionSchema, resultSchema, fieldNodesList) } } catch { case e => logError( s"\noriginalInputAttributes is ${originalInputAttributes} ${originalInputAttributes.map( - _.dataType)}, \narrowSchema is ${arrowSchema}, \nProjection is ${prepareList.map(_._1.toProtobuf)}") + _.dataType)}, \nprojectionSchema is ${projectionSchema}, \nresultSchema is ${resultSchema}, \nProjection is ${prepareList.map(_._1.toProtobuf)}") throw e } } @@ -258,28 +264,19 @@ class ColumnarConditionProjector( // for now, we either filter one columnarBatch who has valid rows or we only need to do project // either scenario we will need to output one columnarBatch. beforeEval = System.nanoTime() - val resultColumnVectors = - ArrowWritableColumnVector.allocateColumns(numRows, resultSchema).toArray - val outputVectors = resultColumnVectors - .map(columnVector => { - columnVector.getValueVector() - }) - .toList - .asJava val cols = projectOrdinalList.map(i => { columnarBatch.column(i).asInstanceOf[ArrowWritableColumnVector].getValueVector() }) input = ConverterUtils.createArrowRecordBatch(columnarBatch.numRows, cols) - if (conditioner != null) { - projector.evaluate(input, selectionVector, outputVectors); + val outputBatch = if (conditioner != null) { + projector.evaluate(input, numRows, selectionVector); } else { - projector.evaluate(input, outputVectors); + projector.evaluate(input); } ConverterUtils.releaseArrowRecordBatch(input) - val outputBatch = - new ColumnarBatch(resultColumnVectors.map(_.asInstanceOf[ColumnVector]), numRows) + proc_time += ((System.nanoTime() - beforeEval) / (1000 * 1000)) resColumnarBatch = outputBatch true @@ -448,4 +445,190 @@ object ColumnarConditionProjector extends Logging { numOutputRows, procTime) } + + trait ProjectorWrapper { + def evaluate(recordBatch: ArrowRecordBatch): ColumnarBatch = { + throw new UnsupportedOperationException + } + + def evaluate(recordBatch: ArrowRecordBatch, numRows: Int, selectionVector: SelectionVector): ColumnarBatch = { + throw new UnsupportedOperationException + } + + def close(): Unit + } + + /** + * Proxy projector that is optimized for field projections. + */ + class FieldOptimizedProjector(projectionSchema: Schema, resultSchema: Schema, + exprs: java.util.List[ExpressionTree]) extends ProjectorWrapper { + + val fieldExprs = ListBuffer[(ExpressionTree, Int)]() + val fieldExprNames = new util.HashSet[String]() + + /** + * nonFieldExprs may include fields that are already appeared in projection list. + * To avoid sharing same buffers over output columns. + */ + val nonFieldExprs = ListBuffer[(ExpressionTree, Int)]() + + exprs.asScala.zipWithIndex.foreach { + case (expr, i) => + val root = getRoot(expr) + if (fieldClazz.isInstance(root) && !fieldExprNames.contains(getField(root).getName)) { + fieldExprs.append((expr, i)) + fieldExprNames.add(getField(root).getName) + } else { + nonFieldExprs.append((expr, i)) + } + } + + val fieldResultSchema = new Schema( + fieldExprs.map { + case (_, i) => + resultSchema.getFields.get(i) + }.asJava) + + val nonFieldResultSchema = new Schema( + nonFieldExprs.map { + case (_, i) => + resultSchema.getFields.get(i) + }.asJava) + + val nonFieldProjector: Option[Projector] = + if (nonFieldExprs.isEmpty) { + None + } else { + Some( + Projector.make( + projectionSchema, nonFieldExprs.map { + case (e, _) => e + }.toList.asJava)) + } + + override def evaluate(recordBatch: ArrowRecordBatch): ColumnarBatch = { + val numRows = recordBatch.getLength + val projectedAVs = new Array[ArrowWritableColumnVector](exprs.size()) + + // Execute expression-based projections + val nonFieldResultColumnVectors = + ArrowWritableColumnVector.allocateColumns(numRows, + ArrowUtils.fromArrowSchema(nonFieldResultSchema)) + + val outputVectors = nonFieldResultColumnVectors + .map(columnVector => { + columnVector.getValueVector + }) + .toList + .asJava + + nonFieldProjector.foreach { + _.evaluate(recordBatch, outputVectors) + } + + var k: Int = 0 + nonFieldExprs.foreach { + case (_, i) => + projectedAVs(i) = nonFieldResultColumnVectors(k) + k += 1 + } + + val inAVs = ArrowWritableColumnVector.loadColumns(numRows, projectionSchema, recordBatch) + + fieldExprs.foreach { + case (fieldExpr, i) => + val field = getField(getRoot(fieldExpr)) + var found = false + breakable { + for (j <- 0 until projectionSchema.getFields.size()) { + val projField = projectionSchema.getFields.get(j) + if (Objects.equals(field.getName, projField.getName)) { + // Found field in input schema + if (projectedAVs(i) != null) { + throw new IllegalStateException() + } + val vector = inAVs(j) + projectedAVs(i) = vector + vector.retain() + found = true + break + } + } + } + if (!found) { + throw new IllegalArgumentException("Field not found for projection: " + field.getName) + } + } + + inAVs.foreach(_.close()) + + // Projected vector count check + projectedAVs.foreach { + arrowVector => + if (arrowVector == null) { + throw new IllegalStateException() + } + } + + val outputBatch = + new ColumnarBatch(projectedAVs.map(_.asInstanceOf[ColumnVector]), numRows) + + outputBatch + } + + override def close() = { + nonFieldProjector.foreach(_.close()) + } + } + + class FilterProjector(projectionSchema: Schema, resultSchema: Schema, + exprs: java.util.List[ExpressionTree], + selectionVectorType: GandivaTypes.SelectionVectorType) extends ProjectorWrapper { + val projector = Projector.make(projectionSchema, exprs, selectionVectorType) + + override def evaluate(recordBatch: ArrowRecordBatch, numRows: Int, + selectionVector: SelectionVector): ColumnarBatch = { + val resultColumnVectors = + ArrowWritableColumnVector.allocateColumns(numRows, ArrowUtils.fromArrowSchema(resultSchema)) + + val outputVectors = resultColumnVectors + .map(columnVector => { + columnVector.getValueVector + }) + .toList + .asJava + + projector.evaluate(recordBatch, selectionVector, outputVectors) + + val outputBatch = + new ColumnarBatch(resultColumnVectors.map(_.asInstanceOf[ColumnVector]), numRows) + + outputBatch + } + + override def close(): Unit = { + projector.close() + } + } + + val treeClazz = classOf[ExpressionTree] + val rootField = treeClazz.getDeclaredField("root") + val fieldClazz = Class.forName("org.apache.arrow.gandiva.expression.FieldNode") + val fieldField = fieldClazz.getDeclaredField("field") + + rootField.setAccessible(true) + fieldField.setAccessible(true) + + def getRoot(expressionTree: ExpressionTree): TreeNode = { + rootField.get(expressionTree).asInstanceOf[TreeNode] + } + + def getField(fieldNode: Any): Field = { + if (!fieldClazz.isInstance(fieldNode)) { + throw new IllegalArgumentException + } + fieldField.get(fieldNode).asInstanceOf[Field] + + } } diff --git a/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala b/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala index b9cec3729..f95c89a1a 100644 --- a/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala +++ b/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala @@ -35,10 +35,10 @@ class TPCDSSuite extends QueryTest with SharedSparkSession { val conf = super.sparkConf conf.set("spark.memory.offHeap.size", String.valueOf(MAX_DIRECT_MEMORY)) .set("spark.sql.extensions", "com.intel.oap.ColumnarPlugin") - .set("spark.sql.codegen.wholeStage", "false") + .set("spark.sql.codegen.wholeStage", "true") .set("spark.sql.sources.useV1SourceList", "") .set("spark.sql.columnar.tmp_dir", "/tmp/") - .set("spark.sql.adaptive.enabled", "false") + .set("spark.sql.adaptive.enabled", "true") .set("spark.sql.columnar.sort.broadcastJoin", "true") .set("spark.storage.blockManagerSlaveTimeoutMs", "3600000") .set("spark.executor.heartbeatInterval", "3600000") @@ -50,6 +50,7 @@ class TPCDSSuite extends QueryTest with SharedSparkSession { .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") .set("spark.unsafe.exceptionOnMemoryLeak", "false") .set("spark.network.io.preferDirectBufs", "false") + .set("spark.sql.sources.useV1SourceList", "arrow,parquet") return conf } diff --git a/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSTableGen.scala b/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSTableGen.scala index 0cbd13232..2eb8ae828 100644 --- a/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSTableGen.scala +++ b/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSTableGen.scala @@ -63,10 +63,16 @@ class TPCDSTableGen(val spark: SparkSession, scale: Double, path: String) case "web_page" => webPageSchema case "web_site" => webSiteSchema } - writeParquetTable(name, rows, schema) + val partitionBy: List[String] = name match { + case "catalog_sales" => List("cs_sold_date_sk") + case "web_sales" => List("ws_sold_date_sk") + case _ => List[String]() + } + writeParquetTable(name, rows, schema, partitionBy) } - private def writeParquetTable(tableName: String, rows: List[Row], schema: StructType): Unit = { + private def writeParquetTable(tableName: String, rows: List[Row], schema: StructType, + partitionBy: List[String]): Unit = { if (rows.isEmpty) { return } @@ -87,10 +93,11 @@ class TPCDSTableGen(val spark: SparkSession, scale: Double, path: String) } convertedData.coalesce(1) - .write - .format("parquet") - .mode("overwrite") - .save(path + File.separator + tableName) + .write + .format("parquet") + .mode("overwrite") + .partitionBy(partitionBy.toArray: _*) + .save(path + File.separator + tableName) } override def gen(): Unit = { diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/code_generator.h b/native-sql-engine/cpp/src/codegen/arrow_compute/code_generator.h index 2015499c2..0f06d445c 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/code_generator.h +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/code_generator.h @@ -67,7 +67,7 @@ class ArrowComputeCodeGenerator : public CodeGenerator { #endif } - ~ArrowComputeCodeGenerator() { + virtual ~ArrowComputeCodeGenerator() { expr_visitor_cache_.clear(); visitor_list_.clear(); } diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/basic_physical_kernels.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/basic_physical_kernels.cc index ba4b430cc..5fdf019e8 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/basic_physical_kernels.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/basic_physical_kernels.cc @@ -54,6 +54,7 @@ class ProjectKernel::Impl { auto field_node = std::dynamic_pointer_cast(node); input_field_list_.push_back(field_node->field()); } + pool_ = nullptr; } arrow::Status MakeResultIterator( @@ -125,6 +126,7 @@ ProjectKernel::ProjectKernel(arrow::compute::ExecContext* ctx, const gandiva::NodeVector& project_list) { impl_.reset(new Impl(ctx, input_field_node_list, project_list)); kernel_name_ = "ProjectKernel"; + ctx_ = nullptr; } arrow::Status ProjectKernel::MakeResultIterator( @@ -153,6 +155,7 @@ class FilterKernel::Impl { auto field_node = std::dynamic_pointer_cast(node); input_field_list_.push_back(field_node->field()); } + pool_ = nullptr; } arrow::Status MakeResultIterator( @@ -178,7 +181,8 @@ class FilterKernel::Impl { &condition_node_visitor)); codegen_ctx->process_codes += condition_node_visitor->GetPrepare(); for (auto header : condition_node_visitor->GetHeaders()) { - if (std::find(codegen_ctx->header_codes.begin(), codegen_ctx->header_codes.end(), + if (!header.empty() && + std::find(codegen_ctx->header_codes.begin(), codegen_ctx->header_codes.end(), header) == codegen_ctx->header_codes.end()) { codegen_ctx->header_codes.push_back(header); } diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/codegen_common.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/codegen_common.cc index ea2f5f1f7..702628509 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/codegen_common.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/codegen_common.cc @@ -332,7 +332,7 @@ gandiva::ExpressionPtr GetHash32Kernel(std::vector key_list, seed = func_node; } func_node_list.push_back(func_node); - return gandiva::TreeExprBuilder::MakeExpression(func_node_list[0], + return gandiva::TreeExprBuilder::MakeExpression(func_node_list.at(0), arrow::field("hash_key", ret_type)); } @@ -605,12 +605,19 @@ arrow::Status CompileCodes(std::string codes, std::string signature) { std::string exec(const char* cmd) { std::array buffer; std::string result; - std::unique_ptr pipe(popen(cmd, "r"), pclose); + FILE* file = popen(cmd, "r"); + std::unique_ptr pipe(file, pclose); if (!pipe) { + pclose(file); throw std::runtime_error("popen() failed!"); } - while (fgets(buffer.data(), buffer.size(), pipe.get()) != nullptr) { - result += buffer.data(); + try { + while (fgets(buffer.data(), sizeof buffer, pipe.get()) != nullptr) { + result += buffer.data(); + } + } catch (...) { + pclose(file); + throw; } return result; } diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/codegen_node_visitor.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/codegen_node_visitor.cc index 309f2fd5a..e4fbfcfc3 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/codegen_node_visitor.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/codegen_node_visitor.cc @@ -298,13 +298,50 @@ arrow::Status CodeGenNodeVisitor::Visit(const gandiva::FunctionNode& node) { RETURN_NOT_OK(AppendProjectList(child_visitor_list, i)); } codes_str_ = ss.str(); - } else if (func_name.find("cast") != std::string::npos) { + } else if (func_name.find("cast") != std::string::npos && + func_name.compare("castDATE") != 0 && + func_name.compare("castDECIMAL") != 0 && + func_name.compare("castDECIMALNullOnOverflow") != 0) { ss << child_visitor_list[0]->GetResult(); check_str_ = child_visitor_list[0]->GetPreCheck(); for (int i = 0; i < 1; i++) { RETURN_NOT_OK(AppendProjectList(child_visitor_list, i)); } codes_str_ = ss.str(); + } else if (func_name.compare("castDECIMAL") == 0) { + codes_str_ = func_name + "_" + std::to_string(cur_func_id); + auto validity = codes_str_ + "_validity"; + std::stringstream fix_ss; + auto decimal_type = + std::dynamic_pointer_cast(node.return_type()); + auto childNode = node.children().at(0); + if (childNode->return_type()->id() != arrow::Type::DECIMAL) { + // if not casting from Decimal + fix_ss << ", " << decimal_type->precision() << ", " << decimal_type->scale(); + } else { + // if casting from Decimal + auto childType = + std::dynamic_pointer_cast(childNode->return_type()); + fix_ss << ", " << childType->precision() << ", " << childType->scale() << ", " + << decimal_type->precision() << ", " << decimal_type->scale(); + } + std::stringstream prepare_ss; + prepare_ss << GetCTypeString(node.return_type()) << " " << codes_str_ << ";" + << std::endl; + prepare_ss << "bool " << validity << " = " << child_visitor_list[0]->GetPreCheck() + << ";" << std::endl; + prepare_ss << "if (" << validity << ") {" << std::endl; + prepare_ss << codes_str_ << " = " << func_name << "(" + << child_visitor_list[0]->GetResult() << fix_ss.str() << ");" + << std::endl; + prepare_ss << "}" << std::endl; + + for (int i = 0; i < 1; i++) { + prepare_str_ += child_visitor_list[i]->GetPrepare(); + } + prepare_str_ += prepare_ss.str(); + check_str_ = validity; + } else if (func_name.compare("add") == 0) { codes_str_ = "add_" + std::to_string(cur_func_id); auto validity = "add_validity_" + std::to_string(cur_func_id); @@ -768,7 +805,8 @@ std::string CodeGenNodeVisitor::GetNaNCheckStr(std::string left, std::string rig std::string CodeGenNodeVisitor::CombineValidity(std::vector validity_list) { bool first = true; std::stringstream out; - for (auto validity : validity_list) { + for (int i = 0; i < validity_list.size(); i++) { + auto validity = validity_list[i]; if (first) { if (validity.compare("true") != 0) { out << validity; diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/conditioned_merge_join_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/conditioned_merge_join_kernel.cc index 7285edc3c..73cedd271 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/conditioned_merge_join_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/conditioned_merge_join_kernel.cc @@ -88,6 +88,8 @@ class ConditionedMergeJoinKernel::Impl { THROW_NOT_OK(GetIndexList(result_schema_, left_field_list_, right_field_list_, true, &exist_index_, &result_schema_index_list_)); } + + pool_ = nullptr; } arrow::Status MakeResultIterator( diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/conditioned_probe_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/conditioned_probe_kernel.cc index ef35c102b..d6b4a7dd0 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/conditioned_probe_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/conditioned_probe_kernel.cc @@ -131,6 +131,8 @@ class ConditionedProbeKernel::Impl { THROW_NOT_OK(GetIndexList(result_schema_, left_field_list_, right_field_list_, true, &exist_index_, &result_schema_index_list_)); } + + pool_ = nullptr; } arrow::Status MakeResultIterator( @@ -412,6 +414,9 @@ class ConditionedProbeKernel::Impl { auto iter = dependent_iter_list[0]; auto typed_dependent = std::dynamic_pointer_cast>(iter); + if (typed_dependent == nullptr) { + throw std::runtime_error("casting on hash relation iterator failed"); + } RETURN_NOT_OK(typed_dependent->Next(&hash_relation_)); // chendi: previous result_schema_index_list design is little tricky, it @@ -768,40 +773,44 @@ class ConditionedProbeKernel::Impl { case TypeTraits::type_id: { \ using ArrayType_ = precompile::TypeTraits::ArrayType; \ auto typed_first_key_arr = std::make_shared(key_payloads[0]); \ - if (typed_first_key_arr->null_count() == 0) { \ - fast_probe = [this, typed_key_array, typed_first_key_arr](int i) { \ - return hash_relation_->Get(typed_key_array->GetView(i), \ - typed_first_key_arr->GetView(i)); \ - }; \ - } else { \ - fast_probe = [this, typed_key_array, typed_first_key_arr](int i) { \ - if (typed_first_key_arr->IsNull(i)) { \ - return hash_relation_->GetNull(); \ - } else { \ + if (typed_first_key_arr) { \ + if (typed_first_key_arr->null_count() == 0) { \ + fast_probe = [this, typed_key_array, typed_first_key_arr](int i) { \ return hash_relation_->Get(typed_key_array->GetView(i), \ typed_first_key_arr->GetView(i)); \ - } \ - }; \ + }; \ + } else { \ + fast_probe = [this, typed_key_array, typed_first_key_arr](int i) { \ + if (typed_first_key_arr->IsNull(i)) { \ + return hash_relation_->GetNull(); \ + } else { \ + return hash_relation_->Get(typed_key_array->GetView(i), \ + typed_first_key_arr->GetView(i)); \ + } \ + }; \ + } \ } \ } break; PROCESS_SUPPORTED_TYPES(PROCESS) #undef PROCESS case TypeTraits::type_id: { auto typed_first_key_arr = std::make_shared(key_payloads[0]); - if (typed_first_key_arr->null_count() == 0) { - fast_probe = [this, typed_key_array, typed_first_key_arr](int i) { - return hash_relation_->Get(typed_key_array->GetView(i), - typed_first_key_arr->GetString(i)); - }; - } else { - fast_probe = [this, typed_key_array, typed_first_key_arr](int i) { - if (typed_first_key_arr->IsNull(i)) { - return hash_relation_->GetNull(); - } else { + if (typed_first_key_arr) { + if (typed_first_key_arr->null_count() == 0) { + fast_probe = [this, typed_key_array, typed_first_key_arr](int i) { return hash_relation_->Get(typed_key_array->GetView(i), typed_first_key_arr->GetString(i)); - } - }; + }; + } else { + fast_probe = [this, typed_key_array, typed_first_key_arr](int i) { + if (typed_first_key_arr->IsNull(i)) { + return hash_relation_->GetNull(); + } else { + return hash_relation_->Get(typed_key_array->GetView(i), + typed_first_key_arr->GetString(i)); + } + }; + } } } break; default: { @@ -1001,6 +1010,7 @@ class ConditionedProbeKernel::Impl { uint64_t Evaluate(std::shared_ptr key_array, const arrow::ArrayVector& key_payloads) override { auto typed_key_array = std::dynamic_pointer_cast(key_array); + assert(typed_key_array != nullptr); std::vector> payloads; int i = 0; bool do_unsafe_row = true; @@ -1071,7 +1081,9 @@ class ConditionedProbeKernel::Impl { if (!do_unsafe_row) { index = fast_probe(i); } else { - unsafe_key_row->reset(); + if (unsafe_key_row) { + unsafe_key_row->reset(); + } for (auto payload_arr : payloads) { payload_arr->Append(i, &unsafe_key_row); } @@ -1853,6 +1865,7 @@ ConditionedProbeKernel::ConditionedProbeKernel( right_schema_list, condition, join_type, result_schema, hash_configuration_list, hash_relation_idx)); kernel_name_ = "ConditionedProbeKernel"; + ctx_ = nullptr; } arrow::Status ConditionedProbeKernel::MakeResultIterator( diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/expression_codegen_visitor.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/expression_codegen_visitor.cc index 7b0483203..4502ea474 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/expression_codegen_visitor.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/expression_codegen_visitor.cc @@ -1026,8 +1026,8 @@ arrow::Status ExpressionCodegenVisitor::Visit(const gandiva::BooleanNode& node) field_type_ = mixed; } for (auto header : child_visitor->GetHeaders()) { - if (std::find(header_list_.begin(), header_list_.end(), header) == - header_list_.end()) { + if (!header.empty() && std::find(header_list_.begin(), header_list_.end(), + header) == header_list_.end()) { header_list_.push_back(header); } } diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc index 8e19adfeb..09aca707b 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc @@ -129,6 +129,7 @@ class HashAggregateKernel::Impl { if (no_result_project) return; } result_expr_list_ = result_expr_node_list; + pool_ = nullptr; } virtual arrow::Status MakeResultIterator( @@ -491,8 +492,8 @@ class HashAggregateKernel::Impl { finish_ss << "if(do_hash_aggr_finish_" << level << ") {"; for (int i = 0; i < action_idx; i++) { finish_ss << "aggr_action_list_" << level << "[" << i - << "]->Finish(do_hash_aggr_finish_" << level - << "_offset, 10000, &do_hash_aggr_finish_" << level << "_out);" + << "]->Finish(do_hash_aggr_finish_" << level << "_offset," + << GetBatchSize() << ", &do_hash_aggr_finish_" << level << "_out);" << std::endl; } finish_ss << "if (do_hash_aggr_finish_" << level << "_out.size() > 0) {" << std::endl; @@ -755,7 +756,7 @@ class HashAggregateKernel::Impl { int gp_idx = 0; std::vector> outputs; for (auto action : action_impl_list_) { - action->Finish(offset_, 10000, &outputs); + action->Finish(offset_, GetBatchSize(), &outputs); } if (outputs.size() > 0) { out_length += outputs[0]->length(); @@ -911,7 +912,7 @@ class HashAggregateKernel::Impl { int gp_idx = 0; std::vector> outputs; for (auto action : action_impl_list_) { - action->Finish(offset_, 10000, &outputs); + action->Finish(offset_, GetBatchSize(), &outputs); } if (outputs.size() > 0) { out_length += outputs[0]->length(); @@ -1065,7 +1066,7 @@ class HashAggregateKernel::Impl { int gp_idx = 0; std::vector> outputs; for (auto action : action_impl_list_) { - action->Finish(offset_, 10000, &outputs); + action->Finish(offset_, GetBatchSize(), &outputs); } if (outputs.size() > 0) { out_length += outputs[0]->length(); @@ -1113,6 +1114,7 @@ HashAggregateKernel::HashAggregateKernel( impl_.reset(new Impl(ctx, input_field_list, action_list, result_field_node_list, result_expr_node_list)); kernel_name_ = "HashAggregateKernelKernel"; + ctx_ = ctx; } #undef PROCESS_SUPPORTED_TYPES diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc index 00276e57e..57c1d57d3 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc @@ -54,6 +54,7 @@ class HashRelationKernel::Impl { std::shared_ptr root_node, const std::vector>& output_field_list) : ctx_(ctx), input_field_list_(input_field_list) { + pool_ = ctx_->memory_pool(); std::vector> hash_relation_list; for (auto field : input_field_list) { std::shared_ptr hash_relation_column; @@ -129,7 +130,11 @@ class HashRelationKernel::Impl { // If single key case, we can put key in KeyArray auto key_type = std::dynamic_pointer_cast( key_hash_field_list[0]->type()); - key_size_ = key_type->bit_width() / 8; + if (key_type) { + key_size_ = key_type->bit_width() / 8; + } else { + key_size_ = 0; + } hash_relation_ = std::make_shared(ctx_, hash_relation_list, key_size_); } else { diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.cc index 296b204bb..c0ac168f7 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.cc @@ -195,6 +195,7 @@ class HashArrayKernel::Impl { } index++; } + assert(func_node_list.size() > 0); expr = gandiva::TreeExprBuilder::MakeExpression(func_node_list[0], arrow::field("res", arrow::int64())); #ifdef DEBUG @@ -307,6 +308,7 @@ ConcatArrayKernel::ConcatArrayKernel( std::vector> type_list) { impl_.reset(new Impl(ctx, type_list)); kernel_name_ = "ConcatArrayKernel"; + ctx_ = nullptr; } arrow::Status ConcatArrayKernel::Evaluate(const ArrayList& in, @@ -323,6 +325,7 @@ class CachedRelationKernel::Impl { result_schema_(result_schema), key_field_list_(key_field_list), result_type_(result_type) { + pool_ = ctx_->memory_pool(); for (auto field : key_field_list) { auto indices = result_schema->GetAllFieldIndices(field->name()); if (indices.size() != 1) { diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.h b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.h index e2b6dd972..4921141c6 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.h +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.h @@ -115,7 +115,7 @@ class EncodeArrayKernel : public KernalBase { private: class Impl; std::unique_ptr impl_; - arrow::compute::ExecContext* ctx_; + arrow::compute::ExecContext* ctx_ = nullptr; }; class WindowAggregateFunctionKernel : public KernalBase { @@ -147,7 +147,7 @@ class WindowAggregateFunctionKernel : public KernalBase { typename arrow::enable_if_number>> createBuilder(std::shared_ptr data_type); - arrow::compute::ExecContext* ctx_; + arrow::compute::ExecContext* ctx_ = nullptr; std::shared_ptr action_; std::vector> accumulated_group_ids_; std::vector> type_list_; @@ -167,7 +167,7 @@ class HashArrayKernel : public KernalBase { private: class Impl; std::unique_ptr impl_; - arrow::compute::ExecContext* ctx_; + arrow::compute::ExecContext* ctx_ = nullptr; }; class SortArraysToIndicesKernel : public KernalBase { @@ -200,7 +200,7 @@ class SortArraysToIndicesKernel : public KernalBase { private: std::unique_ptr impl_; - arrow::compute::ExecContext* ctx_; + arrow::compute::ExecContext* ctx_ = nullptr; }; class CachedRelationKernel : public KernalBase { @@ -223,7 +223,7 @@ class CachedRelationKernel : public KernalBase { private: std::unique_ptr impl_; - arrow::compute::ExecContext* ctx_; + arrow::compute::ExecContext* ctx_ = nullptr; }; class WindowSortKernel : public KernalBase { @@ -243,7 +243,7 @@ class WindowSortKernel : public KernalBase { private: std::unique_ptr impl_; - arrow::compute::ExecContext* ctx_; + arrow::compute::ExecContext* ctx_ = nullptr; }; class HashAggregateKernel : public KernalBase { @@ -273,7 +273,7 @@ class HashAggregateKernel : public KernalBase { private: std::unique_ptr impl_; - arrow::compute::ExecContext* ctx_; + arrow::compute::ExecContext* ctx_ = nullptr; }; class WindowRankKernel : public KernalBase { @@ -299,7 +299,7 @@ class WindowRankKernel : public KernalBase { private: std::shared_ptr sorter_; - arrow::compute::ExecContext* ctx_; + arrow::compute::ExecContext* ctx_ = nullptr; std::vector input_cache_; std::vector> type_list_; bool desc_; @@ -347,7 +347,7 @@ class ConditionedProbeArraysKernel : public KernalBase { private: std::unique_ptr impl_; - arrow::compute::ExecContext* ctx_; + arrow::compute::ExecContext* ctx_ = nullptr; }; class ConditionedJoinArraysKernel : public KernalBase { public: @@ -377,7 +377,7 @@ class ConditionedJoinArraysKernel : public KernalBase { private: std::unique_ptr impl_; - arrow::compute::ExecContext* ctx_; + arrow::compute::ExecContext* ctx_ = nullptr; }; class WholeStageCodeGenKernel : public KernalBase { public: @@ -401,7 +401,7 @@ class WholeStageCodeGenKernel : public KernalBase { private: std::unique_ptr impl_; - arrow::compute::ExecContext* ctx_; + arrow::compute::ExecContext* ctx_ = nullptr; }; class HashRelationKernel : public KernalBase { public: @@ -425,7 +425,7 @@ class HashRelationKernel : public KernalBase { private: std::unique_ptr impl_; - arrow::compute::ExecContext* ctx_; + arrow::compute::ExecContext* ctx_ = nullptr; }; class ConcatArrayListKernel : public KernalBase { public: @@ -449,7 +449,7 @@ class ConcatArrayListKernel : public KernalBase { private: std::unique_ptr impl_; - arrow::compute::ExecContext* ctx_; + arrow::compute::ExecContext* ctx_ = nullptr; }; class ConditionedProbeKernel : public KernalBase { public: @@ -484,7 +484,7 @@ class ConditionedProbeKernel : public KernalBase { private: std::unique_ptr impl_; - arrow::compute::ExecContext* ctx_; + arrow::compute::ExecContext* ctx_ = nullptr; }; class ConditionedMergeJoinKernel : public KernalBase { public: @@ -518,7 +518,7 @@ class ConditionedMergeJoinKernel : public KernalBase { private: std::unique_ptr impl_; - arrow::compute::ExecContext* ctx_; + arrow::compute::ExecContext* ctx_ = nullptr; }; class ProjectKernel : public KernalBase { public: @@ -542,7 +542,7 @@ class ProjectKernel : public KernalBase { private: std::unique_ptr impl_; - arrow::compute::ExecContext* ctx_; + arrow::compute::ExecContext* ctx_ = nullptr; }; class FilterKernel : public KernalBase { public: @@ -566,7 +566,7 @@ class FilterKernel : public KernalBase { private: std::unique_ptr impl_; - arrow::compute::ExecContext* ctx_; + arrow::compute::ExecContext* ctx_ = nullptr; }; class ConcatArrayKernel : public KernalBase { public: @@ -581,7 +581,7 @@ class ConcatArrayKernel : public KernalBase { private: std::unique_ptr impl_; - arrow::compute::ExecContext* ctx_; + arrow::compute::ExecContext* ctx_ = nullptr; }; } // namespace extra } // namespace arrowcompute diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/merge_join_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/merge_join_kernel.cc index 01e19397c..9416e1ecc 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/merge_join_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/merge_join_kernel.cc @@ -1506,6 +1506,7 @@ ConditionedJoinArraysKernel::ConditionedJoinArraysKernel( const std::shared_ptr& result_schema) { impl_.reset(new Impl(ctx, left_key_list, right_key_list, func_node, join_type, left_field_list, right_field_list, result_schema)); + ctx_ = ctx; kernel_name_ = "ConditionedJoinArraysKernel"; } diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/probe_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/probe_kernel.cc index 2bb7b5600..283444d6e 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/probe_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/probe_kernel.cc @@ -402,6 +402,9 @@ class ConditionedProbeArraysKernel::Impl { left_projected_batch_list) { std::stringstream ss; for (auto name : left_projected_batch_list) { + if (name.first == nullptr || name.second.empty()) { + throw std::runtime_error("uninitialized value found"); + } ss << "std::vector> " << name.second << ";" << std::endl; } @@ -477,7 +480,8 @@ class ConditionedProbeArraysKernel::Impl { std::vector, std::string>> name_list) { std::stringstream ss; int i = 0; - for (auto name : name_list) { + for (int idx = 0; idx < name_list.size(); idx++) { + auto name = name_list[idx]; ss << name.second << " = std::make_shared<" << GetTypeString(name.first, "Array") << ">(projected_batch[" << i++ << "]);" << std::endl; } @@ -497,7 +501,8 @@ class ConditionedProbeArraysKernel::Impl { std::string GetResultIteratorProjectedSet( std::vector, std::string>> name_list) { std::stringstream ss; - for (auto name : name_list) { + for (int i = 0; i < name_list.size(); i++) { + auto name = name_list[i]; ss << name.second << " = " << name.second.substr(0, name.second.size() - 1) << ";" << std::endl; } diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc index 98f89ab2b..d9752aec4 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc @@ -84,7 +84,12 @@ using is_number_bool_date = /////////////// SortArraysToIndices //////////////// class SortArraysToIndicesKernel::Impl { public: - Impl() {} + Impl() { + ctx_ = nullptr; + col_num_ = 0; + NaN_check_ = false; + } + Impl(arrow::compute::ExecContext* ctx, std::shared_ptr result_schema, std::shared_ptr key_projector, std::vector> projected_types, @@ -160,8 +165,16 @@ class SortArraysToIndicesKernel::Impl { // process auto codes = ProduceCodes(); // compile codes - RETURN_NOT_OK(CompileCodes(codes, signature_)); - RETURN_NOT_OK(LoadLibrary(signature_, ctx_, &sorter_)); + const arrow::Status& status1 = CompileCodes(codes, signature_); + if (!status1.ok()) { + FileSpinUnLock(file_lock); + return status1; + } + const arrow::Status& status2 = LoadLibrary(signature_, ctx_, &sorter_); + if (!status2.ok()) { + FileSpinUnLock(file_lock); + return status1; + } } FileSpinUnLock(file_lock); return arrow::Status::OK(); @@ -235,7 +248,7 @@ class SortArraysToIndicesKernel::Impl { // true for nulls_first, false for nulls_last std::vector nulls_order_; bool NaN_check_; - int col_num_; + int col_num_ = 0; class TypedSorterCodeGenImpl { public: @@ -1607,7 +1620,7 @@ class SortMultiplekeyKernel : public SortArraysToIndicesKernel::Impl { return false; } - auto Sort(ArrayItemIndexS* indices_begin, ArrayItemIndexS* indices_end) { + void Sort(ArrayItemIndexS* indices_begin, ArrayItemIndexS* indices_end) { int keys_num = sort_directions_.size(); auto comp = [this, &keys_num](const ArrayItemIndexS& x, const ArrayItemIndexS& y) { return compareRow(x.array_id, x.id, y.array_id, y.id, keys_num); diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/whole_stage_codegen_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/whole_stage_codegen_kernel.cc index e036405b5..9b336cbd0 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/whole_stage_codegen_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/whole_stage_codegen_kernel.cc @@ -95,7 +95,7 @@ class WholeStageCodeGenKernel::Impl { auto function_node = std::dynamic_pointer_cast(node); auto func_name = function_node->descriptor()->name(); if (func_name.compare(0, 22, "conditionedProbeArrays") == 0) { - int join_type; + int join_type = 0; gandiva::NodeVector left_schema_list; RETURN_NOT_OK(GetArguments(function_node, 0, &left_schema_list)); gandiva::NodeVector right_schema_list; @@ -132,7 +132,7 @@ class WholeStageCodeGenKernel::Impl { out)); } else if (func_name.compare(0, 20, "conditionedMergeJoin") == 0) { - int join_type; + int join_type = 0; gandiva::NodeVector left_schema_list; RETURN_NOT_OK(GetArguments(function_node, 0, &left_schema_list)); gandiva::NodeVector right_schema_list; @@ -277,8 +277,8 @@ class WholeStageCodeGenKernel::Impl { // process try { // compile codes - RETURN_NOT_OK(CompileCodes(codes, signature_)); - RETURN_NOT_OK(LoadLibrary(signature_, ctx_, out)); + arrow::Status s = CompileCodes(codes, signature_); + s = LoadLibrary(signature_, ctx_, out); } catch (const std::runtime_error& error) { FileSpinUnLock(file_lock); throw error; @@ -310,7 +310,9 @@ class WholeStageCodeGenKernel::Impl { gandiva_projector_list_.push_back(codegen_ctx->gandiva_projector); } for (auto header : headers) { - codes_ss << header << std::endl; + if (!header.empty()) { + codes_ss << header << std::endl; + } } if (is_aggr_) { @@ -424,7 +426,8 @@ class TypedWholeStageCodeGenImpl : public CodeGenBase { for (int i = 0; i < length; i++) { )" << std::endl; } else { - codes_ss << "while (!should_stop_ && out_length < 10000) {" << std::endl; + codes_ss << "while (!should_stop_ && out_length < " << GetBatchSize() << ") {" + << std::endl; } // input preparation for (int i = 0; i < input_field_list.size(); i++) { @@ -561,7 +564,9 @@ extern "C" void MakeCodeGen(arrow::compute::ExecContext *ctx, std::string GetProcessMaterializeCodes(std::shared_ptr codegen_ctx) { std::stringstream codes_ss; int i = 0; - for (auto pair : codegen_ctx->output_list) { + auto out_list = codegen_ctx->output_list; + for (int j = 0; j < out_list.size(); j++) { + auto pair = out_list[j]; auto name = pair.first.first; auto type = pair.second; auto validity = name + "_validity"; @@ -651,6 +656,7 @@ WholeStageCodeGenKernel::WholeStageCodeGenKernel( const std::vector>& output_field_list) { impl_.reset(new Impl(ctx, input_field_list, root_node, output_field_list)); kernel_name_ = "WholeStageCodeGenKernel"; + ctx_ = nullptr; } arrow::Status WholeStageCodeGenKernel::MakeResultIterator( @@ -664,4 +670,4 @@ std::string WholeStageCodeGenKernel::GetSignature() { return impl_->GetSignature } // namespace extra } // namespace arrowcompute } // namespace codegen -} // namespace sparkcolumnarplugin \ No newline at end of file +} // namespace sparkcolumnarplugin diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_kernel.cc index bbd02e4ff..7b321f62d 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_kernel.cc @@ -99,16 +99,17 @@ arrow::Status WindowAggregateFunctionKernel::Evaluate(const ArrayList& in) { int32_t max_group_id = 0; std::shared_ptr group_id_array = in[1]; auto group_ids = std::dynamic_pointer_cast(group_id_array); - accumulated_group_ids_.push_back(group_ids); - for (int i = 0; i < group_ids->length(); i++) { - if (group_ids->IsNull(i)) { - continue; - } - if (group_ids->GetView(i) > max_group_id) { - max_group_id = group_ids->GetView(i); + if (group_ids) { + accumulated_group_ids_.push_back(group_ids); + for (int i = 0; i < group_ids->length(); i++) { + if (group_ids->IsNull(i)) { + continue; + } + if (group_ids->GetView(i) > max_group_id) { + max_group_id = group_ids->GetView(i); + } } } - ArrayList action_input_data; action_input_data.push_back(in[0]); std::function func; @@ -116,15 +117,16 @@ arrow::Status WindowAggregateFunctionKernel::Evaluate(const ArrayList& in) { RETURN_NOT_OK( action_->Get()->Submit(action_input_data, max_group_id, &func, &null_func)); - for (int row_id = 0; row_id < group_id_array->length(); row_id++) { - if (group_ids->IsNull(row_id)) { - RETURN_NOT_OK(null_func()); - continue; + if (group_ids) { + for (int row_id = 0; row_id < group_id_array->length(); row_id++) { + if (group_ids->IsNull(row_id)) { + RETURN_NOT_OK(null_func()); + continue; + } + auto group_id = group_ids->GetView(row_id); + RETURN_NOT_OK(func(group_id)); } - auto group_id = group_ids->GetView(row_id); - RETURN_NOT_OK(func(group_id)); } - return arrow::Status::OK(); } @@ -376,7 +378,7 @@ arrow::Status WindowRankKernel::Finish(ArrayList* out) { std::shared_ptr last_index = sorted_partition.at(j - 1); bool same = true; for (int column_id = 0; column_id < type_list_.size(); column_id++) { - bool s; + bool s = false; std::shared_ptr type = type_list_.at(column_id); switch (type->id()) { #define PROCESS(InType, BUILDER_TYPE, ARRAY_TYPE) \ @@ -396,7 +398,7 @@ arrow::Status WindowRankKernel::Finish(ArrayList* out) { break; } } - if (same) { + if (same && rank_array[index->array_id] && rank_array[last_index->array_id]) { rank_array[index->array_id][index->id] = rank_array[last_index->array_id][last_index->id]; continue; diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h index e1ecbed4e..aa6d0ed87 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h @@ -93,11 +93,16 @@ class WindowSortKernel::Impl { auto file_lock = FileSpinLock(); auto status = LoadLibrary(signature_, ctx_, &sorter); if (!status.ok()) { - // process - auto codes = ProduceCodes(result_schema); - // compile codes - RETURN_NOT_OK(CompileCodes(codes, signature_)); - RETURN_NOT_OK(LoadLibrary(signature_, ctx_, &sorter)); + try { + // process + auto codes = ProduceCodes(result_schema); + // compile codes + auto s = CompileCodes(codes, signature_); + s = LoadLibrary(signature_, ctx_, &sorter); + } catch (const std::runtime_error& error) { + FileSpinUnLock(file_lock); + throw error; + } } FileSpinUnLock(file_lock); return arrow::Status::OK(); @@ -121,7 +126,7 @@ class WindowSortKernel::Impl { protected: std::shared_ptr sorter; - arrow::compute::ExecContext* ctx_; + arrow::compute::ExecContext* ctx_ = nullptr; std::string signature_; bool nulls_first_; bool asc_; @@ -636,7 +641,7 @@ class WindowSortOnekeyKernel : public WindowSortKernel::Impl { // using ArrayType_key = arrow::UInt32Array; std::vector> cached_key_; std::vector cached_; - arrow::compute::ExecContext* ctx_; + arrow::compute::ExecContext* ctx_ = nullptr; std::shared_ptr result_schema_; bool nulls_first_; bool asc_; diff --git a/native-sql-engine/cpp/src/codegen/expr_visitor.h b/native-sql-engine/cpp/src/codegen/expr_visitor.h index 8cd6d4ed3..568404ceb 100644 --- a/native-sql-engine/cpp/src/codegen/expr_visitor.h +++ b/native-sql-engine/cpp/src/codegen/expr_visitor.h @@ -51,7 +51,7 @@ class ExprVisitor : public VisitorBase { // "encodeArray"}; std::vector gdv{"add", "substract", "multiply", "divide"}; std::vector ce{}; - int codegen_type; + int codegen_type = 0; arrow::Status Visit(const gandiva::FunctionNode& node); }; } // namespace codegen diff --git a/native-sql-engine/cpp/src/jni/jni_wrapper.cc b/native-sql-engine/cpp/src/jni/jni_wrapper.cc index 1a4dbbdc4..6638ab183 100644 --- a/native-sql-engine/cpp/src/jni/jni_wrapper.cc +++ b/native-sql-engine/cpp/src/jni/jni_wrapper.cc @@ -636,6 +636,10 @@ Java_com_intel_oap_vectorized_ExpressionEvaluatorJniWrapper_nativeSetDependency( JNIEXPORT jboolean JNICALL Java_com_intel_oap_vectorized_BatchIterator_nativeHasNext( JNIEnv* env, jobject obj, jlong id) { auto iter = GetBatchIterator(env, id); + if (iter == nullptr) { + std::string error_message = "faked to get batch iterator"; + env->ThrowNew(io_exception_class, error_message.c_str()); + } return iter->HasNext(); } @@ -854,7 +858,9 @@ Java_com_intel_oap_vectorized_BatchIterator_nativeProcessAndCacheOne( } auto iter = GetBatchIterator(env, id); - status = iter->ProcessAndCacheOne(in); + if (iter) { + status = iter->ProcessAndCacheOne(in); + } if (!status.ok()) { std::string error_message = diff --git a/native-sql-engine/cpp/src/precompile/array.cc b/native-sql-engine/cpp/src/precompile/array.cc index eb7072216..2da5f0b95 100644 --- a/native-sql-engine/cpp/src/precompile/array.cc +++ b/native-sql-engine/cpp/src/precompile/array.cc @@ -44,7 +44,11 @@ BooleanArray::BooleanArray(const std::shared_ptr& in) : cache_(in) null_count_ = in->null_count(); \ null_bitmap_data_ = null_count_ == 0 ? NULLPTR : in->null_bitmap_data(); \ auto typed_in = std::dynamic_pointer_cast(in); \ - raw_value_ = typed_in->raw_values(); \ + if (typed_in) { \ + raw_value_ = typed_in->raw_values(); \ + } else { \ + raw_value_ = NULLPTR; \ + } \ } TYPED_NUMERIC_ARRAY_IMPL(Int8Array, int8_t) diff --git a/native-sql-engine/cpp/src/precompile/gandiva.h b/native-sql-engine/cpp/src/precompile/gandiva.h index a97b654b6..f2e6eedaf 100644 --- a/native-sql-engine/cpp/src/precompile/gandiva.h +++ b/native-sql-engine/cpp/src/precompile/gandiva.h @@ -30,7 +30,8 @@ int64_t castDATE64(int32_t in) { return castDATE_date32(in); } int64_t extractYear(int64_t millis) { return extractYear_timestamp(millis); } template T round2(T val, int precision = 2) { - int charsNeeded = 1 + snprintf(NULL, 0, "%.*f", (int)precision, val); + double dVal = (double)val; + int charsNeeded = 1 + snprintf(NULL, 0, "%.*f", (int)precision, dVal); char* buffer = reinterpret_cast(malloc(charsNeeded)); snprintf(buffer, charsNeeded, "%.*f", (int)precision, nextafter(val, val + 0.5)); double result = atof(buffer); @@ -39,9 +40,10 @@ T round2(T val, int precision = 2) { } arrow::Decimal128 castDECIMAL(double val, int32_t precision, int32_t scale) { - int charsNeeded = 1 + snprintf(NULL, 0, "%.*f", (int)scale, val); + double dVal = (double)val; + int charsNeeded = 1 + snprintf(NULL, 0, "%.*f", (int)scale, dVal); char* buffer = reinterpret_cast(malloc(charsNeeded)); - snprintf(buffer, charsNeeded, "%.*f", (int)scale, nextafter(val, val + 0.5)); + snprintf(buffer, sizeof(buffer), "%.*f", (int)scale, nextafter(val, val + 0.5)); auto decimal_str = std::string(buffer); free(buffer); return arrow::Decimal128::FromString(decimal_str).ValueOrDie(); diff --git a/native-sql-engine/cpp/src/precompile/hash_arrays_kernel.cc b/native-sql-engine/cpp/src/precompile/hash_arrays_kernel.cc index 6f41ca337..5f05c4a29 100644 --- a/native-sql-engine/cpp/src/precompile/hash_arrays_kernel.cc +++ b/native-sql-engine/cpp/src/precompile/hash_arrays_kernel.cc @@ -47,6 +47,7 @@ class HashArraysKernel::Impl { } index++; } + assert(func_node_list.size() > 0); auto expr = gandiva::TreeExprBuilder::MakeExpression( func_node_list[0], arrow::field("projection_key", arrow::int64())); schema_ = arrow::schema(field_list); diff --git a/native-sql-engine/cpp/src/shuffle/splitter.cc b/native-sql-engine/cpp/src/shuffle/splitter.cc index b8bc0eba7..2eebb9ae4 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.cc +++ b/native-sql-engine/cpp/src/shuffle/splitter.cc @@ -501,6 +501,7 @@ arrow::Status Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t n switch (column_type_id_[i]) { case Type::SHUFFLE_BINARY: { auto builder = std::make_shared(options_.memory_pool); + assert(builder != nullptr); RETURN_NOT_OK(builder->Reserve(new_size)); RETURN_NOT_OK( builder->ReserveData(binary_array_empirical_size_[binary_idx] * new_size)); @@ -510,6 +511,7 @@ arrow::Status Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t n } case Type::SHUFFLE_LARGE_BINARY: { auto builder = std::make_shared(options_.memory_pool); + assert(builder != nullptr); RETURN_NOT_OK(builder->Reserve(new_size)); RETURN_NOT_OK(builder->ReserveData( large_binary_array_empirical_size_[large_binary_idx] * new_size)); @@ -1171,6 +1173,9 @@ arrow::Status HashSplitter::ComputeAndCountPartitionId(const arrow::RecordBatch& std::to_string(outputs.size())); } auto pid_arr = std::dynamic_pointer_cast(outputs.at(0)); + if (pid_arr == nullptr) { + return arrow::Status::Invalid("failed to cast outputs.at(0)"); + } for (auto i = 0; i < num_rows; ++i) { // positive mod auto pid = pid_arr->Value(i) % num_partitions_;