From 5cb4d63631325d1d876c408c9b11da8b23bbd0ec Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Thu, 29 Jul 2021 19:26:55 +0800 Subject: [PATCH 1/7] FilePartition and partitionedFile for scanning flexibility --- .../src/serde/physical_plan/from_proto.rs | 8 +- ballista/rust/scheduler/src/lib.rs | 26 +- datafusion/src/datasource/mod.rs | 252 +++++++++ datafusion/src/datasource/parquet.rs | 314 ++++++++++-- .../src/physical_optimizer/repartition.rs | 11 +- datafusion/src/physical_plan/csv.rs | 2 +- datafusion/src/physical_plan/parquet.rs | 482 +++--------------- 7 files changed, 634 insertions(+), 461 deletions(-) diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 46815db056a1..2cc7cda6ff48 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -130,10 +130,9 @@ impl TryInto> for &protobuf::PhysicalPlanNode { } PhysicalPlanType::ParquetScan(scan) => { let projection = scan.projection.iter().map(|i| *i as usize).collect(); - let filenames: Vec<&str> = - scan.filename.iter().map(|s| s.as_str()).collect(); - Ok(Arc::new(ParquetExec::try_from_files( - &filenames, + let path: &str = scan.filename[0].as_str(); + Ok(Arc::new(ParquetExec::try_from_path( + path, Some(projection), None, scan.batch_size as usize, @@ -621,6 +620,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc { let catalog_list = Arc::new(MemoryCatalogList::new()) as Arc; + let ctx_state = ExecutionContextState { catalog_list, scalar_functions: Default::default(), diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index b476b77f36a1..8d3bd341b8d7 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -82,7 +82,7 @@ use self::state::{ConfigBackendClient, SchedulerState}; use ballista_core::config::BallistaConfig; use ballista_core::execution_plans::ShuffleWriterExec; use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; -use datafusion::physical_plan::parquet::ParquetExec; +use datafusion::datasource::parquet::ParquetRootDesc; use datafusion::prelude::{ExecutionConfig, ExecutionContext}; use std::time::{Instant, SystemTime, UNIX_EPOCH}; @@ -282,24 +282,18 @@ impl SchedulerGrpc for SchedulerServer { match file_type { FileType::Parquet => { - let parquet_exec = - ParquetExec::try_from_path(&path, None, None, 1024, 1, None) - .map_err(|e| { - let msg = format!("Error opening parquet files: {}", e); - error!("{}", msg); - tonic::Status::internal(msg) - })?; + let parquet_desc = ParquetRootDesc::new(&path).map_err(|e| { + let msg = format!("Error opening parquet files: {}", e); + error!("{}", msg); + tonic::Status::internal(msg) + })?; //TODO include statistics and any other info needed to reconstruct ParquetExec Ok(Response::new(GetFileMetadataResult { - schema: Some(parquet_exec.schema().as_ref().into()), - partitions: parquet_exec - .partitions() - .iter() - .map(|part| FilePartitionMetadata { - filename: part.filenames().to_vec(), - }) - .collect(), + schema: Some(parquet_desc.schema().as_ref().into()), + partitions: vec![FilePartitionMetadata { + filename: vec![path], + }], })) } //TODO implement for CSV diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs index 9699a997caa1..b4c44d9f1137 100644 --- a/datafusion/src/datasource/mod.rs +++ b/datafusion/src/datasource/mod.rs @@ -27,6 +27,13 @@ pub mod parquet; pub use self::csv::{CsvFile, CsvReadOptions}; pub use self::datasource::{TableProvider, TableType}; pub use self::memory::MemTable; +use crate::arrow::datatypes::{Schema, SchemaRef}; +use crate::datasource::datasource::{ColumnStatistics, Statistics}; +use crate::error::{DataFusionError, Result}; +use crate::physical_plan::common::build_file_list; +use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; +use crate::physical_plan::Accumulator; +use std::sync::Arc; /// Source for table input data pub(crate) enum Source> { @@ -36,3 +43,248 @@ pub(crate) enum Source> { /// Read data from a reader Reader(std::sync::Mutex>), } + +#[derive(Debug, Clone)] +/// A single file that should be read, along with its schema, statistics +/// and partition column values that need to be appended to each row. +pub struct PartitionedFile { + /// Path for the file (e.g. URL, filesystem path, etc) + pub file_path: String, + /// Schema of the file + pub schema: Schema, + /// Statistics of the file + pub statistics: Statistics, + // Values of partition columns to be appended to each row + // pub partition_value: Option>, + // Schema of partition columns + // pub partition_schema: Option, + // We may include row group range here for a more fine-grained parallel execution +} + +impl From for PartitionedFile { + fn from(file_path: String) -> Self { + Self { + file_path, + schema: Schema::empty(), + statistics: Default::default(), + } + } +} + +impl std::fmt::Display for PartitionedFile { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.file_path) + } +} + +#[derive(Debug, Clone)] +/// A collection of files that should be read in a single task +pub struct FilePartition { + /// The index of the partition among all partitions + pub index: usize, + /// The contained files of the partition + pub files: Vec, +} + +impl std::fmt::Display for FilePartition { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let files: Vec = self.files.iter().map(|f| f.to_string()).collect(); + write!(f, "{}", files.join(", ")) + } +} + +#[derive(Debug, Clone)] +/// All source files with same schema exists in a path +pub struct SourceRootDescriptor { + /// All source files in the path + pub partition_files: Vec, + /// The schema of the files + pub schema: SchemaRef, +} + +/// Builder for ['SourceRootDescriptor'] inside given path +pub trait SourceRootDescBuilder { + /// Construct a ['SourceRootDescriptor'] from the provided path + fn get_source_desc( + path: &str, + ext: &str, + provided_schema: Option, + collect_statistics: bool, + ) -> Result { + let filenames = build_file_list(path, ext)?; + if filenames.is_empty() { + return Err(DataFusionError::Plan(format!( + "No file (with .{} extension) found at path {}", + ext, path + ))); + } + + // build a list of Parquet partitions with statistics and gather all unique schemas + // used in this data set + let mut schemas: Vec = vec![]; + let mut contains_file = false; + + let partitioned_files = filenames + .iter() + .map(|file_path| { + contains_file = true; + let result = if collect_statistics { + let pf = Self::get_file_meta(file_path)?; + let schema = pf.schema.clone(); + if schemas.is_empty() { + schemas.push(schema); + } else if schema != schemas[0] { + // we currently get the schema information from the first file rather than do + // schema merging and this is a limitation. + // See https://issues.apache.org/jira/browse/ARROW-11017 + return Err(DataFusionError::Plan(format!( + "The file {} have different schema from the first file and DataFusion does \ + not yet support schema merging", + file_path + ))); + } + pf + } else { + PartitionedFile { + file_path: file_path.to_owned(), + schema: provided_schema.clone().unwrap(), + statistics: Statistics::default(), + } + }; + + Ok(result) + }).collect::>>(); + + if !contains_file { + return Err(DataFusionError::Plan(format!( + "No file (with .{} extension) found at path {}", + ext, path + ))); + } + + let result_schema = provided_schema.unwrap_or_else(|| schemas.pop().unwrap()); + + Ok(SourceRootDescriptor { + partition_files: partitioned_files?, + schema: Arc::new(result_schema), + }) + } + + /// Get all metadata for a source file, including schema, statistics, partitions, etc. + fn get_file_meta(file_path: &str) -> Result; +} + +/// Get all files as well as the summary statistics when a limit is provided +pub fn get_statistics_with_limit( + source_desc: &SourceRootDescriptor, + limit: Option, +) -> (Vec, Statistics) { + let mut all_files = source_desc.partition_files.clone(); + let schema = source_desc.schema.clone(); + + let mut total_byte_size = 0; + let mut null_counts = vec![0; schema.fields().len()]; + let mut has_statistics = false; + let (mut max_values, mut min_values) = create_max_min_accs(&schema); + + let mut num_rows = 0; + let mut num_files = 0; + for file in &all_files { + num_files += 1; + let file_stats = &file.statistics; + num_rows += file_stats.num_rows.unwrap_or(0); + total_byte_size += file_stats.total_byte_size.unwrap_or(0); + if let Some(vec) = &file_stats.column_statistics { + has_statistics = true; + for (i, cs) in vec.iter().enumerate() { + null_counts[i] += cs.null_count.unwrap_or(0); + + if let Some(max_value) = &mut max_values[i] { + if let Some(file_max) = cs.max_value.clone() { + match max_value.update(&[file_max]) { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + } + + if let Some(min_value) = &mut min_values[i] { + if let Some(file_min) = cs.min_value.clone() { + match min_value.update(&[file_min]) { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + if num_rows > limit.unwrap_or(usize::MAX) { + break; + } + } + all_files.truncate(num_files); + + let column_stats = if has_statistics { + Some(get_col_stats( + &*schema, + null_counts, + &mut max_values, + &mut min_values, + )) + } else { + None + }; + + let statistics = Statistics { + num_rows: Some(num_rows as usize), + total_byte_size: Some(total_byte_size as usize), + column_statistics: column_stats, + }; + (all_files, statistics) +} + +fn create_max_min_accs( + schema: &Schema, +) -> (Vec>, Vec>) { + let max_values: Vec> = schema + .fields() + .iter() + .map(|field| MaxAccumulator::try_new(field.data_type()).ok()) + .collect::>(); + let min_values: Vec> = schema + .fields() + .iter() + .map(|field| MinAccumulator::try_new(field.data_type()).ok()) + .collect::>(); + (max_values, min_values) +} + +fn get_col_stats( + schema: &Schema, + null_counts: Vec, + max_values: &mut Vec>, + min_values: &mut Vec>, +) -> Vec { + (0..schema.fields().len()) + .map(|i| { + let max_value = match &max_values[i] { + Some(max_value) => max_value.evaluate().ok(), + None => None, + }; + let min_value = match &min_values[i] { + Some(min_value) => min_value.evaluate().ok(), + None => None, + }; + ColumnStatistics { + null_count: Some(null_counts[i] as usize), + max_value, + min_value, + distinct_count: None, + } + }) + .collect() +} diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs index d312c7724563..e22f0c46d16c 100644 --- a/datafusion/src/datasource/parquet.rs +++ b/datafusion/src/datasource/parquet.rs @@ -18,25 +18,32 @@ //! Parquet data source use std::any::Any; -use std::string::String; +use std::fs::File; use std::sync::Arc; -use arrow::datatypes::*; +use parquet::arrow::ArrowReader; +use parquet::arrow::ParquetFileArrowReader; +use parquet::file::serialized_reader::SerializedFileReader; +use parquet::file::statistics::Statistics as ParquetStatistics; +use super::datasource::TableProviderFilterPushDown; +use crate::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use crate::datasource::datasource::Statistics; -use crate::datasource::TableProvider; +use crate::datasource::{ + create_max_min_accs, get_col_stats, get_statistics_with_limit, PartitionedFile, + SourceRootDescBuilder, SourceRootDescriptor, TableProvider, +}; use crate::error::Result; use crate::logical_plan::{combine_filters, Expr}; +use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; use crate::physical_plan::parquet::ParquetExec; -use crate::physical_plan::ExecutionPlan; - -use super::datasource::TableProviderFilterPushDown; +use crate::physical_plan::{Accumulator, ExecutionPlan}; +use crate::scalar::ScalarValue; /// Table-based representation of a `ParquetFile`. pub struct ParquetTable { path: String, - schema: SchemaRef, - statistics: Statistics, + desc: Arc, max_concurrency: usize, enable_pruning: bool, } @@ -45,12 +52,10 @@ impl ParquetTable { /// Attempt to initialize a new `ParquetTable` from a file path. pub fn try_new(path: impl Into, max_concurrency: usize) -> Result { let path = path.into(); - let parquet_exec = ParquetExec::try_from_path(&path, None, None, 0, 1, None)?; - let schema = parquet_exec.schema(); + let root_desc = ParquetRootDesc::new(path.as_str()); Ok(Self { path, - schema, - statistics: parquet_exec.statistics().to_owned(), + desc: Arc::new(root_desc?), max_concurrency, enable_pruning: true, }) @@ -65,24 +70,17 @@ impl ParquetTable { collect_statistics: bool, ) -> Result { let path = path.into(); - if collect_statistics { - let parquet_exec = ParquetExec::try_from_path(&path, None, None, 0, 1, None)?; - Ok(Self { - path, - schema: Arc::new(schema), - statistics: parquet_exec.statistics().to_owned(), - max_concurrency, - enable_pruning: true, - }) - } else { - Ok(Self { - path, - schema: Arc::new(schema), - statistics: Statistics::default(), - max_concurrency, - enable_pruning: true, - }) - } + let root_desc = ParquetRootDesc::new_with_schema( + path.as_str(), + Some(schema), + collect_statistics, + ); + Ok(Self { + path, + desc: Arc::new(root_desc?), + max_concurrency, + enable_pruning: true, + }) } /// Get the path for the Parquet file(s) represented by this ParquetTable instance @@ -109,7 +107,7 @@ impl TableProvider for ParquetTable { /// Get the schema for this parquet file. fn schema(&self) -> SchemaRef { - self.schema.clone() + self.desc.schema() } fn supports_filter_pushdown( @@ -136,8 +134,8 @@ impl TableProvider for ParquetTable { } else { None }; - Ok(Arc::new(ParquetExec::try_from_path( - &self.path, + Ok(Arc::new(ParquetExec::try_new( + self.desc.clone(), projection.clone(), predicate, limit @@ -149,7 +147,7 @@ impl TableProvider for ParquetTable { } fn statistics(&self) -> Statistics { - self.statistics.clone() + self.desc.statistics() } fn has_exact_statistics(&self) -> bool { @@ -157,6 +155,254 @@ impl TableProvider for ParquetTable { } } +#[derive(Debug)] +/// Descriptor for a parquet root path +pub struct ParquetRootDesc { + /// metadata for files inside the root path + pub descriptor: SourceRootDescriptor, +} + +impl ParquetRootDesc { + /// Construct a new parquet descriptor for a root path + pub fn new(root_path: &str) -> Result { + let root_desc = Self::get_source_desc(root_path, "parquet", None, true); + Ok(Self { + descriptor: root_desc?, + }) + } + + /// Construct a new parquet descriptor for a root path with known schema + pub fn new_with_schema( + root_path: &str, + schema: Option, + collect_statistics: bool, + ) -> Result { + let root_desc = + Self::get_source_desc(root_path, "parquet", schema, collect_statistics); + Ok(Self { + descriptor: root_desc?, + }) + } + + /// Get file schema for all parquet files + pub fn schema(&self) -> SchemaRef { + self.descriptor.schema.clone() + } + + /// Get the summary statistics for all parquet files + pub fn statistics(&self) -> Statistics { + get_statistics_with_limit(&self.descriptor, None).1 + } + + fn summarize_min_max( + max_values: &mut Vec>, + min_values: &mut Vec>, + fields: &[Field], + i: usize, + stat: &ParquetStatistics, + ) { + match stat { + ParquetStatistics::Boolean(s) => { + if let DataType::Boolean = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value + .update(&[ScalarValue::Boolean(Some(*s.max()))]) + { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value + .update(&[ScalarValue::Boolean(Some(*s.min()))]) + { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + ParquetStatistics::Int32(s) => { + if let DataType::Int32 = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value.update(&[ScalarValue::Int32(Some(*s.max()))]) + { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value.update(&[ScalarValue::Int32(Some(*s.min()))]) + { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + ParquetStatistics::Int64(s) => { + if let DataType::Int64 = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value.update(&[ScalarValue::Int64(Some(*s.max()))]) + { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value.update(&[ScalarValue::Int64(Some(*s.min()))]) + { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + ParquetStatistics::Float(s) => { + if let DataType::Float32 = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value + .update(&[ScalarValue::Float32(Some(*s.max()))]) + { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value + .update(&[ScalarValue::Float32(Some(*s.min()))]) + { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + ParquetStatistics::Double(s) => { + if let DataType::Float64 = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value + .update(&[ScalarValue::Float64(Some(*s.max()))]) + { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value + .update(&[ScalarValue::Float64(Some(*s.min()))]) + { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + _ => {} + } + } +} + +impl SourceRootDescBuilder for ParquetRootDesc { + fn get_file_meta(file_path: &str) -> Result { + let file = File::open(file_path)?; + let file_reader = Arc::new(SerializedFileReader::new(file)?); + let mut arrow_reader = ParquetFileArrowReader::new(file_reader); + let file_path = file_path.to_string(); + let schema = arrow_reader.get_schema()?; + let num_fields = schema.fields().len(); + let fields = schema.fields().to_vec(); + let meta_data = arrow_reader.get_metadata(); + + let mut num_rows = 0; + let mut total_byte_size = 0; + let mut null_counts = vec![0; num_fields]; + let mut has_statistics = false; + + let (mut max_values, mut min_values) = create_max_min_accs(&schema); + + for row_group_meta in meta_data.row_groups() { + num_rows += row_group_meta.num_rows(); + total_byte_size += row_group_meta.total_byte_size(); + + let columns_null_counts = row_group_meta + .columns() + .iter() + .flat_map(|c| c.statistics().map(|stats| stats.null_count())); + + for (i, cnt) in columns_null_counts.enumerate() { + null_counts[i] += cnt as usize + } + + for (i, column) in row_group_meta.columns().iter().enumerate() { + if let Some(stat) = column.statistics() { + has_statistics = true; + ParquetRootDesc::summarize_min_max( + &mut max_values, + &mut min_values, + &fields, + i, + stat, + ) + } + } + } + + let column_stats = if has_statistics { + Some(get_col_stats( + &schema, + null_counts, + &mut max_values, + &mut min_values, + )) + } else { + None + }; + + let statistics = Statistics { + num_rows: Some(num_rows as usize), + total_byte_size: Some(total_byte_size as usize), + column_statistics: column_stats, + }; + + Ok(PartitionedFile { + file_path, + schema, + statistics, + }) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index 4504c81daa06..6d6c8cfd0dc0 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -110,6 +110,7 @@ mod tests { use super::*; use crate::datasource::datasource::Statistics; + use crate::datasource::PartitionedFile; use crate::physical_plan::parquet::{ ParquetExec, ParquetExecMetrics, ParquetPartition, }; @@ -122,11 +123,12 @@ mod tests { vec![], Arc::new(ParquetExec::new( vec![ParquetPartition::new( - vec!["x".to_string()], - Statistics::default(), + vec![PartitionedFile::from("x".to_string())], + 0, )], schema, None, + Statistics::default(), ParquetExecMetrics::new(), None, 2048, @@ -160,11 +162,12 @@ mod tests { vec![], Arc::new(ParquetExec::new( vec![ParquetPartition::new( - vec!["x".to_string()], - Statistics::default(), + vec![PartitionedFile::from("x".to_string())], + 0, )], schema, None, + Statistics::default(), ParquetExecMetrics::new(), None, 2048, diff --git a/datafusion/src/physical_plan/csv.rs b/datafusion/src/physical_plan/csv.rs index 544f98cba0c6..853ba7400484 100644 --- a/datafusion/src/physical_plan/csv.rs +++ b/datafusion/src/physical_plan/csv.rs @@ -141,7 +141,7 @@ impl CsvExec { ) -> Result { let file_extension = String::from(options.file_extension); - let filenames = common::build_file_list(path, file_extension.as_str())?; + let filenames = common::build_file_list(path, options.file_extension)?; if filenames.is_empty() { return Err(DataFusionError::Execution(format!( "No files found at {path} with file extension {file_extension}", diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index ff8bb5b32678..c871d128e8c0 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -18,7 +18,6 @@ //! Execution plan for reading Parquet files use std::fmt; -use std::fs::File; use std::sync::Arc; use std::{any::Any, convert::TryInto}; @@ -27,14 +26,15 @@ use crate::{ logical_plan::{Column, Expr}, physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, physical_plan::{ - common, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, + DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, + SendableRecordBatchStream, }, scalar::ScalarValue, }; use arrow::{ array::ArrayRef, - datatypes::{DataType, Schema, SchemaRef}, + datatypes::{Schema, SchemaRef}, error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; @@ -54,13 +54,14 @@ use tokio::{ task, }; -use crate::datasource::datasource::{ColumnStatistics, Statistics}; +use crate::datasource::datasource::Statistics; use async_trait::async_trait; use super::stream::RecordBatchReceiverStream; use super::SQLMetric; -use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; -use crate::physical_plan::Accumulator; +use crate::datasource::parquet::ParquetRootDesc; +use crate::datasource::{get_statistics_with_limit, FilePartition, PartitionedFile}; +use std::fs::File; /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] @@ -95,9 +96,7 @@ pub struct ParquetExec { #[derive(Debug, Clone)] pub struct ParquetPartition { /// The Parquet filename for this partition - pub filenames: Vec, - /// Statistics for this partition - pub statistics: Statistics, + pub file_partition: FilePartition, /// Execution metrics metrics: ParquetPartitionMetrics, } @@ -131,285 +130,38 @@ impl ParquetExec { ) -> Result { // build a list of filenames from the specified path, which could be a single file or // a directory containing one or more parquet files - let filenames = common::build_file_list(path, ".parquet")?; - if filenames.is_empty() { - Err(DataFusionError::Plan(format!( - "No Parquet files (with .parquet extension) found at path {}", - path - ))) - } else { - let filenames = filenames - .iter() - .map(|filename| filename.as_str()) - .collect::>(); - Self::try_from_files( - &filenames, - projection, - predicate, - batch_size, - max_concurrency, - limit, - ) - } + let root_desc = ParquetRootDesc::new(path)?; + Self::try_new( + Arc::new(root_desc), + projection, + predicate, + batch_size, + max_concurrency, + limit, + ) } - /// Create a new Parquet reader execution plan based on the specified list of Parquet - /// files - pub fn try_from_files( - filenames: &[&str], + /// Create a new Parquet reader execution plan with root descriptor, provided partitions and schema + pub fn try_new( + desc: Arc, projection: Option>, predicate: Option, batch_size: usize, max_concurrency: usize, limit: Option, ) -> Result { - debug!("Creating ParquetExec, filenames: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", - filenames, projection, predicate, limit); - // build a list of Parquet partitions with statistics and gather all unique schemas - // used in this data set - let mut schemas: Vec = vec![]; - let mut partitions = Vec::with_capacity(max_concurrency); - let filenames: Vec = filenames.iter().map(|s| s.to_string()).collect(); - let chunks = split_files(&filenames, max_concurrency); - let mut num_rows = 0; - let mut num_fields = 0; - let mut fields = Vec::new(); - let mut total_byte_size = 0; - let mut null_counts = Vec::new(); - let mut max_values: Vec> = Vec::new(); - let mut min_values: Vec> = Vec::new(); - let mut limit_exhausted = false; - for chunk in chunks { - let mut filenames: Vec = - chunk.iter().map(|x| x.to_string()).collect(); - let mut total_files = 0; - for filename in &filenames { - total_files += 1; - let file = File::open(filename)?; - let file_reader = Arc::new(SerializedFileReader::new(file)?); - let mut arrow_reader = ParquetFileArrowReader::new(file_reader); - let meta_data = arrow_reader.get_metadata(); - // collect all the unique schemas in this data set - let schema = arrow_reader.get_schema()?; - if schemas.is_empty() || schema != schemas[0] { - fields = schema.fields().to_vec(); - num_fields = schema.fields().len(); - null_counts = vec![0; num_fields]; - max_values = schema - .fields() - .iter() - .map(|field| MaxAccumulator::try_new(field.data_type()).ok()) - .collect::>(); - min_values = schema - .fields() - .iter() - .map(|field| MinAccumulator::try_new(field.data_type()).ok()) - .collect::>(); - schemas.push(schema); - } - - for row_group_meta in meta_data.row_groups() { - num_rows += row_group_meta.num_rows(); - total_byte_size += row_group_meta.total_byte_size(); - - // Currently assumes every Parquet file has same schema - // https://issues.apache.org/jira/browse/ARROW-11017 - let columns_null_counts = row_group_meta - .columns() - .iter() - .flat_map(|c| c.statistics().map(|stats| stats.null_count())); + debug!("Creating ParquetExec, desc: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", + desc, projection, predicate, limit); - for (i, cnt) in columns_null_counts.enumerate() { - null_counts[i] += cnt - } - - for (i, column) in row_group_meta.columns().iter().enumerate() { - if let Some(stat) = column.statistics() { - match stat { - ParquetStatistics::Boolean(s) => { - if let DataType::Boolean = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Boolean(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Boolean(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - ParquetStatistics::Int32(s) => { - if let DataType::Int32 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Int32(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Int32(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - ParquetStatistics::Int64(s) => { - if let DataType::Int64 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Int64(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Int64(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - ParquetStatistics::Float(s) => { - if let DataType::Float32 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Float32(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Float32(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - ParquetStatistics::Double(s) => { - if let DataType::Float64 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Float64(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Float64(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - _ => {} - } - } - } + let (all_files, statistics) = get_statistics_with_limit(&desc.descriptor, limit); + let schema = desc.schema(); - if limit.map(|x| num_rows >= x as i64).unwrap_or(false) { - limit_exhausted = true; - break; - } - } - } - let column_stats = (0..num_fields) - .map(|i| { - let max_value = match &max_values[i] { - Some(max_value) => max_value.evaluate().ok(), - None => None, - }; - let min_value = match &min_values[i] { - Some(min_value) => min_value.evaluate().ok(), - None => None, - }; - ColumnStatistics { - null_count: Some(null_counts[i] as usize), - max_value, - min_value, - distinct_count: None, - } - }) - .collect(); - - let statistics = Statistics { - num_rows: Some(num_rows as usize), - total_byte_size: Some(total_byte_size as usize), - column_statistics: Some(column_stats), - }; - // remove files that are not needed in case of limit - filenames.truncate(total_files); - partitions.push(ParquetPartition::new(filenames, statistics)); - if limit_exhausted { - break; - } + let mut partitions = Vec::with_capacity(max_concurrency); + let chunked_files = split_files(&all_files, max_concurrency); + for (index, group) in chunked_files.iter().enumerate() { + partitions.push(ParquetPartition::new(Vec::from(*group), index)); } - // we currently get the schema information from the first file rather than do - // schema merging and this is a limitation. - // See https://issues.apache.org/jira/browse/ARROW-11017 - if schemas.len() > 1 { - return Err(DataFusionError::Plan(format!( - "The Parquet files have {} different schemas and DataFusion does \ - not yet support schema merging", - schemas.len() - ))); - } - let schema = Arc::new(schemas.pop().unwrap()); let metrics = ParquetExecMetrics::new(); let predicate_builder = predicate.and_then(|predicate_expr| { @@ -430,6 +182,7 @@ impl ParquetExec { partitions, schema, projection, + statistics, metrics, predicate_builder, batch_size, @@ -438,10 +191,12 @@ impl ParquetExec { } /// Create a new Parquet reader execution plan with provided partitions and schema + #[allow(clippy::too_many_arguments)] pub fn new( partitions: Vec, schema: SchemaRef, projection: Option>, + statistics: Statistics, metrics: ParquetExecMetrics, predicate_builder: Option, batch_size: usize, @@ -459,94 +214,20 @@ impl ParquetExec { .collect(), ); - // sum the statistics - let mut num_rows: Option = None; - let mut total_byte_size: Option = None; - let mut null_counts: Vec = vec![0; schema.fields().len()]; - let mut has_statistics = false; - let mut max_values = schema - .fields() - .iter() - .map(|field| MaxAccumulator::try_new(field.data_type()).ok()) - .collect::>(); - let mut min_values = schema - .fields() - .iter() - .map(|field| MinAccumulator::try_new(field.data_type()).ok()) - .collect::>(); - for part in &partitions { - if let Some(n) = part.statistics.num_rows { - num_rows = Some(num_rows.unwrap_or(0) + n) + let new_column_statistics = statistics.column_statistics.map(|stats| { + let mut projected_stats = Vec::with_capacity(projection.len()); + for proj in &projection { + projected_stats.push(stats[*proj].clone()); } - if let Some(n) = part.statistics.total_byte_size { - total_byte_size = Some(total_byte_size.unwrap_or(0) + n) - } - if let Some(x) = &part.statistics.column_statistics { - let part_nulls: Vec> = - x.iter().map(|c| c.null_count).collect(); - has_statistics = true; - - let part_max_values: Vec> = - x.iter().map(|c| c.max_value.clone()).collect(); - let part_min_values: Vec> = - x.iter().map(|c| c.min_value.clone()).collect(); - - for &i in projection.iter() { - null_counts[i] = part_nulls[i].unwrap_or(0); - if let Some(part_max_value) = part_max_values[i].clone() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[part_max_value]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - } - if let Some(part_min_value) = part_min_values[i].clone() { - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[part_min_value]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - } - - let column_stats = if has_statistics { - Some( - (0..schema.fields().len()) - .map(|i| { - let max_value = match &max_values[i] { - Some(max_value) => max_value.evaluate().ok(), - None => None, - }; - let min_value = match &min_values[i] { - Some(min_value) => min_value.evaluate().ok(), - None => None, - }; - ColumnStatistics { - null_count: Some(null_counts[i] as usize), - max_value, - min_value, - distinct_count: None, - } - }) - .collect(), - ) - } else { - None - }; + projected_stats + }); let statistics = Statistics { - num_rows, - total_byte_size, - column_statistics: column_stats, + num_rows: statistics.num_rows, + total_byte_size: statistics.total_byte_size, + column_statistics: new_column_statistics, }; + Self { partitions, schema: Arc::new(projected_schema), @@ -582,22 +263,20 @@ impl ParquetExec { impl ParquetPartition { /// Create a new parquet partition - pub fn new(filenames: Vec, statistics: Statistics) -> Self { + pub fn new(files: Vec, index: usize) -> Self { Self { - filenames, - statistics, + file_partition: FilePartition { index, files }, metrics: ParquetPartitionMetrics::new(), } } /// The Parquet filename for this partition - pub fn filenames(&self) -> &[String] { - &self.filenames - } - - /// Statistics for this partition - pub fn statistics(&self) -> &Statistics { - &self.statistics + pub fn filenames(&self) -> Vec { + self.file_partition + .files + .iter() + .map(|f| f.file_path.clone()) + .collect() } } @@ -663,8 +342,7 @@ impl ExecutionPlan for ParquetExec { Receiver>, ) = channel(2); - let partition = &self.partitions[partition]; - let filenames = partition.filenames.clone(); + let partition = self.partitions[partition].clone(); let metrics = partition.metrics.clone(); let projection = self.projection.clone(); let predicate_builder = self.predicate_builder.clone(); @@ -672,8 +350,8 @@ impl ExecutionPlan for ParquetExec { let limit = self.limit; task::spawn_blocking(move || { - if let Err(e) = read_files( - &filenames, + if let Err(e) = read_partition( + partition, metrics, &projection, &predicate_builder, @@ -698,9 +376,7 @@ impl ExecutionPlan for ParquetExec { let files: Vec<_> = self .partitions .iter() - .map(|pp| pp.filenames.iter()) - .flatten() - .map(|s| s.as_str()) + .map(|pp| format!("{}", pp.file_partition)) .collect(); write!( @@ -720,14 +396,11 @@ impl ExecutionPlan for ParquetExec { .flat_map(|p| { vec![ ( - format!( - "numPredicateEvaluationErrors for {}", - p.filenames.join(",") - ), + format!("numPredicateEvaluationErrors for {}", p.file_partition), p.metrics.predicate_evaluation_errors.as_ref().clone(), ), ( - format!("numRowGroupsPruned for {}", p.filenames.join(",")), + format!("numRowGroupsPruned for {}", p.file_partition), p.metrics.row_groups_pruned.as_ref().clone(), ), ] @@ -851,7 +524,7 @@ fn build_row_group_predicate( match predicate_values { Ok(values) => { // NB: false means don't scan row group - let num_pruned = values.iter().filter(|&v| !v).count(); + let num_pruned = values.iter().filter(|&v| !*v).count(); metrics.row_groups_pruned.add(num_pruned); Box::new(move |_, i| values[i]) } @@ -865,8 +538,9 @@ fn build_row_group_predicate( } } -fn read_files( - filenames: &[String], +#[allow(clippy::too_many_arguments)] +fn read_partition( + partition: ParquetPartition, metrics: ParquetPartitionMetrics, projection: &[usize], predicate_builder: &Option, @@ -875,8 +549,9 @@ fn read_files( limit: Option, ) -> Result<()> { let mut total_rows = 0; - 'outer: for filename in filenames { - let file = File::open(&filename)?; + let all_files = partition.file_partition.files; + 'outer: for partitioned_file in all_files { + let file = File::open(partitioned_file.file_path.as_str())?; let mut file_reader = SerializedFileReader::new(file)?; if let Some(predicate_builder) = predicate_builder { let row_group_predicate = build_row_group_predicate( @@ -904,7 +579,7 @@ fn read_files( Some(Err(e)) => { let err_msg = format!( "Error reading batch from {}: {}", - filename, + partitioned_file, e.to_string() ); // send error to operator @@ -924,12 +599,15 @@ fn read_files( Ok(()) } -fn split_files(filenames: &[String], n: usize) -> Vec<&[String]> { - let mut chunk_size = filenames.len() / n; - if filenames.len() % n > 0 { +fn split_files( + partitioned_files: &[PartitionedFile], + n: usize, +) -> Vec<&[PartitionedFile]> { + let mut chunk_size = partitioned_files.len() / n; + if partitioned_files.len() % n > 0 { chunk_size += 1; } - filenames.chunks(chunk_size).collect() + partitioned_files.chunks(chunk_size).collect() } #[cfg(test)] @@ -945,24 +623,24 @@ mod tests { #[test] fn test_split_files() { - let filenames = vec![ - "a".to_string(), - "b".to_string(), - "c".to_string(), - "d".to_string(), - "e".to_string(), + let files = vec![ + PartitionedFile::from("a".to_string()), + PartitionedFile::from("b".to_string()), + PartitionedFile::from("c".to_string()), + PartitionedFile::from("d".to_string()), + PartitionedFile::from("e".to_string()), ]; - let chunks = split_files(&filenames, 1); + let chunks = split_files(&files, 1); assert_eq!(1, chunks.len()); assert_eq!(5, chunks[0].len()); - let chunks = split_files(&filenames, 2); + let chunks = split_files(&files, 2); assert_eq!(2, chunks.len()); assert_eq!(3, chunks[0].len()); assert_eq!(2, chunks[1].len()); - let chunks = split_files(&filenames, 5); + let chunks = split_files(&files, 5); assert_eq!(5, chunks.len()); assert_eq!(1, chunks[0].len()); assert_eq!(1, chunks[1].len()); @@ -970,7 +648,7 @@ mod tests { assert_eq!(1, chunks[3].len()); assert_eq!(1, chunks[4].len()); - let chunks = split_files(&filenames, 123); + let chunks = split_files(&files, 123); assert_eq!(5, chunks.len()); assert_eq!(1, chunks[0].len()); assert_eq!(1, chunks[1].len()); From 794a28dc492707567513ea90bea05a0ee6ded718 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Mon, 23 Aug 2021 22:26:42 +0800 Subject: [PATCH 2/7] clippy --- ballista/rust/core/src/serde/physical_plan/to_proto.rs | 2 +- datafusion/src/physical_plan/csv.rs | 2 +- datafusion/src/physical_plan/parquet.rs | 5 ++--- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index 8d8f917461a9..438f871b346a 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -271,7 +271,7 @@ impl TryInto for Arc { let filenames = exec .partitions() .iter() - .flat_map(|part| part.filenames().to_owned()) + .flat_map(|part| part.filenames()) .collect(); Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::ParquetScan( diff --git a/datafusion/src/physical_plan/csv.rs b/datafusion/src/physical_plan/csv.rs index 853ba7400484..544f98cba0c6 100644 --- a/datafusion/src/physical_plan/csv.rs +++ b/datafusion/src/physical_plan/csv.rs @@ -141,7 +141,7 @@ impl CsvExec { ) -> Result { let file_extension = String::from(options.file_extension); - let filenames = common::build_file_list(path, options.file_extension)?; + let filenames = common::build_file_list(path, file_extension.as_str())?; if filenames.is_empty() { return Err(DataFusionError::Execution(format!( "No files found at {path} with file extension {file_extension}", diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index c871d128e8c0..9d0bc909b495 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -18,6 +18,7 @@ //! Execution plan for reading Parquet files use std::fmt; +use std::fs::File; use std::sync::Arc; use std::{any::Any, convert::TryInto}; @@ -26,7 +27,7 @@ use crate::{ logical_plan::{Column, Expr}, physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, physical_plan::{ - DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, + DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, }, scalar::ScalarValue, @@ -61,7 +62,6 @@ use super::stream::RecordBatchReceiverStream; use super::SQLMetric; use crate::datasource::parquet::ParquetRootDesc; use crate::datasource::{get_statistics_with_limit, FilePartition, PartitionedFile}; -use std::fs::File; /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] @@ -538,7 +538,6 @@ fn build_row_group_predicate( } } -#[allow(clippy::too_many_arguments)] fn read_partition( partition: ParquetPartition, metrics: ParquetPartitionMetrics, From f50b1a38206ce970438926f0c658f876cb6f5c6f Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 24 Aug 2021 10:29:52 +0800 Subject: [PATCH 3/7] remove schema from partitioned file --- datafusion/src/datasource/mod.rs | 14 ++++++-------- datafusion/src/datasource/parquet.rs | 16 ++++++---------- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs index b4c44d9f1137..6fb1a9bb66ea 100644 --- a/datafusion/src/datasource/mod.rs +++ b/datafusion/src/datasource/mod.rs @@ -50,8 +50,6 @@ pub(crate) enum Source> { pub struct PartitionedFile { /// Path for the file (e.g. URL, filesystem path, etc) pub file_path: String, - /// Schema of the file - pub schema: Schema, /// Statistics of the file pub statistics: Statistics, // Values of partition columns to be appended to each row @@ -65,7 +63,6 @@ impl From for PartitionedFile { fn from(file_path: String) -> Self { Self { file_path, - schema: Schema::empty(), statistics: Default::default(), } } @@ -102,10 +99,13 @@ pub struct SourceRootDescriptor { pub schema: SchemaRef, } +/// Returned partitioned file with its schema +pub type FileAndSchema = (PartitionedFile, Schema); + /// Builder for ['SourceRootDescriptor'] inside given path pub trait SourceRootDescBuilder { /// Construct a ['SourceRootDescriptor'] from the provided path - fn get_source_desc( + fn build_source_desc( path: &str, ext: &str, provided_schema: Option, @@ -129,8 +129,7 @@ pub trait SourceRootDescBuilder { .map(|file_path| { contains_file = true; let result = if collect_statistics { - let pf = Self::get_file_meta(file_path)?; - let schema = pf.schema.clone(); + let (pf, schema) = Self::file_meta(file_path)?; if schemas.is_empty() { schemas.push(schema); } else if schema != schemas[0] { @@ -147,7 +146,6 @@ pub trait SourceRootDescBuilder { } else { PartitionedFile { file_path: file_path.to_owned(), - schema: provided_schema.clone().unwrap(), statistics: Statistics::default(), } }; @@ -171,7 +169,7 @@ pub trait SourceRootDescBuilder { } /// Get all metadata for a source file, including schema, statistics, partitions, etc. - fn get_file_meta(file_path: &str) -> Result; + fn file_meta(file_path: &str) -> Result; } /// Get all files as well as the summary statistics when a limit is provided diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs index e22f0c46d16c..4313a3e1a600 100644 --- a/datafusion/src/datasource/parquet.rs +++ b/datafusion/src/datasource/parquet.rs @@ -29,10 +29,7 @@ use parquet::file::statistics::Statistics as ParquetStatistics; use super::datasource::TableProviderFilterPushDown; use crate::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use crate::datasource::datasource::Statistics; -use crate::datasource::{ - create_max_min_accs, get_col_stats, get_statistics_with_limit, PartitionedFile, - SourceRootDescBuilder, SourceRootDescriptor, TableProvider, -}; +use crate::datasource::{create_max_min_accs, get_col_stats, get_statistics_with_limit, PartitionedFile, SourceRootDescBuilder, SourceRootDescriptor, TableProvider, FileAndSchema}; use crate::error::Result; use crate::logical_plan::{combine_filters, Expr}; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; @@ -165,7 +162,7 @@ pub struct ParquetRootDesc { impl ParquetRootDesc { /// Construct a new parquet descriptor for a root path pub fn new(root_path: &str) -> Result { - let root_desc = Self::get_source_desc(root_path, "parquet", None, true); + let root_desc = Self::build_source_desc(root_path, "parquet", None, true); Ok(Self { descriptor: root_desc?, }) @@ -178,7 +175,7 @@ impl ParquetRootDesc { collect_statistics: bool, ) -> Result { let root_desc = - Self::get_source_desc(root_path, "parquet", schema, collect_statistics); + Self::build_source_desc(root_path, "parquet", schema, collect_statistics); Ok(Self { descriptor: root_desc?, }) @@ -334,7 +331,7 @@ impl ParquetRootDesc { } impl SourceRootDescBuilder for ParquetRootDesc { - fn get_file_meta(file_path: &str) -> Result { + fn file_meta(file_path: &str) -> Result { let file = File::open(file_path)?; let file_reader = Arc::new(SerializedFileReader::new(file)?); let mut arrow_reader = ParquetFileArrowReader::new(file_reader); @@ -395,11 +392,10 @@ impl SourceRootDescBuilder for ParquetRootDesc { column_statistics: column_stats, }; - Ok(PartitionedFile { + Ok((PartitionedFile { file_path, - schema, statistics, - }) + }, schema)) } } From ab71fa6ffba40796883326ee73fb750c295e3faa Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Wed, 25 Aug 2021 18:04:58 +0800 Subject: [PATCH 4/7] ballista logical parquet table --- ballista/rust/core/proto/ballista.proto | 22 +++++- .../core/src/serde/logical_plan/from_proto.rs | 65 ++++++++++++++++-- .../core/src/serde/logical_plan/to_proto.rs | 63 +++++++++++++++-- ballista/rust/scheduler/src/lib.rs | 4 +- datafusion/src/datasource/mod.rs | 25 +++---- datafusion/src/datasource/parquet.rs | 67 ++++++++++++------- datafusion/src/execution/context.rs | 3 +- datafusion/src/logical_plan/builder.rs | 11 +-- datafusion/src/physical_plan/parquet.rs | 11 ++- datafusion/src/sql/planner.rs | 1 + datafusion/tests/parquet_pruning.rs | 13 ++-- 11 files changed, 217 insertions(+), 68 deletions(-) diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index c184de3d1329..0794be462ce5 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -271,12 +271,28 @@ message CsvTableScanNode { repeated LogicalExprNode filters = 8; } +message Statistics { + int64 num_rows = 1; + int64 total_byte_size = 2; + repeated ColumnStats column_stats = 3; +} + +message PartitionedFile { + string path = 1; + Statistics statistics = 2; +} + +message TableDescriptor { + string path = 1; + repeated PartitionedFile partition_files = 2; + Schema schema = 3; +} + message ParquetTableScanNode { string table_name = 1; - string path = 2; + TableDescriptor table_desc = 2; ProjectionColumns projection = 3; - Schema schema = 4; - repeated LogicalExprNode filters = 5; + repeated LogicalExprNode filters = 4; } message ProjectionNode { diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index ade2cb40adb7..86c1c643c9e0 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -21,6 +21,8 @@ use crate::error::BallistaError; use crate::serde::{from_proto_binary_op, proto_error, protobuf}; use crate::{convert_box_required, convert_required}; use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; +use datafusion::datasource::parquet::{ParquetTable, ParquetTableDescriptor}; +use datafusion::datasource::{PartitionedFile, TableDescriptor}; use datafusion::logical_plan::window_frames::{ WindowFrame, WindowFrameBound, WindowFrameUnits, }; @@ -134,10 +136,11 @@ impl TryInto for &protobuf::LogicalPlanNode { .map_err(|e| e.into()) } LogicalPlanType::ParquetScan(scan) => { + let descriptor: TableDescriptor = convert_required!(scan.table_desc)?; let projection = match scan.projection.as_ref() { None => None, Some(columns) => { - let schema: Schema = convert_required!(scan.schema)?; + let schema = descriptor.schema.clone(); let r: Result, _> = columns .columns .iter() @@ -154,11 +157,22 @@ impl TryInto for &protobuf::LogicalPlanNode { Some(r?) } }; - LogicalPlanBuilder::scan_parquet_with_name( - &scan.path, - projection, + + let filters: Vec = scan + .filters + .iter() + .map(|expr| expr.try_into()) + .collect::, _>>()?; + let parquet_table = ParquetTable::try_new_with_desc( + Arc::new(ParquetTableDescriptor { descriptor }), 24, + true, + )?; + LogicalPlanBuilder::scan( &scan.table_name, + Arc::new(parquet_table), + projection, + Some(filters), )? //TODO concurrency .build() .map_err(|e| e.into()) @@ -301,6 +315,48 @@ impl TryInto for &protobuf::LogicalPlanNode { } } +impl TryInto for &protobuf::TableDescriptor { + type Error = BallistaError; + + fn try_into(self) -> Result { + let partition_files = self + .partition_files + .iter() + .map(|f| f.try_into()) + .collect::, _>>()?; + let schema = convert_required!(self.schema)?; + Ok(TableDescriptor { + path: self.path.to_owned(), + partition_files, + schema: Arc::new(schema), + }) + } +} + +impl TryInto for &protobuf::PartitionedFile { + type Error = BallistaError; + + fn try_into(self) -> Result { + let statistics = convert_required!(self.statistics)?; + Ok(PartitionedFile { + file_path: self.path.clone(), + statistics, + }) + } +} + +impl TryInto for &protobuf::Statistics { + type Error = BallistaError; + + fn try_into(self) -> Result { + Ok(Statistics { + num_rows: Some(self.num_rows as usize), + total_byte_size: Some(self.total_byte_size as usize), + column_statistics: None, + }) + } +} + impl From<&protobuf::Column> for Column { fn from(c: &protobuf::Column) -> Column { let c = c.clone(); @@ -1114,6 +1170,7 @@ impl TryInto for &protobuf::Field { } } +use datafusion::datasource::datasource::Statistics; use datafusion::physical_plan::{aggregates, windows}; use datafusion::prelude::{ array, date_part, date_trunc, length, lower, ltrim, md5, rtrim, sha224, sha256, diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index 5877ced5f561..c20de8c81c6f 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -22,8 +22,11 @@ use super::super::proto_error; use crate::datasource::DfTableAdapter; use crate::serde::{protobuf, BallistaError}; -use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit}; -use datafusion::datasource::CsvFile; +use datafusion::arrow::datatypes::{ + DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit, +}; +use datafusion::datasource::datasource::Statistics; +use datafusion::datasource::{CsvFile, PartitionedFile, TableDescriptor}; use datafusion::logical_plan::{ window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits}, Column, Expr, JoinConstraint, JoinType, LogicalPlan, @@ -253,6 +256,44 @@ impl TryInto for &protobuf::ArrowType { } } +#[allow(clippy::from_over_into)] +impl Into for Statistics { + fn into(self) -> protobuf::Statistics { + let none_value = -1_i64; + protobuf::Statistics { + num_rows: self.num_rows.map(|n| n as i64).unwrap_or(none_value), + total_byte_size: self.total_byte_size.map(|n| n as i64).unwrap_or(none_value), + column_stats: vec![], + } + } +} + +impl From<&PartitionedFile> for protobuf::PartitionedFile { + fn from(pf: &PartitionedFile) -> protobuf::PartitionedFile { + protobuf::PartitionedFile { + path: pf.file_path.clone(), + statistics: Some(pf.statistics.clone().into()), + } + } +} + +impl TryFrom for protobuf::TableDescriptor { + type Error = BallistaError; + + fn try_from(desc: TableDescriptor) -> Result { + let partition_files: Vec = + desc.partition_files.iter().map(|pf| pf.into()).collect(); + + let schema: protobuf::Schema = desc.schema.into(); + + Ok(protobuf::TableDescriptor { + path: desc.path, + partition_files, + schema: Some(schema), + }) + } +} + impl TryInto for &Box { type Error = BallistaError; fn try_into(self) -> Result { @@ -706,13 +747,14 @@ impl TryInto for &LogicalPlan { .collect::, _>>()?; if let Some(parquet) = source.downcast_ref::() { + let table_desc: protobuf::TableDescriptor = + parquet.desc.descriptor.clone().try_into()?; Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::ParquetScan( protobuf::ParquetTableScanNode { table_name: table_name.to_owned(), - path: parquet.path().to_owned(), + table_desc: Some(table_desc), projection, - schema: Some(schema), filters, }, )), @@ -1262,6 +1304,19 @@ impl Into for &Schema { } } +#[allow(clippy::from_over_into)] +impl Into for SchemaRef { + fn into(self) -> protobuf::Schema { + protobuf::Schema { + columns: self + .fields() + .iter() + .map(protobuf::Field::from) + .collect::>(), + } + } +} + impl From<&datafusion::logical_plan::DFField> for protobuf::DfField { fn from(f: &datafusion::logical_plan::DFField) -> protobuf::DfField { protobuf::DfField { diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index 8d3bd341b8d7..33ac03574caa 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -82,7 +82,7 @@ use self::state::{ConfigBackendClient, SchedulerState}; use ballista_core::config::BallistaConfig; use ballista_core::execution_plans::ShuffleWriterExec; use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; -use datafusion::datasource::parquet::ParquetRootDesc; +use datafusion::datasource::parquet::ParquetTableDescriptor; use datafusion::prelude::{ExecutionConfig, ExecutionContext}; use std::time::{Instant, SystemTime, UNIX_EPOCH}; @@ -282,7 +282,7 @@ impl SchedulerGrpc for SchedulerServer { match file_type { FileType::Parquet => { - let parquet_desc = ParquetRootDesc::new(&path).map_err(|e| { + let parquet_desc = ParquetTableDescriptor::new(&path).map_err(|e| { let msg = format!("Error opening parquet files: {}", e); error!("{}", msg); tonic::Status::internal(msg) diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs index 6fb1a9bb66ea..394da783ad32 100644 --- a/datafusion/src/datasource/mod.rs +++ b/datafusion/src/datasource/mod.rs @@ -54,8 +54,6 @@ pub struct PartitionedFile { pub statistics: Statistics, // Values of partition columns to be appended to each row // pub partition_value: Option>, - // Schema of partition columns - // pub partition_schema: Option, // We may include row group range here for a more fine-grained parallel execution } @@ -92,7 +90,9 @@ impl std::fmt::Display for FilePartition { #[derive(Debug, Clone)] /// All source files with same schema exists in a path -pub struct SourceRootDescriptor { +pub struct TableDescriptor { + /// root path of the table + pub path: String, /// All source files in the path pub partition_files: Vec, /// The schema of the files @@ -102,15 +102,15 @@ pub struct SourceRootDescriptor { /// Returned partitioned file with its schema pub type FileAndSchema = (PartitionedFile, Schema); -/// Builder for ['SourceRootDescriptor'] inside given path -pub trait SourceRootDescBuilder { - /// Construct a ['SourceRootDescriptor'] from the provided path - fn build_source_desc( +/// Builder for ['TableDescriptor'] inside given path +pub trait TableDescriptorBuilder { + /// Construct a ['TableDescriptor'] from the provided path + fn build_table_desc( path: &str, ext: &str, provided_schema: Option, collect_statistics: bool, - ) -> Result { + ) -> Result { let filenames = build_file_list(path, ext)?; if filenames.is_empty() { return Err(DataFusionError::Plan(format!( @@ -162,7 +162,8 @@ pub trait SourceRootDescBuilder { let result_schema = provided_schema.unwrap_or_else(|| schemas.pop().unwrap()); - Ok(SourceRootDescriptor { + Ok(TableDescriptor { + path: path.to_string(), partition_files: partitioned_files?, schema: Arc::new(result_schema), }) @@ -174,11 +175,11 @@ pub trait SourceRootDescBuilder { /// Get all files as well as the summary statistics when a limit is provided pub fn get_statistics_with_limit( - source_desc: &SourceRootDescriptor, + table_desc: &TableDescriptor, limit: Option, ) -> (Vec, Statistics) { - let mut all_files = source_desc.partition_files.clone(); - let schema = source_desc.schema.clone(); + let mut all_files = table_desc.partition_files.clone(); + let schema = table_desc.schema.clone(); let mut total_byte_size = 0; let mut null_counts = vec![0; schema.fields().len()]; diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs index 4313a3e1a600..5f4d1eeb3b50 100644 --- a/datafusion/src/datasource/parquet.rs +++ b/datafusion/src/datasource/parquet.rs @@ -29,7 +29,10 @@ use parquet::file::statistics::Statistics as ParquetStatistics; use super::datasource::TableProviderFilterPushDown; use crate::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use crate::datasource::datasource::Statistics; -use crate::datasource::{create_max_min_accs, get_col_stats, get_statistics_with_limit, PartitionedFile, SourceRootDescBuilder, SourceRootDescriptor, TableProvider, FileAndSchema}; +use crate::datasource::{ + create_max_min_accs, get_col_stats, get_statistics_with_limit, FileAndSchema, + PartitionedFile, TableDescriptor, TableDescriptorBuilder, TableProvider, +}; use crate::error::Result; use crate::logical_plan::{combine_filters, Expr}; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; @@ -39,8 +42,8 @@ use crate::scalar::ScalarValue; /// Table-based representation of a `ParquetFile`. pub struct ParquetTable { - path: String, - desc: Arc, + /// Descriptor of the table, including schema, files, etc. + pub desc: Arc, max_concurrency: usize, enable_pruning: bool, } @@ -49,10 +52,9 @@ impl ParquetTable { /// Attempt to initialize a new `ParquetTable` from a file path. pub fn try_new(path: impl Into, max_concurrency: usize) -> Result { let path = path.into(); - let root_desc = ParquetRootDesc::new(path.as_str()); + let table_desc = ParquetTableDescriptor::new(path.as_str()); Ok(Self { - path, - desc: Arc::new(root_desc?), + desc: Arc::new(table_desc?), max_concurrency, enable_pruning: true, }) @@ -67,22 +69,34 @@ impl ParquetTable { collect_statistics: bool, ) -> Result { let path = path.into(); - let root_desc = ParquetRootDesc::new_with_schema( + let table_desc = ParquetTableDescriptor::new_with_schema( path.as_str(), Some(schema), collect_statistics, ); Ok(Self { - path, - desc: Arc::new(root_desc?), + desc: Arc::new(table_desc?), max_concurrency, enable_pruning: true, }) } + /// Attempt to initialize a new `ParquetTable` from a table descriptor. + pub fn try_new_with_desc( + desc: Arc, + max_concurrency: usize, + enable_pruning: bool, + ) -> Result { + Ok(Self { + desc, + max_concurrency, + enable_pruning, + }) + } + /// Get the path for the Parquet file(s) represented by this ParquetTable instance pub fn path(&self) -> &str { - &self.path + &self.desc.descriptor.path } /// Get parquet pruning option @@ -152,19 +166,19 @@ impl TableProvider for ParquetTable { } } -#[derive(Debug)] +#[derive(Debug, Clone)] /// Descriptor for a parquet root path -pub struct ParquetRootDesc { +pub struct ParquetTableDescriptor { /// metadata for files inside the root path - pub descriptor: SourceRootDescriptor, + pub descriptor: TableDescriptor, } -impl ParquetRootDesc { +impl ParquetTableDescriptor { /// Construct a new parquet descriptor for a root path pub fn new(root_path: &str) -> Result { - let root_desc = Self::build_source_desc(root_path, "parquet", None, true); + let table_desc = Self::build_table_desc(root_path, "parquet", None, true); Ok(Self { - descriptor: root_desc?, + descriptor: table_desc?, }) } @@ -174,10 +188,10 @@ impl ParquetRootDesc { schema: Option, collect_statistics: bool, ) -> Result { - let root_desc = - Self::build_source_desc(root_path, "parquet", schema, collect_statistics); + let table_desc = + Self::build_table_desc(root_path, "parquet", schema, collect_statistics); Ok(Self { - descriptor: root_desc?, + descriptor: table_desc?, }) } @@ -330,7 +344,7 @@ impl ParquetRootDesc { } } -impl SourceRootDescBuilder for ParquetRootDesc { +impl TableDescriptorBuilder for ParquetTableDescriptor { fn file_meta(file_path: &str) -> Result { let file = File::open(file_path)?; let file_reader = Arc::new(SerializedFileReader::new(file)?); @@ -364,7 +378,7 @@ impl SourceRootDescBuilder for ParquetRootDesc { for (i, column) in row_group_meta.columns().iter().enumerate() { if let Some(stat) = column.statistics() { has_statistics = true; - ParquetRootDesc::summarize_min_max( + ParquetTableDescriptor::summarize_min_max( &mut max_values, &mut min_values, &fields, @@ -392,10 +406,13 @@ impl SourceRootDescBuilder for ParquetRootDesc { column_statistics: column_stats, }; - Ok((PartitionedFile { - file_path, - statistics, - }, schema)) + Ok(( + PartitionedFile { + file_path, + statistics, + }, + schema, + )) } } diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 7a54beac6d86..5c2a0429a0e4 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -304,7 +304,7 @@ impl ExecutionContext { ) -> Result> { Ok(Arc::new(DataFrameImpl::new( self.state.clone(), - &LogicalPlanBuilder::scan(UNNAMED_TABLE, provider, None)?.build()?, + &LogicalPlanBuilder::scan(UNNAMED_TABLE, provider, None, None)?.build()?, ))) } @@ -413,6 +413,7 @@ impl ExecutionContext { table_ref.table(), Arc::clone(provider), None, + None, )? .build()?; Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan))) diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index d9afe2e01f38..2e26ab212f2b 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -109,7 +109,7 @@ impl LogicalPlanBuilder { projection: Option>, ) -> Result { let provider = Arc::new(MemTable::try_new(schema, partitions)?); - Self::scan(UNNAMED_TABLE, provider, projection) + Self::scan(UNNAMED_TABLE, provider, projection, None) } /// Scan a CSV data source @@ -130,7 +130,7 @@ impl LogicalPlanBuilder { table_name: impl Into, ) -> Result { let provider = Arc::new(CsvFile::try_new(path, options)?); - Self::scan(table_name, provider, projection) + Self::scan(table_name, provider, projection, None) } /// Scan a Parquet data source @@ -151,7 +151,7 @@ impl LogicalPlanBuilder { table_name: impl Into, ) -> Result { let provider = Arc::new(ParquetTable::try_new(path, max_concurrency)?); - Self::scan(table_name, provider, projection) + Self::scan(table_name, provider, projection, None) } /// Scan an empty data source, mainly used in tests @@ -162,7 +162,7 @@ impl LogicalPlanBuilder { ) -> Result { let table_schema = Arc::new(table_schema.clone()); let provider = Arc::new(EmptyTable::new(table_schema)); - Self::scan(name.unwrap_or(UNNAMED_TABLE), provider, projection) + Self::scan(name.unwrap_or(UNNAMED_TABLE), provider, projection, None) } /// Convert a table provider into a builder with a TableScan @@ -170,6 +170,7 @@ impl LogicalPlanBuilder { table_name: impl Into, provider: Arc, projection: Option>, + filters: Option>, ) -> Result { let table_name = table_name.into(); @@ -201,7 +202,7 @@ impl LogicalPlanBuilder { source: provider, projected_schema: Arc::new(projected_schema), projection, - filters: vec![], + filters: filters.unwrap_or_default(), limit: None, }; diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index 9d0bc909b495..f16065790c4a 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -27,8 +27,7 @@ use crate::{ logical_plan::{Column, Expr}, physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, physical_plan::{ - DisplayFormatType, ExecutionPlan, Partitioning, - SendableRecordBatchStream, + DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, }, scalar::ScalarValue, }; @@ -60,7 +59,7 @@ use async_trait::async_trait; use super::stream::RecordBatchReceiverStream; use super::SQLMetric; -use crate::datasource::parquet::ParquetRootDesc; +use crate::datasource::parquet::ParquetTableDescriptor; use crate::datasource::{get_statistics_with_limit, FilePartition, PartitionedFile}; /// Execution plan for scanning one or more Parquet partitions @@ -130,9 +129,9 @@ impl ParquetExec { ) -> Result { // build a list of filenames from the specified path, which could be a single file or // a directory containing one or more parquet files - let root_desc = ParquetRootDesc::new(path)?; + let table_desc = ParquetTableDescriptor::new(path)?; Self::try_new( - Arc::new(root_desc), + Arc::new(table_desc), projection, predicate, batch_size, @@ -143,7 +142,7 @@ impl ParquetExec { /// Create a new Parquet reader execution plan with root descriptor, provided partitions and schema pub fn try_new( - desc: Arc, + desc: Arc, projection: Option>, predicate: Option, batch_size: usize, diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index 29204f4a6ded..0ee2c2e0cf5e 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -519,6 +519,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .unwrap_or(&table_name), provider, None, + None, )? .build(), (None, None) => Err(DataFusionError::Plan(format!( diff --git a/datafusion/tests/parquet_pruning.rs b/datafusion/tests/parquet_pruning.rs index 789f0810c983..14c600cfc6e5 100644 --- a/datafusion/tests/parquet_pruning.rs +++ b/datafusion/tests/parquet_pruning.rs @@ -499,12 +499,13 @@ impl ContextWithParquet { /// the number of output rows and normalized execution metrics async fn query_with_expr(&mut self, expr: Expr) -> TestOutput { let sql = format!("EXPR only: {:?}", expr); - let logical_plan = LogicalPlanBuilder::scan("t", self.provider.clone(), None) - .unwrap() - .filter(expr) - .unwrap() - .build() - .unwrap(); + let logical_plan = + LogicalPlanBuilder::scan("t", self.provider.clone(), None, None) + .unwrap() + .filter(expr) + .unwrap() + .build() + .unwrap(); self.run_test(logical_plan, sql).await } From fd2a0b06c296ac77265a32781c57a84df8db8a24 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Wed, 25 Aug 2021 20:34:04 +0800 Subject: [PATCH 5/7] ballista physical parquet exec --- ballista/rust/core/proto/ballista.proto | 11 +++++-- .../src/serde/physical_plan/from_proto.rs | 33 ++++++++++++++++--- .../core/src/serde/physical_plan/to_proto.rs | 23 ++++++++----- datafusion/src/physical_plan/parquet.rs | 4 +-- 4 files changed, 53 insertions(+), 18 deletions(-) diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 0794be462ce5..45ff6c5984ca 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -583,10 +583,15 @@ message FilterExecNode { PhysicalExprNode expr = 2; } +message ParquetPartition { + uint32 index = 1; + repeated PartitionedFile files = 2; +} + message ParquetScanExecNode { - repeated string filename = 1; - repeated uint32 projection = 2; - uint32 num_partitions = 3; + repeated ParquetPartition partitions = 1; + Schema schema = 2; + repeated uint32 projection = 3; uint32 batch_size = 4; } diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 2cc7cda6ff48..9d758285f1db 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -34,6 +34,8 @@ use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion::catalog::catalog::{ CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider, }; +use datafusion::datasource::datasource::Statistics; +use datafusion::datasource::FilePartition; use datafusion::execution::context::{ ExecutionConfig, ExecutionContextState, ExecutionProps, }; @@ -44,6 +46,7 @@ use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunc use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use datafusion::physical_plan::hash_join::PartitionMode; +use datafusion::physical_plan::parquet::{ParquetExecMetrics, ParquetPartition}; use datafusion::physical_plan::planner::DefaultPhysicalPlanner; use datafusion::physical_plan::window_functions::{ BuiltInWindowFunction, WindowFunction, @@ -129,16 +132,23 @@ impl TryInto> for &protobuf::PhysicalPlanNode { )?)) } PhysicalPlanType::ParquetScan(scan) => { + let partitions = scan + .partitions + .iter() + .map(|p| p.try_into()) + .collect::, _>>()?; + let schema = Arc::new(convert_required!(scan.schema)?); let projection = scan.projection.iter().map(|i| *i as usize).collect(); - let path: &str = scan.filename[0].as_str(); - Ok(Arc::new(ParquetExec::try_from_path( - path, + Ok(Arc::new(ParquetExec::new( + partitions, + schema, Some(projection), + Statistics::default(), + ParquetExecMetrics::new(), None, scan.batch_size as usize, - scan.num_partitions as usize, None, - )?)) + ))) } PhysicalPlanType::CoalesceBatches(coalesce_batches) => { let input: Arc = @@ -470,6 +480,19 @@ impl TryInto> for &protobuf::PhysicalPlanNode { } } +impl TryInto for &protobuf::ParquetPartition { + type Error = BallistaError; + + fn try_into(self) -> Result { + let files = self + .files + .iter() + .map(|f| f.try_into()) + .collect::, _>>()?; + Ok(ParquetPartition::new(files, self.index as usize)) + } +} + impl From<&protobuf::PhysicalColumn> for Column { fn from(c: &protobuf::PhysicalColumn) -> Column { Column::new(&c.name, c.index as usize) diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index 438f871b346a..e7d4ac652874 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -38,7 +38,7 @@ use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::hash_aggregate::AggregateMode; use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode}; use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; -use datafusion::physical_plan::parquet::ParquetExec; +use datafusion::physical_plan::parquet::{ParquetExec, ParquetPartition}; use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::sort::SortExec; use datafusion::{ @@ -268,22 +268,19 @@ impl TryInto for Arc { )), }) } else if let Some(exec) = plan.downcast_ref::() { - let filenames = exec - .partitions() - .iter() - .flat_map(|part| part.filenames()) - .collect(); + let partitions = exec.partitions().iter().map(|p| p.into()).collect(); + Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::ParquetScan( protobuf::ParquetScanExecNode { - filename: filenames, + partitions, + schema: Some(exec.schema.as_ref().into()), projection: exec .projection() .as_ref() .iter() .map(|n| *n as u32) .collect(), - num_partitions: exec.partitions().len() as u32, batch_size: exec.batch_size() as u32, }, )), @@ -621,6 +618,16 @@ impl TryFrom> for protobuf::PhysicalExprNode { } } +impl From<&ParquetPartition> for protobuf::ParquetPartition { + fn from(p: &ParquetPartition) -> protobuf::ParquetPartition { + let files = p.file_partition.files.iter().map(|f| f.into()).collect(); + protobuf::ParquetPartition { + index: p.file_partition.index as u32, + files, + } + } +} + fn try_parse_when_then_expr( when_expr: &Arc, then_expr: &Arc, diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index f16065790c4a..6ade53b84566 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -66,9 +66,9 @@ use crate::datasource::{get_statistics_with_limit, FilePartition, PartitionedFil #[derive(Debug, Clone)] pub struct ParquetExec { /// Parquet partitions to read - partitions: Vec, + pub partitions: Vec, /// Schema after projection is applied - schema: SchemaRef, + pub schema: SchemaRef, /// Projection for which columns to load projection: Vec, /// Batch size From 030fb55efb90f5c067b7bc33d37ff70fb20895dd Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sun, 29 Aug 2021 11:46:07 +0800 Subject: [PATCH 6/7] resolve comments --- .../core/src/serde/logical_plan/from_proto.rs | 17 +++++++++-- .../core/src/serde/logical_plan/to_proto.rs | 30 ++++++++++++++----- ballista/rust/scheduler/src/lib.rs | 13 ++++++-- 3 files changed, 47 insertions(+), 13 deletions(-) diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index 035dfb72a492..97a442f4347c 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -345,14 +345,26 @@ impl TryInto for &protobuf::PartitionedFile { } } +impl From<&protobuf::ColumnStats> for ColumnStatistics { + fn from(cs: &protobuf::ColumnStats) -> ColumnStatistics { + ColumnStatistics { + null_count: Some(cs.null_count as usize), + max_value: cs.max_value.as_ref().map(|m| m.try_into().unwrap()), + min_value: cs.min_value.as_ref().map(|m| m.try_into().unwrap()), + distinct_count: Some(cs.distinct_count as usize), + } + } +} + impl TryInto for &protobuf::Statistics { type Error = BallistaError; fn try_into(self) -> Result { + let column_statistics = self.column_stats.iter().map(|s| s.into()).collect(); Ok(Statistics { num_rows: Some(self.num_rows as usize), total_byte_size: Some(self.total_byte_size as usize), - column_statistics: None, + column_statistics: Some(column_statistics), }) } } @@ -1170,7 +1182,8 @@ impl TryInto for &protobuf::Field { } } -use datafusion::datasource::datasource::Statistics; +use crate::serde::protobuf::ColumnStats; +use datafusion::datasource::datasource::{ColumnStatistics, Statistics}; use datafusion::physical_plan::{aggregates, windows}; use datafusion::prelude::{ array, date_part, date_trunc, length, lower, ltrim, md5, rtrim, sha224, sha256, diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index c20de8c81c6f..3cc9d0a11984 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -25,7 +25,7 @@ use crate::serde::{protobuf, BallistaError}; use datafusion::arrow::datatypes::{ DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit, }; -use datafusion::datasource::datasource::Statistics; +use datafusion::datasource::datasource::{ColumnStatistics, Statistics}; use datafusion::datasource::{CsvFile, PartitionedFile, TableDescriptor}; use datafusion::logical_plan::{ window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits}, @@ -256,14 +256,28 @@ impl TryInto for &protobuf::ArrowType { } } -#[allow(clippy::from_over_into)] -impl Into for Statistics { - fn into(self) -> protobuf::Statistics { +impl From<&ColumnStatistics> for protobuf::ColumnStats { + fn from(cs: &ColumnStatistics) -> protobuf::ColumnStats { + protobuf::ColumnStats { + min_value: cs.min_value.as_ref().map(|m| m.try_into().unwrap()), + max_value: cs.max_value.as_ref().map(|m| m.try_into().unwrap()), + null_count: cs.null_count.map(|n| n as u32).unwrap_or(0), + distinct_count: cs.distinct_count.map(|n| n as u32).unwrap_or(0), + } + } +} + +impl From<&Statistics> for protobuf::Statistics { + fn from(s: &Statistics) -> protobuf::Statistics { let none_value = -1_i64; + let column_stats = match &s.column_statistics { + None => vec![], + Some(column_stats) => column_stats.iter().map(|s| s.into()).collect(), + }; protobuf::Statistics { - num_rows: self.num_rows.map(|n| n as i64).unwrap_or(none_value), - total_byte_size: self.total_byte_size.map(|n| n as i64).unwrap_or(none_value), - column_stats: vec![], + num_rows: s.num_rows.map(|n| n as i64).unwrap_or(none_value), + total_byte_size: s.total_byte_size.map(|n| n as i64).unwrap_or(none_value), + column_stats, } } } @@ -272,7 +286,7 @@ impl From<&PartitionedFile> for protobuf::PartitionedFile { fn from(pf: &PartitionedFile) -> protobuf::PartitionedFile { protobuf::PartitionedFile { path: pf.file_path.clone(), - statistics: Some(pf.statistics.clone().into()), + statistics: Some((&pf.statistics).into()), } } } diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index 5fa2c827d291..411d0131a803 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -288,12 +288,19 @@ impl SchedulerGrpc for SchedulerServer { tonic::Status::internal(msg) })?; + let partitions = parquet_desc + .descriptor + .partition_files + .iter() + .map(|pf| FilePartitionMetadata { + filename: vec![pf.file_path.clone()], + }) + .collect(); + //TODO include statistics and any other info needed to reconstruct ParquetExec Ok(Response::new(GetFileMetadataResult { schema: Some(parquet_desc.schema().as_ref().into()), - partitions: vec![FilePartitionMetadata { - filename: vec![path], - }], + partitions, })) } //TODO implement for CSV From 288db83f416053636fc86a8864cd010b7d1811b3 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sun, 29 Aug 2021 21:08:25 +0800 Subject: [PATCH 7/7] resolve comments --- .../core/src/serde/logical_plan/from_proto.rs | 8 +----- .../core/src/serde/logical_plan/to_proto.rs | 2 +- ballista/rust/scheduler/src/lib.rs | 2 +- datafusion/src/datasource/mod.rs | 27 +++++++++++-------- datafusion/src/datasource/parquet.rs | 15 +++++------ datafusion/src/execution/context.rs | 3 +-- datafusion/src/logical_plan/builder.rs | 11 ++++---- datafusion/src/physical_plan/parquet.rs | 9 +++---- datafusion/src/sql/planner.rs | 1 - datafusion/tests/parquet_pruning.rs | 13 +++++---- 10 files changed, 40 insertions(+), 51 deletions(-) diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index 97a442f4347c..fc4ac2c9076c 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -158,11 +158,6 @@ impl TryInto for &protobuf::LogicalPlanNode { } }; - let filters: Vec = scan - .filters - .iter() - .map(|expr| expr.try_into()) - .collect::, _>>()?; let parquet_table = ParquetTable::try_new_with_desc( Arc::new(ParquetTableDescriptor { descriptor }), 24, @@ -172,7 +167,6 @@ impl TryInto for &protobuf::LogicalPlanNode { &scan.table_name, Arc::new(parquet_table), projection, - Some(filters), )? //TODO remove hard-coded max_partitions .build() .map_err(|e| e.into()) @@ -339,7 +333,7 @@ impl TryInto for &protobuf::PartitionedFile { fn try_into(self) -> Result { let statistics = convert_required!(self.statistics)?; Ok(PartitionedFile { - file_path: self.path.clone(), + path: self.path.clone(), statistics, }) } diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index 3cc9d0a11984..aa7a973dd340 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -285,7 +285,7 @@ impl From<&Statistics> for protobuf::Statistics { impl From<&PartitionedFile> for protobuf::PartitionedFile { fn from(pf: &PartitionedFile) -> protobuf::PartitionedFile { protobuf::PartitionedFile { - path: pf.file_path.clone(), + path: pf.path.clone(), statistics: Some((&pf.statistics).into()), } } diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index 411d0131a803..f03d08b1b0ed 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -293,7 +293,7 @@ impl SchedulerGrpc for SchedulerServer { .partition_files .iter() .map(|pf| FilePartitionMetadata { - filename: vec![pf.file_path.clone()], + filename: vec![pf.path.clone()], }) .collect(); diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs index 394da783ad32..d5e29522f6ba 100644 --- a/datafusion/src/datasource/mod.rs +++ b/datafusion/src/datasource/mod.rs @@ -49,7 +49,7 @@ pub(crate) enum Source> { /// and partition column values that need to be appended to each row. pub struct PartitionedFile { /// Path for the file (e.g. URL, filesystem path, etc) - pub file_path: String, + pub path: String, /// Statistics of the file pub statistics: Statistics, // Values of partition columns to be appended to each row @@ -58,9 +58,9 @@ pub struct PartitionedFile { } impl From for PartitionedFile { - fn from(file_path: String) -> Self { + fn from(path: String) -> Self { Self { - file_path, + path, statistics: Default::default(), } } @@ -68,7 +68,7 @@ impl From for PartitionedFile { impl std::fmt::Display for PartitionedFile { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}", self.file_path) + write!(f, "{}", self.path) } } @@ -100,7 +100,10 @@ pub struct TableDescriptor { } /// Returned partitioned file with its schema -pub type FileAndSchema = (PartitionedFile, Schema); +pub struct FileAndSchema { + file: PartitionedFile, + schema: Schema, +} /// Builder for ['TableDescriptor'] inside given path pub trait TableDescriptorBuilder { @@ -119,7 +122,7 @@ pub trait TableDescriptorBuilder { ))); } - // build a list of Parquet partitions with statistics and gather all unique schemas + // build a list of partitions with statistics and gather all unique schemas // used in this data set let mut schemas: Vec = vec![]; let mut contains_file = false; @@ -129,7 +132,7 @@ pub trait TableDescriptorBuilder { .map(|file_path| { contains_file = true; let result = if collect_statistics { - let (pf, schema) = Self::file_meta(file_path)?; + let FileAndSchema {file, schema} = Self::file_meta(file_path)?; if schemas.is_empty() { schemas.push(schema); } else if schema != schemas[0] { @@ -142,10 +145,10 @@ pub trait TableDescriptorBuilder { file_path ))); } - pf + file } else { PartitionedFile { - file_path: file_path.to_owned(), + path: file_path.to_owned(), statistics: Statistics::default(), } }; @@ -170,10 +173,12 @@ pub trait TableDescriptorBuilder { } /// Get all metadata for a source file, including schema, statistics, partitions, etc. - fn file_meta(file_path: &str) -> Result; + fn file_meta(path: &str) -> Result; } -/// Get all files as well as the summary statistics when a limit is provided +/// Get all files as well as the summary statistic +/// if the optional `limit` is provided, includes only sufficient files +/// needed to read up to `limit` number of rows pub fn get_statistics_with_limit( table_desc: &TableDescriptor, limit: Option, diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs index c81880b17e6b..c11aadea9a64 100644 --- a/datafusion/src/datasource/parquet.rs +++ b/datafusion/src/datasource/parquet.rs @@ -345,11 +345,11 @@ impl ParquetTableDescriptor { } impl TableDescriptorBuilder for ParquetTableDescriptor { - fn file_meta(file_path: &str) -> Result { - let file = File::open(file_path)?; + fn file_meta(path: &str) -> Result { + let file = File::open(path)?; let file_reader = Arc::new(SerializedFileReader::new(file)?); let mut arrow_reader = ParquetFileArrowReader::new(file_reader); - let file_path = file_path.to_string(); + let path = path.to_string(); let schema = arrow_reader.get_schema()?; let num_fields = schema.fields().len(); let fields = schema.fields().to_vec(); @@ -406,13 +406,10 @@ impl TableDescriptorBuilder for ParquetTableDescriptor { column_statistics: column_stats, }; - Ok(( - PartitionedFile { - file_path, - statistics, - }, + Ok(FileAndSchema { + file: PartitionedFile { path, statistics }, schema, - )) + }) } } diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index bebfe9db67a1..2e6a7a4f7012 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -304,7 +304,7 @@ impl ExecutionContext { ) -> Result> { Ok(Arc::new(DataFrameImpl::new( self.state.clone(), - &LogicalPlanBuilder::scan(UNNAMED_TABLE, provider, None, None)?.build()?, + &LogicalPlanBuilder::scan(UNNAMED_TABLE, provider, None)?.build()?, ))) } @@ -413,7 +413,6 @@ impl ExecutionContext { table_ref.table(), Arc::clone(provider), None, - None, )? .build()?; Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan))) diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index a0ce8bedeb4f..f31dd3732883 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -109,7 +109,7 @@ impl LogicalPlanBuilder { projection: Option>, ) -> Result { let provider = Arc::new(MemTable::try_new(schema, partitions)?); - Self::scan(UNNAMED_TABLE, provider, projection, None) + Self::scan(UNNAMED_TABLE, provider, projection) } /// Scan a CSV data source @@ -130,7 +130,7 @@ impl LogicalPlanBuilder { table_name: impl Into, ) -> Result { let provider = Arc::new(CsvFile::try_new(path, options)?); - Self::scan(table_name, provider, projection, None) + Self::scan(table_name, provider, projection) } /// Scan a Parquet data source @@ -151,7 +151,7 @@ impl LogicalPlanBuilder { table_name: impl Into, ) -> Result { let provider = Arc::new(ParquetTable::try_new(path, max_partitions)?); - Self::scan(table_name, provider, projection, None) + Self::scan(table_name, provider, projection) } /// Scan an empty data source, mainly used in tests @@ -162,7 +162,7 @@ impl LogicalPlanBuilder { ) -> Result { let table_schema = Arc::new(table_schema.clone()); let provider = Arc::new(EmptyTable::new(table_schema)); - Self::scan(name.unwrap_or(UNNAMED_TABLE), provider, projection, None) + Self::scan(name.unwrap_or(UNNAMED_TABLE), provider, projection) } /// Convert a table provider into a builder with a TableScan @@ -170,7 +170,6 @@ impl LogicalPlanBuilder { table_name: impl Into, provider: Arc, projection: Option>, - filters: Option>, ) -> Result { let table_name = table_name.into(); @@ -202,7 +201,7 @@ impl LogicalPlanBuilder { source: provider, projected_schema: Arc::new(projected_schema), projection, - filters: filters.unwrap_or_default(), + filters: vec![], limit: None, }; diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index dac4e8175473..eb8f927fc2ad 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -531,12 +531,9 @@ fn read_partition( let mut total_rows = 0; let all_files = partition.file_partition.files; 'outer: for partitioned_file in all_files { - let file_metrics = ParquetFileMetrics::new( - partition_index, - &*partitioned_file.file_path, - &metrics, - ); - let file = File::open(partitioned_file.file_path.as_str())?; + let file_metrics = + ParquetFileMetrics::new(partition_index, &*partitioned_file.path, &metrics); + let file = File::open(partitioned_file.path.as_str())?; let mut file_reader = SerializedFileReader::new(file)?; if let Some(predicate_builder) = predicate_builder { let row_group_predicate = build_row_group_predicate( diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index a7cd035e5844..44fc0b2eba6c 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -519,7 +519,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .unwrap_or(&table_name), provider, None, - None, )? .build(), (None, None) => Err(DataFusionError::Plan(format!( diff --git a/datafusion/tests/parquet_pruning.rs b/datafusion/tests/parquet_pruning.rs index 3e86d43aded1..14f5dd20f470 100644 --- a/datafusion/tests/parquet_pruning.rs +++ b/datafusion/tests/parquet_pruning.rs @@ -497,13 +497,12 @@ impl ContextWithParquet { /// the number of output rows and normalized execution metrics async fn query_with_expr(&mut self, expr: Expr) -> TestOutput { let sql = format!("EXPR only: {:?}", expr); - let logical_plan = - LogicalPlanBuilder::scan("t", self.provider.clone(), None, None) - .unwrap() - .filter(expr) - .unwrap() - .build() - .unwrap(); + let logical_plan = LogicalPlanBuilder::scan("t", self.provider.clone(), None) + .unwrap() + .filter(expr) + .unwrap() + .build() + .unwrap(); self.run_test(logical_plan, sql).await }