Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ package org.apache.spark.sql.catalyst
package object analysis {

/**
* Responsible for resolving which identifiers refer to the same entity. For example, by using
* case insensitive equality.
* Resolver should return true if the first string refers to the same entity as the second string.
* For example, by using case insensitive equality.
*/
type Resolver = (String, String) => Boolean

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ abstract class NamedExpression extends Expression {

def name: String
def exprId: ExprId

/**
* All possible qualifiers for the expression.
*
* For now, since we do not allow using original table name to qualify a column name once the
* table is aliased, this can only be:
*
* 1. Empty Seq: when an attribute doesn't have a qualifier,
* e.g. top level attributes aliased in the SELECT clause, or column from a LocalRelation.
* 2. Single element: either the table name or the alias name of the table.
*/
def qualifiers: Seq[String]

def toAttribute: Attribute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
/** Prints out the schema in the tree format */
def printSchema(): Unit = println(schemaString)

/**
* A prefix string used when printing the plan.
*
* We use "!" to indicate an invalid plan, and "'" to indicate an unresolved plan.
*/
protected def statePrefix = if (missingInput.nonEmpty && children.nonEmpty) "!" else ""

override def simpleString = statePrefix + super.simpleString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,29 @@
package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{UnresolvedGetField, Resolver}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.trees

/**
* Estimates of various statistics. The default estimation logic simply lazily multiplies the
* corresponding statistic produced by the children. To override this behavior, override
* `statistics` and assign it an overridden version of `Statistics`.
*
* '''NOTE''': concrete and/or overridden versions of statistics fields should pay attention to the
* performance of the implementations. The reason is that estimations might get triggered in
* performance-critical processes, such as query plan planning.
*
* Note that we are using a BigInt here since it is easy to overflow a 64-bit integer in
* cardinality estimation (e.g. cartesian joins).
*
* @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
* defaults to the product of children's `sizeInBytes`.
*/
private[sql] case class Statistics(sizeInBytes: BigInt)

abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
self: Product =>

/**
* Computes [[Statistics]] for this plan. The default implementation assumes the output
* cardinality is the product of of all child plan's cardinality, i.e. applies in the case
* of cartesian joins.
*
* [[LeafNode]]s must override this.
*/
def statistics: Statistics = {
if (children.size == 0) {
throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.")
}

Statistics(
sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product)
Statistics(sizeInBytes = children.map(_.statistics.sizeInBytes).product)
}

/**
Expand Down Expand Up @@ -128,26 +116,41 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
def resolve(name: String, resolver: Resolver): Option[NamedExpression] =
resolve(name, output, resolver)

def resolveAsTableColumn(
/**
* Resolve the given `name` string against the given attribute, returning either 0 or 1 match.
*
* This assumes `name` has multiple parts, where the 1st part is a qualifier
* (i.e. table name, alias, or subquery alias).
* See the comment above `candidates` variable in resolve() for semantics the returned data.
*/
private def resolveAsTableColumn(
nameParts: Array[String],
resolver: Resolver,
attribute: Attribute): List[(Attribute, List[String])] = {
if (attribute.qualifiers.find(resolver(_, nameParts.head)).nonEmpty && nameParts.size > 1) {
val remainingParts = nameParts.drop(1)
attribute: Attribute): Option[(Attribute, List[String])] = {
assert(nameParts.length > 1)
if (attribute.qualifiers.exists(resolver(_, nameParts.head))) {
// At least one qualifier matches. See if remaining parts match.
val remainingParts = nameParts.tail
resolveAsColumn(remainingParts, resolver, attribute)
} else {
Nil
None
}
}

def resolveAsColumn(
/**
* Resolve the given `name` string against the given attribute, returning either 0 or 1 match.
*
* Different from resolveAsTableColumn, this assumes `name` does NOT start with a qualifier.
* See the comment above `candidates` variable in resolve() for semantics the returned data.
*/
private def resolveAsColumn(
nameParts: Array[String],
resolver: Resolver,
attribute: Attribute): List[(Attribute, List[String])] = {
attribute: Attribute): Option[(Attribute, List[String])] = {
if (resolver(attribute.name, nameParts.head)) {
(attribute.withName(nameParts.head), nameParts.tail.toList) :: Nil
Option((attribute.withName(nameParts.head), nameParts.tail.toList))
} else {
Nil
None
}
}

Expand All @@ -159,25 +162,44 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {

val parts = name.split("\\.")

// We will try to resolve this name as `table.column` pattern first.
var options = input.flatMap { option =>
resolveAsTableColumn(parts, resolver, option)
// A sequence of possible candidate matches.
// Each candidate is a tuple. The first element is a resolved attribute, followed by a list
// of parts that are to be resolved.
// For example, consider an example where "a" is the table name, "b" is the column name,
// and "c" is the struct field name, i.e. "a.b.c". In this case, Attribute will be "a.b",
// and the second element will be List("c").
var candidates: Seq[(Attribute, List[String])] = {
// If the name has 2 or more parts, try to resolve it as `table.column` first.
if (parts.length > 1) {
input.flatMap { option =>
resolveAsTableColumn(parts, resolver, option)
}
} else {
Seq.empty
}
}

// If none of attributes match `table.column` pattern, we try to resolve it as a column.
if(options.isEmpty) {
options = input.flatMap { option =>
resolveAsColumn(parts, resolver, option)
if (candidates.isEmpty) {
candidates = input.flatMap { candidate =>
resolveAsColumn(parts, resolver, candidate)
}
}

options.distinct match {
candidates.distinct match {
// One match, no nested fields, use it.
case Seq((a, Nil)) => Some(a)

// One match, but we also need to extract the requested nested field.
case Seq((a, nestedFields)) =>
Some(Alias(nestedFields.foldLeft(a: Expression)(UnresolvedGetField), nestedFields.last)())
// The foldLeft adds UnresolvedGetField for every remaining parts of the name,
// and aliased it with the last part of the name.
// For example, consider name "a.b.c", where "a" is resolved to an existing attribute.
// Then this will add UnresolvedGetField("b") and UnresolvedGetField("c"), and alias
// the final expression as "c".
val fieldExprs = nestedFields.foldLeft(a: Expression)(UnresolvedGetField)
val aliasName = nestedFields.last
Some(Alias(fieldExprs, aliasName)())

// No matches.
case Seq() =>
Expand All @@ -186,8 +208,8 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {

// More than one match.
case ambiguousReferences =>
throw new TreeNodeException(
this, s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}")
throw new AnalysisException(
s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical

/**
* Estimates of various statistics. The default estimation logic simply lazily multiplies the
* corresponding statistic produced by the children. To override this behavior, override
* `statistics` and assign it an overridden version of `Statistics`.
*
* '''NOTE''': concrete and/or overridden versions of statistics fields should pay attention to the
* performance of the implementations. The reason is that estimations might get triggered in
* performance-critical processes, such as query plan planning.
*
* Note that we are using a BigInt here since it is easy to overflow a 64-bit integer in
* cardinality estimation (e.g. cartesian joins).
*
* @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
* defaults to the product of children's `sizeInBytes`.
*/
private[sql] case class Statistics(sizeInBytes: BigInt)