diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 9caee30cc5dd..3b91d0c77d90 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -69,6 +69,7 @@ smallvec = { version = "1.6", features = ["union"] } rand = "0.8" # dependencies added in the cubestore fork start here. itertools = "0.9.0" +lru = "0.6.5" serde = { version = "1.0", features = ["rc"] } serde_derive = "1.0" tracing = "0.1.25" diff --git a/datafusion/src/physical_plan/expressions/in_list.rs b/datafusion/src/physical_plan/expressions/in_list.rs index 9fb00d8ea680..c0bb26571c1e 100644 --- a/datafusion/src/physical_plan/expressions/in_list.rs +++ b/datafusion/src/physical_plan/expressions/in_list.rs @@ -23,9 +23,9 @@ use std::sync::Arc; use arrow::array::GenericStringArray; use arrow::array::{ ArrayRef, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, - Int64Array, Int8Array, StringOffsetSizeTrait, UInt16Array, UInt32Array, UInt64Array, - UInt8Array, Int64Decimal0Array, Int64Decimal1Array, Int64Decimal2Array, Int64Decimal3Array, Int64Decimal4Array, - Int64Decimal5Array, Int64Decimal10Array + Int64Array, Int64Decimal0Array, Int64Decimal10Array, Int64Decimal1Array, + Int64Decimal2Array, Int64Decimal3Array, Int64Decimal4Array, Int64Decimal5Array, + Int8Array, StringOffsetSizeTrait, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }; use arrow::{datatypes::DataType, record_batch::RecordBatch}; @@ -44,7 +44,7 @@ pub struct InListExpr { } macro_rules! make_contains { -($ARRAY:expr, $LIST_VALUES:expr, $NEGATED:expr, Int64Decimal, $ARRAY_TYPE:ident, $SCALE:expr) => {{ + ($ARRAY:expr, $LIST_VALUES:expr, $NEGATED:expr, Int64Decimal, $ARRAY_TYPE:ident, $SCALE:expr) => {{ let array = $ARRAY.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap(); let mut contains_null = false; @@ -317,25 +317,74 @@ impl PhysicalExpr for InListExpr { make_contains!(array, list_values, self.negated, UInt8, UInt8Array) } DataType::Int64Decimal(0) => { - make_contains!(array, list_values, self.negated, Int64Decimal, Int64Decimal0Array, 0) + make_contains!( + array, + list_values, + self.negated, + Int64Decimal, + Int64Decimal0Array, + 0 + ) } DataType::Int64Decimal(1) => { - make_contains!(array, list_values, self.negated, Int64Decimal, Int64Decimal1Array, 1) + make_contains!( + array, + list_values, + self.negated, + Int64Decimal, + Int64Decimal1Array, + 1 + ) } DataType::Int64Decimal(2) => { - make_contains!(array, list_values, self.negated, Int64Decimal, Int64Decimal2Array, 2) + make_contains!( + array, + list_values, + self.negated, + Int64Decimal, + Int64Decimal2Array, + 2 + ) } DataType::Int64Decimal(3) => { - make_contains!(array, list_values, self.negated, Int64Decimal, Int64Decimal3Array, 3) + make_contains!( + array, + list_values, + self.negated, + Int64Decimal, + Int64Decimal3Array, + 3 + ) } DataType::Int64Decimal(4) => { - make_contains!(array, list_values, self.negated, Int64Decimal, Int64Decimal4Array, 4) + make_contains!( + array, + list_values, + self.negated, + Int64Decimal, + Int64Decimal4Array, + 4 + ) } DataType::Int64Decimal(5) => { - make_contains!(array, list_values, self.negated, Int64Decimal, Int64Decimal5Array, 5) + make_contains!( + array, + list_values, + self.negated, + Int64Decimal, + Int64Decimal5Array, + 5 + ) } DataType::Int64Decimal(10) => { - make_contains!(array, list_values, self.negated, Int64Decimal, Int64Decimal10Array, 10) + make_contains!( + array, + list_values, + self.negated, + Int64Decimal, + Int64Decimal10Array, + 10 + ) } DataType::Boolean => { make_contains!(array, list_values, self.negated, Boolean, BooleanArray) diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index a9ff34f6dced..639d30146ba9 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -19,7 +19,7 @@ use std::fmt; use std::fs::File; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use std::{any::Any, convert::TryInto}; @@ -44,6 +44,7 @@ use arrow::{ use hashbrown::HashMap; use log::debug; use parquet::file::{ + footer, metadata::RowGroupMetaData, reader::{FileReader, SerializedFileReader}, statistics::Statistics as ParquetStatistics, @@ -58,6 +59,7 @@ use tokio_stream::wrappers::ReceiverStream; use crate::datasource::datasource::{ColumnStatistics, Statistics}; use async_trait::async_trait; use futures::stream::{Stream, StreamExt}; +use parquet::file::metadata::ParquetMetaData; use super::SQLMetric; @@ -80,6 +82,8 @@ pub struct ParquetExec { predicate_builder: Option, /// Optional limit of the number of rows limit: Option, + /// Creates readers for parquet files. + metadata_cache: Arc, } /// Represents one partition of a Parquet data set and this currently means one Parquet file. @@ -117,6 +121,135 @@ struct ParquetPartitionMetrics { pub row_groups_pruned: Arc, } +/// Cache for Parquet Metadata +pub trait ParquetMetadataCache: Debug + Sync + Send { + /// Returns the metadata for the given file, possibly cached on key + fn metadata(&self, key: &str, file: &File) -> Result>; + + /// Creates a FileReader for the given filename + fn file_reader(&self, filename: &str) -> Result> { + let file = File::open(filename)?; + let metadata = self.metadata(filename, &file)?; + Ok(SerializedFileReader::new_with_metadata( + file, + (*metadata).clone(), + )) + } + + /// Returns a copy of the cache stats. + fn stats(&self) -> ParquetMetadataCacheStats; +} + +/// Stats for ParquetMetadataCache. +#[derive(Clone, Debug)] +pub struct ParquetMetadataCacheStats { + hits: u64, + misses: u64, +} + +impl ParquetMetadataCacheStats { + /// Returns a new ParquetMetadataCacheStats + pub fn new() -> Self { + ParquetMetadataCacheStats { hits: 0, misses: 0 } + } + + /// Returns the number of cache reads. + pub fn reads(&self) -> u64 { + self.hits + self.misses + } + + /// Returns the number of cache hits. + pub fn hits(&self) -> u64 { + self.hits + } + + /// Returns the numbere of cache misses. + pub fn misses(&self) -> u64 { + self.misses + } + + /// Increments the number of cache hits. + pub fn hit(&mut self) { + self.hits += 1; + } + + /// Increments the number of cache misses. + pub fn miss(&mut self) { + self.misses += 1; + } +} + +/// Default MetadataCache, does not cache anything +#[derive(Debug)] +pub struct NoopParquetMetadataCache; + +impl NoopParquetMetadataCache { + /// Creates a new DefaultMetadataCache + pub fn new() -> Arc { + Arc::new(NoopParquetMetadataCache {}) + } +} + +impl ParquetMetadataCache for NoopParquetMetadataCache { + fn metadata(&self, _key: &str, file: &File) -> Result> { + Ok(Arc::new(footer::parse_metadata(file)?)) + } + + fn stats(&self) -> ParquetMetadataCacheStats { + ParquetMetadataCacheStats::new() + } +} + +/// LruMetadataCache, caches parquet metadata. +#[derive(Debug)] +pub struct LruParquetMetadataCache { + data: Mutex, +} + +#[derive(Debug)] +struct LruParquetMetadataCacheData { + cache: lru::LruCache>, + stats: ParquetMetadataCacheStats, +} + +impl LruParquetMetadataCache { + /// Creates a new LruMetadataCache + pub fn new(metadata_cache_capacity: usize) -> Arc { + Arc::new(LruParquetMetadataCache { + data: Mutex::new(LruParquetMetadataCacheData { + cache: lru::LruCache::new(metadata_cache_capacity), + stats: ParquetMetadataCacheStats::new(), + }), + }) + } +} + +impl ParquetMetadataCache for LruParquetMetadataCache { + fn metadata(&self, key: &str, file: &File) -> Result> { + { + let mut data = self.data.lock().unwrap(); + let metadata = data.cache.get(&key.to_string()); + if let Some(metadata) = metadata { + let result = Ok(metadata.clone()); + data.stats.hit(); + return result; + } else { + data.stats.miss(); + } + } + let metadata = Arc::new(footer::parse_metadata(file)?); + { + let mut data = self.data.lock().unwrap(); + data.cache.put(key.to_string(), metadata.clone()); + } + Ok(metadata) + } + + fn stats(&self) -> ParquetMetadataCacheStats { + self.data.lock().unwrap().stats.clone() + } +} + impl ParquetExec { /// Create a new Parquet reader execution plan based on the specified Parquet filename or /// directory containing Parquet files @@ -127,6 +260,27 @@ impl ParquetExec { batch_size: usize, max_concurrency: usize, limit: Option, + ) -> Result { + ParquetExec::try_from_path_with_cache( + path, + projection, + predicate, + batch_size, + max_concurrency, + limit, + NoopParquetMetadataCache::new(), + ) + } + + /// Same as {try_from_path}, but with a ParquetMetadataCache + pub fn try_from_path_with_cache( + path: &str, + projection: Option>, + predicate: Option, + batch_size: usize, + max_concurrency: usize, + limit: Option, + metadata_cache: Arc, ) -> Result { // build a list of filenames from the specified path, which could be a single file or // a directory containing one or more parquet files @@ -141,13 +295,14 @@ impl ParquetExec { .iter() .map(|filename| filename.as_str()) .collect::>(); - Self::try_from_files( + Self::try_from_files_with_cache( &filenames, projection, predicate, batch_size, max_concurrency, limit, + metadata_cache, ) } } @@ -161,6 +316,27 @@ impl ParquetExec { batch_size: usize, max_concurrency: usize, limit: Option, + ) -> Result { + ParquetExec::try_from_files_with_cache( + filenames, + projection, + predicate, + batch_size, + max_concurrency, + limit, + NoopParquetMetadataCache::new(), + ) + } + + /// Same as {try_from_files}, but with a ParquetMetadataCache + pub fn try_from_files_with_cache( + filenames: &[&str], + projection: Option>, + predicate: Option, + batch_size: usize, + max_concurrency: usize, + limit: Option, + metadata_cache: Arc, ) -> Result { debug!("Creating ParquetExec, filenames: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", filenames, projection, predicate, limit); @@ -180,9 +356,8 @@ impl ParquetExec { let mut total_files = 0; for filename in &filenames { total_files += 1; - let file = File::open(filename)?; - let file_reader = Arc::new(SerializedFileReader::new(file)?); - let mut arrow_reader = ParquetFileArrowReader::new(file_reader); + let file_reader = metadata_cache.file_reader(filename.as_str())?; + let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); let meta_data = arrow_reader.get_metadata(); // collect all the unique schemas in this data set let schema = arrow_reader.get_schema()?; @@ -262,7 +437,7 @@ impl ParquetExec { } }); - Ok(Self::new( + Ok(Self::new_with_cache( partitions, schema, projection, @@ -270,6 +445,7 @@ impl ParquetExec { predicate_builder, batch_size, limit, + metadata_cache, )) } @@ -282,6 +458,29 @@ impl ParquetExec { predicate_builder: Option, batch_size: usize, limit: Option, + ) -> Self { + ParquetExec::new_with_cache( + partitions, + schema, + projection, + metrics, + predicate_builder, + batch_size, + limit, + NoopParquetMetadataCache::new(), + ) + } + + /// Same as {new}, but with a ParquetMetadataCache + pub fn new_with_cache( + partitions: Vec, + schema: SchemaRef, + projection: Option>, + metrics: ParquetExecMetrics, + predicate_builder: Option, + batch_size: usize, + limit: Option, + metadata_cache: Arc, ) -> Self { let projection = match projection { Some(p) => p, @@ -347,6 +546,7 @@ impl ParquetExec { batch_size, statistics, limit, + metadata_cache, } } @@ -462,6 +662,7 @@ impl ExecutionPlan for ParquetExec { let batch_size = self.batch_size; let limit = self.limit; let tx_unwind = response_tx.clone(); + let metadata_cache = self.metadata_cache.clone(); cube_ext::spawn_blocking_mpsc_with_catch_unwind( move || { @@ -473,6 +674,7 @@ impl ExecutionPlan for ParquetExec { batch_size, response_tx, limit, + metadata_cache, ) { println!("Parquet reader thread terminated due to error: {:?}", e); } @@ -669,11 +871,11 @@ fn read_files( batch_size: usize, response_tx: Sender>, limit: Option, + metadata_cache: Arc, ) -> Result<()> { let mut total_rows = 0; 'outer: for filename in filenames { - let file = File::open(&filename)?; - let mut file_reader = SerializedFileReader::new(file)?; + let mut file_reader = metadata_cache.file_reader(filename)?; if let Some(predicate_builder) = predicate_builder { let row_group_predicate = build_row_group_predicate( predicate_builder,