Skip to content

Commit 75eb80c

Browse files
alambnuno-fariajonathanc-n
authored
[Parquet Metadata Cache] Document the ListingTable cache (apache#17133)
* [Parquet Metadata Cache] Document the ListingTable cache * Apply suggestions from code review Co-authored-by: Nuno Faria <nunofpfaria@gmail.com> Co-authored-by: Jonathan Chen <chenleejonathan@gmail.com> --------- Co-authored-by: Nuno Faria <nunofpfaria@gmail.com> Co-authored-by: Jonathan Chen <chenleejonathan@gmail.com>
1 parent b250848 commit 75eb80c

File tree

4 files changed

+129
-33
lines changed

4 files changed

+129
-33
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -819,13 +819,26 @@ impl ListingOptions {
819819
}
820820
}
821821

822-
/// Reads data from one or more files as a single table.
822+
/// Built in [`TableProvider`] that reads data from one or more files as a single table.
823823
///
824-
/// Implements [`TableProvider`], a DataFusion data source. The files are read
825-
/// using an [`ObjectStore`] instance, for example from local files or objects
826-
/// from AWS S3.
824+
/// The files are read using an [`ObjectStore`] instance, for example from
825+
/// local files or objects from AWS S3.
826+
///
827+
/// # Features:
828+
/// * Reading multiple files as a single table
829+
/// * Hive style partitioning (e.g., directories named `date=2024-06-01`)
830+
/// * Merges schemas from files with compatible but not identical schemas (see [`ListingTableConfig::file_schema`])
831+
/// * `limit`, `filter` and `projection` pushdown for formats that support it (e.g.,
832+
/// Parquet)
833+
/// * Statistics collection and pruning based on file metadata
834+
/// * Pre-existing sort order (see [`ListingOptions::file_sort_order`])
835+
/// * Metadata caching to speed up repeated queries (see [`FileMetadataCache`])
836+
/// * Statistics caching (see [`FileStatisticsCache`])
837+
///
838+
/// [`FileMetadataCache`]: datafusion_execution::cache::cache_manager::FileMetadataCache
839+
///
840+
/// # Reading Directories and Hive Style Partitioning
827841
///
828-
/// # Reading Directories
829842
/// For example, given the `table1` directory (or object store prefix)
830843
///
831844
/// ```text
@@ -861,16 +874,23 @@ impl ListingOptions {
861874
/// If the query has a predicate like `WHERE date = '2024-06-01'`
862875
/// only the corresponding directory will be read.
863876
///
864-
/// `ListingTable` also supports limit, filter and projection pushdown for formats that
865-
/// support it as such as Parquet.
866-
///
867877
/// # See Also
868878
///
869879
/// 1. [`ListingTableConfig`]: Configuration options
870880
/// 1. [`DataSourceExec`]: `ExecutionPlan` used by `ListingTable`
871881
///
872882
/// [`DataSourceExec`]: crate::datasource::source::DataSourceExec
873883
///
884+
/// # Caching Metadata
885+
///
886+
/// Some formats, such as Parquet, use the `FileMetadataCache` to cache file
887+
/// metadata that is needed to execute but expensive to read, such as row
888+
/// groups and statistics. The cache is scoped to the [`SessionContext`] and can
889+
/// be configured via the [runtime config options].
890+
///
891+
/// [`SessionContext`]: crate::prelude::SessionContext
892+
/// [runtime config options]: https://datafusion.apache.org/user-guide/configs.html#runtime-configuration-settings
893+
///
874894
/// # Example: Read a directory of parquet files using a [`ListingTable`]
875895
///
876896
/// ```no_run
@@ -931,10 +951,16 @@ pub struct ListingTable {
931951
table_schema: SchemaRef,
932952
/// Indicates how the schema was derived (inferred or explicitly specified)
933953
schema_source: SchemaSource,
954+
/// Options used to configure the listing table such as the file format
955+
/// and partitioning information
934956
options: ListingOptions,
957+
/// The SQL definition for this table, if any
935958
definition: Option<String>,
959+
/// Cache for collected file statistics
936960
collected_statistics: FileStatisticsCache,
961+
/// Constraints applied to this table
937962
constraints: Constraints,
963+
/// Column default expressions for columns that are not physically present in the data files
938964
column_defaults: HashMap<String, Expr>,
939965
/// Optional [`SchemaAdapterFactory`] for creating schema adapters
940966
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,

datafusion/core/src/lib.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -313,17 +313,17 @@
313313
//! ```
314314
//!
315315
//! A [`TableProvider`] provides information for planning and
316-
//! an [`ExecutionPlan`] for execution. DataFusion includes [`ListingTable`],
317-
//! a [`TableProvider`] which reads individual files or directories of files
318-
//! ("partitioned datasets") of the same file format. Users can add
319-
//! support for new file formats by implementing the [`TableProvider`]
320-
//! trait.
316+
//! an [`ExecutionPlan`] for execution. DataFusion includes two built-in
317+
//! table providers that support common file formats and require no runtime services,
318+
//! [`ListingTable`] and [`MemTable`]. You can add support for any other data
319+
//! source and/or file formats by implementing the [`TableProvider`] trait.
321320
//!
322321
//! See also:
323322
//!
324323
//! 1. [`ListingTable`]: Reads data from one or more Parquet, JSON, CSV, or AVRO
325-
//! files supporting HIVE style partitioning, optional compression, directly
326-
//! reading from remote object store and more.
324+
//! files in one or more local or remote directories. Supports HIVE style
325+
//! partitioning, optional compression, directly reading from remote
326+
//! object store, file metadata caching, and more.
327327
//!
328328
//! 2. [`MemTable`]: Reads data from in memory [`RecordBatch`]es.
329329
//!

datafusion/execution/src/cache/cache_manager.rs

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,34 @@ use std::collections::HashMap;
2525
use std::fmt::{Debug, Formatter};
2626
use std::sync::Arc;
2727

28-
/// The cache of listing files statistics.
29-
/// if set [`CacheManagerConfig::with_files_statistics_cache`]
30-
/// Will avoid infer same file statistics repeatedly during the session lifetime,
31-
/// this cache will store in [`crate::runtime_env::RuntimeEnv`].
28+
/// A cache for [`Statistics`].
29+
///
30+
/// If enabled via [`CacheManagerConfig::with_files_statistics_cache`] this
31+
/// cache avoids inferring the same file statistics repeatedly during the
32+
/// session lifetime.
33+
///
34+
/// See [`crate::runtime_env::RuntimeEnv`] for more details
3235
pub type FileStatisticsCache =
3336
Arc<dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta>>;
3437

38+
/// Cache for storing the [`ObjectMeta`]s that result from listing a path
39+
///
40+
/// Listing a path means doing an object store "list" operation or `ls`
41+
/// command on the local filesystem. This operation can be expensive,
42+
/// especially when done over remote object stores.
43+
///
44+
/// See [`crate::runtime_env::RuntimeEnv`] for more details
3545
pub type ListFilesCache =
3646
Arc<dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>>;
3747

38-
/// Represents generic file-embedded metadata.
48+
/// Generic file-embedded metadata used with [`FileMetadataCache`].
49+
///
50+
/// For example, Parquet footers and page metadata can be represented
51+
/// using this trait.
52+
///
53+
/// See [`crate::runtime_env::RuntimeEnv`] for more details
3954
pub trait FileMetadata: Any + Send + Sync {
40-
/// Returns the file metadata as [`Any`] so that it can be downcasted to a specific
55+
/// Returns the file metadata as [`Any`] so that it can be downcast to a specific
4156
/// implementation.
4257
fn as_any(&self) -> &dyn Any;
4358

@@ -48,7 +63,20 @@ pub trait FileMetadata: Any + Send + Sync {
4863
fn extra_info(&self) -> HashMap<String, String>;
4964
}
5065

51-
/// Cache to store file-embedded metadata.
66+
/// Cache for file-embedded metadata.
67+
///
68+
/// This cache stores per-file metadata in the form of [`FileMetadata`],
69+
///
70+
/// For example, the built in [`ListingTable`] uses this cache to avoid parsing
71+
/// Parquet footers multiple times for the same file.
72+
///
73+
/// DataFusion provides a default implementation, [`DefaultFilesMetadataCache`],
74+
/// and users can also provide their own implementations to implement custom
75+
/// caching strategies.
76+
///
77+
/// See [`crate::runtime_env::RuntimeEnv`] for more details.
78+
///
79+
/// [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
5280
pub trait FileMetadataCache:
5381
CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>, Extra = ObjectMeta>
5482
{
@@ -93,6 +121,13 @@ impl Debug for dyn FileMetadataCache {
93121
}
94122
}
95123

124+
/// Manages various caches used in DataFusion.
125+
///
126+
/// Following DataFusion design principles, DataFusion provides default cache
127+
/// implementations, while also allowing users to provide their own custom cache
128+
/// implementations by implementing the relevant traits.
129+
///
130+
/// See [`CacheManagerConfig`] for configuration options.
96131
#[derive(Debug)]
97132
pub struct CacheManager {
98133
file_statistic_cache: Option<FileStatisticsCache>,
@@ -130,7 +165,7 @@ impl CacheManager {
130165
self.file_statistic_cache.clone()
131166
}
132167

133-
/// Get the cache of objectMeta under same path.
168+
/// Get the cache for storing the result of listing [`ObjectMeta`]s under the same path.
134169
pub fn get_list_files_cache(&self) -> Option<ListFilesCache> {
135170
self.list_files_cache.clone()
136171
}
@@ -181,6 +216,9 @@ impl Default for CacheManagerConfig {
181216
}
182217

183218
impl CacheManagerConfig {
219+
/// Set the cache for files statistics.
220+
///
221+
/// Default is `None` (disabled).
184222
pub fn with_files_statistics_cache(
185223
mut self,
186224
cache: Option<FileStatisticsCache>,
@@ -189,11 +227,17 @@ impl CacheManagerConfig {
189227
self
190228
}
191229

230+
/// Set the cache for listing files.
231+
///
232+
/// Default is `None` (disabled).
192233
pub fn with_list_files_cache(mut self, cache: Option<ListFilesCache>) -> Self {
193234
self.list_files_cache = cache;
194235
self
195236
}
196237

238+
/// Sets the cache for file-embedded metadata.
239+
///
240+
/// Default is a [`DefaultFilesMetadataCache`].
197241
pub fn with_file_metadata_cache(
198242
mut self,
199243
cache: Option<Arc<dyn FileMetadataCache>>,
@@ -202,6 +246,7 @@ impl CacheManagerConfig {
202246
self
203247
}
204248

249+
/// Sets the limit of the file-embedded metadata cache, in bytes.
205250
pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self {
206251
self.metadata_cache_limit = limit;
207252
self

datafusion/execution/src/cache/cache_unit.rs

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,13 @@ use dashmap::DashMap;
3030
use object_store::path::Path;
3131
use object_store::ObjectMeta;
3232

33-
/// Collected statistics for files
33+
/// Default implementation of [`FileStatisticsCache`]
34+
///
35+
/// Stores collected statistics for files
36+
///
3437
/// Cache is invalided when file size or last modification has changed
38+
///
39+
/// [`FileStatisticsCache`]: crate::cache::cache_manager::FileStatisticsCache
3540
#[derive(Default)]
3641
pub struct DefaultFileStatisticsCache {
3742
statistics: DashMap<Path, (ObjectMeta, Arc<Statistics>)>,
@@ -102,8 +107,13 @@ impl CacheAccessor<Path, Arc<Statistics>> for DefaultFileStatisticsCache {
102107
}
103108
}
104109

110+
/// Default implementation of [`ListFilesCache`]
111+
///
105112
/// Collected files metadata for listing files.
106-
/// Cache will not invalided until user call remove or clear.
113+
///
114+
/// Cache is not invalided until user calls [`Self::remove`] or [`Self::clear`].
115+
///
116+
/// [`ListFilesCache`]: crate::cache::cache_manager::ListFilesCache
107117
#[derive(Default)]
108118
pub struct DefaultListFilesCache {
109119
statistics: DashMap<Path, Arc<Vec<ObjectMeta>>>,
@@ -280,21 +290,36 @@ impl DefaultFilesMetadataCacheState {
280290
}
281291
}
282292

293+
/// Default implementation of [`FileMetadataCache`]
294+
///
283295
/// Collected file embedded metadata cache.
284-
/// The metadata for some file is invalided when the file size or last modification time have been
285-
/// changed.
286-
/// The `memory_limit` passed in the constructor controls the maximum size of the cache, which uses
287-
/// a Least Recently Used eviction algorithm.
288-
/// Users should use the `get` and `put` methods. The `get_with_extra` and `put_with_extra` methods
289-
/// simply call `get` and `put`, respectively.
296+
///
297+
/// The metadata for each file is invalidated when the file size or last
298+
/// modification time have been changed.
299+
///
300+
/// # Internal details
301+
///
302+
/// The `memory_limit` controls the maximum size of the cache, which uses a
303+
/// Least Recently Used eviction algorithm. When adding a new entry, if the total
304+
/// size of the cached entries exceeds `memory_limit`, the least recently used entries
305+
/// are evicted until the total size is lower than `memory_limit`.
306+
///
307+
/// # `Extra` Handling
308+
///
309+
/// Users should use the [`Self::get`] and [`Self::put`] methods. The
310+
/// [`Self::get_with_extra`] and [`Self::put_with_extra`] methods simply call
311+
/// `get` and `put`, respectively.
290312
pub struct DefaultFilesMetadataCache {
291313
// the state is wrapped in a Mutex to ensure the operations are atomic
292314
state: Mutex<DefaultFilesMetadataCacheState>,
293315
}
294316

295317
impl DefaultFilesMetadataCache {
296-
/// The `memory_limit` parameter controls the maximum size of the cache, in bytes, using a Least
297-
/// Recently Used eviction algorithm.
318+
/// Create a new instance of [`DefaultFilesMetadataCache`].
319+
///
320+
/// # Arguments
321+
/// `memory_limit`: the maximum size of the cache, in bytes
322+
//
298323
pub fn new(memory_limit: usize) -> Self {
299324
Self {
300325
state: Mutex::new(DefaultFilesMetadataCacheState::new(memory_limit)),

0 commit comments

Comments
 (0)