From dd9cd15d840dfafdf85a36bbf58008a12d86d1af Mon Sep 17 00:00:00 2001 From: carlos Date: Fri, 24 Sep 2021 23:27:42 +0800 Subject: [PATCH] remove hard coded value (#1044) --- ballista/rust/core/proto/ballista.proto | 1 + .../core/src/serde/logical_plan/from_proto.rs | 4 ++-- .../core/src/serde/logical_plan/to_proto.rs | 1 + benchmarks/src/bin/tpch.rs | 4 ++-- datafusion/src/datasource/parquet.rs | 21 ++++++++++++------- datafusion/src/logical_plan/builder.rs | 8 +++---- datafusion/src/physical_plan/parquet.rs | 10 ++++----- 7 files changed, 28 insertions(+), 21 deletions(-) diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 3fc291e3a83f..a3ed18ab034c 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -296,6 +296,7 @@ message ParquetTableScanNode { TableDescriptor table_desc = 2; ProjectionColumns projection = 3; repeated LogicalExprNode filters = 4; + uint32 target_partitions = 5; } message AvroTableScanNode { diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index 8ffdb650aa21..9c7658c940bd 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -161,14 +161,14 @@ impl TryInto for &protobuf::LogicalPlanNode { let parquet_table = ParquetTable::try_new_with_desc( Arc::new(ParquetTableDescriptor { descriptor }), - 24, + scan.target_partitions as usize, true, )?; LogicalPlanBuilder::scan( &scan.table_name, Arc::new(parquet_table), projection, - )? //TODO remove hard-coded max_partitions + )? .build() .map_err(|e| e.into()) } diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index b74f663b0645..f5c24144490b 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -758,6 +758,7 @@ impl TryInto for &LogicalPlan { table_desc: Some(table_desc), projection, filters, + target_partitions: parquet.get_target_partitions() as u32, }, )), }) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 2e9b2ff3df8f..203c186e1ec3 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -443,7 +443,7 @@ fn get_table( path: &str, table: &str, table_format: &str, - max_partitions: usize, + target_partitions: usize, ) -> Result> { match table_format { // dbgen creates .tbl ('|' delimited) files without header @@ -471,7 +471,7 @@ fn get_table( Ok(Arc::new(ParquetTable::try_new_with_schema( &path, schema, - max_partitions, + target_partitions, false, )?)) } diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs index 65c90897cb12..d044ed94d59d 100644 --- a/datafusion/src/datasource/parquet.rs +++ b/datafusion/src/datasource/parquet.rs @@ -44,18 +44,18 @@ use crate::scalar::ScalarValue; pub struct ParquetTable { /// Descriptor of the table, including schema, files, etc. pub desc: Arc, - max_partitions: usize, + target_partitions: usize, enable_pruning: bool, } impl ParquetTable { /// Attempt to initialize a new `ParquetTable` from a file path. - pub fn try_new(path: impl Into, max_partitions: usize) -> Result { + pub fn try_new(path: impl Into, target_partitions: usize) -> Result { let path = path.into(); let table_desc = ParquetTableDescriptor::new(path.as_str()); Ok(Self { desc: Arc::new(table_desc?), - max_partitions, + target_partitions, enable_pruning: true, }) } @@ -65,7 +65,7 @@ impl ParquetTable { pub fn try_new_with_schema( path: impl Into, schema: Schema, - max_partitions: usize, + target_partitions: usize, collect_statistics: bool, ) -> Result { let path = path.into(); @@ -76,7 +76,7 @@ impl ParquetTable { ); Ok(Self { desc: Arc::new(table_desc?), - max_partitions, + target_partitions, enable_pruning: true, }) } @@ -84,12 +84,12 @@ impl ParquetTable { /// Attempt to initialize a new `ParquetTable` from a table descriptor. pub fn try_new_with_desc( desc: Arc, - max_partitions: usize, + target_partitions: usize, enable_pruning: bool, ) -> Result { Ok(Self { desc, - max_partitions, + target_partitions, enable_pruning, }) } @@ -109,6 +109,11 @@ impl ParquetTable { self.enable_pruning = enable_pruning; self } + + /// Get Target partitions + pub fn get_target_partitions(&self) -> usize { + self.target_partitions + } } #[async_trait] @@ -153,7 +158,7 @@ impl TableProvider for ParquetTable { limit .map(|l| std::cmp::min(l, batch_size)) .unwrap_or(batch_size), - self.max_partitions, + self.target_partitions, limit, )?)) } diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index c0322739b0e2..d902d6f7f2b4 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -139,20 +139,20 @@ impl LogicalPlanBuilder { pub fn scan_parquet( path: impl Into, projection: Option>, - max_partitions: usize, + target_partitions: usize, ) -> Result { let path = path.into(); - Self::scan_parquet_with_name(path.clone(), projection, max_partitions, path) + Self::scan_parquet_with_name(path.clone(), projection, target_partitions, path) } /// Scan a Parquet data source and register it with a given table name pub fn scan_parquet_with_name( path: impl Into, projection: Option>, - max_partitions: usize, + target_partitions: usize, table_name: impl Into, ) -> Result { - let provider = Arc::new(ParquetTable::try_new(path, max_partitions)?); + let provider = Arc::new(ParquetTable::try_new(path, target_partitions)?); Self::scan(table_name, provider, projection) } diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index feed181ca83d..f4ac4c8fddaf 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -117,7 +117,7 @@ impl ParquetExec { projection: Option>, predicate: Option, batch_size: usize, - max_partitions: usize, + target_partitions: usize, limit: Option, ) -> Result { // build a list of filenames from the specified path, which could be a single file or @@ -128,7 +128,7 @@ impl ParquetExec { projection, predicate, batch_size, - max_partitions, + target_partitions, limit, ) } @@ -139,7 +139,7 @@ impl ParquetExec { projection: Option>, predicate: Option, batch_size: usize, - max_partitions: usize, + target_partitions: usize, limit: Option, ) -> Result { debug!("Creating ParquetExec, desc: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", @@ -149,8 +149,8 @@ impl ParquetExec { let (all_files, statistics) = get_statistics_with_limit(&desc.descriptor, limit); let schema = desc.schema(); - let mut partitions = Vec::with_capacity(max_partitions); - let chunked_files = split_files(&all_files, max_partitions); + let mut partitions = Vec::with_capacity(target_partitions); + let chunked_files = split_files(&all_files, target_partitions); for (index, group) in chunked_files.iter().enumerate() { partitions.push(ParquetPartition::new( Vec::from(*group),