Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,29 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
def resolve(name: String, resolver: Resolver): Option[NamedExpression] =
resolve(name, output, resolver)

def resolveAsTableColumn(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are going to merge this, can you add javadoc for these two methods to explain what they do, and what the differences are?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also they should be private or at least protected.

nameParts: Array[String],
resolver: Resolver,
attribute: Attribute): List[(Attribute, List[String])] = {
if (attribute.qualifiers.find(resolver(_, nameParts.head)).nonEmpty && nameParts.size > 1) {
val remainingParts = nameParts.drop(1)
resolveAsColumn(remainingParts, resolver, attribute)
} else {
Nil
}
}

def resolveAsColumn(
nameParts: Array[String],
resolver: Resolver,
attribute: Attribute): List[(Attribute, List[String])] = {
if (resolver(attribute.name, nameParts.head)) {
(attribute.withName(nameParts.head), nameParts.tail.toList) :: Nil
} else {
Nil
}
}

/** Performs attribute resolution given a name and a sequence of possible attributes. */
protected def resolve(
name: String,
Expand All @@ -136,24 +159,15 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {

val parts = name.split("\\.")

// Collect all attributes that are output by this nodes children where either the first part
// matches the name or where the first part matches the scope and the second part matches the
// name. Return these matches along with any remaining parts, which represent dotted access to
// struct fields.
val options = input.flatMap { option =>
// If the first part of the desired name matches a qualifier for this possible match, drop it.
val remainingParts =
if (option.qualifiers.find(resolver(_, parts.head)).nonEmpty && parts.size > 1) {
parts.drop(1)
} else {
parts
}

if (resolver(option.name, remainingParts.head)) {
// Preserve the case of the user's attribute reference.
(option.withName(remainingParts.head), remainingParts.tail.toList) :: Nil
} else {
Nil
// We will try to resolve this name as `table.column` pattern first.
var options = input.flatMap { option =>
resolveAsTableColumn(parts, resolver, option)
}

// If none of attributes match `table.column` pattern, we try to resolve it as a column.
if(options.isEmpty) {
options = input.flatMap { option =>
resolveAsColumn(parts, resolver, option)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ class HiveResolutionSuite extends HiveComparisonTest {
assert(sql("SELECT nestedArray[0].a FROM nestedRepeatedTest").collect().head(0) === 1)
}

createQueryTest("test ambiguousReferences resolved as hive",
"""
|CREATE TABLE t1(x INT);
|CREATE TABLE t2(a STRUCT<x: INT>, k INT);
|INSERT OVERWRITE TABLE t1 SELECT 1 FROM src LIMIT 1;
|INSERT OVERWRITE TABLE t2 SELECT named_struct("x",1),1 FROM src LIMIT 1;
|SELECT a.x FROM t1 a JOIN t2 b ON a.x = b.k;
""".stripMargin)

/**
* Negative examples. Currently only left here for documentation purposes.
* TODO(marmbrus): Test that catalyst fails on these queries.
Expand Down