Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to register UDAFs using SessionContext's register_udaf #874

Closed
emanueledomingo opened this issue Sep 19, 2024 · 2 comments · Fixed by #878
Closed

Unable to register UDAFs using SessionContext's register_udaf #874

emanueledomingo opened this issue Sep 19, 2024 · 2 comments · Fixed by #878
Assignees
Labels
bug Something isn't working

Comments

@emanueledomingo
Copy link

Describe the bug
During the update of Datafusion from 39 to 41, my script got broken because the register_udaf crashes witht he following error:

in SessionContext.register_udaf(self, udaf)
    829 def register_udaf(self, udaf: AggregateUDF) -> None:
    830     """Register a user-defined aggregation function (UDAF) with the context."""
--> 831     self.ctx.register_udaf(udaf._udaf)

AttributeError: 'AggregateUDF' object has no attribute '_udaf'

To Reproduce
Steps to reproduce the behavior:

import datafusion as df
import pyarrow
import pyarrow.compute as pc
from typing import List

class AverageAccumulator(df.Accumulator):
    def __init__(self):
        self._sum = 0
        self._count = 0

    def update(self, values: pyarrow.Array) -> None:
        self._sum += pc.sum(values).as_py()
        self._count += len(values)

    def merge(self, other) -> None:
        self._sum += other._sum
        self._count += other._count

    def state(self) -> pyarrow.Array:
        return pyarrow.array([self._sum, self._count])

    def evaluate(self) -> pyarrow.Scalar:
        return pyarrow.scalar(self._sum / self._count)

average_udaf = df.udaf(
    AverageAccumulator,
    pyarrow.float64(),
    pyarrow.float64(),
    [pyarrow.float64()],
    'stable'
)

ctx = df.SessionContext()

ctx.register_udaf(average_udaf)

Additional context
The bug is introduce in datafusion 40.1.0

@emanueledomingo emanueledomingo added the bug Something isn't working label Sep 19, 2024
@timsaucer
Copy link
Contributor

Verified. I have a fix I will push up tonight or first thing in the morning. Thank you for the bug report!

In the mean time, are you able to use the udaf via dataframe APIs? I tested the below without the correction, following your code:

import pyarrow as pa
from datafusion import col
batch = pa.RecordBatch.from_arrays(
    [
        pa.array([0, 1, 2, 3, 4, 5, 6]),
    ],
    names=["a"],
)

df = ctx.create_dataframe([[batch]])

df.aggregate([], average_udaf(col("a")).alias("udaf")).show()

returns

DataFrame()
+------+
| udaf |
+------+
| 3.0  |
+------+

@timsaucer timsaucer self-assigned this Sep 19, 2024
@emanueledomingo
Copy link
Author

By executing you code:

...
df.aggregate([], [average_udaf(col("a")).alias("udaf")]).show()

I correctly see the same result as you!

DataFrame()
+------+
| udaf |
+------+
| 3.0  |
+------+

timsaucer added a commit to timsaucer/datafusion-python that referenced this issue Sep 20, 2024
timsaucer added a commit that referenced this issue Sep 21, 2024
* Test no longer hangs, and updated error string to match latest

* Add unit tests for registering udf and udaf

* Resolve error on registering udaf #874

* remove stale comment

* Update unit test text to match in multiple versions of python

* Regex for exception that is compatible with python 3.10 and 3.12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants