Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,24 @@ sealed class Metadata private[types] (private[types] val map: Map[String, Any])
/** Gets a Metadata array. */
def getMetadataArray(key: String): Array[Metadata] = get(key)

/** Return a copy with the keys removed */
def withKeysRemoved(keysToRemove: Seq[String]): Metadata = {
if (keysToRemove.isEmpty) {
this
} else {
new Metadata(this.map -- keysToRemove)
}
}

/** Return a copy with a key removed */
def withKeyRemoved(keyToRemove: String): Metadata = {
if (map.contains(keyToRemove)) {
new Metadata(map - keyToRemove)
} else {
this
}
}

/** Converts to its JSON representation. */
def json: String = compact(render(jsonValue))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ object TableOutputResolver extends SQLConfHelper with Logging {
val canWriteExpr = canWrite(
tableName, valueType, colType, byName = true, conf, addError, colPath)
if (canWriteExpr) {
applyColumnMetadata(checkNullability(value, col, conf, colPath), col)
val nullsHandled = checkNullability(value, col, conf, colPath)
applyColumnMetadata(nullsHandled, col)
} else {
value
}
Expand Down Expand Up @@ -222,12 +223,29 @@ object TableOutputResolver extends SQLConfHelper with Logging {
val requiredMetadata = CharVarcharUtils.cleanMetadata(column.metadata)

// Make sure that the result has the requiredMetadata and only that.
// If the expr is an Attribute or NamedLambdaVariable with the proper name and metadata,
// it should remain stable, but we do not trust that other NamedAttributes will
// remain stable (namely Alias).
//
// If the expr is a NamedLambdaVariable, it must be from our handling of structured
// array or map fields; the Alias will be added on the outer structured value.
//
// Even an Attribute with the proper name and metadata is not enough to prevent
// source query metadata leaking to the Write after rewrites, ie:
// case a: Attribute if a.name == column.name && a.metadata == requiredMetadata => a
//
// The problem is that an Attribute can be replaced by what it refers to, for example:
// Project AttrRef(metadata={}, exprId=2)
// Project Alias(
// cast(AttrRef(metadata={source_field_default_value}, exprId=1) as same_type),
// exprId=2,
// explicitMetadata=None) -- metadata.isEmpty
// gets rewritten to:
// Project Alias(
// AttrRef(metadata={source_field_default_value}, exprId=1),
// exprId=2,
// explicitMetadata=None) -- metadata.nonEmpty !!
//
// So we always add an Alias(expr, name, explicitMetadata = Some(requiredMetadata))
// to prevent expr from leaking the source query metadata into the Write.
expr match {
case a: Attribute if a.name == column.name && a.metadata == requiredMetadata =>
a
case v: NamedLambdaVariable if v.name == column.name && v.metadata == requiredMetadata =>
v
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.analysis.resolver
import org.apache.spark.sql.catalyst.analysis.{AliasResolution, UnresolvedAlias}
import org.apache.spark.sql.catalyst.expressions.{
Alias,
AliasHelper,
Expression,
NamedExpression,
OuterReference
Expand All @@ -31,7 +32,8 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
*/
class AliasResolver(expressionResolver: ExpressionResolver)
extends TreeNodeResolver[UnresolvedAlias, Expression]
with ResolvesExpressionChildren {
with ResolvesExpressionChildren
with AliasHelper {
private val scopes = expressionResolver.getNameScopes
private val expressionResolutionContextStack =
expressionResolver.getExpressionResolutionContextStack
Expand Down Expand Up @@ -115,30 +117,19 @@ class AliasResolver(expressionResolver: ExpressionResolver)
*
* Project[
* Alias("alias_2")(
* Alias("alias_1")(id)
* )
* Alias("alias_1")(id1)
* )(id2)
* ]( ... )
*
* and after the `collapseAlias` call (removing the bottom one) it would be:
*
* Project[
* Alias("alias_2")(id)
* Alias("alias_2")(id2)
* ]( ... )
*/
private def collapseAlias(alias: Alias): Alias =
alias.child match {
case innerAlias: Alias =>
val metadata = if (alias.metadata.isEmpty) {
None
} else {
Some(alias.metadata)
}
alias.copy(child = innerAlias.child)(
exprId = alias.exprId,
qualifier = alias.qualifier,
explicitMetadata = metadata,
nonInheritableMetadataKeys = alias.nonInheritableMetadataKeys
)
case _: Alias => mergeAliases(alias)
case _ => alias
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalyst.expressions

import scala.annotation.tailrec

import org.apache.spark.sql.catalyst.analysis.MultiAlias
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Project}
Expand Down Expand Up @@ -68,8 +70,8 @@ trait AliasHelper {
* but keep the name of the outermost attribute.
*/
protected def replaceAliasButKeepName(
expr: NamedExpression,
aliasMap: AttributeMap[Alias]): NamedExpression = {
expr: NamedExpression,
aliasMap: AttributeMap[Alias]): NamedExpression = {
expr match {
// We need to keep the `Alias` if we replace a top-level Attribute, so that it's still a
// `NamedExpression`. We also need to keep the name of the original Attribute.
Expand All @@ -90,29 +92,82 @@ trait AliasHelper {
case a: Alias if a.metadata != Metadata.empty => a
case other => trimAliases(other)
}
case a @ Alias(child, _) => trimAliases(child)
case Alias(child, _) => trimAliases(child)
case MultiAlias(child, _) => trimAliases(child)
case other => other.mapChildren(trimAliases)
}

protected def trimNonTopLevelAliases[T <: Expression](e: T): T = {
val res = CurrentOrigin.withOrigin(e.origin) {
e match {
case a: Alias =>
// Preserve the _effective_ metadata.
a.copy(child = trimAliases(a.child))(
exprId = a.exprId,
qualifier = a.qualifier,
explicitMetadata = Some(a.metadata),
nonInheritableMetadataKeys = Nil)
case a: MultiAlias =>
a.copy(child = trimAliases(a.child))
case a: Alias => mergeAndTrimAliases(a)
case a: MultiAlias => a.copy(child = trimAliases(a.child))
case other => trimAliases(other)
}
}

res.copyTagsFrom(e)

res.asInstanceOf[T]
}

/**
* Merge any stack of aliases under the top-level alias, and then
* drops any aliases deeper in the expression tree.
* So Alias1(Alias2(Alias3(Foo(Alias4(x))))) becomes
* Alias5(Foo(x))
* where Alias5 preserves the metadata of Alias{1,2,3}
* and the name and exprId of Alias1.
* Alias4 is simply removed.
*/
@tailrec
protected final def mergeAndTrimAliases(alias: Alias): Alias = {
alias.child match {
case _: Alias => mergeAndTrimAliases(mergeAliases(alias))
case other => alias.withNewChild(trimAliases(other))
}
}

/**
* Merge an Alias(Alias(x)) into Alias(x) preserving metadata.
*
* If the outer alias has explicit metadata,
* it is preserved.
* Else if the inner alias has explicit metadata,
* the result has explicit outer.metadata.
* Else both are deriving the metadata.
* the result is deriving metadata,
* with the union of noninheritable keys.
*
* @param alias An Alias with a child Alias, Alias(Alias(x))
* @return The merged alias, Alias(x)
*/
protected final def mergeAliases(alias: Alias): Alias = {
val child = alias.child.asInstanceOf[Alias]
var explicitMetadata = alias.explicitMetadata
var nonInheritableMetadataKeys = alias.nonInheritableMetadataKeys

if (explicitMetadata.isDefined) {
// Outer alias is explicit; we can ignore inner metadata.
// The outer nonInheritableMetadataKeys are irrelevant.
nonInheritableMetadataKeys = Nil
} else if (child.explicitMetadata.isDefined) {
// Inner alias is explicit; remove any outer non-inherits.
// We don't need nonInheritableMetadataKeys anymore.
explicitMetadata = Some(alias.metadata)
nonInheritableMetadataKeys = Nil
} else {
// Both are deriving. Union the nonInheritableMetadataKeys
val nonInheritSet = nonInheritableMetadataKeys.toSet
nonInheritableMetadataKeys = nonInheritableMetadataKeys ++
child.nonInheritableMetadataKeys.filterNot(nonInheritSet)
}
val res = CurrentOrigin.withOrigin(alias.origin) {
alias.copy(child = child.child)(
exprId = alias.exprId,
qualifier = alias.qualifier,
explicitMetadata = explicitMetadata,
nonInheritableMetadataKeys = nonInheritableMetadataKeys)
}
res.copyTagsFrom(alias)
res
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,11 @@ case class Alias(child: Expression, name: String)(
s"${child.sql} AS $qualifierPrefix${quoteIfNeeded(name)}"
}

// Copying this alias with a new child expression.
def withNewChild(newChild: Expression): Alias = {
withNewChildInternal(newChild)
}

override protected def withNewChildInternal(newChild: Expression): Alias =
copy(child = newChild)(exprId, qualifier, explicitMetadata, nonInheritableMetadataKeys)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.trees.AlwaysProcess
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.catalyst.util.AUTO_GENERATED_ALIAS
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -619,11 +620,28 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
// If the alias name is different from attribute name, we can't strip it either, or we
// may accidentally change the output schema name of the root plan.
case a @ Alias(attr: Attribute, name)
if (a.metadata == attr.metadata) &&
name == attr.name &&
!excludeList.contains(attr) &&
!excludeList.contains(a) =>
attr
if !excludeList.contains(attr) &&
!excludeList.contains(a) &&
name == attr.name =>

val metadata = a.metadata
var attrMetadata = attr.metadata
if (metadata == attrMetadata) {
// The alias is truly redundant, remove it.
attr
} else if (attr.metadata.contains(AUTO_GENERATED_ALIAS)) {
attrMetadata = attr.metadata.withKeyRemoved(AUTO_GENERATED_ALIAS)
if (metadata == attrMetadata) {
// The AUTO_GENERATED_ALIAS is not propagating to a view, so it is ok to remove it.
// With that key removed, the alias is now redundant, remove it.
attr.withMetadata(metadata)
} else {
a
}
} else {
a
}

case a => a
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest {
ArrayType(new StructType().add("y", "int").add("x", "byte")),
hasTransform = true)

withSQLConf("spark.sql.preserveCharVarcharTypeInfo" -> "true") {
withSQLConf(SQLConf.PRESERVE_CHAR_VARCHAR_TYPE_INFO.key -> "true") {
// exact match on VARCHAR does not need transform
assertArrayField(ArrayType(VarcharType(7)), ArrayType(VarcharType(7)), hasTransform = false)
// VARCHAR length increase could avoid transform
Expand Down Expand Up @@ -512,7 +512,8 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest {
val y = query.output.last

val parsedPlan = byName(table, query)
val expectedPlan = byName(table, Project(Seq(Alias(X, "x")(), y), query))
val expectedPlan = byName(table,
Project(Seq(Alias(X, "x")(), Alias(y, y.name)()), query))

assertNotResolved(parsedPlan)
checkAnalysis(parsedPlan, expectedPlan, caseSensitive = false)
Expand All @@ -529,7 +530,8 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest {
val x = query.output.last

val parsedPlan = byName(table, query)
val expectedPlan = byName(table, Project(Seq(x, y), query))
val expectedPlan = byName(table,
Project(Seq(Alias(x, x.name)(), Alias(y, y.name)()), query))

assertNotResolved(parsedPlan)
checkAnalysis(parsedPlan, expectedPlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,15 @@ WITH s AS (SELECT 43 AS col)
INSERT INTO cte_tbl SELECT * FROM S
-- !query analysis
InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl], Append, `spark_catalog`.`default`.`cte_tbl`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl), [col]
+- WithCTE
:- CTERelationDef xxxx, false
: +- SubqueryAlias s
: +- Project [43 AS col#x]
: +- OneRowRelation
+- Project [col#x]
+- SubqueryAlias S
+- CTERelationRef xxxx, true, [col#x], false, false, 1, false
+- Project [col#x AS col#x]
+- WithCTE
:- CTERelationDef xxxx, false
: +- SubqueryAlias s
: +- Project [43 AS col#x]
: +- OneRowRelation
+- Project [col#x]
+- SubqueryAlias S
+- CTERelationRef xxxx, true, [col#x], false, false, 1, false


-- !query
Expand All @@ -79,14 +80,15 @@ Project [col#x]
INSERT INTO cte_tbl WITH s AS (SELECT 44 AS col) SELECT * FROM s
-- !query analysis
InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl], Append, `spark_catalog`.`default`.`cte_tbl`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl), [col]
+- WithCTE
:- CTERelationDef xxxx, false
: +- SubqueryAlias s
: +- Project [44 AS col#x]
: +- OneRowRelation
+- Project [col#x]
+- SubqueryAlias s
+- CTERelationRef xxxx, true, [col#x], false, false, 1, false
+- Project [col#x AS col#x]
+- WithCTE
:- CTERelationDef xxxx, false
: +- SubqueryAlias s
: +- Project [44 AS col#x]
: +- OneRowRelation
+- Project [col#x]
+- SubqueryAlias s
+- CTERelationRef xxxx, true, [col#x], false, false, 1, false


-- !query
Expand All @@ -111,15 +113,17 @@ INSERT INTO cte_tbl2 SELECT col
-- !query analysis
Union false, false
:- InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl], Append, `spark_catalog`.`default`.`cte_tbl`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl), [col]
: +- Project [col#x]
: +- SubqueryAlias s
: +- Project [45 AS col#x]
: +- OneRowRelation
: +- Project [col#x AS col#x]
: +- Project [col#x]
: +- SubqueryAlias s
: +- Project [45 AS col#x]
: +- OneRowRelation
+- InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl2, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl2], Append, `spark_catalog`.`default`.`cte_tbl2`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl2), [col]
+- Project [col#x]
+- SubqueryAlias s
+- Project [45 AS col#x]
+- OneRowRelation
+- Project [col#x AS col#x]
+- Project [col#x]
+- SubqueryAlias s
+- Project [45 AS col#x]
+- OneRowRelation


-- !query
Expand Down
Loading