Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dataset] ray failed to serialize pyarrow7.0.0 Tables #22310

Closed
1 of 2 tasks
scv119 opened this issue Feb 11, 2022 · 6 comments · Fixed by #29055
Closed
1 of 2 tasks

[Dataset] ray failed to serialize pyarrow7.0.0 Tables #22310

scv119 opened this issue Feb 11, 2022 · 6 comments · Fixed by #29055
Assignees
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues P1 Issue that should be fixed within a few weeks size:medium

Comments

@scv119
Copy link
Contributor

scv119 commented Feb 11, 2022

Search before asking

  • I searched the issues and found no similar issues.

Ray Component

Ray Core

What happened + What you expected to happen

See #22253 (comment) and #22177 for more context.

TL;DR, when serialize pyarrow7.0.0 Tables, the serialization has wrong data size (as big as 200GBs).

Versions / Dependencies

pyarrow 7.0.0

Reproduction script

python ray/release/nightly_tests/dataset/ray_sgd_runner.py --num-epochs 1 --smoke-test
with pyarrow 7.0.0 installed

Anything else

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!
@scv119 scv119 added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Feb 11, 2022
@scv119 scv119 added this to the Datasets GA milestone Feb 11, 2022
@scv119 scv119 added P1 Issue that should be fixed within a few weeks and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Feb 11, 2022
@scv119 scv119 changed the title [Dataset] ray serialization failed to seal pyarrow7.0.0 Tables [Dataset] ray failed to serialize pyarrow7.0.0 Tables Feb 11, 2022
@clarkzinzow
Copy link
Contributor

Hmm interesting, @scv119 I tried to reproduce this locally and got ~the same high-water mark for object store utilization and number of bytes consumed by tasks when comparing pyarrow==6.0.1 and pyarrow==7.0.0. 🤔

@ericl
Copy link
Contributor

ericl commented Feb 23, 2022

Here's a minimal repro. With pyarrow==6.0.1 it works fine. With 7.0, you get a 400GiB plasma allocated block...

import ray

ray.data.range_arrow(100e6, parallelism=1).write_parquet("/tmp/big")
ray.data.read_parquet("/tmp/big").show()

The issue seems to happen when putting an Arrow table read from parquet into plasma.

@ericl
Copy link
Contributor

ericl commented Feb 23, 2022

Inserting a copy() of the block seems to resolve the issue. Maybe there's some large sparse array that's allocated in the pyarrow object when read from parquet in pyarrow 7.0.0.

We can insert the copy as a workaround for that version, and file a bug upstream.

@clarkzinzow
Copy link
Contributor

Interesting, I tried reading Parquet data and many different operation combinations and wasn't able to reproduce.

We can insert the copy as a workaround for that version, and file a bug upstream.

I'll time-box an investigation this week and otherwise do as you suggest. Thank you for finding a repro!

@clarkzinzow
Copy link
Contributor

clarkzinzow commented Mar 7, 2022

After some debugging, it looks like this is the same bug around serializing Arrow array slice-views on an underlying larger buffer. The new issue here is that Arrow's Parquet reader is creating chunked arrays whose chunks are each slice-views on an underlying contiguous buffer containing the entire chunked array, so when the chunked array is serialized, serializing the N chunks results in copying the entire buffer N times.

Here is a minimal reproduction without involving Parquet (this is buggy in all Arrow versions):

In [1]: import pyarrow as pa

In [2]: base = pa.array(list(range(10000000)))

In [3]: chunked = pa.chunked_array([pa.Array.from_buffers(pa.int64(), 1000000, [None, base.buffers()[1]], offset=i * 1000000) for i in range(10)])

In [4]: len(pickle.dumps(base))
Out[4]: 80000148

In [5]: len(pickle.dumps(chunked))
Out[5]: 800000829

The change in Arrow 7.0.0 is that the chunked array created for a given column when reading Parquet files is all pointing at a single contiguous buffer; in past versions, a different backing buffer was created for each chunk.

In [5]: t = pa.table({"a": list(range(10000000))})
In [6]: pq.write_table(t, "test.parquet")
In [7]: t2 = pq.read_table("test.parquet")

In [8]: for chunk in t2["a"].chunks:
     ...:     print(chunk.buffers()[1].address)
     ...:
140413841182592
140413841182592
140413841182592
140413841182592
140413841182592
140413841182592
140413841182592
140413841182592
140413841182592
140413841182592

Here you can see that Arrow 6.0.1 created a different backing buffer for each chunk:

$ pip install pyarrow==6.0.1
$ ipython

In [1]: t = pq.read_table("test.parquet")

In [2]: for chunk in t["a"].chunks:
   ...:     print(chunk.buffers()[1].address)
   ...:
140596942995904
140596912532864
140596746389440
140596729610240
140596708642304
140596717033152
140596683476288
140596691865664
140596654113152
140596662507200

Possible Solutions

  1. Detect these slice-view chunks at read time and copy each slice into a new buffer (eager full copy of column).
  2. Register our own serialization hook for Arrow arrays that does the slice copy given in (1), but JIT before serialization and generically for all Arrow arrays (should allow us to eliminate some defensive copying that we're doing elsewhere in Datasets).
  3. Help upstream the actual fix to Arrow.

I'd vote for pushing on (2) and (3) in parallel, with (2) being a short-term solution that should transparently apply to all cases, and (3) being the long-term fix.

@ericl
Copy link
Contributor

ericl commented Mar 7, 2022

Great find. For (2), I think that sounds good if the implementation is fairly simple. Otherwise, we can add a smaller fix with the read logic.

For (3) that also sounds good; we should at least ping that this is impacting downstream users.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues P1 Issue that should be fixed within a few weeks size:medium
Projects
None yet
4 participants