From 72a5fe82a11354a155a05bacb626f3ae422c2104 Mon Sep 17 00:00:00 2001 From: philo Date: Mon, 23 May 2022 17:29:37 +0800 Subject: [PATCH] Add scala udf support and a unit test --- .../ColumnarExpressionConverter.scala | 18 +++++++++++++++--- .../{UDF.scala => ColumnarUDF.scala} | 14 +++++++++++++- .../spark/sql/DataFrameFunctionsSuite.scala | 10 ++++++++++ 3 files changed, 38 insertions(+), 4 deletions(-) rename native-sql-engine/core/src/main/scala/com/intel/oap/expression/{UDF.scala => ColumnarUDF.scala} (89%) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpressionConverter.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpressionConverter.scala index b809634e0..00007f4c8 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpressionConverter.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpressionConverter.scala @@ -399,14 +399,24 @@ object ColumnarExpressionConverter extends Logging { attributeSeq, convertBoundRefToAttrRef = convertBoundRefToAttrRef), expr) - case expr if (UDF.isSupportedUDF(expr.prettyName)) => + // Scala UDF. + case expr: ScalaUDF if ColumnarUDF.isSupportedUDF(expr.udfName.get) => val children = expr.children.map { expr => replaceWithColumnarExpression( expr, attributeSeq, convertBoundRefToAttrRef = convertBoundRefToAttrRef) } - UDF.create(children, expr) + ColumnarUDF.create(children, expr) + // Hive UDF. + case expr if (ColumnarUDF.isSupportedUDF(expr.prettyName)) => + val children = expr.children.map { expr => + replaceWithColumnarExpression( + expr, + attributeSeq, + convertBoundRefToAttrRef = convertBoundRefToAttrRef) + } + ColumnarUDF.create(children, expr) case expr => throw new UnsupportedOperationException( s" --> ${expr.getClass} | ${expr} is not currently supported.") @@ -468,7 +478,9 @@ object ColumnarExpressionConverter extends Logging { containsSubquery(sr.srcExpr) || containsSubquery(sr.searchExpr) || containsSubquery(sr.replaceExpr) - case expr if (UDF.isSupportedUDF(expr.prettyName)) => + case expr: ScalaUDF if ColumnarUDF.isSupportedUDF(expr.udfName.get) => + expr.children.map(containsSubquery).exists(_ == true) + case expr if (ColumnarUDF.isSupportedUDF(expr.prettyName)) => expr.children.map(containsSubquery).exists(_ == true) case expr => throw new UnsupportedOperationException( diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/UDF.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarUDF.scala similarity index 89% rename from native-sql-engine/core/src/main/scala/com/intel/oap/expression/UDF.scala rename to native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarUDF.scala index 4c013e420..4709ee6eb 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/UDF.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarUDF.scala @@ -83,20 +83,32 @@ case class ColumnarURLDecoder(input: Expression) extends Expression with Columna } } -object UDF { +object ColumnarUDF { // Keep the supported UDF name. The name is specified in registering the // row based function in spark, e.g., // CREATE TEMPORARY FUNCTION UrlDecoder AS 'com.intel.test.URLDecoderNew'; val supportList = {"UrlDecoder"} def isSupportedUDF(name: String): Boolean = { + if (name == null) { + return false; + } return supportList.contains(name) } def create(children: Seq[Expression], original: Expression): Expression = { original.prettyName match { + // Hive UDF. case "UrlDecoder" => ColumnarURLDecoder(children.head) + // Scala UDF. + case "scalaudf" => + original.asInstanceOf[ScalaUDF].udfName.get match { + case "UrlDecoder" => + ColumnarURLDecoder(children.head) + case other => + throw new UnsupportedOperationException(s"not currently supported: $other.") + } case other => throw new UnsupportedOperationException(s"not currently supported: $other.") } diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index aa1678e4f..cfe36616e 100644 --- a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -3629,6 +3629,16 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { df.select(map(map_entries($"m"), lit(1))), Row(Map(Seq(Row(1, "a")) -> 1))) } + + test("Columnar UDF") { + // Register a scala UDF. The scala UDF code will not be acutally used. It + // will be replaced by columnar UDF at runtime. + spark.udf.register("UrlDecoder", (s : String) => s) + checkAnswer( + sql("select UrlDecoder('AaBb%23'), UrlDecoder(null)"), + Seq(Row("AaBb#", null)) + ) + } } object DataFrameFunctionsSuite {