Skip to content

Conversation

@HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Oct 3, 2018

What changes were proposed in this pull request?

This PR proposes to register Grouped aggregate UDF Vectorized UDFs for SQL Statement, for instance:

from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf("integer", PandasUDFType.GROUPED_AGG)
def sum_udf(v):
    return v.sum()

spark.udf.register("sum_udf", sum_udf)
q = "SELECT v2, sum_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2"
spark.sql(q).show()
+---+-----------+
| v2|sum_udf(v1)|
+---+-----------+
|  1|          1|
|  0|          5|
+---+-----------+

How was this patch tested?

Manual test and unit test.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Oct 3, 2018

@HyukjinKwon
Copy link
Member Author

ok to test

@HyukjinKwon
Copy link
Member Author

retest this please

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the "_ =" thing here?

Copy link
Member Author

@HyukjinKwon HyukjinKwon Oct 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hides the output like ...

>>> spark.udf.register("sum_udf", sum_udf)
<function sum_udf at 0x103ff18c0>

in the doctest.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha. I see..

@icexelloss
Copy link
Contributor

LGTM

@SparkQA
Copy link

SparkQA commented Oct 3, 2018

Test build #96896 has finished for PR 22620 at commit 06a7bd0.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about SQL_WINDOW_AGG_PANDAS_UDF?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need it here:

Users specify GROUPED_AGG only. GROUPED_AGG is turned to WINDOW_AGG eval type in WindowInPandasExec.

Admittedly, there is a bit confusion here we can improve. We just haven't got a user specified udf type that maps to multiple evalType before WINDOW_AGG.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These need to be clearly defined in Apache Spark 3.0 release; otherwise, it might be confusing to both developers and end users. :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened https://issues.apache.org/jira/browse/SPARK-25640 to track this.

To be clear, this is transparent to end users, but I agree it can be confusing to developers.

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM pending tests. Looks like a test expected a specific error msg
AssertionError: "f must be either SQL_BATCHED_UDF or SQL_SCALAR_PANDAS_UDF" does not match "Invalid f: f must be SQL_BATCHED_UDF, SQL_SCALAR_PANDAS_UDF or SQL_GROUPED_AGG_PANDAS_UDF"

@SparkQA
Copy link

SparkQA commented Oct 4, 2018

Test build #96916 has finished for PR 22620 at commit 97d0377.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 4, 2018

Test build #96917 has finished for PR 22620 at commit f36bc03.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Oct 4, 2018

LGTM

@HyukjinKwon
Copy link
Member Author

Thank you @icexelloss, @gatorsmile, @BryanCutler and @viirya.

@HyukjinKwon
Copy link
Member Author

Merged to master and branch-2.4.

asfgit pushed a commit that referenced this pull request Oct 4, 2018
…for SQL Statement

## What changes were proposed in this pull request?

This PR proposes to register Grouped aggregate UDF Vectorized UDFs for SQL Statement, for instance:

```python
from pyspark.sql.functions import pandas_udf, PandasUDFType

pandas_udf("integer", PandasUDFType.GROUPED_AGG)
def sum_udf(v):
    return v.sum()

spark.udf.register("sum_udf", sum_udf)
q = "SELECT v2, sum_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2"
spark.sql(q).show()
```

```
+---+-----------+
| v2|sum_udf(v1)|
+---+-----------+
|  1|          1|
|  0|          5|
+---+-----------+
```

## How was this patch tested?

Manual test and unit test.

Closes #22620 from HyukjinKwon/SPARK-25601.

Authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
@asfgit asfgit closed this in 79dd4c9 Oct 4, 2018
asfgit pushed a commit that referenced this pull request Oct 4, 2018
…for SQL Statement

## What changes were proposed in this pull request?

This PR proposes to register Grouped aggregate UDF Vectorized UDFs for SQL Statement, for instance:

```python
from pyspark.sql.functions import pandas_udf, PandasUDFType

pandas_udf("integer", PandasUDFType.GROUPED_AGG)
def sum_udf(v):
    return v.sum()

spark.udf.register("sum_udf", sum_udf)
q = "SELECT v2, sum_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2"
spark.sql(q).show()
```

```
+---+-----------+
| v2|sum_udf(v1)|
+---+-----------+
|  1|          1|
|  0|          5|
+---+-----------+
```

## How was this patch tested?

Manual test and unit test.

Closes #22620 from HyukjinKwon/SPARK-25601.

Authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
@HyukjinKwon HyukjinKwon deleted the SPARK-25601 branch October 16, 2018 12:43
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…for SQL Statement

## What changes were proposed in this pull request?

This PR proposes to register Grouped aggregate UDF Vectorized UDFs for SQL Statement, for instance:

```python
from pyspark.sql.functions import pandas_udf, PandasUDFType

pandas_udf("integer", PandasUDFType.GROUPED_AGG)
def sum_udf(v):
    return v.sum()

spark.udf.register("sum_udf", sum_udf)
q = "SELECT v2, sum_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2"
spark.sql(q).show()
```

```
+---+-----------+
| v2|sum_udf(v1)|
+---+-----------+
|  1|          1|
|  0|          5|
+---+-----------+
```

## How was this patch tested?

Manual test and unit test.

Closes apache#22620 from HyukjinKwon/SPARK-25601.

Authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 22, 2025
…for SQL Statement

## What changes were proposed in this pull request?

This PR proposes to register Grouped aggregate UDF Vectorized UDFs for SQL Statement, for instance:

```python
from pyspark.sql.functions import pandas_udf, PandasUDFType

pandas_udf("integer", PandasUDFType.GROUPED_AGG)
def sum_udf(v):
    return v.sum()

spark.udf.register("sum_udf", sum_udf)
q = "SELECT v2, sum_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2"
spark.sql(q).show()
```

```
+---+-----------+
| v2|sum_udf(v1)|
+---+-----------+
|  1|          1|
|  0|          5|
+---+-----------+
```

## How was this patch tested?

Manual test and unit test.

Closes apache#22620 from HyukjinKwon/SPARK-25601.

Authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants