Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ class Analyzer(
Batch("Hints", fixedPoint,
new ResolveHints.ResolveBroadcastHints(conf),
ResolveHints.RemoveAllHints),
Batch("Simple Sanity Check", Once,
LookupFunctions),
Batch("Substitution", fixedPoint,
CTESubstitution,
WindowsSubstitution,
Expand Down Expand Up @@ -1038,6 +1040,25 @@ class Analyzer(
}
}

/**
* Checks whether a function identifier referenced by an [[UnresolvedFunction]] is defined in the
* function registry. Note that this rule doesn't try to resolve the [[UnresolvedFunction]]. It
* only performs simple existence check according to the function identifier to quickly identify
* undefined functions without triggering relation resolution, which may incur potentially
* expensive partition/schema discovery process in some cases.
*
* @see [[ResolveFunctions]]
* @see https://issues.apache.org/jira/browse/SPARK-19737
*/
object LookupFunctions extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressions {
case f: UnresolvedFunction if !catalog.functionExists(f.name) =>
withPosition(f) {
throw new NoSuchFunctionException(f.name.database.getOrElse("default"), f.name.funcName)
}
}
}

/**
* Replaces [[UnresolvedFunction]]s with concrete [[Expression]]s.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.catalog

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SimpleCatalystConf, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
Expand Down Expand Up @@ -1196,4 +1196,25 @@ class SessionCatalogSuite extends PlanTest {
catalog.listFunctions("unknown_db", "func*")
}
}

test("SPARK-19737: detect undefined functions without triggering relation resolution") {
import org.apache.spark.sql.catalyst.dsl.plans._

Seq(true, false) foreach { caseSensitive =>
val conf = SimpleCatalystConf(caseSensitive)
val catalog = new SessionCatalog(newBasicCatalog(), new SimpleFunctionRegistry, conf)
val analyzer = new Analyzer(catalog, conf)

// The analyzer should report the undefined function rather than the undefined table first.
val cause = intercept[AnalysisException] {
analyzer.execute(
UnresolvedRelation(TableIdentifier("undefined_table")).select(
UnresolvedFunction("undefined_fn", Nil, isDistinct = false)
)
)
}

assert(cause.getMessage.contains("Undefined function: 'undefined_fn'"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ private[sql] class HiveSessionCatalog(
}
}

// TODO Removes this method after implementing Spark native "histogram_numeric".
override def functionExists(name: FunctionIdentifier): Boolean = {
super.functionExists(name) || hiveFunctions.contains(name.funcName)
}

/** List of functions we pass over to Hive. Note that over time this list should go to 0. */
// We have a list of Hive built-in functions that we do not support. So, we will check
// Hive's function registry and lazily load needed functions into our own function registry.
Expand Down