From b4c8cdb3942965f158bc3445a6ebe207c7c405be Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Wed, 6 Jul 2016 19:48:13 -0400 Subject: [PATCH 1/8] Add AttributeResolver --- .../catalyst/plans/logical/LogicalPlan.scala | 136 +++++++++--------- 1 file changed, 64 insertions(+), 72 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 4984f235b412..40100a432d27 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -137,6 +137,10 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { } } + private[this] lazy val childAttributeResolver = new AttributeResolver(children.flatMap(_.output)) + + private[this] lazy val outputAttributeResolver = new AttributeResolver(output) + /** * Optionally resolves the given strings to a [[NamedExpression]] using the input from all child * nodes of this LogicalPlan. The attribute is expressed as @@ -145,7 +149,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { def resolveChildren( nameParts: Seq[String], resolver: Resolver): Option[NamedExpression] = - resolve(nameParts, children.flatMap(_.output), resolver) + childAttributeResolver.resolve(nameParts, resolver) /** * Optionally resolves the given strings to a [[NamedExpression]] based on the output of this @@ -155,7 +159,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { def resolve( nameParts: Seq[String], resolver: Resolver): Option[NamedExpression] = - resolve(nameParts, output, resolver) + outputAttributeResolver.resolve(nameParts, resolver) /** * Given an attribute name, split it to name parts by dot, but @@ -165,104 +169,92 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { def resolveQuoted( name: String, resolver: Resolver): Option[NamedExpression] = { - resolve(UnresolvedAttribute.parseAttributeName(name), output, resolver) + outputAttributeResolver.resolve(UnresolvedAttribute.parseAttributeName(name), resolver) } +} - /** - * Resolve the given `name` string against the given attribute, returning either 0 or 1 match. - * - * This assumes `name` has multiple parts, where the 1st part is a qualifier - * (i.e. table name, alias, or subquery alias). - * See the comment above `candidates` variable in resolve() for semantics the returned data. - */ - private def resolveAsTableColumn( - nameParts: Seq[String], - resolver: Resolver, - attribute: Attribute): Option[(Attribute, List[String])] = { - assert(nameParts.length > 1) - if (attribute.qualifier.exists(resolver(_, nameParts.head))) { - // At least one qualifier matches. See if remaining parts match. - val remainingParts = nameParts.tail - resolveAsColumn(remainingParts, resolver, attribute) - } else { - None - } +/** + * Helper class for (LogicalPlan) attribute resolution. This class indexes attributes by their + * case-in-sensitive name, and checks potential candidates using the given Resolver. Both qualified + * and direct resolution are supported. + */ +private[catalyst] class AttributeResolver(attributes: Seq[Attribute]) extends Logging { + private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, Seq[Attribute]] = { + m.mapValues(_.distinct).map(identity) } - /** - * Resolve the given `name` string against the given attribute, returning either 0 or 1 match. - * - * Different from resolveAsTableColumn, this assumes `name` does NOT start with a qualifier. - * See the comment above `candidates` variable in resolve() for semantics the returned data. - */ - private def resolveAsColumn( - nameParts: Seq[String], - resolver: Resolver, - attribute: Attribute): Option[(Attribute, List[String])] = { - if (!attribute.isGenerated && resolver(attribute.name, nameParts.head)) { - Option((attribute.withName(nameParts.head), nameParts.tail.toList)) - } else { - None + /** Map to use for direct case insensitive attribute lookups. */ + private val direct: Map[String, Seq[Attribute]] = { + unique(attributes.groupBy(_.name.toLowerCase)) + } + + /** Map to use for qualified case insensitive attribute lookups. */ + private val qualified: Map[(String, String), Seq[Attribute]] = { + val grouped = attributes.filter(_.qualifier.isDefined).groupBy { a => + (a.qualifier.get.toLowerCase, a.name.toLowerCase) } + unique(grouped) } - /** Performs attribute resolution given a name and a sequence of possible attributes. */ - protected def resolve( - nameParts: Seq[String], - input: Seq[Attribute], - resolver: Resolver): Option[NamedExpression] = { + /** Perform attribute resolution given a name and a resolver. */ + def resolve(nameParts: Seq[String], resolver: Resolver): Option[NamedExpression] = { + // Check if the attribute is a match for the given name. + def isMatch(name: String, a: Attribute): Boolean = !a.isGenerated && resolver(a.name, name) - // A sequence of possible candidate matches. - // Each candidate is a tuple. The first element is a resolved attribute, followed by a list - // of parts that are to be resolved. + // Find matches for the given name assuming that the 1st part is a qualifier (i.e. table name, + // alias, or subquery alias) and the 2nd part is the actual name. This returns a tuple of + // matched attributes and a list of parts that are to be resolved. + // // For example, consider an example where "a" is the table name, "b" is the column name, // and "c" is the struct field name, i.e. "a.b.c". In this case, Attribute will be "a.b", // and the second element will be List("c"). - var candidates: Seq[(Attribute, List[String])] = { - // If the name has 2 or more parts, try to resolve it as `table.column` first. - if (nameParts.length > 1) { - input.flatMap { option => - resolveAsTableColumn(nameParts, resolver, option) - } - } else { - Seq.empty - } + val matches = nameParts match { + case qualifier :: name :: nestedFields => + val key = (qualifier.toLowerCase, name.toLowerCase) + val attributes = qualified.get(key).toSeq.flatMap(_.filter { a => + resolver(qualifier, a.qualifier.get) && isMatch(name, a) + }) + (attributes, nestedFields) + case all => + (Nil, all) } // If none of attributes match `table.column` pattern, we try to resolve it as a column. - if (candidates.isEmpty) { - candidates = input.flatMap { candidate => - resolveAsColumn(nameParts, resolver, candidate) - } + val (candidates, nestedFields) = matches match { + case (Nil, _) => + val name = nameParts.head + val attributes = direct.get(name.toLowerCase).toSeq.flatMap(_.filter(isMatch(name, _))) + (attributes, nameParts.tail) + case _ => matches } def name = UnresolvedAttribute(nameParts).name - - candidates.distinct match { - // One match, no nested fields, use it. - case Seq((a, Nil)) => Some(a) - - // One match, but we also need to extract the requested nested field. - case Seq((a, nestedFields)) => + candidates match { + case Seq(a) if nestedFields.nonEmpty => + // One match, but we also need to extract the requested nested field. // The foldLeft adds ExtractValues for every remaining parts of the identifier, // and aliased it with the last part of the name. // For example, consider "a.b.c", where "a" is resolved to an existing attribute. // Then this will add ExtractValue("c", ExtractValue("b", a)), and alias the final // expression as "c". - val fieldExprs = nestedFields.foldLeft(a: Expression)((expr, fieldName) => - ExtractValue(expr, Literal(fieldName), resolver)) + val fieldExprs = nestedFields.foldLeft(a: Expression) { (e, name) => + ExtractValue(e, Literal(name), resolver) + } Some(Alias(fieldExprs, nestedFields.last)()) - // No matches. + case Seq(a) => + // One match, no nested fields, use it. + Some(a) + case Seq() => - logTrace(s"Could not find $name in ${input.mkString(", ")}") + // No matches. + logTrace(s"Could not find $name in ${attributes.mkString(", ")}") None - // More than one match. case ambiguousReferences => - val referenceNames = ambiguousReferences.map(_._1).mkString(", ") - throw new AnalysisException( - s"Reference '$name' is ambiguous, could be: $referenceNames.") + // More than one match. + val referenceNames = ambiguousReferences.mkString(", ") + throw new AnalysisException(s"Reference '$name' is ambiguous, could be: $referenceNames.") } } } From 3e397202878108f9db4cf84f0ff61faee2370798 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Wed, 6 Jul 2016 20:30:02 -0400 Subject: [PATCH 2/8] move refresh back --- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 8dbfbb539c1f..7c7e2e6fb6d0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -171,6 +171,11 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { resolver: Resolver): Option[NamedExpression] = { outputAttributeResolver.resolve(UnresolvedAttribute.parseAttributeName(name), resolver) } + + /** + * Refreshes (or invalidates) any metadata/data cached in the plan recursively. + */ + def refresh(): Unit = children.foreach(_.refresh()) } /** @@ -257,11 +262,6 @@ private[catalyst] class AttributeResolver(attributes: Seq[Attribute]) extends Lo throw new AnalysisException(s"Reference '$name' is ambiguous, could be: $referenceNames.") } } - - /** - * Refreshes (or invalidates) any metadata/data cached in the plan recursively. - */ - def refresh(): Unit = children.foreach(_.refresh()) } /** From c75ae8d892ec46a18342235c39c7002402740b7d Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Thu, 7 Jul 2016 15:21:44 -0400 Subject: [PATCH 3/8] Use Seq-based instead of List-based pattern matching --- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 7c7e2e6fb6d0..4d97da13c8e2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -189,7 +189,7 @@ private[catalyst] class AttributeResolver(attributes: Seq[Attribute]) extends Lo } /** Map to use for direct case insensitive attribute lookups. */ - private val direct: Map[String, Seq[Attribute]] = { + private lazy val direct: Map[String, Seq[Attribute]] = { unique(attributes.groupBy(_.name.toLowerCase)) } @@ -214,7 +214,7 @@ private[catalyst] class AttributeResolver(attributes: Seq[Attribute]) extends Lo // and "c" is the struct field name, i.e. "a.b.c". In this case, Attribute will be "a.b", // and the second element will be List("c"). val matches = nameParts match { - case qualifier :: name :: nestedFields => + case qualifier +: name +: nestedFields => val key = (qualifier.toLowerCase, name.toLowerCase) val attributes = qualified.get(key).toSeq.flatMap(_.filter { a => resolver(qualifier, a.qualifier.get) && isMatch(name, a) @@ -226,7 +226,7 @@ private[catalyst] class AttributeResolver(attributes: Seq[Attribute]) extends Lo // If none of attributes match `table.column` pattern, we try to resolve it as a column. val (candidates, nestedFields) = matches match { - case (Nil, _) => + case (Seq(), _) => val name = nameParts.head val attributes = direct.get(name.toLowerCase).toSeq.flatMap(_.filter(isMatch(name, _))) (attributes, nameParts.tail) From a5d1a4aa7c49efa24e1c3f553958509630fecfcd Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Thu, 7 Jul 2016 23:45:33 -0400 Subject: [PATCH 4/8] Rename attribute after resolution. --- .../catalyst/plans/logical/LogicalPlan.scala | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 4d97da13c8e2..0151def0e117 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -180,8 +180,8 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { /** * Helper class for (LogicalPlan) attribute resolution. This class indexes attributes by their - * case-in-sensitive name, and checks potential candidates using the given Resolver. Both qualified - * and direct resolution are supported. + * case-in-sensitive name, and checks potential candidates using the [[Resolver]] passed to the + * resolve(...) function. Both qualified and direct resolution are supported. */ private[catalyst] class AttributeResolver(attributes: Seq[Attribute]) extends Logging { private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, Seq[Attribute]] = { @@ -203,8 +203,12 @@ private[catalyst] class AttributeResolver(attributes: Seq[Attribute]) extends Lo /** Perform attribute resolution given a name and a resolver. */ def resolve(nameParts: Seq[String], resolver: Resolver): Option[NamedExpression] = { - // Check if the attribute is a match for the given name. - def isMatch(name: String, a: Attribute): Boolean = !a.isGenerated && resolver(a.name, name) + // Collect matching attributes given a name and a lookup. + def collectMatches(name: String, candidates: Option[Seq[Attribute]]): Seq[Attribute] = { + candidates.toSeq.flatMap(_.collect { + case a if !a.isGenerated && resolver(a.name, name) => a.withName(name) + }) + } // Find matches for the given name assuming that the 1st part is a qualifier (i.e. table name, // alias, or subquery alias) and the 2nd part is the actual name. This returns a tuple of @@ -216,9 +220,9 @@ private[catalyst] class AttributeResolver(attributes: Seq[Attribute]) extends Lo val matches = nameParts match { case qualifier +: name +: nestedFields => val key = (qualifier.toLowerCase, name.toLowerCase) - val attributes = qualified.get(key).toSeq.flatMap(_.filter { a => - resolver(qualifier, a.qualifier.get) && isMatch(name, a) - }) + val attributes = collectMatches(name, qualified.get(key)).filter { a => + resolver(qualifier, a.qualifier.get) + } (attributes, nestedFields) case all => (Nil, all) @@ -228,7 +232,7 @@ private[catalyst] class AttributeResolver(attributes: Seq[Attribute]) extends Lo val (candidates, nestedFields) = matches match { case (Seq(), _) => val name = nameParts.head - val attributes = direct.get(name.toLowerCase).toSeq.flatMap(_.filter(isMatch(name, _))) + val attributes = collectMatches(name, direct.get(name.toLowerCase)) (attributes, nameParts.tail) case _ => matches } From d64346bb8a9068ad92924dbd8da5d495f6c3b169 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Sun, 11 Sep 2016 21:42:41 +0200 Subject: [PATCH 5/8] Move resolution into AttribueSeq --- .../sql/catalyst/expressions/package.scala | 84 +++++++++++++++ .../catalyst/plans/logical/LogicalPlan.scala | 100 +----------------- 2 files changed, 89 insertions(+), 95 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala index a6125c61e508..83d3794ed8a3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst import com.google.common.collect.Maps +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{StructField, StructType} @@ -123,6 +125,88 @@ package object expressions { def indexOf(exprId: ExprId): Int = { Option(exprIdToOrdinal.get(exprId)).getOrElse(-1) } + + private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, Seq[Attribute]] = { + m.mapValues(_.distinct).map(identity) + } + + /** Map to use for direct case insensitive attribute lookups. */ + @transient private lazy val direct: Map[String, Seq[Attribute]] = { + unique(attrs.groupBy(_.name.toLowerCase)) + } + + /** Map to use for qualified case insensitive attribute lookups. */ + @transient private val qualified: Map[(String, String), Seq[Attribute]] = { + val grouped = attrs.filter(_.qualifier.isDefined).groupBy { a => + (a.qualifier.get.toLowerCase, a.name.toLowerCase) + } + unique(grouped) + } + + /** Perform attribute resolution given a name and a resolver. */ + def resolve(nameParts: Seq[String], resolver: Resolver): Option[NamedExpression] = { + // Collect matching attributes given a name and a lookup. + def collectMatches(name: String, candidates: Option[Seq[Attribute]]): Seq[Attribute] = { + candidates.toSeq.flatMap(_.collect { + case a if !a.isGenerated && resolver(a.name, name) => a.withName(name) + }) + } + + // Find matches for the given name assuming that the 1st part is a qualifier (i.e. table name, + // alias, or subquery alias) and the 2nd part is the actual name. This returns a tuple of + // matched attributes and a list of parts that are to be resolved. + // + // For example, consider an example where "a" is the table name, "b" is the column name, + // and "c" is the struct field name, i.e. "a.b.c". In this case, Attribute will be "a.b", + // and the second element will be List("c"). + val matches = nameParts match { + case qualifier +: name +: nestedFields => + val key = (qualifier.toLowerCase, name.toLowerCase) + val attributes = collectMatches(name, qualified.get(key)).filter { a => + resolver(qualifier, a.qualifier.get) + } + (attributes, nestedFields) + case all => + (Nil, all) + } + + // If none of attributes match `table.column` pattern, we try to resolve it as a column. + val (candidates, nestedFields) = matches match { + case (Seq(), _) => + val name = nameParts.head + val attributes = collectMatches(name, direct.get(name.toLowerCase)) + (attributes, nameParts.tail) + case _ => matches + } + + def name = UnresolvedAttribute(nameParts).name + candidates match { + case Seq(a) if nestedFields.nonEmpty => + // One match, but we also need to extract the requested nested field. + // The foldLeft adds ExtractValues for every remaining parts of the identifier, + // and aliased it with the last part of the name. + // For example, consider "a.b.c", where "a" is resolved to an existing attribute. + // Then this will add ExtractValue("c", ExtractValue("b", a)), and alias the final + // expression as "c". + val fieldExprs = nestedFields.foldLeft(a: Expression) { (e, name) => + ExtractValue(e, Literal(name), resolver) + } + Some(Alias(fieldExprs, nestedFields.last)()) + + case Seq(a) => + // One match, no nested fields, use it. + Some(a) + + case Seq() => + // No matches. + None + + case ambiguousReferences => + // More than one match. + val referenceNames = ambiguousReferences.mkString(", ") + throw new AnalysisException(s"Reference '$name' is ambiguous, could be: $referenceNames.") + } + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 0151def0e117..853b5f255f15 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -137,9 +137,9 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { } } - private[this] lazy val childAttributeResolver = new AttributeResolver(children.flatMap(_.output)) + private[this] lazy val childAttributes = AttributeSeq(children.flatMap(_.output)) - private[this] lazy val outputAttributeResolver = new AttributeResolver(output) + private[this] lazy val outputAttributes = AttributeSeq(output) /** * Optionally resolves the given strings to a [[NamedExpression]] using the input from all child @@ -149,7 +149,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { def resolveChildren( nameParts: Seq[String], resolver: Resolver): Option[NamedExpression] = - childAttributeResolver.resolve(nameParts, resolver) + childAttributes.resolve(nameParts, resolver) /** * Optionally resolves the given strings to a [[NamedExpression]] based on the output of this @@ -159,7 +159,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { def resolve( nameParts: Seq[String], resolver: Resolver): Option[NamedExpression] = - outputAttributeResolver.resolve(nameParts, resolver) + outputAttributes.resolve(nameParts, resolver) /** * Given an attribute name, split it to name parts by dot, but @@ -169,7 +169,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { def resolveQuoted( name: String, resolver: Resolver): Option[NamedExpression] = { - outputAttributeResolver.resolve(UnresolvedAttribute.parseAttributeName(name), resolver) + outputAttributes.resolve(UnresolvedAttribute.parseAttributeName(name), resolver) } /** @@ -178,96 +178,6 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { def refresh(): Unit = children.foreach(_.refresh()) } -/** - * Helper class for (LogicalPlan) attribute resolution. This class indexes attributes by their - * case-in-sensitive name, and checks potential candidates using the [[Resolver]] passed to the - * resolve(...) function. Both qualified and direct resolution are supported. - */ -private[catalyst] class AttributeResolver(attributes: Seq[Attribute]) extends Logging { - private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, Seq[Attribute]] = { - m.mapValues(_.distinct).map(identity) - } - - /** Map to use for direct case insensitive attribute lookups. */ - private lazy val direct: Map[String, Seq[Attribute]] = { - unique(attributes.groupBy(_.name.toLowerCase)) - } - - /** Map to use for qualified case insensitive attribute lookups. */ - private val qualified: Map[(String, String), Seq[Attribute]] = { - val grouped = attributes.filter(_.qualifier.isDefined).groupBy { a => - (a.qualifier.get.toLowerCase, a.name.toLowerCase) - } - unique(grouped) - } - - /** Perform attribute resolution given a name and a resolver. */ - def resolve(nameParts: Seq[String], resolver: Resolver): Option[NamedExpression] = { - // Collect matching attributes given a name and a lookup. - def collectMatches(name: String, candidates: Option[Seq[Attribute]]): Seq[Attribute] = { - candidates.toSeq.flatMap(_.collect { - case a if !a.isGenerated && resolver(a.name, name) => a.withName(name) - }) - } - - // Find matches for the given name assuming that the 1st part is a qualifier (i.e. table name, - // alias, or subquery alias) and the 2nd part is the actual name. This returns a tuple of - // matched attributes and a list of parts that are to be resolved. - // - // For example, consider an example where "a" is the table name, "b" is the column name, - // and "c" is the struct field name, i.e. "a.b.c". In this case, Attribute will be "a.b", - // and the second element will be List("c"). - val matches = nameParts match { - case qualifier +: name +: nestedFields => - val key = (qualifier.toLowerCase, name.toLowerCase) - val attributes = collectMatches(name, qualified.get(key)).filter { a => - resolver(qualifier, a.qualifier.get) - } - (attributes, nestedFields) - case all => - (Nil, all) - } - - // If none of attributes match `table.column` pattern, we try to resolve it as a column. - val (candidates, nestedFields) = matches match { - case (Seq(), _) => - val name = nameParts.head - val attributes = collectMatches(name, direct.get(name.toLowerCase)) - (attributes, nameParts.tail) - case _ => matches - } - - def name = UnresolvedAttribute(nameParts).name - candidates match { - case Seq(a) if nestedFields.nonEmpty => - // One match, but we also need to extract the requested nested field. - // The foldLeft adds ExtractValues for every remaining parts of the identifier, - // and aliased it with the last part of the name. - // For example, consider "a.b.c", where "a" is resolved to an existing attribute. - // Then this will add ExtractValue("c", ExtractValue("b", a)), and alias the final - // expression as "c". - val fieldExprs = nestedFields.foldLeft(a: Expression) { (e, name) => - ExtractValue(e, Literal(name), resolver) - } - Some(Alias(fieldExprs, nestedFields.last)()) - - case Seq(a) => - // One match, no nested fields, use it. - Some(a) - - case Seq() => - // No matches. - logTrace(s"Could not find $name in ${attributes.mkString(", ")}") - None - - case ambiguousReferences => - // More than one match. - val referenceNames = ambiguousReferences.mkString(", ") - throw new AnalysisException(s"Reference '$name' is ambiguous, could be: $referenceNames.") - } - } -} - /** * A logical plan node with no children. */ From eb6dd8033c532f7bcf103ee6096b9dedd91176fd Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Fri, 20 Apr 2018 17:18:57 +0200 Subject: [PATCH 6/8] Fix compilation --- .../org/apache/spark/sql/catalyst/expressions/package.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala index f83087499134..ab2d517931c8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala @@ -163,7 +163,7 @@ package object expressions { // Collect matching attributes given a name and a lookup. def collectMatches(name: String, candidates: Option[Seq[Attribute]]): Seq[Attribute] = { candidates.toSeq.flatMap(_.collect { - case a if !a.isGenerated && resolver(a.name, name) => a.withName(name) + case a if resolver(a.name, name) => a.withName(name) }) } From ac4abcd00b4f94d720c82d555f6703385983c500 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Sat, 5 May 2018 15:54:22 +0200 Subject: [PATCH 7/8] Fix UT --- .../spark/sql/catalyst/expressions/package.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala index ab2d517931c8..78f8eb7085db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala @@ -17,8 +17,9 @@ package org.apache.spark.sql.catalyst -import com.google.common.collect.Maps +import java.util.Locale +import com.google.common.collect.Maps import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute} import org.apache.spark.sql.catalyst.expressions._ @@ -147,13 +148,13 @@ package object expressions { /** Map to use for direct case insensitive attribute lookups. */ @transient private lazy val direct: Map[String, Seq[Attribute]] = { - unique(attrs.groupBy(_.name.toLowerCase)) + unique(attrs.groupBy(_.name.toLowerCase(Locale.ROOT))) } /** Map to use for qualified case insensitive attribute lookups. */ @transient private val qualified: Map[(String, String), Seq[Attribute]] = { val grouped = attrs.filter(_.qualifier.isDefined).groupBy { a => - (a.qualifier.get.toLowerCase, a.name.toLowerCase) + (a.qualifier.get.toLowerCase(Locale.ROOT), a.name.toLowerCase(Locale.ROOT)) } unique(grouped) } @@ -176,7 +177,7 @@ package object expressions { // and the second element will be List("c"). val matches = nameParts match { case qualifier +: name +: nestedFields => - val key = (qualifier.toLowerCase, name.toLowerCase) + val key = (qualifier.toLowerCase(Locale.ROOT), name.toLowerCase(Locale.ROOT)) val attributes = collectMatches(name, qualified.get(key)).filter { a => resolver(qualifier, a.qualifier.get) } @@ -189,7 +190,7 @@ package object expressions { val (candidates, nestedFields) = matches match { case (Seq(), _) => val name = nameParts.head - val attributes = collectMatches(name, direct.get(name.toLowerCase)) + val attributes = collectMatches(name, direct.get(name.toLowerCase(Locale.ROOT))) (attributes, nameParts.tail) case _ => matches } @@ -218,7 +219,7 @@ package object expressions { case ambiguousReferences => // More than one match. - val referenceNames = ambiguousReferences.mkString(", ") + val referenceNames = ambiguousReferences.map(_.qualifiedName).mkString(", ") throw new AnalysisException(s"Reference '$name' is ambiguous, could be: $referenceNames.") } } From cbc164db9d05c8c4fc90573710776c592f95ed8a Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Sat, 5 May 2018 16:05:00 +0200 Subject: [PATCH 8/8] Appease scalastyle --- .../org/apache/spark/sql/catalyst/expressions/package.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala index 78f8eb7085db..8a06daa37132 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst import java.util.Locale import com.google.common.collect.Maps + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute} import org.apache.spark.sql.catalyst.expressions._