Skip to content

Conversation

@sarutak
Copy link
Member

@sarutak sarutak commented Aug 19, 2016

What changes were proposed in this pull request?

When we join two DataFrames which are originated from a same DataFrame, operations to the joined DataFrame can fail.

One reproducible example is as follows.

val df = Seq(
  (1, "a", "A"),
  (2, "b", "B"),
  (3, "c", "C"),
  (4, "d", "D"),
  (5, "e", "E")).toDF("col1", "col2", "col3")
  val filtered = df.filter("col1 != 3").select("col1", "col2")
  val joined = filtered.join(df, filtered("col1") === df("col1"), "inner")
  val selected1 = joined.select(df("col3"))

In this case, AnalysisException is thrown.

Another example is as follows.

val df = Seq(
  (1, "a", "A"),
  (2, "b", "B"),
  (3, "c", "C"),
  (4, "d", "D"),
  (5, "e", "E")).toDF("col1", "col2", "col3")
  val filtered = df.filter("col1 != 3").select("col1", "col2")
  val rightOuterJoined = filtered.join(df, filtered("col1") === df("col1"), "right")
  val selected2 = rightOuterJoined.select(df("col1"))
  selected2.show

In this case, we will expect to get the answer like as follows.

1
2
3
4
5

But the actual result is as follows.

1
2
null
4
5

The cause of the problems in the examples is that the logical plan related to the right side DataFrame and the expressions of its output are re-created in the analyzer (at ResolveReference rule) when a DataFrame has expressions which have a same exprId each other.
Re-created expressions are equally to the original ones except exprId.
This will happen when we do self-join or similar pattern operations.

In the first example, df("col3") returns a Column which includes an expression and the expression have an exprId (say id1 here).
After join, the expresion which the right side DataFrame (df) has is re-created and the old and new expressions are equally but exprId is renewed (say id2 for the new exprId here).
Because of the mismatch of those exprIds, AnalysisException is thrown.

In the second example, df("col1") returns a column and the expression contained in the column is assigned an exprId (say id3).
On the other hand, a column returned by filtered("col1") has an expression which has the same exprId (id3).
After join, the expressions in the right side DataFrame are re-created and the expression assigned id3 is no longer present in the right side but present in the left side.
So, referring df("col1") to the joined DataFrame, we get col1 of right side which includes null.

To resolve this issue, I have introduced LazilyDeterminedAttribute.
It is returned when we refer a column like df("expr") and determines which expression df("expr") should point to lazily.

How was this patch tested?

I added some test cases.

@sarutak sarutak force-pushed the SPARK-17154 branch 2 times, most recently from b3e887c to 05872b7 Compare August 19, 2016 17:01
@SparkQA
Copy link

SparkQA commented Aug 19, 2016

Test build #64081 has finished for PR 14719 at commit b3e887c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class LazilyDeterminedAttribute(

@SparkQA
Copy link

SparkQA commented Aug 19, 2016

Test build #64082 has finished for PR 14719 at commit 05872b7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class LazilyDeterminedAttribute(

@gatorsmile
Copy link
Member

See a related PR by @cloud-fan : #11632

@SparkQA
Copy link

SparkQA commented Aug 20, 2016

Test build #64116 has finished for PR 14719 at commit 91cb915.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 20, 2016

Test build #64126 has finished for PR 14719 at commit dd0ddbc.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sarutak
Copy link
Member Author

sarutak commented Aug 20, 2016

@gatorsmile Thanks for the information. I'll check it.

@sarutak
Copy link
Member Author

sarutak commented Aug 20, 2016

retest this please.

@SparkQA
Copy link

SparkQA commented Aug 20, 2016

Test build #64137 has finished for PR 14719 at commit dd0ddbc.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 20, 2016

Test build #64145 has finished for PR 14719 at commit dd0ddbc.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 20, 2016

Test build #64144 has finished for PR 14719 at commit 8241d3b.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 20, 2016

Test build #64146 has finished for PR 14719 at commit 74eb4aa.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 20, 2016

Test build #64148 has finished for PR 14719 at commit 48a0775.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 20, 2016

Test build #64155 has finished for PR 14719 at commit 9ddc9d8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

It's really a hard problem and we have discussed it many times but can't reach a consensus.

Do you mind sending a design doc first so that it's easy for other people to review and discuss? thanks!

@sarutak
Copy link
Member Author

sarutak commented Aug 21, 2016

@cloud-fan Of course. I'll write a design doc soon.

@SparkQA
Copy link

SparkQA commented Aug 27, 2016

Test build #64538 has finished for PR 14719 at commit b09c0d7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 27, 2016

Test build #64537 has finished for PR 14719 at commit 021977f.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
    • case class UnresolvedAttribute(

@sarutak
Copy link
Member Author

sarutak commented Sep 28, 2016

In the current commit(b778b5d) I tried changing to prohibit direct self-join.

@SparkQA
Copy link

SparkQA commented Sep 28, 2016

Test build #66034 has finished for PR 14719 at commit b778b5d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sarutak
Copy link
Member Author

sarutak commented Sep 28, 2016

I noticed HiveDataFrameJoinSuite expect to support self-join like as follows.

    checkAnswer(
      df.join(df, df("key") === df("Key")),
      Row(1, "1", 1, "1") :: Row(2, "2", 2, "2") :: Nil)

    checkAnswer(
      df.join(df.filter($"value" === "2"), df("key") === df("Key")),
      Row(2, "2", 2, "2") :: Nil)

So I don't change to prohibit self-join on second thought.

@nsyca
Copy link
Contributor

nsyca commented Sep 28, 2016

@sarutak, on the surface, the problem looks like in the Optimization code but in fact, the root cause is the column/ExprId C2#77 from T2 are indistinguishable between the two streams referencing the relation T2, one in the right table of the LEFT JOIN and the other in the IN subquery. This further makes the Optimization rule PushPredicateThroughJoin thinks the expression c2#77 + 1 (from the projection of LEFT JOIN = c2#77 (from the IN subquery converted to Semi-join) is a local predicate over the LEFT JOIN and hence pushes it down below the LEFT JOIN.

My comments in SPARK-17337 on 31/Aug/16 14:42 and 14:43 explain in more details.

@nsyca
Copy link
Contributor

nsyca commented Sep 28, 2016

The test case @sarutak raised here is what I consider the problem of the current code.

df.join(df, df("key") === df("key"))

How do we make a conclusion that the left operand df("key") references the first relation df and the right operans the second relation df? We can't. Arguably, we can treat this predicate as a local predicate in which both operands reference one of the two relations, that is, it could mean any of the three cases below:

  1. df.join(df.as("df2"), df("key") === df2("key")
  2. df.join(df.as("df2"), df("key") === df("key")
  3. df.join(df.as("df2"), df2("key") === df2("key")

Hence this type of statement should yield an ambiguous reference error.

@SparkQA
Copy link

SparkQA commented Sep 28, 2016

Test build #66040 has finished for PR 14719 at commit 437ac99.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 11, 2016

Test build #66747 has finished for PR 14719 at commit 9ad2c85.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 28, 2016

Test build #67692 has finished for PR 14719 at commit 929f2a8.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 28, 2016

Test build #67693 has finished for PR 14719 at commit 15bf529.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@nsyca
Copy link
Contributor

nsyca commented Nov 10, 2016

@cloud-fan, I was studying the ResolveSubquery code for my work on SPARK-17348. I was first puzzle about the code in def rewriteSubQuery

  // Make sure the inner and the outer query attributes do not collide.
  val outputSet = outer.map(_.outputSet).reduce(_ ++ _)
  val duplicates = basePlan.outputSet.intersect(outputSet)
  val (plan, deDuplicatedConditions) = if (duplicates.nonEmpty) {
    val aliasMap = AttributeMap(duplicates.map { dup =>
      dup -> Alias(dup, dup.toString)()
    }.toSeq)
    val aliasedExpressions = basePlan.output.map { ref =>
      aliasMap.getOrElse(ref, ref)
    }      
    val aliasedProjection = Project(aliasedExpressions, basePlan)
    val aliasedConditions = baseConditions.map(_.transform { 
      case ref: Attribute => aliasMap.getOrElse(ref, ref).toAttribute
    })     
    (aliasedProjection, aliasedConditions)
  } else {
    (basePlan, baseConditions)
  }      
  // Remove outer references from the correlated predicates. We wait with extracting
  // these until collisions between the inner and outer query attributes have been
  // solved.
  val conditions = deDuplicatedConditions.map(_.transform {
    case OuterReference(ref) => ref
  })
  (plan, conditions)
}

Until I debugged a SQL that referenced the same table in both the outer table and the table in the subquery that I realized I ran into a similar issue like this one we are trying to fix. I think my proposal of generating a new ExprId for each column will make this piece of code unnecessary.

@SparkQA
Copy link

SparkQA commented Nov 17, 2016

Test build #68750 has finished for PR 14719 at commit e91a24e.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 17, 2016

Test build #68753 has started for PR 14719 at commit 5d1ff3e.

@sarutak
Copy link
Member Author

sarutak commented Nov 17, 2016

retest this please.

@SparkQA
Copy link

SparkQA commented Nov 17, 2016

Test build #68760 has finished for PR 14719 at commit 5d1ff3e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 5, 2017

Test build #70911 has finished for PR 14719 at commit a3f32c4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@nsyca
Copy link
Contributor

nsyca commented Jan 12, 2017

@sarutak would your code be able to solve this ambiguity of df("a") in the join condition?

val df = Seq((1,0), (2,1)).toDF("a","b")
val df1 = df.filter(df("b") > 0)
val result = df.filter(df("a") > 0).join(df1, df("a") === df1("a"), "left").select(df1("a"))

Here is my understanding.

At the first function

df.filter(df("a") > 0) ... [1]

Spark implicitly creates a new Dataset. So when it tries to resolve the column df("a") in the argument of the join, the Dataset named df is not the caller of the join. The Dataset df is actually embedded in the new unnamed Dataset in [1]. So what we need here is a scheme to record where Datasets are embedded in another Dataset.

Taking the above example, we can draw a tree resembling the embedded structure of the Dataset result.

      (result)
       select
         |
        [2]
        join
       /   \
     [1]  (df1)
   filter filter
      |     |
     (df)  (df)

where [1] is the unnamed Dataset mentioned above and [2] is another unnamed Dataset.

df.filter(df("a") > 0).join(df1, df("a") === df1("a"), "left") ... [2]

When we try to resolve df("a") from [1]. There is only one df under [1] so the resolution is not ambiguous. The problem here when we are trying to resolve df("a") in the argument of the join operator, even when we have the embedded tree structure, how do we distinguish the ambiguity of df("a") from under [1] and from under df1?

Your breath-first-search walk may hit the correct df first but it should continue to find the second df of the same level and if it found one, it should raise an exception.

An interesting test scenario to verify this would be the one below:

val result = df1.join(df.filter(df("a") > 0), df("a") === df1("a"), "right").select(df1("a"))

@HyukjinKwon
Copy link
Member

Hi @sarutak, what do you think about ^?

@sarutak
Copy link
Member Author

sarutak commented May 12, 2017

@HyukjinKwon Thanks for pinging me! I still think this issue should be fixed but I didn't notice @nsyca's last comment. I'll consider the problem which he mentioned soon.

@gatorsmile
Copy link
Member

ping @sarutak

@sarutak
Copy link
Member Author

sarutak commented Jun 27, 2017

I found this solution can't resolve this issue in some corner case. I'll close this PR for now and will revise later.

@sarutak sarutak closed this Jun 27, 2017
@sarutak sarutak deleted the SPARK-17154 branch June 4, 2021 20:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants