Skip to content

Conversation

shehabgamin
Copy link
Contributor

@shehabgamin shehabgamin commented Aug 3, 2025

Which issue does this PR close?

Rationale for this change

Faster queries and downstream usability.

What changes are included in this PR?

  1. Make Parquet reader public so downstream crates can use it.
  2. Use cached metadata for ListingTable statistics for faster queries.

Are these changes tested?

Yes.

Are there any user-facing changes?

No.

@github-actions github-actions bot added core Core DataFusion crate datasource Changes to the datasource crate labels Aug 3, 2025
@shehabgamin shehabgamin changed the title Parquet metadata feat: Use Cached Metadata for ListingTable Statistics Aug 3, 2025
@github-actions github-actions bot added the documentation Improvements or additions to documentation label Aug 3, 2025
@shehabgamin
Copy link
Contributor Author

shehabgamin commented Aug 3, 2025

EDIT:
I had collect_statistics turned off when doing this, so disregard the results. Striking out the results below to not cause confusion.

Not seeing any benefit when testing with S3 data cc @alamb

Tested on Derived TPC-H (100 GB) querying S3 from EC2.

EC2 Tab 1:

env RUSTFLAGS="-C target-cpu=native" cargo build -r -p sail-cli --bins --target-dir target/parquet-metadata-cache

env \ RUST_LOG=info \ SAIL_PARQUET__FILE_METADATA_CACHE=true \ target/parquet-metadata-cache/release/sail spark server

EC2 Tab 2:

python python/pysail/examples/spark/tpch.py \ --data-path s3://BUCKET-PATH HERE \ --query-path python/pysail/data/tpch/queries \ --query-all \ --num-runs 3

Run 1 Total time for all queries: 180.1174988746643 seconds.
Run 2 Total time for all queries: 184.3733410835266 seconds.
Run 3 Total time for all queries: 176.67709589004517 seconds.

@shehabgamin
Copy link
Contributor Author

shehabgamin commented Aug 3, 2025

EDIT:
I had collect_statistics turned off when doing this, so disregard the results. Striking out the results below to not cause confusion.

Retesting after: c20b142
and lakehq/sail@5580d8b as well

Will update this message when testing done

UPDATE:
We can try with both "10G" set and also with nothing set and see. env \ RUST_LOG=info \ SAIL_PARQUET__FILE_METADATA_CACHE=true \ SAIL_PARQUET__FILE_METADATA_CACHE_LIMIT="10G" \ target/parquet-metadata-cache/release/sail spark server

python python/pysail/examples/spark/tpch.py \ --data-path s3://PATH \ --query-path python/pysail/data/tpch/queries \ --query-all \ --num-runs 5

Run 1 Total time for all queries: 203.74799275398254 seconds.
Run 2 Total time for all queries: 184.0539104938507 seconds.
Run 3 Total time for all queries: 178.30218935012817 seconds.
Run 4 Total time for all queries: 172.49501848220825 seconds.
Run 5 Total time for all queries: 174.7171494960785 seconds.

env \ RUST_LOG=info \ SAIL_PARQUET__FILE_METADATA_CACHE=true \ target/parquet-metadata-cache/release/sail spark server

python python/pysail/examples/spark/tpch.py \ --data-path s3://PATH \ --query-path python/pysail/data/tpch/queries \ --query-all \ --num-runs 5

Run 1 Total time for all queries: 182.70136904716492 seconds.
Run 2 Total time for all queries: 192.44108653068542 seconds.
Run 3 Total time for all queries: 175.50793719291687 seconds.
Run 4 Total time for all queries: 175.66266465187073 seconds.
Run 5 Total time for all queries: 175.70708632469177 seconds.

@alamb
Copy link
Contributor

alamb commented Aug 4, 2025

Very exciting -- I have planned time this week to help review and get these various caching things completed

@shehabgamin shehabgamin marked this pull request as ready for review August 4, 2025 22:19
@github-actions github-actions bot removed the documentation Improvements or additions to documentation label Aug 4, 2025
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @shehabgamin -- the code in this PR looks great. I think we just need to add some tests to make sure we don't break this feature in the future

Maybe we can add a test based on the existing ones that shows a second retrieval of statistics doesn't refetch the same footer 🤔

@shehabgamin
Copy link
Contributor Author

Thanks @shehabgamin -- the code in this PR looks great. I think we just need to add some tests to make sure we don't break this feature in the future

Maybe we can add a test based on the existing ones that shows a second retrieval of statistics doesn't refetch the same footer 🤔

Forsure! Will knock that out when I have some downtime today/tonight

@shehabgamin
Copy link
Contributor Author

Thanks @shehabgamin -- the code in this PR looks great. I think we just need to add some tests to make sure we don't break this feature in the future
Maybe we can add a test based on the existing ones that shows a second retrieval of statistics doesn't refetch the same footer 🤔

Forsure! Will knock that out when I have some downtime today/tonight

@alamb Done!

@xudong963 xudong963 self-requested a review August 6, 2025 05:30
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @shehabgamin -- this code and tests look great to me

I also tried it locally and it is 👨‍🍳 👌 . Note that the count(*) which is based on statistics returns immediately:

andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion$ ./target/release/datafusion-cli
DataFusion CLI v49.0.0
> create external table hits stored as parquet location 's3://clickhouse-public-datasets/hits_compatible/athena_partitioned/';
0 row(s) fetched.
Elapsed 3.298 seconds.

> select min("WatchID"), max("WatchID") from hits;
+---------------------+---------------------+
| min(hits.WatchID)   | max(hits.WatchID)   |
+---------------------+---------------------+
| 4611686071420045196 | 9223372033328793741 |
+---------------------+---------------------+
1 row(s) fetched.
Elapsed 0.273 seconds.

> select min("WatchID"), max("WatchID") from hits;
+---------------------+---------------------+
| min(hits.WatchID)   | max(hits.WatchID)   |
+---------------------+---------------------+
| 4611686071420045196 | 9223372033328793741 |
+---------------------+---------------------+
1 row(s) fetched.
Elapsed 0.241 seconds.

DataFusion 49.0.0:

andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion$ ~/Software/datafusion-cli/datafusion-cli-49.0.0
DataFusion CLI v49.0.0
> create external table hits stored as parquet location 's3://clickhouse-public-datasets/hits_compatible/athena_partitioned/';
0 row(s) fetched.
Elapsed 3.313 seconds.

> select min("WatchID"), max("WatchID") from hits;
+---------------------+---------------------+
| min(hits.WatchID)   | max(hits.WatchID)   |
+---------------------+---------------------+
| 4611686071420045196 | 9223372033328793741 |
+---------------------+---------------------+
1 row(s) fetched.
Elapsed 1.380 seconds.

However, I tried merging up from main locally and this PR didn't compile. It seems to have a logical conflict with #17062

The good news is that since we have removed that field I think this PR can get significantly simpler. Sorry I can't push the changes directly to this PR but I don't have write access to the lakehq fork

error[E0609]: no field `cache_metadata` on type `datafusion_common::config::ParquetOptions`
   --> datafusion/core/src/datasource/file_format/parquet.rs:204:37
    |
204 |             format.options().global.cache_metadata,
    |                                     ^^^^^^^^^^^^^^ unknown field

.expect("error reading metadata with hint");
assert_eq!(store.request_count(), 4);

// Increase by 2 because `cache_metadata` is false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

mod opener;
mod page_filter;
mod reader;
pub mod reader;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this need to be made pub? I reverted the change and things still seem to compile just fine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So downstream crates can use it. The reader file looked like it could generally be useful for downstream crates which is why I made the entire reader public.

Currently in Sail we build our own cache for each cache type in CacheManagerConfig. I was unable to access CachedParquetMetaData to do something like the following unless I made CachedParquetMetaData public (full code):

if let Some(parquet_metadata) =
    value.1.as_any().downcast_ref::<CachedParquetMetaData>()
{
    parquet_metadata
        .parquet_metadata()
        .memory_size()
        .min(u32::MAX as usize)
} 

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see -- my concern is that this change might be easy to accidentally undo / break in the future. Maybe to make it more deliberate, you could leave mod reader and then pub use both CachedParquetMetaData and CachedParquetMetaData 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works with me!

/// from the [`FileMetadataCache`], if available, otherwise reads it directly from the file and then
/// updates the cache.
pub(crate) struct CachedParquetFileReader {
pub struct CachedParquetFileReader {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, I locally reverted these changes to visibility and everything seems to have compiled just fine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self
}

pub fn with_cache_metadata(mut self, cache_metadata: bool) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this option was removed in #17062 so it is no longer needed

@alamb
Copy link
Contributor

alamb commented Aug 7, 2025

@nuno-faria and @jonathanc-n -- do you have time to review this PR as well?

@jonathanc-n
Copy link
Contributor

Some changes might be needed after the removal of cache_metadata in #17062, should be small, we just no longer need to do the if check on cache metadata

@shehabgamin shehabgamin requested a review from alamb August 8, 2025 09:11
Comment on lines 996 to 1008
let metadata = Arc::new(
reader
.load_and_finish(fetch, file_size)
.await
.map_err(DataFusionError::from)?,
);

if let Some(cache) = file_metadata_cache {
cache.put(
meta,
Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))),
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is an issue with the fetch_parquet_metadata function. When this function is initially called to retrieve the schema (in fetch_schema), it will read the metadata and update the cache. When the CachedParquetFileReader tries to get the metadata, it checks that it is present in the cache. However, the cached metadata does not contain the page index, as it is not retrieved in the fetch_parquet_metadata, meaning it will have to be read in every query.

So this fetch_parquet_metadata needs to retrieve the entire metadata for the caching to be effective. On the other hand, after this we will have two different places where the entire metadata is read and cached (CachedParquetFileReader and fetch_parquet_metadata), so creating an utility function retrieve_full_parquet_metadata -> Arc<ParquetMetaData> might be useful to avoid duplicate modifications in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh nice catch, will do!

@shehabgamin shehabgamin requested a review from nuno-faria August 9, 2025 04:24
Copy link
Contributor

@nuno-faria nuno-faria left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @shehabgamin, overall lgmt.

pub async fn fetch_parquet_metadata(
store: &dyn ObjectStore,
meta: &ObjectMeta,
pub async fn fetch_parquet_metadata<F: MetadataFetch>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder now if this function should go to a separate module (a utils.rs or similar?). This is because the file_format refers to reader and reader refers to file_format. cc @alamb

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- -- I am thinking the code that handles fetching metadata and schema is also getting enough options that it can probably be its own file / structure too. Maybe something like

pub struct ParquetMetadataFetcher {
...
}

So we could use it like

let fetcher = ParquetMetadataFetcher::new(object_store, path) 
  .with_hint(...);

fetcher.fetch_metadata().await?

🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more I looked the more it seemed like what would be helpful would be a whole new module statistics.rs or something

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am hacking up a prototype of what this might look like

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I bashed out a PR that pulled the metadata handling out into its own module and I think it looks quite a big nicer. Thank you for the suggestion @nuno-faria

@alamb
Copy link
Contributor

alamb commented Aug 11, 2025

> create external table hits stored as parquet location 's3://clickhouse-public-datasets/hits_compatible/athena_partitioned/';
0 row(s) fetched.
Elapsed 3.539 seconds.

> select min("WatchID"), max("WatchID") from hits;
+---------------------+---------------------+
| min(hits.WatchID)   | max(hits.WatchID)   |
+---------------------+---------------------+
| 4611686071420045196 | 9223372033328793741 |
+---------------------+---------------------+
1 row(s) fetched.
Elapsed 0.164 seconds.

The 0.16 seconds is pretty sweet!

On main, the same query takes 1.3sec

> select min("WatchID"), max("WatchID") from hits;
+---------------------+---------------------+
| min(hits.WatchID)   | max(hits.WatchID)   |
+---------------------+---------------------+
| 4611686071420045196 | 9223372033328793741 |
+---------------------+---------------------+
1 row(s) fetched.
Elapsed 1.317 seconds.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took another look at this - and it looks great to me. Thank you @shehabgamin and @nuno-faria and @jonathanc-n

pub async fn fetch_parquet_metadata(
store: &dyn ObjectStore,
meta: &ObjectMeta,
pub async fn fetch_parquet_metadata<F: MetadataFetch>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- -- I am thinking the code that handles fetching metadata and schema is also getting enough options that it can probably be its own file / structure too. Maybe something like

pub struct ParquetMetadataFetcher {
...
}

So we could use it like

let fetcher = ParquetMetadataFetcher::new(object_store, path) 
  .with_hint(...);

fetcher.fetch_metadata().await?

🤔

file: &ObjectMeta,
metadata_size_hint: Option<usize>,
coerce_int96: Option<TimeUnit>,
file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-reviewed these changes and since I think we always now have a file_metadata_cache we could probably make this be non optional. However, there are many tests that need to change

pub async fn fetch_parquet_metadata(
store: &dyn ObjectStore,
meta: &ObjectMeta,
pub async fn fetch_parquet_metadata<F: MetadataFetch>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more I looked the more it seemed like what would be helpful would be a whole new module statistics.rs or something

@alamb alamb merged commit 14fb4a3 into apache:main Aug 11, 2025
27 checks passed
@alamb alamb deleted the parquet-metadata branch August 11, 2025 17:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate datasource Changes to the datasource crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Parquet Metadata Cache] Use the cached metadata for ListingTable statistics

4 participants