diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala index c530d4023..2cc1a05d2 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala @@ -243,6 +243,12 @@ class GazellePluginConfig(conf: SQLConf) extends Logging { val enableUDFKey: String = "spark.oap.sql.columnar.enable.udf" val enableUDF: Boolean = conf.getConfString(enableUDFKey, "false").toBoolean + + // To enable the codegen for projection without the requirement for join existence. + // This config is just for test use. + val enableProjectionCodegenKey: String = "spark.oap.sql.columnar.projection.codegen" + val enableProjectionCodegen: Boolean = + conf.getConfString(enableProjectionCodegenKey, "false").toBoolean } object GazellePluginConfig { diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpression.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpression.scala index b7b911767..04598f28e 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpression.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpression.scala @@ -55,9 +55,13 @@ trait ColumnarExpression { "castDATE", "castDECIMAL", "castDECIMALNullOnOverflow", + "castINT", "castINTOrNull", + "castBIGINT", "castBIGINTOrNull", + "castFLOAT4", "castFLOAT4OrNull", + "castFLOAT8", "castFLOAT8OrNull", "rescaleDECIMAL", "extractYear", diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarUnaryOperator.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarUnaryOperator.scala index 02f99365c..738012f10 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarUnaryOperator.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarUnaryOperator.scala @@ -423,7 +423,7 @@ class ColumnarLower(child: Expression, original: Expression) val gName = "lower" override def supportColumnarCodegen(args: java.lang.Object): Boolean = { - codegenFuncList.contains(gName) && + codegenFuncList.contains(gName) && child.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) } @@ -527,7 +527,30 @@ class ColumnarCast( val gName = "Cast" override def supportColumnarCodegen(args: java.lang.Object): Boolean = { - true && + // Casting data to TimestampType/BinaryType is not supported in codegen. + if (dataType.isInstanceOf[TimestampType] || dataType == BinaryType) { + return false + } + if (dataType == DateType) { + child.dataType match { + case TimestampType => + return false + case StringType => + return false + case _ => + } + } + if (dataType == StringType) { + child.dataType match { + case TimestampType => + return false + case DateType => + return false + case _ => + } + } + + true && child.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) } diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseCodegenStages.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseCodegenStages.scala index e358672d3..ad1349585 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseCodegenStages.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseCodegenStages.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution import java.util.concurrent.atomic.AtomicInteger +import com.intel.oap.GazellePluginConfig import com.intel.oap.execution._ import com.intel.oap.expression.ColumnarExpressionConverter import org.apache.spark._ @@ -118,6 +119,8 @@ case class ColumnarCollapseCodegenStages( codegenStageCounter: AtomicInteger = new AtomicInteger(0)) extends Rule[SparkPlan] { + val enableProjectionCodegen = GazellePluginConfig.getSessionConf.enableProjectionCodegen + private def supportCodegen(plan: SparkPlan): Boolean = plan match { case plan: ColumnarCodegenSupport => plan.supportColumnarCodegen @@ -315,6 +318,10 @@ case class ColumnarCollapseCodegenStages( case s: ColumnarSortExec => /*If ColumnarSort is not ahead of ColumnarSMJ, we should not do wscg for it*/ s.withNewChildren(s.children.map(insertWholeStageCodegen)) + // For testing projection codegen. + case plan: ColumnarCodegenSupport if enableProjectionCodegen && supportCodegen(plan) => + ColumnarWholeStageCodegenExec(insertInputAdapter(plan))( + codegenStageCounter.incrementAndGet()) case plan: ColumnarCodegenSupport if supportCodegen(plan) && existsJoins(plan) => ColumnarWholeStageCodegenExec(insertInputAdapter(plan))( codegenStageCounter.incrementAndGet()) diff --git a/native-sql-engine/core/src/test/scala/com/intel/oap/codegen/ProjectionCodegenSuite.scala b/native-sql-engine/core/src/test/scala/com/intel/oap/codegen/ProjectionCodegenSuite.scala new file mode 100644 index 000000000..5f08ec0bb --- /dev/null +++ b/native-sql-engine/core/src/test/scala/com/intel/oap/codegen/ProjectionCodegenSuite.scala @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.codegen + +import java.sql.Timestamp + +import com.intel.oap.GazellePluginConfig +import com.intel.oap.execution.ColumnarWholeStageCodegenExec + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.test.SharedSparkSession + +class ProjectionCodegenSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + test("less than function in codegen") { + val inputData = Seq((7, 3), (10, 15)).toDF("a", "b") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + val df = inputData.selectExpr("a < b") + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row(false), Row(true)) + ) + } + } + + test("less than or equal to function in codegen") { + val inputData = Seq((7, 7), (10, 15), (10, 9)).toDF("a", "b") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + val df = inputData.selectExpr("a <= b") + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row(true), Row(true), Row(false)) + ) + } + } + + test("greater than function in codegen") { + val inputData = Seq((7, 3), (10, 15)).toDF("a", "b") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + val df = inputData.selectExpr("a > b") + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row(true), Row(false)) + ) + } + } + + test("greater than or equal to function in codegen") { + val inputData = Seq((7, 7), (10, 15), (10, 9)).toDF("a", "b") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + val df = inputData.selectExpr("a >= b") + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row(true), Row(false), Row(true)) + ) + } + } + + test("equal function in codegen") { + val inputData = Seq((7, 3), (10, 10)).toDF("a", "b") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + val df = inputData.selectExpr("a = b") + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row(false), Row(true)) + ) + } + } + + test("translate function in codegen") { + val inputData = Seq(("AaBbCc", "abc", "123")).toDF("a", "b", "c") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + val df = inputData.selectExpr("translate(a, b, c)") + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row("A1B2C3")) + ) + } + } + + test("substr function in codegen") { + val inputData = Seq(("Spark SQL", 5)).toDF("a", "b") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + val df = inputData.selectExpr("substr(a, b)") + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row("k SQL")) + ) + } + } + + test("instr function in codegen") { + val inputData = Seq(("SparkSQL", "SQL")).toDF("a", "b") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + val df = inputData.selectExpr("instr(a, b)") + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row(6)) + ) + } + } + + test("btrim/ltrim/rtrim function in codegen") { + val inputData = Seq((" SparkSQL ")).toDF("a") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + var df = inputData.selectExpr("trim(a)") + var executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row("SparkSQL")) + ) + + df = inputData.selectExpr("ltrim(a)") + executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row("SparkSQL ")) + ) + + df = inputData.selectExpr("rtrim(a)") + executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row(" SparkSQL")) + ) + } + } + + test("upper/lower function in codegen") { + val inputData = Seq(("SparkSQL")).toDF("a") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + var df = inputData.selectExpr("upper(a)") + var executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row("SPARKSQL")) + ) + + df = inputData.selectExpr("lower(a)") + executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row("sparksql")) + ) + } + } + + test("cast INT/BIGINT function in codegen") { + val inputData = Seq(("123")).toDF("a") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + var df = inputData.selectExpr("cast(a as INT)") + var executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row(123)) + ) + + df = inputData.selectExpr("cast(a as BIGINT)") + executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row(123L)) + ) + } + } + + test("cast FLOAT function in codegen") { + val inputData = Seq(("123.456")).toDF("a") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + var df = inputData.selectExpr("cast(a as float)") + var executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row(123.456f)) + ) + } + } + + test("round function in codegen") { + val inputData = Seq((2.4), (2.5)).toDF("a") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + val df = inputData.selectExpr("round(a)") + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row(2), Row(3)) + ) + } + } + + test("abs function in codegen") { + val inputData = Seq((2.0), (-3.0)).toDF("a") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + val df = inputData.selectExpr("abs(a)") + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row(2.0), Row(3.0)) + ) + } + } + + test("add function in codegen") { + val inputData = Seq((2.4), (2.5)).toDF("a") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + val df = inputData.selectExpr("a + 1") + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row(3.4), Row(3.5)) + ) + } + } + + test("subtract function in codegen") { + val inputData = Seq((2.4), (2.5)).toDF("a") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + val df = inputData.selectExpr("a - 1") + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row(1.4), Row(1.5)) + ) + } + } + + test("multiply function in codegen") { + val inputData = Seq((2), (3)).toDF("a") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + val df = inputData.selectExpr("a * 2") + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row(4), Row(6)) + ) + } + } + + test("divide function in codegen") { + val inputData = Seq((2), (4)).toDF("a") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + val df = inputData.selectExpr("a / 2") + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row(1), Row(2)) + ) + } + } + + test("shift left function in codegen") { + val inputData = Seq((2), (4)).toDF("a") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + val df = inputData.selectExpr("shiftleft(a, 1)") + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row(4), Row(8)) + ) + } + } + + test("shift right function in codegen") { + val inputData = Seq((2), (4)).toDF("a") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + val df = inputData.selectExpr("shiftright(a, 1)") + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row(1), Row(2)) + ) + } + } + + test("bitwise and function in codegen") { + val inputData = Seq((3, 5)).toDF("a", "b") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + val df = inputData.selectExpr("a & b") + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row(1)) + ) + } + } + + test("bitwise or function in codegen") { + val inputData = Seq((3, 5)).toDF("a", "b") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + val df = inputData.selectExpr("a | b") + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer( + df, Seq(Row(7)) + ) + } + } + + test("timestamp_micros function in codegen") { + val inputData = Seq(1000L).toDF("a") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + val df = inputData.selectExpr("timestamp_micros(a)") + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == true) + checkAnswer(df, Row(new Timestamp(1L))) + } + } + + // cast string to timestamp is not supported in codegen. + test("block casting string to timestamp in codegen") { + val inputData = Seq("1970-01-01 00:00:00.000").toDF("a") + withSQLConf(GazellePluginConfig.getSessionConf.enableProjectionCodegenKey -> "true") { + val df = inputData.selectExpr("cast(a as timestamp)") + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.children(0).isInstanceOf[ColumnarWholeStageCodegenExec] == false) + // non-codegen result. + checkAnswer(df, Row(new Timestamp(0L))) + } + } + +} diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/expression_codegen_visitor.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/expression_codegen_visitor.cc index 265dba5e1..9207dc63d 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/expression_codegen_visitor.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/expression_codegen_visitor.cc @@ -335,8 +335,9 @@ arrow::Status ExpressionCodegenVisitor::Visit(const gandiva::FunctionNode& node) prepare_ss << "bool " << validity << " = " << child_visitor_list[0]->GetPreCheck() << ";" << std::endl; prepare_ss << "if (" << validity << ") {" << std::endl; - prepare_ss << "auto ind = " << child_visitor_list[0]->GetResult() << ".find(" - << child_visitor_list[1]->GetResult() << ");" << std::endl; + // The 1st arg is the string we are searching for inside the 2nd arg. + prepare_ss << "auto ind = " << child_visitor_list[1]->GetResult() << ".find(" + << child_visitor_list[0]->GetResult() << ");" << std::endl; prepare_ss << "if (ind == std::string::npos) {" << std::endl; prepare_ss << codes_str_ << " = 0;" << std::endl; prepare_ss << "}" << std::endl; @@ -344,7 +345,7 @@ arrow::Status ExpressionCodegenVisitor::Visit(const gandiva::FunctionNode& node) prepare_ss << codes_str_ << " = ind + 1;" << std::endl; prepare_ss << "}" << std::endl; prepare_ss << "}" << std::endl; - for (int i = 0; i < 1; i++) { + for (int i = 0; i < 2; i++) { prepare_str_ += child_visitor_list[i]->GetPrepare(); } prepare_str_ += prepare_ss.str(); @@ -491,7 +492,7 @@ arrow::Status ExpressionCodegenVisitor::Visit(const gandiva::FunctionNode& node) auto childNode = node.children().at(0); if (childNode->return_type()->id() != arrow::Type::DECIMAL) { - // if not casting form Decimal + // For numberic types, except decimal type. std::stringstream fix_ss; if (node.return_type()->id() == arrow::Type::STRING) { prepare_ss << codes_str_ << " = std::to_string("