diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs index d23b12469e385..002869529eb1e 100644 --- a/datafusion-cli/src/functions.rs +++ b/datafusion-cli/src/functions.rs @@ -581,3 +581,119 @@ impl TableFunctionImpl for MetadataCacheFunc { Ok(Arc::new(metadata_cache)) } } + +/// STATISTICS_CACHE table function +#[derive(Debug)] +struct StatisticsCacheTable { + schema: SchemaRef, + batch: RecordBatch, +} + +#[async_trait] +impl TableProvider for StatisticsCacheTable { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> arrow::datatypes::SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> datafusion::logical_expr::TableType { + datafusion::logical_expr::TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + Ok(MemorySourceConfig::try_new_exec( + &[vec![self.batch.clone()]], + TableProvider::schema(self), + projection.cloned(), + )?) + } +} + +#[derive(Debug)] +pub struct StatisticsCacheFunc { + cache_manager: Arc, +} + +impl StatisticsCacheFunc { + pub fn new(cache_manager: Arc) -> Self { + Self { cache_manager } + } +} + +impl TableFunctionImpl for StatisticsCacheFunc { + fn call(&self, exprs: &[Expr]) -> Result> { + if !exprs.is_empty() { + return plan_err!("statistics_cache should have no arguments"); + } + + let schema = Arc::new(Schema::new(vec![ + Field::new("path", DataType::Utf8, false), + Field::new( + "file_modified", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("file_size_bytes", DataType::UInt64, false), + Field::new("e_tag", DataType::Utf8, true), + Field::new("version", DataType::Utf8, true), + Field::new("num_rows", DataType::Utf8, false), + Field::new("num_columns", DataType::UInt64, false), + Field::new("table_size_bytes", DataType::Utf8, false), + Field::new("statistics_size_bytes", DataType::UInt64, false), + ])); + + // construct record batch from metadata + let mut path_arr = vec![]; + let mut file_modified_arr = vec![]; + let mut file_size_bytes_arr = vec![]; + let mut e_tag_arr = vec![]; + let mut version_arr = vec![]; + let mut num_rows_arr = vec![]; + let mut num_columns_arr = vec![]; + let mut table_size_bytes_arr = vec![]; + let mut statistics_size_bytes_arr = vec![]; + + if let Some(file_statistics_cache) = self.cache_manager.get_file_statistic_cache() + { + for (path, entry) in file_statistics_cache.list_entries() { + path_arr.push(path.to_string()); + file_modified_arr + .push(Some(entry.object_meta.last_modified.timestamp_millis())); + file_size_bytes_arr.push(entry.object_meta.size); + e_tag_arr.push(entry.object_meta.e_tag); + version_arr.push(entry.object_meta.version); + num_rows_arr.push(entry.num_rows.to_string()); + num_columns_arr.push(entry.num_columns as u64); + table_size_bytes_arr.push(entry.table_size_bytes.to_string()); + statistics_size_bytes_arr.push(entry.statistics_size_bytes as u64); + } + } + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(path_arr)), + Arc::new(TimestampMillisecondArray::from(file_modified_arr)), + Arc::new(UInt64Array::from(file_size_bytes_arr)), + Arc::new(StringArray::from(e_tag_arr)), + Arc::new(StringArray::from(version_arr)), + Arc::new(StringArray::from(num_rows_arr)), + Arc::new(UInt64Array::from(num_columns_arr)), + Arc::new(StringArray::from(table_size_bytes_arr)), + Arc::new(UInt64Array::from(statistics_size_bytes_arr)), + ], + )?; + + let statistics_cache = StatisticsCacheTable { schema, batch }; + Ok(Arc::new(statistics_cache)) + } +} diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index de666fced7e65..daf4871294239 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -31,7 +31,9 @@ use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::logical_expr::ExplainFormat; use datafusion::prelude::SessionContext; use datafusion_cli::catalog::DynamicObjectStoreCatalog; -use datafusion_cli::functions::{MetadataCacheFunc, ParquetMetadataFunc}; +use datafusion_cli::functions::{ + MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc, +}; use datafusion_cli::object_storage::instrumented::{ InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, }; @@ -244,6 +246,14 @@ async fn main_inner() -> Result<()> { )), ); + // register `statistics_cache` table function to get the contents of the file statistics cache + ctx.register_udtf( + "statistics_cache", + Arc::new(StatisticsCacheFunc::new( + ctx.task_ctx().runtime_env().cache_manager.clone(), + )), + ); + let mut print_options = PrintOptions { format: args.format, quiet: args.quiet, @@ -423,7 +433,13 @@ pub fn extract_disk_limit(size: &str) -> Result { #[cfg(test)] mod tests { use super::*; - use datafusion::{common::test_util::batches_to_string, prelude::ParquetReadOptions}; + use datafusion::{ + common::test_util::batches_to_string, + execution::cache::{ + cache_manager::CacheManagerConfig, cache_unit::DefaultFileStatisticsCache, + }, + prelude::ParquetReadOptions, + }; use insta::assert_snapshot; fn assert_conversion(input: &str, expected: Result) { @@ -631,4 +647,102 @@ mod tests { Ok(()) } + + /// Shows that the statistics cache is not enabled by default yet + /// See https://github.com/apache/datafusion/issues/19217 + #[tokio::test] + async fn test_statistics_cache_default() -> Result<(), DataFusionError> { + let ctx = SessionContext::new(); + + ctx.register_udtf( + "statistics_cache", + Arc::new(StatisticsCacheFunc::new( + ctx.task_ctx().runtime_env().cache_manager.clone(), + )), + ); + + for filename in [ + "alltypes_plain", + "alltypes_tiny_pages", + "lz4_raw_compressed_larger", + ] { + ctx.sql( + format!( + "create external table {filename} + stored as parquet + location '../parquet-testing/data/{filename}.parquet'", + ) + .as_str(), + ) + .await? + .collect() + .await?; + } + + // When the cache manager creates a StatisticsCache by default, + // the contents will show up here + let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename"; + let df = ctx.sql(sql).await?; + let rbs = df.collect().await?; + assert_snapshot!(batches_to_string(&rbs),@r" + ++ + ++ + "); + + Ok(()) + } + + // Can be removed when https://github.com/apache/datafusion/issues/19217 is resolved + #[tokio::test] + async fn test_statistics_cache_override() -> Result<(), DataFusionError> { + // Install a specific StatisticsCache implementation + let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default()); + let cache_config = CacheManagerConfig::default() + .with_files_statistics_cache(Some(file_statistics_cache.clone())); + let runtime = RuntimeEnvBuilder::new() + .with_cache_manager(cache_config) + .build()?; + let config = SessionConfig::new().with_collect_statistics(true); + let ctx = SessionContext::new_with_config_rt(config, Arc::new(runtime)); + + ctx.register_udtf( + "statistics_cache", + Arc::new(StatisticsCacheFunc::new( + ctx.task_ctx().runtime_env().cache_manager.clone(), + )), + ); + + for filename in [ + "alltypes_plain", + "alltypes_tiny_pages", + "lz4_raw_compressed_larger", + ] { + ctx.sql( + format!( + "create external table {filename} + stored as parquet + location '../parquet-testing/data/{filename}.parquet'", + ) + .as_str(), + ) + .await? + .collect() + .await?; + } + + let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename"; + let df = ctx.sql(sql).await?; + let rbs = df.collect().await?; + assert_snapshot!(batches_to_string(&rbs),@r" + +-----------------------------------+-----------------+--------------+-------------+------------------+ + | filename | file_size_bytes | num_rows | num_columns | table_size_bytes | + +-----------------------------------+-----------------+--------------+-------------+------------------+ + | alltypes_plain.parquet | 1851 | Exact(8) | 11 | Absent | + | alltypes_tiny_pages.parquet | 454233 | Exact(7300) | 13 | Absent | + | lz4_raw_compressed_larger.parquet | 380836 | Exact(10000) | 1 | Absent | + +-----------------------------------+-----------------+--------------+-------------+------------------+ + "); + + Ok(()) + } } diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index c25e9ff141bbd..1cdd97c89db15 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -178,7 +178,7 @@ pub struct ListingTable { /// The SQL definition for this table, if any definition: Option, /// Cache for collected file statistics - collected_statistics: FileStatisticsCache, + collected_statistics: Arc, /// Constraints applied to this table constraints: Constraints, /// Column default expressions for columns that are not physically present in the data files @@ -255,7 +255,7 @@ impl ListingTable { /// multiple times in the same session. /// /// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query. - pub fn with_cache(mut self, cache: Option) -> Self { + pub fn with_cache(mut self, cache: Option>) -> Self { self.collected_statistics = cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default())); self diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index ab3f2ea4bfe03..ad92e06e7c3da 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -16,7 +16,8 @@ // under the License. use crate::cache::CacheAccessor; -use crate::cache::DefaultFilesMetadataCache; +use crate::cache::cache_unit::DefaultFilesMetadataCache; +use datafusion_common::stats::Precision; use datafusion_common::{Result, Statistics}; use object_store::ObjectMeta; use object_store::path::Path; @@ -35,8 +36,27 @@ use super::list_files_cache::DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT; /// session lifetime. /// /// See [`crate::runtime_env::RuntimeEnv`] for more details -pub type FileStatisticsCache = - Arc, Extra = ObjectMeta>>; +pub trait FileStatisticsCache: + CacheAccessor, Extra = ObjectMeta> +{ + /// Retrieves the information about the entries currently cached. + fn list_entries(&self) -> HashMap; +} + +/// Represents information about a cached statistics entry. +/// This is used to expose the statistics cache contents to outside modules. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FileStatisticsCacheEntry { + pub object_meta: ObjectMeta, + /// Number of table rows. + pub num_rows: Precision, + /// Number of table columns. + pub num_columns: usize, + /// Total table size, in bytes. + pub table_size_bytes: Precision, + /// Size of the statistics entry, in bytes. + pub statistics_size_bytes: usize, +} /// Cache for storing the [`ObjectMeta`]s that result from listing a path /// @@ -116,7 +136,7 @@ pub struct FileMetadataCacheEntry { pub extra: HashMap, } -impl Debug for dyn CacheAccessor, Extra = ObjectMeta> { +impl Debug for dyn FileStatisticsCache { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "Cache name: {} with length: {}", self.name(), self.len()) } @@ -143,7 +163,7 @@ impl Debug for dyn FileMetadataCache { /// See [`CacheManagerConfig`] for configuration options. #[derive(Debug)] pub struct CacheManager { - file_statistic_cache: Option, + file_statistic_cache: Option>, list_files_cache: Option>, file_metadata_cache: Arc, } @@ -174,7 +194,7 @@ impl CacheManager { } /// Get the cache of listing files statistics. - pub fn get_file_statistic_cache(&self) -> Option { + pub fn get_file_statistic_cache(&self) -> Option> { self.file_statistic_cache.clone() } @@ -213,7 +233,7 @@ pub struct CacheManagerConfig { /// Enable caching of file statistics when listing files. /// Enabling the cache avoids repeatedly reading file statistics in a DataFusion session. /// Default is disabled. Currently only Parquet files are supported. - pub table_files_statistics_cache: Option, + pub table_files_statistics_cache: Option>, /// Enable caching of file metadata when listing files. /// Enabling the cache avoids repeat list and object metadata fetch operations, which may be /// expensive in certain situations (e.g. remote object storage), for objects under paths that @@ -255,7 +275,7 @@ impl CacheManagerConfig { /// Default is `None` (disabled). pub fn with_files_statistics_cache( mut self, - cache: Option, + cache: Option>, ) -> Self { self.table_files_statistics_cache = cache; self diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs index 5662b3766f230..5351df449a7c1 100644 --- a/datafusion/execution/src/cache/cache_unit.rs +++ b/datafusion/execution/src/cache/cache_unit.rs @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::sync::Arc; use crate::cache::CacheAccessor; +use crate::cache::cache_manager::{FileStatisticsCache, FileStatisticsCacheEntry}; use datafusion_common::Statistics; @@ -39,6 +41,29 @@ pub struct DefaultFileStatisticsCache { statistics: DashMap)>, } +impl FileStatisticsCache for DefaultFileStatisticsCache { + fn list_entries(&self) -> HashMap { + let mut entries = HashMap::::new(); + + for entry in &self.statistics { + let path = entry.key(); + let (object_meta, stats) = entry.value(); + entries.insert( + path.clone(), + FileStatisticsCacheEntry { + object_meta: object_meta.clone(), + num_rows: stats.num_rows, + num_columns: stats.column_statistics.len(), + table_size_bytes: stats.total_byte_size, + statistics_size_bytes: 0, // TODO: set to the real size in the future + }, + ); + } + + entries + } +} + impl CacheAccessor> for DefaultFileStatisticsCache { type Extra = ObjectMeta; @@ -106,11 +131,14 @@ impl CacheAccessor> for DefaultFileStatisticsCache { #[cfg(test)] mod tests { + use super::*; use crate::cache::CacheAccessor; + use crate::cache::cache_manager::{FileStatisticsCache, FileStatisticsCacheEntry}; use crate::cache::cache_unit::DefaultFileStatisticsCache; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use chrono::DateTime; use datafusion_common::Statistics; + use datafusion_common::stats::Precision; use object_store::ObjectMeta; use object_store::path::Path; @@ -153,8 +181,24 @@ mod tests { assert!(cache.get_with_extra(&meta2.location, &meta2).is_none()); // different file - let mut meta2 = meta; + let mut meta2 = meta.clone(); meta2.location = Path::from("test2"); assert!(cache.get_with_extra(&meta2.location, &meta2).is_none()); + + // test the list_entries method + let entries = cache.list_entries(); + assert_eq!( + entries, + HashMap::from([( + Path::from("test"), + FileStatisticsCacheEntry { + object_meta: meta.clone(), + num_rows: Precision::Absent, + num_columns: 1, + table_size_bytes: Precision::Absent, + statistics_size_bytes: 0, + } + )]) + ); } } diff --git a/docs/source/user-guide/cli/functions.md b/docs/source/user-guide/cli/functions.md index 305b53c16f65e..f3b0163534c41 100644 --- a/docs/source/user-guide/cli/functions.md +++ b/docs/source/user-guide/cli/functions.md @@ -138,5 +138,37 @@ The columns of the returned table are: | hits | UInt64 | Number of times the cached metadata has been accessed | | extra | Utf8 | Extra information about the cached metadata (e.g., if page index information is included) | +## `statistics_cache` + +Similarly to the `metadata_cache`, the `statistics_cache` function can be used to show information +about the File Statistics Cache that is used by the [`ListingTable`] implementation in DataFusion. +For the statistics to be collected, the config `datafusion.execution.collect_statistics` must be +enabled. + +You can inspect the statistics cache by querying the `statistics_cache` function. For example: + +```sql +> select * from statistics_cache(); ++------------------+---------------------+-----------------+------------------------+---------+-----------------+-------------+--------------------+-----------------------+ +| path | file_modified | file_size_bytes | e_tag | version | num_rows | num_columns | table_size_bytes | statistics_size_bytes | ++------------------+---------------------+-----------------+------------------------+---------+-----------------+-------------+--------------------+-----------------------+ +| .../hits.parquet | 2022-06-25T22:22:22 | 14779976446 | 0-5e24d1ee16380-370f48 | NULL | Exact(99997497) | 105 | Exact(36445943240) | 0 | ++------------------+---------------------+-----------------+------------------------+---------+-----------------+-------------+--------------------+-----------------------+ +``` + +The columns of the returned table are: + +| column_name | data_type | Description | +| --------------------- | --------- | ---------------------------------------------------------------------------- | +| path | Utf8 | File path relative to the object store / filesystem root | +| file_modified | Timestamp | Last modified time of the file | +| file_size_bytes | UInt64 | Size of the file in bytes | +| e_tag | Utf8 | [Entity Tag] (ETag) of the file if available | +| version | Utf8 | Version of the file if available (for object stores that support versioning) | +| num_rows | Utf8 | Number of rows in the table | +| num_columns | UInt64 | Number of columns in the table | +| table_size_bytes | Utf8 | Size of the table, in bytes | +| statistics_size_bytes | UInt64 | Size of the cached statistics in memory | + [`listingtable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html [entity tag]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag