From 5ab4846852723d1c3505223e18c41dbf7bc40fa0 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 4 Sep 2015 17:43:35 +0800 Subject: [PATCH 1/4] Support to specify join type when calling join with usingColumns. --- sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 791c10c3d7ce..5f0b0c6977ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -480,10 +480,11 @@ class DataFrame private[sql]( * * @param right Right side of the join operation. * @param usingColumns Names of the columns to join on. This columns must exist on both sides. + * @param joinType One of: (default)`inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`. * @group dfops * @since 1.4.0 */ - def join(right: DataFrame, usingColumns: Seq[String]): DataFrame = { + def join(right: DataFrame, usingColumns: Seq[String], joinType: String = "inner"): DataFrame = { // Analyze the self join. The assumption is that the analyzer will disambiguate left vs right // by creating a new instance for one of the branch. val joined = sqlContext.executePlan( @@ -502,7 +503,7 @@ class DataFrame private[sql]( Join( joined.left, joined.right, - joinType = Inner, + joinType = JoinType(joinType), condition) ) } From 8ff97ede7250e032c88cf20a4e95f3e1e1cd416f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 4 Sep 2015 18:06:35 +0800 Subject: [PATCH 2/4] Add unit test. --- .../org/apache/spark/sql/DataFrameJoinSuite.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index e2716d7841d8..56ad71ea4f48 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -42,6 +42,19 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { Row(1, 2, "1", "2") :: Row(2, 3, "2", "3") :: Row(3, 4, "3", "4") :: Nil) } + test("join - join using multiple columns and specifying join type") { + val df = Seq(1, 2, 3).map(i => (i, i + 1, i.toString)).toDF("int", "int2", "str") + val df2 = Seq(1, 2, 3).map(i => (i, i + 1, (i + 1).toString)).toDF("int", "int2", "str") + + checkAnswer( + df.join(df2, Seq("int", "str"), "left"), + Row(1, 2, "1", null) :: Row(2, 3, "2", null) :: Row(3, 4, "3", null) :: Nil) + + checkAnswer( + df.join(df2, Seq("int", "str"), "right"), + Row(null, null, null, 2) :: Row(null, null, null, 3) :: Row(null, null, null, 4) :: Nil) + } + test("join - join using self join") { val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str") From efe069aabfb3b06f2a9884153bb035022265652f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 4 Sep 2015 22:36:34 +0800 Subject: [PATCH 3/4] Fix python test. --- python/pyspark/sql/dataframe.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index e269ef4304f3..7c4cb15586c0 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -567,7 +567,11 @@ def join(self, other, on=None, how=None): if on is None or len(on) == 0: jdf = self._jdf.join(other._jdf) elif isinstance(on[0], basestring): - jdf = self._jdf.join(other._jdf, self._jseq(on)) + if how is None: + jdf = self._jdf.join(other._jdf, self._jseq(on), "inner") + else: + assert isinstance(how, basestring), "how should be basestring" + jdf = self._jdf.join(other._jdf, self._jseq(on), how) else: assert isinstance(on[0], Column), "on should be Column or list of Column" if len(on) > 1: From e298dad6d7d62a79e001c7d5f73687f33fd093a4 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 5 Sep 2015 20:39:08 +0800 Subject: [PATCH 4/4] Instead of default parameter, add an extra method. --- .../org/apache/spark/sql/DataFrame.scala | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 5f0b0c6977ef..474ff9204082 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -480,11 +480,30 @@ class DataFrame private[sql]( * * @param right Right side of the join operation. * @param usingColumns Names of the columns to join on. This columns must exist on both sides. - * @param joinType One of: (default)`inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`. * @group dfops * @since 1.4.0 */ - def join(right: DataFrame, usingColumns: Seq[String], joinType: String = "inner"): DataFrame = { + def join(right: DataFrame, usingColumns: Seq[String]): DataFrame = { + join(right, usingColumns, "inner") + } + + /** + * Equi-join with another [[DataFrame]] using the given columns. + * + * Different from other join functions, the join columns will only appear once in the output, + * i.e. similar to SQL's `JOIN USING` syntax. + * + * Note that if you perform a self-join using this function without aliasing the input + * [[DataFrame]]s, you will NOT be able to reference any columns after the join, since + * there is no way to disambiguate which side of the join you would like to reference. + * + * @param right Right side of the join operation. + * @param usingColumns Names of the columns to join on. This columns must exist on both sides. + * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`. + * @group dfops + * @since 1.6.0 + */ + def join(right: DataFrame, usingColumns: Seq[String], joinType: String): DataFrame = { // Analyze the self join. The assumption is that the analyzer will disambiguate left vs right // by creating a new instance for one of the branch. val joined = sqlContext.executePlan(