-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-49552][PYTHON] Add DataFrame API support for new 'randstr' and 'uniform' SQL functions #48143
Conversation
respond to code review comments respond to code review comments respond to code review comments
cc @HyukjinKwon @MaxGekk here is the DataFrame support for the new |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @zhengruifeng for your review! Responded to your comments, please take another look.
+------+ | ||
| ceV0P| | ||
+------+ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we normally don't include an empty line at the end of the docstring
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, this is done.
+------+ | ||
| 7| | ||
+------+ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, this is done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the python linter fails with:
https://github.com/dtenedor/spark/actions/runs/10962013320/job/30442618094
) -> Column: | ||
if seed is None: | ||
return _invoke_function_over_columns( | ||
"uniform", min, max, lit(random.randint(0, sys.maxsize)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_invoke_function_over_columns
requires arguments be columns or column names
"uniform", min, max, lit(random.randint(0, sys.maxsize)) | |
"uniform", lit(min), lit(max), lit(random.randint(0, sys.maxsize)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this is done.
"uniform", min, max, lit(random.randint(0, sys.maxsize)) | ||
) | ||
else: | ||
return _invoke_function_over_columns("uniform", min, max, seed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return _invoke_function_over_columns("uniform", min, max, seed) | |
return _invoke_function_over_columns("uniform", lit(min), lit(max), lit(seed)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this is done.
@@ -2578,6 +2594,16 @@ def regexp_like(str: "ColumnOrName", regexp: "ColumnOrName") -> Column: | |||
regexp_like.__doc__ = pysparkfuncs.regexp_like.__doc__ | |||
|
|||
|
|||
def randstr(length: Union[Column, int], seed: Optional[Union[Column, int]] = None) -> Column: | |||
if seed is None: | |||
return _invoke_function_over_columns("randstr", length, lit(random.randint(0, sys.maxsize))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return _invoke_function_over_columns("randstr", length, lit(random.randint(0, sys.maxsize))) | |
return _invoke_function_over_columns("randstr", lit(length), lit(random.randint(0, sys.maxsize))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this is done.
if seed is None: | ||
return _invoke_function_over_columns("randstr", length, lit(random.randint(0, sys.maxsize))) | ||
else: | ||
return _invoke_function_over_columns("randstr", length, seed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return _invoke_function_over_columns("randstr", length, seed) | |
return _invoke_function_over_columns("randstr", lit(length), lit(seed)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this is done.
+--------------------+ | ||
""" | ||
length = _enum_to_value(length) | ||
length = lit(length) if isinstance(length, int) else length |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: lit
function accepts both literals and Column
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @zhengruifeng for your reviews! Responded to your comments, hopefully the linter passes now.
) -> Column: | ||
if seed is None: | ||
return _invoke_function_over_columns( | ||
"uniform", min, max, lit(random.randint(0, sys.maxsize)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this is done.
"uniform", min, max, lit(random.randint(0, sys.maxsize)) | ||
) | ||
else: | ||
return _invoke_function_over_columns("uniform", min, max, seed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this is done.
@@ -2578,6 +2594,16 @@ def regexp_like(str: "ColumnOrName", regexp: "ColumnOrName") -> Column: | |||
regexp_like.__doc__ = pysparkfuncs.regexp_like.__doc__ | |||
|
|||
|
|||
def randstr(length: Union[Column, int], seed: Optional[Union[Column, int]] = None) -> Column: | |||
if seed is None: | |||
return _invoke_function_over_columns("randstr", length, lit(random.randint(0, sys.maxsize))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this is done.
if seed is None: | ||
return _invoke_function_over_columns("randstr", length, lit(random.randint(0, sys.maxsize))) | ||
else: | ||
return _invoke_function_over_columns("randstr", length, seed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this is done.
+--------------------+ | ||
""" | ||
length = _enum_to_value(length) | ||
length = lit(length) if isinstance(length, int) else length |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated.
thanks, merged to master |
… 'uniform' SQL functions ### What changes were proposed in this pull request? In apache#48004 we added new SQL functions `randstr` and `uniform`. This PR adds DataFrame API support for them. For example, in Scala: ``` sql("create table t(col int not null) using csv") sql("insert into t values (0)") val df = sql("select col from t") df.select(randstr(lit(5), lit(0)).alias("x")).select(length(col("x"))) > 5 df.select(uniform(lit(10), lit(20), lit(0)).alias("x")).selectExpr("x > 5") > true ``` ### Why are the changes needed? This improves DataFrame parity with the SQL API. ### Does this PR introduce _any_ user-facing change? Yes, see above. ### How was this patch tested? This PR adds unit test coverage. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#48143 from dtenedor/dataframes-uniform-randstr. Authored-by: Daniel Tenedorio <daniel.tenedorio@databricks.com> Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
… 'uniform' SQL functions ### What changes were proposed in this pull request? In apache#48004 we added new SQL functions `randstr` and `uniform`. This PR adds DataFrame API support for them. For example, in Scala: ``` sql("create table t(col int not null) using csv") sql("insert into t values (0)") val df = sql("select col from t") df.select(randstr(lit(5), lit(0)).alias("x")).select(length(col("x"))) > 5 df.select(uniform(lit(10), lit(20), lit(0)).alias("x")).selectExpr("x > 5") > true ``` ### Why are the changes needed? This improves DataFrame parity with the SQL API. ### Does this PR introduce _any_ user-facing change? Yes, see above. ### How was this patch tested? This PR adds unit test coverage. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#48143 from dtenedor/dataframes-uniform-randstr. Authored-by: Daniel Tenedorio <daniel.tenedorio@databricks.com> Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
…inistic in Scala Client ### What changes were proposed in this pull request? Make 'randstr' and 'uniform' deterministic in Scala Client ### Why are the changes needed? We need to explicitly set the seed in connect clients, to avoid making the output dataframe non-deterministic (see 14ba4fc) When reviewing #48143, I requested the author to set the seed in python client. But at that time, I was not aware of the fact that Spark Connect Scala Client was reusing the same `functions.scala` under `org.apache.spark.sql`. (There were two different files before) So the two functions may cause non-deterministic issues like: ``` scala> val df = spark.range(10).select(randstr(lit(10)).as("r")) Using Spark's default log4j profile: org/apache/spark/log4j2-pattern-layout-defaults.properties df: org.apache.spark.sql.package.DataFrame = [r: string] scala> df.show() +----------+ | r| +----------+ |5bhIk72PJa| |tuhC50Di38| |PxwfWzdT3X| |sWkmSyWboh| |uZMS4htmM0| |YMxMwY5wdQ| |JDaWSiBwDD| |C7KQ20WE7t| |IwSSqWOObg| |jDF2Ndfy8q| +----------+ scala> df.show() +----------+ | r| +----------+ |fpnnoLJbOA| |qerIKpYPif| |PvliXYIALD| |xK3fosAvOp| |WK12kfkPXq| |2UcdyAEbNm| |HEkl4rMtV1| |PCaH4YJuYo| |JuuXEHSp5i| |jSLjl8ug8S| +----------+ ``` ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? after this fix: ``` scala> val df = spark.range(10).select(randstr(lit(10)).as("r")) df: org.apache.spark.sql.package.DataFrame = [r: string] scala> df.show() +----------+ | r| +----------+ |Gri9B9X8zI| |gfhpGD8PcV| |FDaXofTzlN| |p7ciOScWpu| |QZiEbF5q7c| |9IhRoXmTUM| |TeSEG1EKSN| |B7nLw5iedL| |uFZo1WPLPT| |46E2LVCxxl| +----------+ scala> df.show() +----------+ | r| +----------+ |Gri9B9X8zI| |gfhpGD8PcV| |FDaXofTzlN| |p7ciOScWpu| |QZiEbF5q7c| |9IhRoXmTUM| |TeSEG1EKSN| |B7nLw5iedL| |uFZo1WPLPT| |46E2LVCxxl| +----------+ ``` ### Was this patch authored or co-authored using generative AI tooling? no Closes #48558 from zhengruifeng/sql_rand_str_seed. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…inistic in Scala Client ### What changes were proposed in this pull request? Make 'randstr' and 'uniform' deterministic in Scala Client ### Why are the changes needed? We need to explicitly set the seed in connect clients, to avoid making the output dataframe non-deterministic (see apache@14ba4fc) When reviewing apache#48143, I requested the author to set the seed in python client. But at that time, I was not aware of the fact that Spark Connect Scala Client was reusing the same `functions.scala` under `org.apache.spark.sql`. (There were two different files before) So the two functions may cause non-deterministic issues like: ``` scala> val df = spark.range(10).select(randstr(lit(10)).as("r")) Using Spark's default log4j profile: org/apache/spark/log4j2-pattern-layout-defaults.properties df: org.apache.spark.sql.package.DataFrame = [r: string] scala> df.show() +----------+ | r| +----------+ |5bhIk72PJa| |tuhC50Di38| |PxwfWzdT3X| |sWkmSyWboh| |uZMS4htmM0| |YMxMwY5wdQ| |JDaWSiBwDD| |C7KQ20WE7t| |IwSSqWOObg| |jDF2Ndfy8q| +----------+ scala> df.show() +----------+ | r| +----------+ |fpnnoLJbOA| |qerIKpYPif| |PvliXYIALD| |xK3fosAvOp| |WK12kfkPXq| |2UcdyAEbNm| |HEkl4rMtV1| |PCaH4YJuYo| |JuuXEHSp5i| |jSLjl8ug8S| +----------+ ``` ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? after this fix: ``` scala> val df = spark.range(10).select(randstr(lit(10)).as("r")) df: org.apache.spark.sql.package.DataFrame = [r: string] scala> df.show() +----------+ | r| +----------+ |Gri9B9X8zI| |gfhpGD8PcV| |FDaXofTzlN| |p7ciOScWpu| |QZiEbF5q7c| |9IhRoXmTUM| |TeSEG1EKSN| |B7nLw5iedL| |uFZo1WPLPT| |46E2LVCxxl| +----------+ scala> df.show() +----------+ | r| +----------+ |Gri9B9X8zI| |gfhpGD8PcV| |FDaXofTzlN| |p7ciOScWpu| |QZiEbF5q7c| |9IhRoXmTUM| |TeSEG1EKSN| |B7nLw5iedL| |uFZo1WPLPT| |46E2LVCxxl| +----------+ ``` ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#48558 from zhengruifeng/sql_rand_str_seed. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
In #48004 we added new SQL functions
randstr
anduniform
. This PR adds DataFrame API support for them.For example, in Scala:
Why are the changes needed?
This improves DataFrame parity with the SQL API.
Does this PR introduce any user-facing change?
Yes, see above.
How was this patch tested?
This PR adds unit test coverage.
Was this patch authored or co-authored using generative AI tooling?
No.