From 77d0b90dbddaafecf0d0e3011c7a900fed6badb7 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 9 Jul 2024 09:51:42 +0800 Subject: [PATCH 1/8] fix --- python/pyspark/sql/functions/builtin.py | 58 +++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/python/pyspark/sql/functions/builtin.py b/python/pyspark/sql/functions/builtin.py index c24b9e4378a6..65da716433cb 100644 --- a/python/pyspark/sql/functions/builtin.py +++ b/python/pyspark/sql/functions/builtin.py @@ -1286,6 +1286,11 @@ def max_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column: A column object representing the value from `col` that is associated with the maximum value from `ord`. + Notes + ----- + The function is non-deterministic when the maximum value from `ord` is accociated + with multiple values from `col`. + Examples -------- Example 1: Using `max_by` with groupBy @@ -1336,6 +1341,30 @@ def max_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column: | Consult| Henry| | Finance| George| +----------+---------------------------+ + + Example 4: Non-deterministic when the the maximum 'v' is accociated with multiple 'id' + >>> import pyspark.sql.functions as sf + >>> df = spark.range(1000).withColumn("v", sf.lit(1)) + >>> df.repartition(9).select(sf.max_by("id", "v")).show() # doctest: +SKIP + +-------------+ + |max_by(id, v)| + +-------------+ + | 933| + +-------------+ + + >>> df.repartition(10).select(sf.max_by("id", "v")).show() # doctest: +SKIP + +-------------+ + |max_by(id, v)| + +-------------+ + | 935| + +-------------+ + + >>> df.repartition(11).select(sf.max_by("id", "v")).show() # doctest: +SKIP + +-------------+ + |max_by(id, v)| + +-------------+ + | 918| + +-------------+ """ return _invoke_function_over_columns("max_by", col, ord) @@ -1367,6 +1396,11 @@ def min_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column: Column object that represents the value from `col` associated with the minimum value from `ord`. + Notes + ----- + The function is non-deterministic when the minimum value from `ord` is accociated + with multiple values from `col`. + Examples -------- Example 1: Using `min_by` with groupBy: @@ -1417,6 +1451,30 @@ def min_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column: | Consult| Eva| | Finance| Frank| +----------+---------------------------+ + + Example 4: Non-deterministic when the the minimum 'v' is accociated with multiple 'id' + >>> import pyspark.sql.functions as sf + >>> df = spark.range(1000).withColumn("v", sf.lit(1)) + >>> df.repartition(9).select(sf.min_by("id", "v")).show() # doctest: +SKIP + +-------------+ + |min_by(id, v)| + +-------------+ + | 933| + +-------------+ + + >>> df.repartition(10).select(sf.min_by("id", "v")).show() # doctest: +SKIP + +-------------+ + |min_by(id, v)| + +-------------+ + | 935| + +-------------+ + + >>> df.repartition(11).select(sf.min_by("id", "v")).show() # doctest: +SKIP + +-------------+ + |min_by(id, v)| + +-------------+ + | 918| + +-------------+ """ return _invoke_function_over_columns("min_by", col, ord) From 8de0170090b78d4d392d6d2ce3d4ec0443b3da7e Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 9 Jul 2024 09:52:14 +0800 Subject: [PATCH 2/8] fix --- python/pyspark/sql/functions/builtin.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/functions/builtin.py b/python/pyspark/sql/functions/builtin.py index 65da716433cb..97593e85ce5b 100644 --- a/python/pyspark/sql/functions/builtin.py +++ b/python/pyspark/sql/functions/builtin.py @@ -1342,7 +1342,7 @@ def max_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column: | Finance| George| +----------+---------------------------+ - Example 4: Non-deterministic when the the maximum 'v' is accociated with multiple 'id' + Example 4: Non-deterministic when the maximum 'v' is accociated with multiple 'id' >>> import pyspark.sql.functions as sf >>> df = spark.range(1000).withColumn("v", sf.lit(1)) >>> df.repartition(9).select(sf.max_by("id", "v")).show() # doctest: +SKIP @@ -1452,7 +1452,7 @@ def min_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column: | Finance| Frank| +----------+---------------------------+ - Example 4: Non-deterministic when the the minimum 'v' is accociated with multiple 'id' + Example 4: Non-deterministic when the minimum 'v' is accociated with multiple 'id' >>> import pyspark.sql.functions as sf >>> df = spark.range(1000).withColumn("v", sf.lit(1)) >>> df.repartition(9).select(sf.min_by("id", "v")).show() # doctest: +SKIP From d0daf313666dc38c33448544fae94fc11d97655c Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 9 Jul 2024 12:59:10 +0800 Subject: [PATCH 3/8] fix --- python/pyspark/sql/functions/builtin.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/pyspark/sql/functions/builtin.py b/python/pyspark/sql/functions/builtin.py index 97593e85ce5b..389db9f975cd 100644 --- a/python/pyspark/sql/functions/builtin.py +++ b/python/pyspark/sql/functions/builtin.py @@ -1343,6 +1343,7 @@ def max_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column: +----------+---------------------------+ Example 4: Non-deterministic when the maximum 'v' is accociated with multiple 'id' + >>> import pyspark.sql.functions as sf >>> df = spark.range(1000).withColumn("v", sf.lit(1)) >>> df.repartition(9).select(sf.max_by("id", "v")).show() # doctest: +SKIP @@ -1453,6 +1454,7 @@ def min_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column: +----------+---------------------------+ Example 4: Non-deterministic when the minimum 'v' is accociated with multiple 'id' + >>> import pyspark.sql.functions as sf >>> df = spark.range(1000).withColumn("v", sf.lit(1)) >>> df.repartition(9).select(sf.min_by("id", "v")).show() # doctest: +SKIP From b4634c2d7aaceb3ee2dfdbba9a903ed256c56403 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Wed, 10 Jul 2024 15:00:45 +0800 Subject: [PATCH 4/8] address comments --- R/pkg/R/functions.R | 6 ++ .../org/apache/spark/sql/functions.scala | 6 ++ python/pyspark/sql/functions/builtin.py | 70 +++---------------- .../org/apache/spark/sql/functions.scala | 6 ++ 4 files changed, 28 insertions(+), 60 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index a7e337d3f9af..5c0e40298ff0 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -1558,6 +1558,9 @@ setMethod("max", #' @details #' \code{max_by}: Returns the value associated with the maximum value of ord. #' +#' Note: the function is non-deterministic when the maximum value from `y` is associated +#' with multiple values from `x`. +#' #' @rdname column_aggregate_functions #' @aliases max_by max_by,Column-method #' @note max_by since 3.3.0 @@ -1633,6 +1636,9 @@ setMethod("min", #' @details #' \code{min_by}: Returns the value associated with the minimum value of ord. #' +#' Note: the function is non-deterministic when the minimum value from `y` is associated +#' with multiple values from `x`. +#' #' @rdname column_aggregate_functions #' @aliases min_by min_by,Column-method #' @note min_by since 3.3.0 diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala index eae239a25589..6f5c61458755 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala @@ -884,6 +884,9 @@ object functions { /** * Aggregate function: returns the value associated with the maximum value of ord. * + * @note The function is non-deterministic when the maximum value from `ord` is associated + * with multiple values from `e`. + * * @group agg_funcs * @since 3.4.0 */ @@ -932,6 +935,9 @@ object functions { /** * Aggregate function: returns the value associated with the minimum value of ord. * + * @note The function is non-deterministic when the minimum value from `ord` is associated + * with multiple values from `e`. + * * @group agg_funcs * @since 3.4.0 */ diff --git a/python/pyspark/sql/functions/builtin.py b/python/pyspark/sql/functions/builtin.py index 389db9f975cd..b1c7ae7e4aab 100644 --- a/python/pyspark/sql/functions/builtin.py +++ b/python/pyspark/sql/functions/builtin.py @@ -1271,6 +1271,11 @@ def max_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column: .. versionchanged:: 3.4.0 Supports Spark Connect. + Notes + ----- + The function is non-deterministic when the maximum value from `ord` is associated + with multiple values from `col`. + Parameters ---------- col : :class:`~pyspark.sql.Column` or str @@ -1286,11 +1291,6 @@ def max_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column: A column object representing the value from `col` that is associated with the maximum value from `ord`. - Notes - ----- - The function is non-deterministic when the maximum value from `ord` is accociated - with multiple values from `col`. - Examples -------- Example 1: Using `max_by` with groupBy @@ -1341,31 +1341,6 @@ def max_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column: | Consult| Henry| | Finance| George| +----------+---------------------------+ - - Example 4: Non-deterministic when the maximum 'v' is accociated with multiple 'id' - - >>> import pyspark.sql.functions as sf - >>> df = spark.range(1000).withColumn("v", sf.lit(1)) - >>> df.repartition(9).select(sf.max_by("id", "v")).show() # doctest: +SKIP - +-------------+ - |max_by(id, v)| - +-------------+ - | 933| - +-------------+ - - >>> df.repartition(10).select(sf.max_by("id", "v")).show() # doctest: +SKIP - +-------------+ - |max_by(id, v)| - +-------------+ - | 935| - +-------------+ - - >>> df.repartition(11).select(sf.max_by("id", "v")).show() # doctest: +SKIP - +-------------+ - |max_by(id, v)| - +-------------+ - | 918| - +-------------+ """ return _invoke_function_over_columns("max_by", col, ord) @@ -1382,6 +1357,11 @@ def min_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column: .. versionchanged:: 3.4.0 Supports Spark Connect. + Notes + ----- + The function is non-deterministic when the minimum value from `ord` is associated + with multiple values from `col`. + Parameters ---------- col : :class:`~pyspark.sql.Column` or str @@ -1397,11 +1377,6 @@ def min_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column: Column object that represents the value from `col` associated with the minimum value from `ord`. - Notes - ----- - The function is non-deterministic when the minimum value from `ord` is accociated - with multiple values from `col`. - Examples -------- Example 1: Using `min_by` with groupBy: @@ -1452,31 +1427,6 @@ def min_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column: | Consult| Eva| | Finance| Frank| +----------+---------------------------+ - - Example 4: Non-deterministic when the minimum 'v' is accociated with multiple 'id' - - >>> import pyspark.sql.functions as sf - >>> df = spark.range(1000).withColumn("v", sf.lit(1)) - >>> df.repartition(9).select(sf.min_by("id", "v")).show() # doctest: +SKIP - +-------------+ - |min_by(id, v)| - +-------------+ - | 933| - +-------------+ - - >>> df.repartition(10).select(sf.min_by("id", "v")).show() # doctest: +SKIP - +-------------+ - |min_by(id, v)| - +-------------+ - | 935| - +-------------+ - - >>> df.repartition(11).select(sf.min_by("id", "v")).show() # doctest: +SKIP - +-------------+ - |min_by(id, v)| - +-------------+ - | 918| - +-------------+ """ return _invoke_function_over_columns("min_by", col, ord) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 882918eb78c7..fd0a99867585 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -902,6 +902,9 @@ object functions { /** * Aggregate function: returns the value associated with the maximum value of ord. * + * @note The function is non-deterministic when the maximum value from `ord` is associated + * with multiple values from `e`. + * * @group agg_funcs * @since 3.3.0 */ @@ -952,6 +955,9 @@ object functions { /** * Aggregate function: returns the value associated with the minimum value of ord. * + * @note The function is non-deterministic when the minimum value from `ord` is associated + * with multiple values from `e`. + * * @group agg_funcs * @since 3.3.0 */ From 2e802635d2ba667d4908c02a03738b7b034d6253 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Wed, 10 Jul 2024 15:03:24 +0800 Subject: [PATCH 5/8] address comments --- .../catalyst/expressions/aggregate/MaxByAndMinBy.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MaxByAndMinBy.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MaxByAndMinBy.scala index 56941c9de451..45f5f8048779 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MaxByAndMinBy.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MaxByAndMinBy.scala @@ -99,6 +99,10 @@ abstract class MaxMinBy extends DeclarativeAggregate with BinaryLike[Expression] > SELECT _FUNC_(x, y) FROM VALUES ('a', 10), ('b', 50), ('c', 20) AS tab(x, y); b """, + note = """ + The function is non-deterministic when the maximum value from `y` is associated + with multiple values from `x`. + """, group = "agg_funcs", since = "3.0.0") case class MaxBy(valueExpr: Expression, orderingExpr: Expression) extends MaxMinBy { @@ -122,6 +126,10 @@ case class MaxBy(valueExpr: Expression, orderingExpr: Expression) extends MaxMin > SELECT _FUNC_(x, y) FROM VALUES ('a', 10), ('b', 50), ('c', 20) AS tab(x, y); a """, + note = """ + The function is non-deterministic when the minimum value from `y` is associated + with multiple values from `x`. + """, group = "agg_funcs", since = "3.0.0") case class MinBy(valueExpr: Expression, orderingExpr: Expression) extends MaxMinBy { From 56127665faa97190a4dd6d41729c239530d8e399 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Wed, 10 Jul 2024 16:20:03 +0800 Subject: [PATCH 6/8] fix scala lint --- .../main/scala/org/apache/spark/sql/functions.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala index 6f5c61458755..9b1f19afff86 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala @@ -884,8 +884,9 @@ object functions { /** * Aggregate function: returns the value associated with the maximum value of ord. * - * @note The function is non-deterministic when the maximum value from `ord` is associated - * with multiple values from `e`. + * @note + * The function is non-deterministic when the maximum value from `ord` is associated with + * multiple values from `e`. * * @group agg_funcs * @since 3.4.0 @@ -935,8 +936,9 @@ object functions { /** * Aggregate function: returns the value associated with the minimum value of ord. * - * @note The function is non-deterministic when the minimum value from `ord` is associated - * with multiple values from `e`. + * @note + * The function is non-deterministic when the minimum value from `ord` is associated with + * multiple values from `e`. * * @group agg_funcs * @since 3.4.0 From f61f5e2f0f0c2988b5c80b74a1d67455dcc08e3a Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Thu, 11 Jul 2024 12:27:02 +0800 Subject: [PATCH 7/8] address comments --- R/pkg/R/functions.R | 8 ++++---- .../src/main/scala/org/apache/spark/sql/functions.scala | 8 ++++---- python/pyspark/sql/functions/builtin.py | 8 ++++---- .../catalyst/expressions/aggregate/MaxByAndMinBy.scala | 8 ++++---- .../src/main/scala/org/apache/spark/sql/functions.scala | 8 ++++---- 5 files changed, 20 insertions(+), 20 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 5c0e40298ff0..b91124f96a6f 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -1558,8 +1558,8 @@ setMethod("max", #' @details #' \code{max_by}: Returns the value associated with the maximum value of ord. #' -#' Note: the function is non-deterministic when the maximum value from `y` is associated -#' with multiple values from `x`. +#' Note: The function is non-deterministic so the output order can be different +#' for those associated the same values of `x`. #' #' @rdname column_aggregate_functions #' @aliases max_by max_by,Column-method @@ -1636,8 +1636,8 @@ setMethod("min", #' @details #' \code{min_by}: Returns the value associated with the minimum value of ord. #' -#' Note: the function is non-deterministic when the minimum value from `y` is associated -#' with multiple values from `x`. +#' Note: The function is non-deterministic so the output order can be different +#' for those associated the same values of `x`. #' #' @rdname column_aggregate_functions #' @aliases min_by min_by,Column-method diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala index 9b1f19afff86..43fb9055ab95 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala @@ -885,8 +885,8 @@ object functions { * Aggregate function: returns the value associated with the maximum value of ord. * * @note - * The function is non-deterministic when the maximum value from `ord` is associated with - * multiple values from `e`. + * The function is non-deterministic so the output order can be different for + * those associated the same values of `e`. * * @group agg_funcs * @since 3.4.0 @@ -937,8 +937,8 @@ object functions { * Aggregate function: returns the value associated with the minimum value of ord. * * @note - * The function is non-deterministic when the minimum value from `ord` is associated with - * multiple values from `e`. + * The function is non-deterministic so the output order can be different for + * those associated the same values of `e`. * * @group agg_funcs * @since 3.4.0 diff --git a/python/pyspark/sql/functions/builtin.py b/python/pyspark/sql/functions/builtin.py index b1c7ae7e4aab..f89c8b2e64a3 100644 --- a/python/pyspark/sql/functions/builtin.py +++ b/python/pyspark/sql/functions/builtin.py @@ -1273,8 +1273,8 @@ def max_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column: Notes ----- - The function is non-deterministic when the maximum value from `ord` is associated - with multiple values from `col`. + The function is non-deterministic so the output order can be different for those + associated the same values of `col`. Parameters ---------- @@ -1359,8 +1359,8 @@ def min_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column: Notes ----- - The function is non-deterministic when the minimum value from `ord` is associated - with multiple values from `col`. + The function is non-deterministic so the output order can be different for those + associated the same values of `col`. Parameters ---------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MaxByAndMinBy.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MaxByAndMinBy.scala index 45f5f8048779..b33142ed29cc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MaxByAndMinBy.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MaxByAndMinBy.scala @@ -100,8 +100,8 @@ abstract class MaxMinBy extends DeclarativeAggregate with BinaryLike[Expression] b """, note = """ - The function is non-deterministic when the maximum value from `y` is associated - with multiple values from `x`. + The function is non-deterministic so the output order can be different for + those associated the same values of `x`. """, group = "agg_funcs", since = "3.0.0") @@ -127,8 +127,8 @@ case class MaxBy(valueExpr: Expression, orderingExpr: Expression) extends MaxMin a """, note = """ - The function is non-deterministic when the minimum value from `y` is associated - with multiple values from `x`. + The function is non-deterministic so the output order can be different for + those associated the same values of `x`. """, group = "agg_funcs", since = "3.0.0") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index fd0a99867585..5b4d27fc65d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -902,8 +902,8 @@ object functions { /** * Aggregate function: returns the value associated with the maximum value of ord. * - * @note The function is non-deterministic when the maximum value from `ord` is associated - * with multiple values from `e`. + * @note The function is non-deterministic so the output order can be different for + * those associated the same values of `e`. * * @group agg_funcs * @since 3.3.0 @@ -955,8 +955,8 @@ object functions { /** * Aggregate function: returns the value associated with the minimum value of ord. * - * @note The function is non-deterministic when the minimum value from `ord` is associated - * with multiple values from `e`. + * @note The function is non-deterministic so the output order can be different for + * those associated the same values of `e`. * * @group agg_funcs * @since 3.3.0 From 0f89ff3ca1f071409fcad53224c165f36e943f4a Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Thu, 11 Jul 2024 12:28:49 +0800 Subject: [PATCH 8/8] reformat --- .../src/main/scala/org/apache/spark/sql/functions.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala index 43fb9055ab95..47f7266f3bf5 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala @@ -885,8 +885,8 @@ object functions { * Aggregate function: returns the value associated with the maximum value of ord. * * @note - * The function is non-deterministic so the output order can be different for - * those associated the same values of `e`. + * The function is non-deterministic so the output order can be different for those associated + * the same values of `e`. * * @group agg_funcs * @since 3.4.0 @@ -937,8 +937,8 @@ object functions { * Aggregate function: returns the value associated with the minimum value of ord. * * @note - * The function is non-deterministic so the output order can be different for - * those associated the same values of `e`. + * The function is non-deterministic so the output order can be different for those associated + * the same values of `e`. * * @group agg_funcs * @since 3.4.0