-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Is your feature request related to a problem or challenge?
When using "high latency" storage (e.g. remote object stores, such as AWS S3) listing objects and collecting object metadata can end up taking a non-trivial amount of time. This has already been noted in #7618 where a simple cache was implemented and partially integrated. Unfortunately, partitioned tables cannot hit the cache path in the current ListingTable
implementation. A discrepancy between the performance of flat vs partitioned tables has been previously noted in #9654. Unfortunately, that issue didn't provide much detail on the timing discrepancies, nor did it provide an easy way to reproduce the behavior.
In order to better illustrate this issue, I have added some basic timing statements in the ListingTable
which can be reproduced using the diff below:
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index a93654cdf..8d1ddc909 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -1342,6 +1342,7 @@ impl ListingTable {
return Ok((vec![], Statistics::new_unknown(&self.file_schema)));
};
// list files (with partitions)
+ let start = datafusion_common::instant::Instant::now();
let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| {
pruned_partition_list(
ctx,
@@ -1353,6 +1354,10 @@ impl ListingTable {
)
}))
.await?;
+ println!(
+ "file_list duration: {}ms",
+ 1000. * start.elapsed().as_secs_f32()
+ );
let meta_fetch_concurrency =
ctx.config_options().execution.meta_fetch_concurrency;
let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
@@ -1370,8 +1375,17 @@ impl ListingTable {
.boxed()
.buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
+ let group_start = datafusion_common::instant::Instant::now();
let (file_group, inexact_stats) =
get_files_with_limit(files, limit, self.options.collect_stat).await?;
+ println!(
+ "file_group duration: {}ms",
+ 1000. * group_start.elapsed().as_secs_f32()
+ );
+ println!(
+ "Total duration: {}ms",
+ 1000. * start.elapsed().as_secs_f32()
+ );
let file_groups = file_group.split_files(self.options.target_partitions);
let (mut file_groups, mut stats) = compute_all_files_statistics(
(note that reproducing these results currently relies on the resolution to /pull/17050)
The discrepancies between the timing for these operations can be seen below. In spite of the fact the queries ultimately end up operating on the same exact subset of the files, the queries issued to the partitioned dataset continue to take longer to complete. In all cases shown here the cumulative duration required to build the file_groups
dominates the query times. Each query is issued twice to allow the default caching mechanisms in DataFusion to build cached data, and then operate on that cached data in the 2nd query.
DataFusion CLI v49.0.0
> set datafusion.execution.parquet.cache_metadata = true;
0 row(s) fetched.
Elapsed 0.000 seconds.
> CREATE EXTERNAL TABLE overture_maps
STORED AS PARQUET LOCATION 's3://overturemaps-us-west-2/release/2025-07-23.0/';
0 row(s) fetched.
Elapsed 7.724 seconds.
> select count(*) from overture_maps where type='address';
file_list duration: 317.549ms
file_group duration: 240.83353ms
Total duration: 558.39996ms
+-----------+
| count(*) |
+-----------+
| 446544475 |
+-----------+
1 row(s) fetched.
Elapsed 0.563 seconds.
> select count(*) from overture_maps where type='address';
file_list duration: 371.57742ms
file_group duration: 0.034617003ms
Total duration: 371.62094ms
+-----------+
| count(*) |
+-----------+
| 446544475 |
+-----------+
1 row(s) fetched.
Elapsed 0.376 seconds.
> CREATE EXTERNAL TABLE overture_maps_flat
STORED AS PARQUET LOCATION 's3://overturemaps-us-west-2/release/2025-07-23.0/theme=addresses/type=address/';
0 row(s) fetched.
Elapsed 1.478 seconds.
> select count(*) from overture_maps_flat;
file_list duration: 0.004915ms
file_group duration: 421.62354ms
Total duration: 421.64505ms
+-----------+
| count(*) |
+-----------+
| 446544475 |
+-----------+
1 row(s) fetched.
Elapsed 0.423 seconds.
> select count(*) from overture_maps_flat;
file_list duration: 0.004658ms
file_group duration: 42.788586ms
Total duration: 42.806194ms
+-----------+
| count(*) |
+-----------+
| 446544475 |
+-----------+
1 row(s) fetched.
Elapsed 0.044 seconds.
The basic layout of the underlying dataset can be seen here. Note that the subset of files accessed for the queries against the partitioned table will be exactly the same as the flat dataset.
$ aws s3 ls s3://overturemaps-us-west-2/release/2025-07-23.0/ --recursive
2025-07-22 14:42:50 1073537247 release/2025-07-23.0/theme=addresses/type=address/part-00000-57f746d8-98a1-4faa-94c2-4f084343ecac-c000.zstd.parquet
2025-07-22 14:43:06 1013189887 release/2025-07-23.0/theme=addresses/type=address/part-00001-57f746d8-98a1-4faa-94c2-4f084343ecac-c000.zstd.parquet
. . .
$ aws s3 ls s3://overturemaps-us-west-2/release/2025-07-23.0/theme=addresses/
PRE type=address/
The DefaultListFilesCache
can be forced into the mix using the following diff:
diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs
index 37f1baa17..1e3faad15 100644
--- a/datafusion/execution/src/cache/cache_manager.rs
+++ b/datafusion/execution/src/cache/cache_manager.rs
@@ -80,6 +80,9 @@ impl CacheManager {
}
if let Some(lc) = &config.list_files_cache {
manager.list_files_cache = Some(Arc::clone(lc))
+ } else {
+ manager.list_files_cache =
+ Some(Arc::new(super::cache_unit::DefaultListFilesCache::default()));
}
if let Some(mc) = &config.file_metadata_cache {
manager.file_metadata_cache = Some(Arc::clone(mc));
When the cache is inserted, the performance for the flat table shows significant improvements on repeat queries, however the partitioned table continues to suffer from latencies dominated by listing files.
DataFusion CLI v49.0.0
> set datafusion.execution.parquet.cache_metadata = true;
0 row(s) fetched.
Elapsed 0.000 seconds.
> CREATE EXTERNAL TABLE overture_maps
STORED AS PARQUET LOCATION 's3://overturemaps-us-west-2/release/2025-07-23.0/';
0 row(s) fetched.
Elapsed 9.056 seconds.
> select count(*) from overture_maps where type='address';
file_list duration: 412.9362ms
file_group duration: 272.7048ms
Total duration: 685.6636ms
+-----------+
| count(*) |
+-----------+
| 446544475 |
+-----------+
1 row(s) fetched.
Elapsed 0.690 seconds.
> select count(*) from overture_maps where type='address';
file_list duration: 136.47644ms
file_group duration: 0.034272ms
Total duration: 136.51955ms
+-----------+
| count(*) |
+-----------+
| 446544475 |
+-----------+
1 row(s) fetched.
Elapsed 0.141 seconds.
> CREATE EXTERNAL TABLE overture_maps_flat
STORED AS PARQUET LOCATION 's3://overturemaps-us-west-2/release/2025-07-23.0/theme=addresses/type=address/';
0 row(s) fetched.
Elapsed 0.727 seconds.
> select count(*) from overture_maps_flat;
file_list duration: 0.0063920002ms
file_group duration: 357.8303ms
Total duration: 357.86032ms
+-----------+
| count(*) |
+-----------+
| 446544475 |
+-----------+
1 row(s) fetched.
Elapsed 0.359 seconds.
> select count(*) from overture_maps_flat;
file_list duration: 0.0066520004ms
file_group duration: 0.030025ms
Total duration: 0.047092ms
+-----------+
| count(*) |
+-----------+
| 446544475 |
+-----------+
1 row(s) fetched.
Elapsed 0.001 seconds.
Describe the solution you'd like
Ultimately I'd like partitioned datasets to operate with similar performance to flat datasets, and have caching mechanisms available to both. Based on the structure of the existing code, I believe that the code path the ListingTable
executes when discovering files and collecting initial metadata should be normalized between partitioned datasets and flat datasets.
During a table scan, both table types first call list_files_for_scan
followed by pruned_partition_list
async fn list_files_for_scan<'a>( |
pub async fn pruned_partition_list<'a>( |
In pruned_partition_list
flat tables end up executing list_all_files
which is where the ListFilesCache
is currently leveraged:
datafusion/datafusion/catalog-listing/src/helpers.rs
Lines 417 to 432 in f3941b2
// if no partition col => simply list all the files | |
if partition_cols.is_empty() { | |
if !filters.is_empty() { | |
return internal_err!( | |
"Got partition filters for unpartitioned table {}", | |
table_path | |
); | |
} | |
return Ok(Box::pin( | |
table_path | |
.list_all_files(ctx, store, file_extension) | |
.await? | |
.try_filter(|object_meta| futures::future::ready(object_meta.size > 0)) | |
.map_ok(|object_meta| object_meta.into()), | |
)); | |
} |
Partitioned datasets, however, end up making calls that at least partially, and may fully rediscover, the partition and file structure by calling list_partitions
datafusion/datafusion/catalog-listing/src/helpers.rs
Lines 435 to 436 in f3941b2
let partitions = | |
list_partitions(store, table_path, partition_cols.len(), partition_prefix) |
Note that list_partitions
executes a breadth first search of the (potentially prefixed, depending on query filter evaluation) partition structure, and as such the per-query latency penalty it applies scales linearly with the depth of the partition tree.
pub async fn list_partitions( |
My Opinion
Executing a breadth first search on the full, or a prefixed subset of, the dataset is likely less performant than simply recursively listing the full, or a prefixed subset of, dataset. The partitions of any given object can be directly derived from the object's path, allowing pruning of objects to continue working as it works right now. Additionally, any caching for listing all of the objects can then apply to both partitioned and flat tables. I suspect there are table structures where the current approach is better, such as a table with many many objects and a shallow partition structure. However, most remote object storage solutions will return many hundreds to thousands of object paths in a single list request, so the tables would need to be quite large in terms of number of objects.
Describe alternatives you've considered
It would probably be possible to apply a ListFilesCache
prior to the divergence between the flat and partitioned tables. In this scenario I think care would need to be taken when loading objects into the cache, because partitioned datasets don't necessarily return all the potential objects due to pruning. This would also mean that repeat queries to partitioned datasets could continue to have a high percentage of cache misses depending on user supplied filters.
Additional context
Existing issues mentioned:
- [Cache] Support cache ListFiles result cache in session level #7618
- Partitioned object store lists all files on every query when using hive-partitioned parquet files #9654
Since this relates to the ListingTable
and remote dataset: