Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dont use take with arrow #2336

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

Ben-Epstein
Copy link
Contributor

@Ben-Epstein Ben-Epstein commented Feb 9, 2023

This directly addresses #2335
And is directly the fix for https://issues.apache.org/jira/browse/ARROW-9773 (which is now apache/arrow#33049)

I believe fixing all of the .takes to .slice would also fix #2334 because .take uses memory, but .slice is zero-copy. The memory is exploding going to hdf5 because we keep .takeing

You can see huggingface datasets does the same thing: https://github.com/huggingface/datasets/pull/645/files

That being said, there are a number of other places that vaex uses .take which should be fixed. But because of the lack of typing in the vaex repo, it's hard for me to know which ones are pyarrow arrays, which are pyarrow tables, and which are numpy arrays. I'm happy to help move the rest over, but I would need some guidance.

Here are all of the places .take is used

./packages/vaex-core/vaex/functions.py:    values = choices.take(indices)
./packages/vaex-core/vaex/dataframe.py:                    used_keys = keys.take(codes)
./packages/vaex-core/vaex/dataframe.py:                        keys = keys.take(indices)
./packages/vaex-core/vaex/dataframe.py:    def take(self, indices, filtered=True, dropfilter=True):
./packages/vaex-core/vaex/dataframe.py:        >>> df.take([0,2])
./packages/vaex-core/vaex/dataframe.py:        df.dataset = df.dataset.take(indices)
./packages/vaex-core/vaex/dataframe.py:        return self.take(indices)
./packages/vaex-core/vaex/dataframe.py:        return self.take(indices).split(into)
./packages/vaex-core/vaex/dataframe.py:        return self.take(indices)
./packages/vaex-core/vaex/cpu.py:                keys = keys.take(indices)
./packages/vaex-core/vaex/cpu.py:        keys = keys.take(order)
./packages/vaex-core/vaex/join.py:            left = left.concat(left.take(lookup_left))
./packages/vaex-core/vaex/join.py:            left = left.take(left_indices_matched, filtered=False, dropfilter=False)
./packages/vaex-core/vaex/join.py:            right_dataset = right_dataset.take(lookup, masked=any(lookup_masked))
./packages/vaex-core/vaex/dataset.py:    def take(self, indices, masked=False):
./packages/vaex-core/vaex/dataset.py:        return DatasetTake(self, indices, masked=masked)
./packages/vaex-core/vaex/dataset.py:class DatasetTake(DatasetDecorator):
./packages/vaex-core/vaex/hash.py:        keys = pa.compute.take(keys, indices)
./packages/vaex-core/vaex/array_types.py:def take(ar, indices):
./packages/vaex-core/vaex/array_types.py:    return ar.take(indices)
./packages/vaex-core/vaex/groupby.py:                        self.bin_values = pa.compute.take(self.bin_values, self.sort_indices)
./packages/vaex-core/vaex/groupby.py:                        bin_values[field.name] = ar.take(indices)
./packages/vaex-core/vaex/groupby.py:                    bin_values[parent.label] = parent.bin_values.take(indices)
./packages/vaex-core/vaex/groupby.py:            self.bin_values = pa.compute.take(self.bin_values, sort_indices)
./packages/vaex-core/vaex/groupby.py:            values = pa.compute.take(values, indices)
./packages/vaex-core/vaex/groupby.py:                        ar = np.take(ar, sort_indices, axis=i)
./packages/vaex-core/vaex/groupby.py:                            columns[by.label] = vaex.array_types.take(by.bin_values, index)
./packages/vaex-core/vaex/groupby.py:                        columns[by.label] = by.bin_values.take(indices)
./packages/vaex-core/vaex/column.py:            ar = ar_unfiltered.take(vaex.array_types.to_arrow(take_indices))
./packages/vaex-core/vaex/column.py:        x = x.dictionary.take(x.indices)  # equivalent to PyArrow 5.0.0's dictionary_decode() but backwards compatible

@maartenbreddels
Copy link
Member

The memory is exploding going to hdf5 because we keep .takeing

Yeah, when using sliced arrays, that seems to be the case. It will try to concatenate them first, which will explode the memory use!

This is quite a bad arrow situation... :/ digesting this

Copy link
Member

@maartenbreddels maartenbreddels left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into this Ben, please take a look at my comment.

Comment on lines 385 to 395
# Don't use .take in arrow anymore
# https://issues.apache.org/jira/browse/ARROW-9773
# slice is zero-copy
if isinstance(ar_unfiltered, pa.Array):
ar = pa.concat_arrays(
[ar_unfiltered.slice(i, 1) for i in take_indices]
)
elif isinstance(ar_unfiltered, pa.ChunkedArray):
ar = pa.concat_arrays(
[ar_unfiltered.slice(i, 1).combine_chunks() for i in take_indices]
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Don't use .take in arrow anymore
# https://issues.apache.org/jira/browse/ARROW-9773
# slice is zero-copy
if isinstance(ar_unfiltered, pa.Array):
ar = pa.concat_arrays(
[ar_unfiltered.slice(i, 1) for i in take_indices]
)
elif isinstance(ar_unfiltered, pa.ChunkedArray):
ar = pa.concat_arrays(
[ar_unfiltered.slice(i, 1).combine_chunks() for i in take_indices]
)
if isinstance(ar_unfiltered, pa.Array):
ar = ar_unfiltered.take(vaex.array_types.to_arrow(take_indices))
elif isinstance(ar_unfiltered, pa.ChunkedArray):
# Don't use .take in arrow for chunked arrays
# https://issues.apache.org/jira/browse/ARROW-9773
# slice is zero-copy
ar = pa.concat_arrays(
[ar_unfiltered.slice(i, 1).combine_chunks() for i in take_indices]
)

it is only an issue for chunked arrays right?
Maybe the best place to put this is vaex.array_types.take ?
There is already a take function there, and if all places in vaex uses that function, we have a nice central place where we can solve this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maartenbreddels i do not think this is only an issue for chunked_arrays, I think it's an issue with take broadly. I definitely like the idea of moving it down to a centralized place, but I think no matter the array type, you need to replace take with slice

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://issues.apache.org/jira/browse/ARROW-9773 only talks about chunked arrays, and I also think it's only a problem for that (since it's more complex).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay i'll take your word on this one, you certainly know more here. I'll move this into the vaex array_types.take

@Ben-Epstein
Copy link
Contributor Author

@maartenbreddels i updated the function to use the array_types take function, but i wasn't able to update the others. take is used in a bunch of places and they aren't always from a pyarrow array. Without typing, it's not clear to me when it's on a dataframe, or a dataset, or something else.

I tried to dig into the dataframe take which led to the dataset take and then this DatasetTake class but I can't really understand what it's doing. Would you mind helping me out with that part?

@maartenbreddels
Copy link
Member

I'll scan over it tomorrow!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG-REPORT] Potential memory leak when exporting large strings to hdf5
3 participants