Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,3 +581,119 @@ impl TableFunctionImpl for MetadataCacheFunc {
Ok(Arc::new(metadata_cache))
}
}

/// STATISTICS_CACHE table function
#[derive(Debug)]
struct StatisticsCacheTable {
schema: SchemaRef,
batch: RecordBatch,
}

#[async_trait]
impl TableProvider for StatisticsCacheTable {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn schema(&self) -> arrow::datatypes::SchemaRef {
self.schema.clone()
}

fn table_type(&self) -> datafusion::logical_expr::TableType {
datafusion::logical_expr::TableType::Base
}

async fn scan(
&self,
_state: &dyn Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(MemorySourceConfig::try_new_exec(
&[vec![self.batch.clone()]],
TableProvider::schema(self),
projection.cloned(),
)?)
}
}

#[derive(Debug)]
pub struct StatisticsCacheFunc {
cache_manager: Arc<CacheManager>,
}

impl StatisticsCacheFunc {
pub fn new(cache_manager: Arc<CacheManager>) -> Self {
Self { cache_manager }
}
}

impl TableFunctionImpl for StatisticsCacheFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
if !exprs.is_empty() {
return plan_err!("statistics_cache should have no arguments");
}

let schema = Arc::new(Schema::new(vec![
Field::new("path", DataType::Utf8, false),
Field::new(
"file_modified",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("file_size_bytes", DataType::UInt64, false),
Field::new("e_tag", DataType::Utf8, true),
Field::new("version", DataType::Utf8, true),
Field::new("num_rows", DataType::Utf8, false),
Field::new("num_columns", DataType::UInt64, false),
Field::new("table_size_bytes", DataType::Utf8, false),
Field::new("statistics_size_bytes", DataType::UInt64, false),
]));

// construct record batch from metadata
let mut path_arr = vec![];
let mut file_modified_arr = vec![];
let mut file_size_bytes_arr = vec![];
let mut e_tag_arr = vec![];
let mut version_arr = vec![];
let mut num_rows_arr = vec![];
let mut num_columns_arr = vec![];
let mut table_size_bytes_arr = vec![];
let mut statistics_size_bytes_arr = vec![];

if let Some(file_statistics_cache) = self.cache_manager.get_file_statistic_cache()
{
for (path, entry) in file_statistics_cache.list_entries() {
path_arr.push(path.to_string());
file_modified_arr
.push(Some(entry.object_meta.last_modified.timestamp_millis()));
file_size_bytes_arr.push(entry.object_meta.size);
e_tag_arr.push(entry.object_meta.e_tag);
version_arr.push(entry.object_meta.version);
num_rows_arr.push(entry.num_rows.to_string());
num_columns_arr.push(entry.num_columns as u64);
table_size_bytes_arr.push(entry.table_size_bytes.to_string());
statistics_size_bytes_arr.push(entry.statistics_size_bytes as u64);
}
}

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(path_arr)),
Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
Arc::new(UInt64Array::from(file_size_bytes_arr)),
Arc::new(StringArray::from(e_tag_arr)),
Arc::new(StringArray::from(version_arr)),
Arc::new(StringArray::from(num_rows_arr)),
Arc::new(UInt64Array::from(num_columns_arr)),
Arc::new(StringArray::from(table_size_bytes_arr)),
Arc::new(UInt64Array::from(statistics_size_bytes_arr)),
],
)?;

let statistics_cache = StatisticsCacheTable { schema, batch };
Ok(Arc::new(statistics_cache))
}
}
118 changes: 116 additions & 2 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::logical_expr::ExplainFormat;
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
use datafusion_cli::functions::{MetadataCacheFunc, ParquetMetadataFunc};
use datafusion_cli::functions::{
MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc,
};
use datafusion_cli::object_storage::instrumented::{
InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
};
Expand Down Expand Up @@ -244,6 +246,14 @@ async fn main_inner() -> Result<()> {
)),
);

// register `statistics_cache` table function to get the contents of the file statistics cache
ctx.register_udtf(
"statistics_cache",
Arc::new(StatisticsCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
);

let mut print_options = PrintOptions {
format: args.format,
quiet: args.quiet,
Expand Down Expand Up @@ -423,7 +433,13 @@ pub fn extract_disk_limit(size: &str) -> Result<usize, String> {
#[cfg(test)]
mod tests {
use super::*;
use datafusion::{common::test_util::batches_to_string, prelude::ParquetReadOptions};
use datafusion::{
common::test_util::batches_to_string,
execution::cache::{
cache_manager::CacheManagerConfig, cache_unit::DefaultFileStatisticsCache,
},
prelude::ParquetReadOptions,
};
use insta::assert_snapshot;

fn assert_conversion(input: &str, expected: Result<usize, String>) {
Expand Down Expand Up @@ -631,4 +647,102 @@ mod tests {

Ok(())
}

/// Shows that the statistics cache is not enabled by default yet
/// See https://github.com/apache/datafusion/issues/19217
#[tokio::test]
async fn test_statistics_cache_default() -> Result<(), DataFusionError> {
let ctx = SessionContext::new();

ctx.register_udtf(
"statistics_cache",
Arc::new(StatisticsCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
);

for filename in [
"alltypes_plain",
"alltypes_tiny_pages",
"lz4_raw_compressed_larger",
] {
ctx.sql(
format!(
"create external table {filename}
stored as parquet
location '../parquet-testing/data/{filename}.parquet'",
)
.as_str(),
)
.await?
.collect()
.await?;
}

// When the cache manager creates a StatisticsCache by default,
// the contents will show up here
let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;
assert_snapshot!(batches_to_string(&rbs),@r"
++
++
");

Ok(())
}

// Can be removed when https://github.com/apache/datafusion/issues/19217 is resolved
#[tokio::test]
async fn test_statistics_cache_override() -> Result<(), DataFusionError> {
// Install a specific StatisticsCache implementation
let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not be necessary after #19217

let cache_config = CacheManagerConfig::default()
.with_files_statistics_cache(Some(file_statistics_cache.clone()));
let runtime = RuntimeEnvBuilder::new()
.with_cache_manager(cache_config)
.build()?;
let config = SessionConfig::new().with_collect_statistics(true);
let ctx = SessionContext::new_with_config_rt(config, Arc::new(runtime));

ctx.register_udtf(
"statistics_cache",
Arc::new(StatisticsCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
);

for filename in [
"alltypes_plain",
"alltypes_tiny_pages",
"lz4_raw_compressed_larger",
] {
ctx.sql(
format!(
"create external table {filename}
stored as parquet
location '../parquet-testing/data/{filename}.parquet'",
)
.as_str(),
)
.await?
.collect()
.await?;
}

let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;
assert_snapshot!(batches_to_string(&rbs),@r"
+-----------------------------------+-----------------+--------------+-------------+------------------+
| filename | file_size_bytes | num_rows | num_columns | table_size_bytes |
+-----------------------------------+-----------------+--------------+-------------+------------------+
| alltypes_plain.parquet | 1851 | Exact(8) | 11 | Absent |
| alltypes_tiny_pages.parquet | 454233 | Exact(7300) | 13 | Absent |
| lz4_raw_compressed_larger.parquet | 380836 | Exact(10000) | 1 | Absent |
+-----------------------------------+-----------------+--------------+-------------+------------------+
");

Ok(())
}
}
4 changes: 2 additions & 2 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ pub struct ListingTable {
/// The SQL definition for this table, if any
definition: Option<String>,
/// Cache for collected file statistics
collected_statistics: FileStatisticsCache,
collected_statistics: Arc<dyn FileStatisticsCache>,
/// Constraints applied to this table
constraints: Constraints,
/// Column default expressions for columns that are not physically present in the data files
Expand Down Expand Up @@ -255,7 +255,7 @@ impl ListingTable {
/// multiple times in the same session.
///
/// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query.
pub fn with_cache(mut self, cache: Option<FileStatisticsCache>) -> Self {
pub fn with_cache(mut self, cache: Option<Arc<dyn FileStatisticsCache>>) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically this is an API change, but I think it is a good one and makes the FileStatisticsCache consistent with the rest-- will mark the PR as API change

self.collected_statistics =
cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default()));
self
Expand Down
36 changes: 28 additions & 8 deletions datafusion/execution/src/cache/cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
// under the License.

use crate::cache::CacheAccessor;
use crate::cache::DefaultFilesMetadataCache;
use crate::cache::cache_unit::DefaultFilesMetadataCache;
use datafusion_common::stats::Precision;
use datafusion_common::{Result, Statistics};
use object_store::ObjectMeta;
use object_store::path::Path;
Expand All @@ -35,8 +36,27 @@ use super::list_files_cache::DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT;
/// session lifetime.
///
/// See [`crate::runtime_env::RuntimeEnv`] for more details
pub type FileStatisticsCache =
Arc<dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta>>;
pub trait FileStatisticsCache:
CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta>
{
/// Retrieves the information about the entries currently cached.
fn list_entries(&self) -> HashMap<Path, FileStatisticsCacheEntry>;
}

/// Represents information about a cached statistics entry.
/// This is used to expose the statistics cache contents to outside modules.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FileStatisticsCacheEntry {
pub object_meta: ObjectMeta,
/// Number of table rows.
pub num_rows: Precision<usize>,
/// Number of table columns.
pub num_columns: usize,
/// Total table size, in bytes.
pub table_size_bytes: Precision<usize>,
/// Size of the statistics entry, in bytes.
pub statistics_size_bytes: usize,
}

/// Cache for storing the [`ObjectMeta`]s that result from listing a path
///
Expand Down Expand Up @@ -116,7 +136,7 @@ pub struct FileMetadataCacheEntry {
pub extra: HashMap<String, String>,
}

impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> {
impl Debug for dyn FileStatisticsCache {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
}
Expand All @@ -143,7 +163,7 @@ impl Debug for dyn FileMetadataCache {
/// See [`CacheManagerConfig`] for configuration options.
#[derive(Debug)]
pub struct CacheManager {
file_statistic_cache: Option<FileStatisticsCache>,
file_statistic_cache: Option<Arc<dyn FileStatisticsCache>>,
list_files_cache: Option<Arc<dyn ListFilesCache>>,
file_metadata_cache: Arc<dyn FileMetadataCache>,
}
Expand Down Expand Up @@ -174,7 +194,7 @@ impl CacheManager {
}

/// Get the cache of listing files statistics.
pub fn get_file_statistic_cache(&self) -> Option<FileStatisticsCache> {
pub fn get_file_statistic_cache(&self) -> Option<Arc<dyn FileStatisticsCache>> {
self.file_statistic_cache.clone()
}

Expand Down Expand Up @@ -213,7 +233,7 @@ pub struct CacheManagerConfig {
/// Enable caching of file statistics when listing files.
/// Enabling the cache avoids repeatedly reading file statistics in a DataFusion session.
/// Default is disabled. Currently only Parquet files are supported.
pub table_files_statistics_cache: Option<FileStatisticsCache>,
pub table_files_statistics_cache: Option<Arc<dyn FileStatisticsCache>>,
/// Enable caching of file metadata when listing files.
/// Enabling the cache avoids repeat list and object metadata fetch operations, which may be
/// expensive in certain situations (e.g. remote object storage), for objects under paths that
Expand Down Expand Up @@ -255,7 +275,7 @@ impl CacheManagerConfig {
/// Default is `None` (disabled).
pub fn with_files_statistics_cache(
mut self,
cache: Option<FileStatisticsCache>,
cache: Option<Arc<dyn FileStatisticsCache>>,
) -> Self {
self.table_files_statistics_cache = cache;
self
Expand Down
Loading