-
Notifications
You must be signed in to change notification settings - Fork 908
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement cudf-polars
chunked parquet reading
#16944
Implement cudf-polars
chunked parquet reading
#16944
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor tweaks, overall, I think the implementation side looks in good shape.
There's an open question about whether we want to read all the chunks and do a concatenate at the end, but I will wait for benchmarking on that.
We need to put some documentation about the chunked reading somewhere. As a minimum, can you add a new |
Co-authored-by: Lawrence Mitchell <wence@gmx.li>
In addition to the slicing issue, switching to chunked reading by default seems to shake out another chunked parquet reader bug with import pylibcudf as plc
import pyarrow as pa
import pyarrow.parquet as pq
data = {
"a": [1, 2, 3, None, 4, 5],
"b": ["ẅ", "x", "y", "z", "123", "abcd"],
"c": [None, None, 4, 5, -1, 0],
}
path = "./test.parquet"
pq.write_table(pa.Table.from_pydict(data), path)
reader = plc.io.parquet.ChunkedParquetReader(
plc.io.SourceInfo([path]),
columns=['a', 'b', 'c'],
nrows=2,
skip_rows=0,
chunk_read_limit=0,
pass_read_limit=17179869184 # 16 GiB
)
chk = reader.read_chunk()
tbl = chk.tbl
names = chk.column_names()
concatenated_columns = tbl.columns()
while reader.has_next():
tbl = reader.read_chunk().tbl
for i in range(tbl.num_columns()):
concatenated_columns[i] = plc.concatenate.concatenate(
[concatenated_columns[i], tbl._columns[i]]
)
# Drop residual columns to save memory
tbl._columns[i] = None
gpu_result = plc.interop.to_arrow(tbl)
cpu_result = pq.read_table(path)[:2]
print(cpu_result.column(1).to_pylist())
print(gpu_result.column(1).to_pylist()) this yields
I suppose this is a separate issue from #17158 since the trunk includes this fix now I believe. |
if self.typ == "csv" and self.skip_rows != 0: # pragma: no cover | ||
if self.typ in {"csv", "parquet"} and self.skip_rows != 0: # pragma: no cover | ||
# This comes from slice pushdown, but that | ||
# optimization doesn't happen right now | ||
raise NotImplementedError("skipping rows in CSV reader") | ||
raise NotImplementedError("skipping rows in CSV or Parquet reader") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like this, because it turns off yet more routes into running a query on device. Particularly, since parquet ingest is the primary way we recommend people do things, we need it to work in basically all cases.
Things we can do:
- turn off chunked reading is skip_rows != 0
- fail if skip_rows != 0 (as here)
- Manually handle skip_rows != 0 in the chunked reader by reading full chunks and slicing them away if skip_rows != 0
I think I like the third option best.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I pushed an implementation of option 3.
Rather than falling back to CPU for chunked read + skip_rows, just read chunks and skip manually after the fact. Simplify the parquet scan tests a bit and add better coverage of both chunked and unchunked behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some suggestions to improve the docs and a couple of minor questions on implementation, but nothing blocking. I think that this is good to merge when you are happy.
Co-authored-by: Vyas Ramasubramani <vyas.ramasubramani@gmail.com>
/merge |
@@ -208,8 +214,9 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: | |||
translation phase should fail earlier. | |||
""" | |||
return self.do_evaluate( | |||
config, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wence- - Just a note. Pretty sure this means we will need to pass in a config object to every single task in the task graph for multi-gpu.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Argh, ok, painful. Let's try and figure something out (especially because the config object can contain a memory resource).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense for config
to be a required IR constructor argument, and not require it as an argument to do_evaluate
(unless necessary)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, we could pass the config options we need into the Scan
node during translate
, and then it never needs to be in do_evaluate
at all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possible revision proposed here: #17339
This PR provides access to the libcudf chunked parquet reader through the
cudf-polars
gpu engine, inspired by the cuDF python implementation.Closes #16818