Skip to content

Commit

Permalink
remove hard coded value (#1044)
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 authored Sep 24, 2021
1 parent 0a15e1d commit dd9cd15
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 21 deletions.
1 change: 1 addition & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ message ParquetTableScanNode {
TableDescriptor table_desc = 2;
ProjectionColumns projection = 3;
repeated LogicalExprNode filters = 4;
uint32 target_partitions = 5;
}

message AvroTableScanNode {
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,14 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {

let parquet_table = ParquetTable::try_new_with_desc(
Arc::new(ParquetTableDescriptor { descriptor }),
24,
scan.target_partitions as usize,
true,
)?;
LogicalPlanBuilder::scan(
&scan.table_name,
Arc::new(parquet_table),
projection,
)? //TODO remove hard-coded max_partitions
)?
.build()
.map_err(|e| e.into())
}
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
table_desc: Some(table_desc),
projection,
filters,
target_partitions: parquet.get_target_partitions() as u32,
},
)),
})
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ fn get_table(
path: &str,
table: &str,
table_format: &str,
max_partitions: usize,
target_partitions: usize,
) -> Result<Arc<dyn TableProvider>> {
match table_format {
// dbgen creates .tbl ('|' delimited) files without header
Expand Down Expand Up @@ -471,7 +471,7 @@ fn get_table(
Ok(Arc::new(ParquetTable::try_new_with_schema(
&path,
schema,
max_partitions,
target_partitions,
false,
)?))
}
Expand Down
21 changes: 13 additions & 8 deletions datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ use crate::scalar::ScalarValue;
pub struct ParquetTable {
/// Descriptor of the table, including schema, files, etc.
pub desc: Arc<ParquetTableDescriptor>,
max_partitions: usize,
target_partitions: usize,
enable_pruning: bool,
}

impl ParquetTable {
/// Attempt to initialize a new `ParquetTable` from a file path.
pub fn try_new(path: impl Into<String>, max_partitions: usize) -> Result<Self> {
pub fn try_new(path: impl Into<String>, target_partitions: usize) -> Result<Self> {
let path = path.into();
let table_desc = ParquetTableDescriptor::new(path.as_str());
Ok(Self {
desc: Arc::new(table_desc?),
max_partitions,
target_partitions,
enable_pruning: true,
})
}
Expand All @@ -65,7 +65,7 @@ impl ParquetTable {
pub fn try_new_with_schema(
path: impl Into<String>,
schema: Schema,
max_partitions: usize,
target_partitions: usize,
collect_statistics: bool,
) -> Result<Self> {
let path = path.into();
Expand All @@ -76,20 +76,20 @@ impl ParquetTable {
);
Ok(Self {
desc: Arc::new(table_desc?),
max_partitions,
target_partitions,
enable_pruning: true,
})
}

/// Attempt to initialize a new `ParquetTable` from a table descriptor.
pub fn try_new_with_desc(
desc: Arc<ParquetTableDescriptor>,
max_partitions: usize,
target_partitions: usize,
enable_pruning: bool,
) -> Result<Self> {
Ok(Self {
desc,
max_partitions,
target_partitions,
enable_pruning,
})
}
Expand All @@ -109,6 +109,11 @@ impl ParquetTable {
self.enable_pruning = enable_pruning;
self
}

/// Get Target partitions
pub fn get_target_partitions(&self) -> usize {
self.target_partitions
}
}

#[async_trait]
Expand Down Expand Up @@ -153,7 +158,7 @@ impl TableProvider for ParquetTable {
limit
.map(|l| std::cmp::min(l, batch_size))
.unwrap_or(batch_size),
self.max_partitions,
self.target_partitions,
limit,
)?))
}
Expand Down
8 changes: 4 additions & 4 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,20 @@ impl LogicalPlanBuilder {
pub fn scan_parquet(
path: impl Into<String>,
projection: Option<Vec<usize>>,
max_partitions: usize,
target_partitions: usize,
) -> Result<Self> {
let path = path.into();
Self::scan_parquet_with_name(path.clone(), projection, max_partitions, path)
Self::scan_parquet_with_name(path.clone(), projection, target_partitions, path)
}

/// Scan a Parquet data source and register it with a given table name
pub fn scan_parquet_with_name(
path: impl Into<String>,
projection: Option<Vec<usize>>,
max_partitions: usize,
target_partitions: usize,
table_name: impl Into<String>,
) -> Result<Self> {
let provider = Arc::new(ParquetTable::try_new(path, max_partitions)?);
let provider = Arc::new(ParquetTable::try_new(path, target_partitions)?);
Self::scan(table_name, provider, projection)
}

Expand Down
10 changes: 5 additions & 5 deletions datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl ParquetExec {
projection: Option<Vec<usize>>,
predicate: Option<Expr>,
batch_size: usize,
max_partitions: usize,
target_partitions: usize,
limit: Option<usize>,
) -> Result<Self> {
// build a list of filenames from the specified path, which could be a single file or
Expand All @@ -128,7 +128,7 @@ impl ParquetExec {
projection,
predicate,
batch_size,
max_partitions,
target_partitions,
limit,
)
}
Expand All @@ -139,7 +139,7 @@ impl ParquetExec {
projection: Option<Vec<usize>>,
predicate: Option<Expr>,
batch_size: usize,
max_partitions: usize,
target_partitions: usize,
limit: Option<usize>,
) -> Result<Self> {
debug!("Creating ParquetExec, desc: {:?}, projection {:?}, predicate: {:?}, limit: {:?}",
Expand All @@ -149,8 +149,8 @@ impl ParquetExec {
let (all_files, statistics) = get_statistics_with_limit(&desc.descriptor, limit);
let schema = desc.schema();

let mut partitions = Vec::with_capacity(max_partitions);
let chunked_files = split_files(&all_files, max_partitions);
let mut partitions = Vec::with_capacity(target_partitions);
let chunked_files = split_files(&all_files, target_partitions);
for (index, group) in chunked_files.iter().enumerate() {
partitions.push(ParquetPartition::new(
Vec::from(*group),
Expand Down

0 comments on commit dd9cd15

Please sign in to comment.