Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,18 @@
],
"sqlState" : "54006"
},
"EXCEPT_NESTED_COLUMN_INVALID_TYPE" : {
"message" : [
"EXCEPT column <columnName> was resolved and expected to be StructType, but found type <dataType>."
],
"sqlState" : "428H2"
},
"EXCEPT_OVERLAPPING_COLUMNS" : {
"message" : [
"Columns in an EXCEPT list must be distinct and non-overlapping, but got (<columns>)."
],
"sqlState" : "42702"
},
"EXPECT_PERMANENT_VIEW_NOT_TEMP" : {
"message" : [
"'<operation>' expects a permanent view but <viewName> is a temp view."
Expand Down
12 changes: 12 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,18 @@ The event time `<eventName>` has the invalid type `<eventType>`, but expected "T

Exceeds char/varchar type length limitation: `<limit>`.

### EXCEPT_NESTED_COLUMN_INVALID_TYPE

[SQLSTATE: 428H2](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

EXCEPT column `<columnName>` was resolved and expected to be StructType, but found type `<dataType>`.

### EXCEPT_OVERLAPPING_COLUMNS

[SQLSTATE: 42702](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Columns in an EXCEPT list must be distinct and non-overlapping, but got (`<columns>`).

### EXPECT_PERMANENT_VIEW_NOT_TEMP

[SQLSTATE: 42809](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,10 @@ notMatchedBySourceAction
| UPDATE SET assignmentList
;

exceptClause
: EXCEPT LEFT_PAREN exceptCols=multipartIdentifierList RIGHT_PAREN
;

assignmentList
: assignment (COMMA assignment)*
;
Expand Down Expand Up @@ -969,8 +973,8 @@ primaryExpression
| LAST LEFT_PAREN expression (IGNORE NULLS)? RIGHT_PAREN #last
| POSITION LEFT_PAREN substr=valueExpression IN str=valueExpression RIGHT_PAREN #position
| constant #constantDefault
| ASTERISK #star
| qualifiedName DOT ASTERISK #star
| ASTERISK exceptClause? #star
| qualifiedName DOT ASTERISK exceptClause? #star
| LEFT_PAREN namedExpression (COMMA namedExpression)+ RIGHT_PAREN #rowConstructor
| LEFT_PAREN query RIGHT_PAREN #subqueryExpression
| functionName LEFT_PAREN (setQuantifier? argument+=functionArgument
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -358,20 +359,23 @@ abstract class Star extends LeafExpression with NamedExpression {
def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression]
}


/**
* Represents all of the input attributes to a given relational operator, for example in
* "SELECT * FROM ...".
* "SELECT * FROM ..." or "SELECT * EXCEPT(...) FROM ..."
*
* This is also used to expand structs. For example:
* "SELECT record.* from (SELECT struct(a,b,c) as record ...)
*
* @param target an optional name that should be the target of the expansion. If omitted all
* targets' columns are produced. This can either be a table name or struct name. This
* is a list of identifiers that is the path of the expansion.
*/
case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevaluable {

*
* This class provides the shared behavior between the classes for SELECT * ([[UnresolvedStar]])
* and SELECT * EXCEPT ([[UnresolvedStarExcept]]). [[UnresolvedStar]] is just a case class of this,
* while [[UnresolvedStarExcept]] adds some additional logic to the expand method.
*/
abstract class UnresolvedStarBase(target: Option[Seq[String]]) extends Star with Unevaluable {
/**
* Returns true if the nameParts is a subset of the last elements of qualifier of the attribute.
*
Expand All @@ -383,7 +387,7 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu
* - `SELECT t.* FROM ns1.ns2.t` where nameParts is Seq("t") and
* qualifier is Seq("ns1", "ns2", "t").
*/
private def matchedQualifier(
protected def matchedQualifier(
attribute: Attribute,
nameParts: Seq[String],
resolver: Resolver): Boolean = {
Expand Down Expand Up @@ -444,6 +448,148 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu
override def toString: String = target.map(_.mkString("", ".", ".")).getOrElse("") + "*"
}

/**
* Represents some of the input attributes to a given relational operator, for example in
* "SELECT * EXCEPT(a) FROM ...".
*
* @param target an optional name that should be the target of the expansion. If omitted all
* targets' columns are produced. This can only be a table name. This
* is a list of identifiers that is the path of the expansion.
*
* @param excepts a list of names that should be excluded from the expansion.
*
*/
case class UnresolvedStarExcept(target: Option[Seq[String]], excepts: Seq[Seq[String]])
extends UnresolvedStarBase(target) {

/**
* We expand the * EXCEPT by the following three steps:
* 1. use the original .expand() to get top-level column list or struct expansion
* 2. resolve excepts (with respect to the Seq[NamedExpression] returned from (1))
* 3. filter the expanded columns with the resolved except list. recursively apply filtering in
* case of nested columns in the except list (in order to rewrite structs)
*/
override def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = {
// Use the UnresolvedStarBase expand method to get a seq of NamedExpressions corresponding to
// the star expansion. This will yield a list of top-level columns from the logical plan's
// output, or in the case of struct expansion (e.g. target=`x` for SELECT x.*) it will give
// a seq of Alias wrapping the struct field extraction.
val expandedCols = super.expand(input, resolver)

// resolve except list with respect to the expandedCols
val resolvedExcepts = excepts.map { exceptParts =>
AttributeSeq(expandedCols.map(_.toAttribute)).resolve(exceptParts, resolver).getOrElse {
val orderedCandidates = StringUtils.orderSuggestedIdentifiersBySimilarity(
UnresolvedAttribute(exceptParts).name, expandedCols.map(a => a.qualifier :+ a.name))
// if target is defined and expandedCols does not include any Attributes, it must be struct
// expansion; give message suggesting to use unqualified names of nested fields.
throw QueryCompilationErrors
.unresolvedColumnError(UnresolvedAttribute(exceptParts).name, orderedCandidates)
}
}

// Convert each resolved except into a pair of (col: Attribute, nestedColumn) representing the
// top level column in expandedCols that we must 'filter' based on nestedColumn.
@scala.annotation.tailrec
def getRootColumn(expr: Expression, nestedColumn: Seq[String] = Nil)
: (NamedExpression, Seq[String]) = expr match {
case GetStructField(fieldExpr, _, Some(fieldName)) =>
getRootColumn(fieldExpr, fieldName +: nestedColumn)
case e: NamedExpression => e -> nestedColumn
case other: ExtractValue => throw new AnalysisException(
errorClass = "EXCEPT_NESTED_COLUMN_INVALID_TYPE",
messageParameters = Map("columnName" -> other.sql, "dataType" -> other.dataType.toString))
}
// An exceptPair represents the column in expandedCols along with the path of a nestedColumn
// that should be except-ed. Consider two examples:
// 1. excepting the entire col1 = (col1, Seq())
// 2. excepting a nested field in col2, col2.a.b = (col2, Seq(a, b))
// INVARIANT: we rely on the structure of the resolved except being an Alias of GetStructField
// in the case of nested columns.
val exceptPairs = resolvedExcepts.map {
case Alias(exceptExpr, name) => getRootColumn(exceptExpr)
case except: NamedExpression => except -> Seq.empty
}

// Filter columns which correspond to ones listed in the except list and return a new list of
// columns which exclude the columns in the except list. The 'filtering' manifests as either
// dropping the column from the list of columns we return, or rewriting the projected column in
// order to remove excepts that refer to nested columns. For example, with the example above:
// excepts = Seq(
// (col1, Seq()), => filter col1 from the output
// (col2, Seq(a, b)) => rewrite col2 in the output so that it doesn't include the nested field
// ) corresponding to col2.a.b
//
// This occurs in two steps:
// 1. group the excepts by the column they refer to (groupedExcepts)
// 2. filter/rewrite input columns based on four cases:
// a. column doesn't match any groupedExcepts => column unchanged
// b. column exists in groupedExcepts and:
// i. none of remainingExcepts are empty => recursively apply filterColumns over the
// struct fields in order to rewrite the struct
// ii. a remainingExcept is empty, but there are multiple remainingExcepts => we must
// have duplicate/overlapping excepts - throw an error
// iii. [otherwise] remainingExcept is exactly Seq(Seq()) => this is the base 'filtering'
// case. we omit the column from the output (this is a column we would like to
// except). NOTE: this case isn't explicitly listed in the `collect` below since we
// 'collect' columns which match the cases above and omit ones that fall into this
// remaining case.
def filterColumns(columns: Seq[NamedExpression], excepts: Seq[(NamedExpression, Seq[String])])
: Seq[NamedExpression] = {
// group the except pairs by the column they refer to. NOTE: no groupMap until scala 2.13
val groupedExcepts: AttributeMap[Seq[Seq[String]]] =
AttributeMap(excepts.groupBy(_._1.toAttribute).view.mapValues(v => v.map(_._2)))

// map input columns while searching for the except entry corresponding to the current column
columns.map(col => col -> groupedExcepts.get(col.toAttribute)).collect {
// pass through columns that don't match anything in groupedExcepts
case (col, None) => col
// found a match but nestedExcepts has remaining excepts - recurse to rewrite the struct
case (col, Some(nestedExcepts)) if nestedExcepts.forall(_.nonEmpty) =>
val fields = col.dataType match {
case s: StructType => s.fields
// we shouldn't be here since we EXCEPT_NEXTED_COLUMN_INVALID_TYPE in getRootColumn
// for this case - just throw internal error
case _ => throw SparkException.internalError("Invalid column type")
}
val extractedFields = fields.zipWithIndex.map { case (f, i) =>
Alias(GetStructField(col, i), f.name)()
}
val newExcepts = nestedExcepts.map { nestedExcept =>
// INVARIANT: we cannot have duplicate column names in nested columns, thus, this `head`
// will find the one and only column corresponding to the correct extractedField.
extractedFields.collectFirst { case col if resolver(col.name, nestedExcept.head) =>
col.toAttribute -> nestedExcept.tail
}.get
}
Alias(CreateStruct(filterColumns(extractedFields.toSeq, newExcepts)), col.name)()
// if there are multiple nestedExcepts but one is empty we must have overlapping except
// columns. throw an error.
case (col, Some(nestedExcepts)) if nestedExcepts.size > 1 =>
throw new AnalysisException(
errorClass = "EXCEPT_OVERLAPPING_COLUMNS",
messageParameters = Map(
"columns" -> this.excepts.map(_.mkString(".")).mkString(", ")))
}
}

filterColumns(expandedCols, exceptPairs)
}
}

/**
* Represents all of the input attributes to a given relational operator, for example in
* "SELECT * FROM ...".
*
* This is also used to expand structs. For example:
* "SELECT record.* from (SELECT struct(a,b,c) as record ...)
*
* @param target an optional name that should be the target of the expansion. If omitted all
* targets' columns are produced. This can either be a table name or struct name. This
* is a list of identifiers that is the path of the expansion.
*/
case class UnresolvedStar(target: Option[Seq[String]]) extends UnresolvedStarBase(target)

/**
* Represents all of the input attributes to a given relational operator, for example in
* "SELECT `(id)?+.+` FROM ...".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1786,7 +1786,27 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
* Both un-targeted (global) and targeted aliases are supported.
*/
override def visitStar(ctx: StarContext): Expression = withOrigin(ctx) {
UnresolvedStar(Option(ctx.qualifiedName()).map(_.identifier.asScala.map(_.getText).toSeq))
var target = Option(ctx.qualifiedName()).map(_.identifier.asScala.map(_.getText).toSeq)

if (ctx.exceptClause != null) {
visitStarExcept(ctx, target)
}
else {
UnresolvedStar(target)
}
}

/**
* Create a star-except (i.e. all - except list) expression; this selects all elements in the
* specified object except those in the except list.
* Both un-targeted (global) and targeted aliases are supported.
*/
def visitStarExcept(ctx: StarContext, target: Option[Seq[String]]): Expression = withOrigin(ctx) {
val exceptCols = ctx.exceptClause
.exceptCols.multipartIdentifier.asScala.map(typedVisit[Seq[String]])
UnresolvedStarExcept(
target,
exceptCols.toSeq)
}

/**
Expand Down
Loading