From 9dbde5b8c8b28a61c0702bb128e2d2b5cfb51977 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 27 Jun 2016 13:05:12 -0700 Subject: [PATCH 1/3] [SPARK-16228][SQL] Support a fallback lookup for external functions with `double`-type parameter only. --- .../spark/sql/catalyst/analysis/Analyzer.scala | 13 +++++++++++-- .../spark/sql/hive/execution/HiveUDFSuite.scala | 7 +++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 96f2e38946f1..a0e2e53f6570 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf} -import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogRelation, InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -881,7 +881,16 @@ class Analyzer( } case u @ UnresolvedFunction(funcId, children, isDistinct) => withPosition(u) { - catalog.lookupFunction(funcId, children) match { + val func = try { + catalog.lookupFunction(funcId, children) + } catch { + // SPARK-16228 ExternalCatalog may recognize `double`-type only. + case _: Exception => + val newChildren = children.map(x => + if (x.dataType.isInstanceOf[DecimalType]) Cast(x, DoubleType) else x) + catalog.lookupFunction(funcId, newChildren) + } + func match { // DISTINCT is not meaningful for a Max or a Min. case max: Max if isDistinct => AggregateExpression(max, Complete, isDistinct = false) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 0f56b2c0d1f4..def4601cf615 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -142,6 +142,13 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { sql("SELECT array(max(key), max(key)) FROM src").collect().toSeq) } + test("SPARK-16228 Percentile needs explicit cast to double") { + sql("select percentile(value, cast(0.5 as double)) from values 1,2,3 T(value)") + sql("select percentile_approx(value, cast(0.5 as double)) from values 1.0,2.0,3.0 T(value)") + sql("select percentile(value, 0.5) from values 1,2,3 T(value)") + sql("select percentile_approx(value, 0.5) from values 1.0,2.0,3.0 T(value)") + } + test("Generic UDAF aggregates") { checkAnswer(sql("SELECT ceiling(percentile_approx(key, 0.99999D)) FROM src LIMIT 1"), sql("SELECT max(key) FROM src LIMIT 1").collect().toSeq) From b8df0284aa7bd4328ff7f8e1ebdce55272e549d2 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 27 Jun 2016 14:21:34 -0700 Subject: [PATCH 2/3] Move fallback logic into HiveSessionCatalog. --- .../spark/sql/catalyst/analysis/Analyzer.scala | 13 ++----------- .../spark/sql/hive/HiveSessionCatalog.scala | 15 ++++++++++++++- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a0e2e53f6570..96f2e38946f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf} -import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogRelation, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -881,16 +881,7 @@ class Analyzer( } case u @ UnresolvedFunction(funcId, children, isDistinct) => withPosition(u) { - val func = try { - catalog.lookupFunction(funcId, children) - } catch { - // SPARK-16228 ExternalCatalog may recognize `double`-type only. - case _: Exception => - val newChildren = children.map(x => - if (x.dataType.isInstanceOf[DecimalType]) Cast(x, DoubleType) else x) - catalog.lookupFunction(funcId, newChildren) - } - func match { + catalog.lookupFunction(funcId, children) match { // DISTINCT is not meaningful for a Max or a Min. case max: Max if isDistinct => AggregateExpression(max, Complete, isDistinct = false) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 8a47dcf90803..3a7c04bcd7ca 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -30,12 +30,13 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, SessionCatalog} -import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} +import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{DecimalType, DoubleType} import org.apache.spark.util.Utils @@ -163,6 +164,18 @@ private[sql] class HiveSessionCatalog( } override def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]): Expression = { + try { + subLookupFunction(name, children) + } catch { + case _: Exception => + // SPARK-16228 ExternalCatalog may recognize `double`-type only. + val newChildren = children.map(x => + if (x.dataType.isInstanceOf[DecimalType]) Cast(x, DoubleType) else x) + subLookupFunction(name, newChildren) + } + } + + private def subLookupFunction(name: FunctionIdentifier, children: Seq[Expression]): Expression = { // TODO: Once lookupFunction accepts a FunctionIdentifier, we should refactor this method to // if (super.functionExists(name)) { // super.lookupFunction(name, children) From 80cea2e10581a986e64d13f7abff3f5001f276b1 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 29 Jun 2016 13:28:31 -0700 Subject: [PATCH 3/3] Address comments. --- .../apache/spark/sql/hive/HiveSessionCatalog.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 3a7c04bcd7ca..2589b9d4a028 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -165,17 +165,18 @@ private[sql] class HiveSessionCatalog( override def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]): Expression = { try { - subLookupFunction(name, children) + lookupFunction0(name, children) } catch { - case _: Exception => + case NonFatal(_) => // SPARK-16228 ExternalCatalog may recognize `double`-type only. - val newChildren = children.map(x => - if (x.dataType.isInstanceOf[DecimalType]) Cast(x, DoubleType) else x) - subLookupFunction(name, newChildren) + val newChildren = children.map { child => + if (child.dataType.isInstanceOf[DecimalType]) Cast(child, DoubleType) else child + } + lookupFunction0(name, newChildren) } } - private def subLookupFunction(name: FunctionIdentifier, children: Seq[Expression]): Expression = { + private def lookupFunction0(name: FunctionIdentifier, children: Seq[Expression]): Expression = { // TODO: Once lookupFunction accepts a FunctionIdentifier, we should refactor this method to // if (super.functionExists(name)) { // super.lookupFunction(name, children)