ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory#7180
ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory#7180fsaintjacques wants to merge 7 commits intoapache:masterfrom
Conversation
ec0a3c2 to
99c51ca
Compare
aea5db3 to
2bb5686
Compare
jorisvandenbossche
left a comment
There was a problem hiding this comment.
Looking great! I tested on some dummy datasets created by dask, and that seems to work nicely (although it is of course hard to be sure there is actually no IO going on during the factory)
Will still further check with the existing parquet tests that use _metadata files.
Some non-inline comments:
-
Still need a
partitioningkeyword forparquet_dataset? -
We might want to detect that, when given a directory name, this directory includes a
_metadatafile? (in some API, maybe this can be in thepyarrow.parquetcode) -
For a follow-up: handling of
_common_metadata(just for inspecting the common schema) -
Regarding the statistics stored as an expression:
- Do we want to expose this in Python as well?
- Currently, the statistics are only available when the fragments were constructed from a
_metadatafile (or after querying once), I think? Do we want to allow to populate them on demand? - Statistics are only attached to a RowGroupInfo, and not to a fragment? Not needed for this PR, to be clear, but thinking more generally: we might want to enable that you can create a Fragment with custom statistics (eg in Kartothek ? @xhochy)
-
There are some tests needed for the new
write_metadatacapabilities (I can write/push some if you want). And the same forparquet_dataset/ParquetDatasetFactory.
I ran into a segfault if one of the files is not present (writing a reproducer / test case right now)
There was a problem hiding this comment.
Does this mean that using SplitByRowGroup currently always invokes IO? (in principle it could be filtered/split based on the already-read RowGroupInfo objects ?
There was a problem hiding this comment.
Correct. It could be filtered, but only if the dataset was generated via the _metadata file (or any explicit RowGroupInfo).
There was a problem hiding this comment.
I think it would be good to at least try to avoid IO if the statistics are already available in the RowGroupInfo's (at least, from my understanding how this is used in RAPIDS, can check with them), but certainly not critical to do in this PR.
python/pyarrow/dataset.py
Outdated
There was a problem hiding this comment.
Maybe we should not expose this publicly? (is there a reason you would want to use this directly?)
There was a problem hiding this comment.
That's required for ParquetFileFragment.row_groups. I could change it to only return a list of integers.
|
I don't get a segault for the test you added, just a wrong exception being throw. > raise IOError(errno, message)
E FileNotFoundError: [Errno 2] Failed to open local file '/tmp/pytest-of-fsaintjacques/pytest-44/test_parquet_dataset_factory_i0/test_parquet_dataset/43bd0bd1002048e0b9bbc730f7614d18.parquet'. Detail: [errno 2] No such file or directory
pyarrow/error.pxi:98: FileNotFoundError |
A FileNotFoundError sounds good (the ValueError I added in the tests was just a bit random). Will rebuild locally to see if I still get this |
|
I'm curious about the exception/segfault. If you can reproduce, feel free to share. |
|
It seems this failure doesn't happen all the time for me. Running it a few times, I see also see the FileNotFoundError, but in 1 out of 2 cases, approximately. Now, when running it on an actual example in the interactive terminal (from a small dataset where I actually deleted one of the files), I consistently see the segfault: I get a different stacktrace when running the tests, something about the "unlink", so it might be that the way I remove the file in the test is not very robust / is not similar to deleting a file in the file browser. |
3fa0657 to
fd5a4a3
Compare
- Implement ParquetDatasetFactory - Replace ParquetFileFormat::GetRowGroupFragments with ParquetFileFragment::SplitByRowGroup (and the corresponding bindings). - Add various optimizations, notably in ColumnChunkStatisticsAsExpression. - Consolidate RowGroupSkipper logic in ParquetFileFragment::GetRowGroupFragments. - Ensure FileMetaData::AppendRowGroups checks for schema equality. - Implement dataset._parquet_dataset
fd5a4a3 to
8b301d3
Compare
8b301d3 to
3b85adc
Compare
bkietz
left a comment
There was a problem hiding this comment.
This looks great, thanks for doing this!
A few comments:
There was a problem hiding this comment.
Nothing here can fail so we can just make the constructor public
There was a problem hiding this comment.
For debug purposes, this is extremely useful to introspect the object.
python/pyarrow/_dataset.pyx
Outdated
There was a problem hiding this comment.
| Split the fragment in multiple fragments. | |
| Split the fragment into multiple fragments. |
Could you replicate this comment in c++?
dcf2a68 to
100a7b0
Compare
100a7b0 to
cca91b3
Compare
8df15e3 to
29f44d9
Compare
This patch adds the option to create a dataset of parquet files via `ParquetDatasetFactory`. It reads a single `_metadata` parquet file created by systems like Dask and Spark, extract the metadata of all fragments from said file, and populate each fragment with extra statistics for each columns. The `_metadata` file can be created via `pyarrow.parquet.write_metadata`.
When the Scan operation is materialised, the row groups of the ParquetFileFragment are elided with the statistics _before_ reading the original file metadata. If no RowGroups from a file matches the predicate of the Scan, the file will not be read (including the metadata footer), thus avoiding expensive IO calls. The optimisation benefits are inversely proportional to the predicate's selectivity.
```python
# With the plain FileSystemDataset
%timeit t = nyc_tlc_fs_dataset.to_table(filter=da.field('total_amount') > 1000.0, ...)
1.55 s ± 26 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# With ParquetDatasetFactory
%timeit t = nyc_tlc_parquet_dataset.to_table(filter=da.field('total_amount') > 1000.0, ...)
336 ms ± 17.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
```
- Implement ParquetDatasetFactory
- Replace ParquetFileFormat::GetRowGroupFragments with
ParquetFileFragment::SplitByRowGroup (and the corresponding bindings).
- Add various optimizations, notably in ColumnChunkStatisticsAsExpression.
- Consolidate RowGroupSkipper logic in ParquetFileFragment::ScanFile
- Ensure FileMetaData::AppendRowGroups checks for schema equality.
- Implement dataset._parquet_dataset
Closes apache#7180 from fsaintjacques/ARROW-8062-parquet-dataset-metadata
Lead-authored-by: François Saint-Jacques <fsaintjacques@gmail.com>
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: François Saint-Jacques <fsaintjacques@gmail.com>
Follow-up on ARROW-8062 (#7180) Closes #7345 from jorisvandenbossche/ARROW-8946-metadata-write Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com> Signed-off-by: François Saint-Jacques <fsaintjacques@gmail.com>
This patch adds the option to create a dataset of parquet files via
ParquetDatasetFactory. It reads a single_metadataparquet file created by systems like Dask and Spark, extract the metadata of all fragments from said file, and populate each fragment with extra statistics for each columns. The_metadatafile can be created viapyarrow.parquet.write_metadata.When the Scan operation is materialised, the row groups of the ParquetFileFragment are elided with the statistics before reading the original file metadata. If no RowGroups from a file matches the predicate of the Scan, the file will not be read (including the metadata footer), thus avoiding expensive IO calls. The optimisation benefits are inversely proportional to the predicate's selectivity.
ParquetFileFragment::SplitByRowGroup (and the corresponding bindings).