From 21b0eebed24130f5db854ad833612b63fa5606a5 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Tue, 24 Sep 2019 17:38:14 +0100 Subject: [PATCH 1/9] code review fixes --- python/pyspark/sql/__init__.py | 3 ++- python/pyspark/sql/cogroup.py | 8 ++++---- .../sql/tests/test_pandas_udf_cogrouped_map.py | 12 +++--------- .../pyspark/sql/tests/test_pandas_udf_grouped_map.py | 11 +++-------- .../apache/spark/sql/RelationalGroupedDataset.scala | 4 ++-- 5 files changed, 14 insertions(+), 24 deletions(-) diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py index 9c760e3527be4..ba4c4feec75c8 100644 --- a/python/pyspark/sql/__init__.py +++ b/python/pyspark/sql/__init__.py @@ -51,11 +51,12 @@ from pyspark.sql.group import GroupedData from pyspark.sql.readwriter import DataFrameReader, DataFrameWriter from pyspark.sql.window import Window, WindowSpec +from pyspark.sql.cogroup import CoGroupedData __all__ = [ 'SparkSession', 'SQLContext', 'UDFRegistration', 'DataFrame', 'GroupedData', 'Column', 'Catalog', 'Row', 'DataFrameNaFunctions', 'DataFrameStatFunctions', 'Window', 'WindowSpec', - 'DataFrameReader', 'DataFrameWriter' + 'DataFrameReader', 'DataFrameWriter', 'CoGroupedData' ] diff --git a/python/pyspark/sql/cogroup.py b/python/pyspark/sql/cogroup.py index 9b725e4bafe79..fba83af56adb2 100644 --- a/python/pyspark/sql/cogroup.py +++ b/python/pyspark/sql/cogroup.py @@ -61,11 +61,11 @@ def apply(self, udf): >>> from pyspark.sql.functions import pandas_udf, PandasUDFType >>> df1 = spark.createDataFrame( - ... [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)], - ... ("time", "id", "v1")) + ... [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)], + ... ("time", "id", "v1")) >>> df2 = spark.createDataFrame( - ... [(20000101, 1, "x"), (20000101, 2, "y")], - ... ("time", "id", "v2")) + ... [(20000101, 1, "x"), (20000101, 2, "y")], + ... ("time", "id", "v2")) >>> @pandas_udf("time int, id int, v1 double, v2 string", PandasUDFType.COGROUPED_MAP) ... def asof_join(l, r): ... return pd.merge_asof(l, r, on="time", by="id") diff --git a/python/pyspark/sql/tests/test_pandas_udf_cogrouped_map.py b/python/pyspark/sql/tests/test_pandas_udf_cogrouped_map.py index 7f3f7fa3168a7..17bafe35fdea0 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_cogrouped_map.py +++ b/python/pyspark/sql/tests/test_pandas_udf_cogrouped_map.py @@ -32,15 +32,9 @@ import pyarrow as pa -""" -Tests below use pd.DataFrame.assign that will infer mixed types (unicode/str) for column names -from kwargs w/ Python 2, so need to set check_column_type=False and avoid this check -""" -if sys.version < '3': - _check_column_type = False -else: - _check_column_type = True - +# Tests below use pd.DataFrame.assign that will infer mixed types (unicode/str) for column names +# From kwargs w/ Python 2, so need to set check_column_type=False and avoid this check +_check_column_type = sys.version >= '3' @unittest.skipIf( not have_pandas or not have_pyarrow, diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py index adbe2d103ade0..8918d5ac0cdda 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py +++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py @@ -37,14 +37,9 @@ import pyarrow as pa -""" -Tests below use pd.DataFrame.assign that will infer mixed types (unicode/str) for column names -from kwargs w/ Python 2, so need to set check_column_type=False and avoid this check -""" -if sys.version < '3': - _check_column_type = False -else: - _check_column_type = True +# Tests below use pd.DataFrame.assign that will infer mixed types (unicode/str) for column names +# from kwargs w/ Python 2, so need to set check_column_type=False and avoid this check +_check_column_type = sys.version >= '3' @unittest.skipIf( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index f6d13be0e89be..6e909156bd52c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -47,8 +47,8 @@ import org.apache.spark.sql.types.{NumericType, StructType} */ @Stable class RelationalGroupedDataset protected[sql]( - val df: DataFrame, - val groupingExprs: Seq[Expression], + private [sql] val df: DataFrame, + private [sql] val groupingExprs: Seq[Expression], groupType: RelationalGroupedDataset.GroupType) { private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = { From 93af701e3b9ee3eb56e20afe14272d0cde5551b9 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Tue, 24 Sep 2019 20:43:03 +0100 Subject: [PATCH 2/9] added doc for cogroup function including key --- python/pyspark/sql/cogroup.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/cogroup.py b/python/pyspark/sql/cogroup.py index fba83af56adb2..f469f7fc0c24b 100644 --- a/python/pyspark/sql/cogroup.py +++ b/python/pyspark/sql/cogroup.py @@ -43,7 +43,11 @@ def apply(self, udf): as a `DataFrame`. The user-defined function should take two `pandas.DataFrame` and return another - `pandas.DataFrame`. For each side of the cogroup, all columns are passed together + `pandas.DataFrame`. Alternatively, a user-defined function which additionally takes + a Tuple can be provided, in which case the cogroup key will be passed in as the Tuple + parameter. + + For each side of the cogroup, all columns are passed together as a `pandas.DataFrame` to the user-function and the returned `pandas.DataFrame` are combined as a :class:`DataFrame`. @@ -78,6 +82,19 @@ def apply(self, udf): |20000101| 2|2.0| y| |20000102| 2|4.0| y| +--------+---+---+---+ + >>> @pandas_udf("time int, id int, v1 double, v2 string", PandasUDFType.COGROUPED_MAP) + ... def asof_join(k, l, r): + ... if k == (1,): + ... return pd.merge_asof(l, r, on="time", by="id") + ... else: + ... return pd.DataFrame(columns=['time', 'id', 'v1', 'v2']) + >>> df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show() + +--------+---+---+---+ + | time| id| v1| v2| + +--------+---+---+---+ + |20000101| 1|1.0| x| + |20000102| 1|3.0| x| + +--------+---+---+---+ .. seealso:: :meth:`pyspark.sql.functions.pandas_udf` From d1a60ce2e6936033a12dbb65cd109ef21415701d Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Tue, 24 Sep 2019 21:00:11 +0100 Subject: [PATCH 3/9] added doctest for cogroup --- python/pyspark/sql/cogroup.py | 33 +++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/cogroup.py b/python/pyspark/sql/cogroup.py index f469f7fc0c24b..f24d325a38996 100644 --- a/python/pyspark/sql/cogroup.py +++ b/python/pyspark/sql/cogroup.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import sys from pyspark import since from pyspark.rdd import PythonEvalType @@ -70,10 +71,10 @@ def apply(self, udf): >>> df2 = spark.createDataFrame( ... [(20000101, 1, "x"), (20000101, 2, "y")], ... ("time", "id", "v2")) - >>> @pandas_udf("time int, id int, v1 double, v2 string", PandasUDFType.COGROUPED_MAP) + >>> @pandas_udf("time int, id int, v1 double, v2 string", PandasUDFType.COGROUPED_MAP) # doctest: +SKIP ... def asof_join(l, r): ... return pd.merge_asof(l, r, on="time", by="id") - >>> df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show() + >>> df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show() # doctest: +SKIP +--------+---+---+---+ | time| id| v1| v2| +--------+---+---+---+ @@ -82,13 +83,13 @@ def apply(self, udf): |20000101| 2|2.0| y| |20000102| 2|4.0| y| +--------+---+---+---+ - >>> @pandas_udf("time int, id int, v1 double, v2 string", PandasUDFType.COGROUPED_MAP) + >>> @pandas_udf("time int, id int, v1 double, v2 string", PandasUDFType.COGROUPED_MAP) # doctest: +SKIP ... def asof_join(k, l, r): ... if k == (1,): ... return pd.merge_asof(l, r, on="time", by="id") ... else: ... return pd.DataFrame(columns=['time', 'id', 'v1', 'v2']) - >>> df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show() + >>> df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show() # doctest: +SKIP +--------+---+---+---+ | time| id| v1| v2| +--------+---+---+---+ @@ -113,3 +114,27 @@ def apply(self, udf): def _extract_cols(gd): df = gd._df return [df[col] for col in df.columns] + + +def _test(): + import doctest + from pyspark.sql import SparkSession + import pyspark.sql.cogroup + globs = pyspark.sql.cogroup.__dict__.copy() + spark = SparkSession.builder\ + .master("local[4]")\ + .appName("sql.cogroup tests")\ + .getOrCreate() + sc = spark.sparkContext + globs['sc'] = sc + globs['spark'] = spark + (failure_count, test_count) = doctest.testmod( + pyspark.sql.cogroup, globs=globs, + optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF) + spark.stop() + if failure_count: + sys.exit(-1) + + +if __name__ == "__main__": + _test() From becf4ac1fdb03fbbd8169fc943a0283e6214b183 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Thu, 26 Sep 2019 09:26:02 +0100 Subject: [PATCH 4/9] corrected line wrapping --- python/pyspark/sql/cogroup.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/cogroup.py b/python/pyspark/sql/cogroup.py index f24d325a38996..b9fc23bebf83e 100644 --- a/python/pyspark/sql/cogroup.py +++ b/python/pyspark/sql/cogroup.py @@ -48,9 +48,9 @@ def apply(self, udf): a Tuple can be provided, in which case the cogroup key will be passed in as the Tuple parameter. - For each side of the cogroup, all columns are passed together - as a `pandas.DataFrame` to the user-function and the returned `pandas.DataFrame` - are combined as a :class:`DataFrame`. + For each side of the cogroup, all columns are passed together as a `pandas.DataFrame` + to the user-function and the returned `pandas.DataFrame` are combined as a + :class:`DataFrame`. The returned `pandas.DataFrame` can be of arbitrary length and its schema must match the returnType of the pandas udf. From 81e5aedf4c057969eb94dcecc2764be5d08fe1c3 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Thu, 26 Sep 2019 09:52:42 +0100 Subject: [PATCH 5/9] fixed python line length --- python/pyspark/sql/cogroup.py | 12 +++++++----- .../apache/spark/sql/RelationalGroupedDataset.scala | 4 ++-- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/cogroup.py b/python/pyspark/sql/cogroup.py index b9fc23bebf83e..9a4942b6c8fce 100644 --- a/python/pyspark/sql/cogroup.py +++ b/python/pyspark/sql/cogroup.py @@ -71,7 +71,8 @@ def apply(self, udf): >>> df2 = spark.createDataFrame( ... [(20000101, 1, "x"), (20000101, 2, "y")], ... ("time", "id", "v2")) - >>> @pandas_udf("time int, id int, v1 double, v2 string", PandasUDFType.COGROUPED_MAP) # doctest: +SKIP + >>> @pandas_udf("time int, id int, v1 double, v2 string", + ... PandasUDFType.COGROUPED_MAP) # doctest: +SKIP ... def asof_join(l, r): ... return pd.merge_asof(l, r, on="time", by="id") >>> df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show() # doctest: +SKIP @@ -83,12 +84,13 @@ def apply(self, udf): |20000101| 2|2.0| y| |20000102| 2|4.0| y| +--------+---+---+---+ - >>> @pandas_udf("time int, id int, v1 double, v2 string", PandasUDFType.COGROUPED_MAP) # doctest: +SKIP + >>> @pandas_udf("time int, id int, v1 double, v2 string", + ... PandasUDFType.COGROUPED_MAP) # doctest: +SKIP ... def asof_join(k, l, r): - ... if k == (1,): + ... if k == (1,): ... return pd.merge_asof(l, r, on="time", by="id") - ... else: - ... return pd.DataFrame(columns=['time', 'id', 'v1', 'v2']) + ... else: + ... return pd.DataFrame(columns=['time', 'id', 'v1', 'v2']) >>> df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show() # doctest: +SKIP +--------+---+---+---+ | time| id| v1| v2| diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index 6e909156bd52c..4d4731870700c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -47,8 +47,8 @@ import org.apache.spark.sql.types.{NumericType, StructType} */ @Stable class RelationalGroupedDataset protected[sql]( - private [sql] val df: DataFrame, - private [sql] val groupingExprs: Seq[Expression], + private[sql] val df: DataFrame, + private[sql] val groupingExprs: Seq[Expression], groupType: RelationalGroupedDataset.GroupType) { private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = { From 7405e3a60824bf7a6eed2f010f9bc03b41b18068 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Thu, 26 Sep 2019 15:21:37 +0100 Subject: [PATCH 6/9] fix pycodestyle failure --- python/pyspark/sql/tests/test_pandas_udf_cogrouped_map.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/sql/tests/test_pandas_udf_cogrouped_map.py b/python/pyspark/sql/tests/test_pandas_udf_cogrouped_map.py index 17bafe35fdea0..bc2265fc5fe19 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_cogrouped_map.py +++ b/python/pyspark/sql/tests/test_pandas_udf_cogrouped_map.py @@ -36,6 +36,7 @@ # From kwargs w/ Python 2, so need to set check_column_type=False and avoid this check _check_column_type = sys.version >= '3' + @unittest.skipIf( not have_pandas or not have_pyarrow, pandas_requirement_message or pyarrow_requirement_message) From 7e02ea33cc3fb178a8be1b949d983030d7548df6 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Thu, 26 Sep 2019 22:06:05 +0100 Subject: [PATCH 7/9] improved cogroup doc --- python/pyspark/sql/cogroup.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/cogroup.py b/python/pyspark/sql/cogroup.py index 9a4942b6c8fce..a92c4a02cdec7 100644 --- a/python/pyspark/sql/cogroup.py +++ b/python/pyspark/sql/cogroup.py @@ -44,13 +44,9 @@ def apply(self, udf): as a `DataFrame`. The user-defined function should take two `pandas.DataFrame` and return another - `pandas.DataFrame`. Alternatively, a user-defined function which additionally takes - a Tuple can be provided, in which case the cogroup key will be passed in as the Tuple - parameter. - - For each side of the cogroup, all columns are passed together as a `pandas.DataFrame` - to the user-function and the returned `pandas.DataFrame` are combined as a - :class:`DataFrame`. + `pandas.DataFrame`. For each side of the cogroup, all columns are passed together as a + `pandas.DataFrame` to the user-function and the returned `pandas.DataFrame` are combined as + a :class:`DataFrame`. The returned `pandas.DataFrame` can be of arbitrary length and its schema must match the returnType of the pandas udf. @@ -84,6 +80,13 @@ def apply(self, udf): |20000101| 2|2.0| y| |20000102| 2|4.0| y| +--------+---+---+---+ + + Alternatively, the user can define a function that takes three arguments. In this case, + the grouping key(s) will be passed as the first argument and the data will be passed as the + second and third arguments. The grouping key(s) will be passed as a tuple of numpy data + types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in as two + `pandas.DataFrame`s containing all columns from the original Spark DataFrames. + >>> @pandas_udf("time int, id int, v1 double, v2 string", ... PandasUDFType.COGROUPED_MAP) # doctest: +SKIP ... def asof_join(k, l, r): From 84dd277d943b37fb0daeded68b6088bbadb9c610 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Thu, 26 Sep 2019 22:31:38 +0100 Subject: [PATCH 8/9] fix sphinx error --- python/pyspark/sql/cogroup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/cogroup.py b/python/pyspark/sql/cogroup.py index a92c4a02cdec7..bb4e45fff9fec 100644 --- a/python/pyspark/sql/cogroup.py +++ b/python/pyspark/sql/cogroup.py @@ -85,7 +85,7 @@ def apply(self, udf): the grouping key(s) will be passed as the first argument and the data will be passed as the second and third arguments. The grouping key(s) will be passed as a tuple of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in as two - `pandas.DataFrame`s containing all columns from the original Spark DataFrames. + `pandas.DataFrame` containing all columns from the original Spark DataFrames. >>> @pandas_udf("time int, id int, v1 double, v2 string", ... PandasUDFType.COGROUPED_MAP) # doctest: +SKIP From 4cdb2fa27d51f56808d5d929ce98310451f73550 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Sun, 29 Sep 2019 21:32:21 +0100 Subject: [PATCH 9/9] removed unnecessary stuff from doctest --- python/pyspark/sql/cogroup.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/pyspark/sql/cogroup.py b/python/pyspark/sql/cogroup.py index bb4e45fff9fec..ef87e703bce14 100644 --- a/python/pyspark/sql/cogroup.py +++ b/python/pyspark/sql/cogroup.py @@ -83,7 +83,7 @@ def apply(self, udf): Alternatively, the user can define a function that takes three arguments. In this case, the grouping key(s) will be passed as the first argument and the data will be passed as the - second and third arguments. The grouping key(s) will be passed as a tuple of numpy data + second and third arguments. The grouping key(s) will be passed as a tuple of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in as two `pandas.DataFrame` containing all columns from the original Spark DataFrames. @@ -130,8 +130,6 @@ def _test(): .master("local[4]")\ .appName("sql.cogroup tests")\ .getOrCreate() - sc = spark.sparkContext - globs['sc'] = sc globs['spark'] = spark (failure_count, test_count) = doctest.testmod( pyspark.sql.cogroup, globs=globs,