Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FilePartition and PartitionedFile for scanning flexibility #932

Merged
merged 9 commits into from
Aug 30, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,28 @@ message CsvTableScanNode {
repeated LogicalExprNode filters = 8;
}

message Statistics {
int64 num_rows = 1;
int64 total_byte_size = 2;
repeated ColumnStats column_stats = 3;
}

message PartitionedFile {
string path = 1;
Statistics statistics = 2;
}

message TableDescriptor {
string path = 1;
repeated PartitionedFile partition_files = 2;
Schema schema = 3;
}

message ParquetTableScanNode {
string table_name = 1;
string path = 2;
TableDescriptor table_desc = 2;
ProjectionColumns projection = 3;
Schema schema = 4;
repeated LogicalExprNode filters = 5;
repeated LogicalExprNode filters = 4;
}

message ProjectionNode {
Expand Down Expand Up @@ -567,10 +583,15 @@ message FilterExecNode {
PhysicalExprNode expr = 2;
}

message ParquetPartition {
uint32 index = 1;
repeated PartitionedFile files = 2;
}

message ParquetScanExecNode {
repeated string filename = 1;
repeated uint32 projection = 2;
uint32 num_partitions = 3;
repeated ParquetPartition partitions = 1;
Schema schema = 2;
repeated uint32 projection = 3;
uint32 batch_size = 4;
}

Expand Down
78 changes: 74 additions & 4 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use crate::error::BallistaError;
use crate::serde::{from_proto_binary_op, proto_error, protobuf};
use crate::{convert_box_required, convert_required};
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::datasource::parquet::{ParquetTable, ParquetTableDescriptor};
use datafusion::datasource::{PartitionedFile, TableDescriptor};
use datafusion::logical_plan::window_frames::{
WindowFrame, WindowFrameBound, WindowFrameUnits,
};
Expand Down Expand Up @@ -134,10 +136,11 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
.map_err(|e| e.into())
}
LogicalPlanType::ParquetScan(scan) => {
let descriptor: TableDescriptor = convert_required!(scan.table_desc)?;
let projection = match scan.projection.as_ref() {
None => None,
Some(columns) => {
let schema: Schema = convert_required!(scan.schema)?;
let schema = descriptor.schema.clone();
let r: Result<Vec<usize>, _> = columns
.columns
.iter()
Expand All @@ -154,11 +157,22 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
Some(r?)
}
};
LogicalPlanBuilder::scan_parquet_with_name(
&scan.path,
projection,

let filters: Vec<Expr> = scan
.filters
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<Expr>, _>>()?;
let parquet_table = ParquetTable::try_new_with_desc(
Arc::new(ParquetTableDescriptor { descriptor }),
24,
true,
)?;
LogicalPlanBuilder::scan(
&scan.table_name,
Arc::new(parquet_table),
projection,
Some(filters),
)? //TODO remove hard-coded max_partitions
.build()
.map_err(|e| e.into())
Expand Down Expand Up @@ -301,6 +315,60 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
}
}

impl TryInto<TableDescriptor> for &protobuf::TableDescriptor {
type Error = BallistaError;

fn try_into(self) -> Result<TableDescriptor, Self::Error> {
let partition_files = self
.partition_files
.iter()
.map(|f| f.try_into())
.collect::<Result<Vec<PartitionedFile>, _>>()?;
let schema = convert_required!(self.schema)?;
Ok(TableDescriptor {
path: self.path.to_owned(),
partition_files,
schema: Arc::new(schema),
})
}
}

impl TryInto<PartitionedFile> for &protobuf::PartitionedFile {
type Error = BallistaError;

fn try_into(self) -> Result<PartitionedFile, Self::Error> {
let statistics = convert_required!(self.statistics)?;
Ok(PartitionedFile {
file_path: self.path.clone(),
statistics,
})
}
}

impl From<&protobuf::ColumnStats> for ColumnStatistics {
fn from(cs: &protobuf::ColumnStats) -> ColumnStatistics {
ColumnStatistics {
null_count: Some(cs.null_count as usize),
max_value: cs.max_value.as_ref().map(|m| m.try_into().unwrap()),
min_value: cs.min_value.as_ref().map(|m| m.try_into().unwrap()),
distinct_count: Some(cs.distinct_count as usize),
}
}
}

impl TryInto<Statistics> for &protobuf::Statistics {
type Error = BallistaError;

fn try_into(self) -> Result<Statistics, Self::Error> {
let column_statistics = self.column_stats.iter().map(|s| s.into()).collect();
Ok(Statistics {
num_rows: Some(self.num_rows as usize),
total_byte_size: Some(self.total_byte_size as usize),
column_statistics: Some(column_statistics),
})
}
}

impl From<&protobuf::Column> for Column {
fn from(c: &protobuf::Column) -> Column {
let c = c.clone();
Expand Down Expand Up @@ -1114,6 +1182,8 @@ impl TryInto<Field> for &protobuf::Field {
}
}

use crate::serde::protobuf::ColumnStats;
use datafusion::datasource::datasource::{ColumnStatistics, Statistics};
use datafusion::physical_plan::{aggregates, windows};
use datafusion::prelude::{
array, date_part, date_trunc, length, lower, ltrim, md5, rtrim, sha224, sha256,
Expand Down
77 changes: 73 additions & 4 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
use super::super::proto_error;
use crate::datasource::DfTableAdapter;
use crate::serde::{protobuf, BallistaError};
use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
use datafusion::datasource::CsvFile;
use datafusion::arrow::datatypes::{
DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit,
};
use datafusion::datasource::datasource::{ColumnStatistics, Statistics};
use datafusion::datasource::{CsvFile, PartitionedFile, TableDescriptor};
use datafusion::logical_plan::{
window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
Column, Expr, JoinConstraint, JoinType, LogicalPlan,
Expand Down Expand Up @@ -253,6 +256,58 @@ impl TryInto<DataType> for &protobuf::ArrowType {
}
}

impl From<&ColumnStatistics> for protobuf::ColumnStats {
fn from(cs: &ColumnStatistics) -> protobuf::ColumnStats {
protobuf::ColumnStats {
min_value: cs.min_value.as_ref().map(|m| m.try_into().unwrap()),
max_value: cs.max_value.as_ref().map(|m| m.try_into().unwrap()),
null_count: cs.null_count.map(|n| n as u32).unwrap_or(0),
distinct_count: cs.distinct_count.map(|n| n as u32).unwrap_or(0),
}
}
}

impl From<&Statistics> for protobuf::Statistics {
fn from(s: &Statistics) -> protobuf::Statistics {
let none_value = -1_i64;
let column_stats = match &s.column_statistics {
None => vec![],
Some(column_stats) => column_stats.iter().map(|s| s.into()).collect(),
};
protobuf::Statistics {
num_rows: s.num_rows.map(|n| n as i64).unwrap_or(none_value),
total_byte_size: s.total_byte_size.map(|n| n as i64).unwrap_or(none_value),
column_stats,
}
}
}

impl From<&PartitionedFile> for protobuf::PartitionedFile {
fn from(pf: &PartitionedFile) -> protobuf::PartitionedFile {
protobuf::PartitionedFile {
path: pf.file_path.clone(),
statistics: Some((&pf.statistics).into()),
}
}
}

impl TryFrom<TableDescriptor> for protobuf::TableDescriptor {
type Error = BallistaError;

fn try_from(desc: TableDescriptor) -> Result<protobuf::TableDescriptor, Self::Error> {
let partition_files: Vec<protobuf::PartitionedFile> =
desc.partition_files.iter().map(|pf| pf.into()).collect();

let schema: protobuf::Schema = desc.schema.into();

Ok(protobuf::TableDescriptor {
path: desc.path,
partition_files,
schema: Some(schema),
})
}
}

impl TryInto<DataType> for &Box<protobuf::List> {
type Error = BallistaError;
fn try_into(self) -> Result<DataType, Self::Error> {
Expand Down Expand Up @@ -706,13 +761,14 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
.collect::<Result<Vec<_>, _>>()?;

if let Some(parquet) = source.downcast_ref::<ParquetTable>() {
let table_desc: protobuf::TableDescriptor =
parquet.desc.descriptor.clone().try_into()?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::ParquetScan(
protobuf::ParquetTableScanNode {
table_name: table_name.to_owned(),
path: parquet.path().to_owned(),
table_desc: Some(table_desc),
projection,
schema: Some(schema),
filters,
},
)),
Expand Down Expand Up @@ -1262,6 +1318,19 @@ impl Into<protobuf::Schema> for &Schema {
}
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::Schema> for SchemaRef {
fn into(self) -> protobuf::Schema {
protobuf::Schema {
columns: self
.fields()
.iter()
.map(protobuf::Field::from)
.collect::<Vec<_>>(),
}
}
}

impl From<&datafusion::logical_plan::DFField> for protobuf::DfField {
fn from(f: &datafusion::logical_plan::DFField) -> protobuf::DfField {
protobuf::DfField {
Expand Down
40 changes: 34 additions & 6 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion::catalog::catalog::{
CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
};
use datafusion::datasource::datasource::Statistics;
use datafusion::datasource::FilePartition;
use datafusion::execution::context::{
ExecutionConfig, ExecutionContextState, ExecutionProps,
};
Expand All @@ -44,6 +46,8 @@ use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunc
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::hash_join::PartitionMode;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::parquet::ParquetPartition;
use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
use datafusion::physical_plan::window_functions::{
BuiltInWindowFunction, WindowFunction,
Expand Down Expand Up @@ -129,17 +133,23 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
)?))
}
PhysicalPlanType::ParquetScan(scan) => {
let partitions = scan
.partitions
.iter()
.map(|p| p.try_into())
.collect::<Result<Vec<ParquetPartition>, _>>()?;
let schema = Arc::new(convert_required!(scan.schema)?);
let projection = scan.projection.iter().map(|i| *i as usize).collect();
let filenames: Vec<&str> =
scan.filename.iter().map(|s| s.as_str()).collect();
Ok(Arc::new(ParquetExec::try_from_files(
&filenames,
Ok(Arc::new(ParquetExec::new(
partitions,
schema,
Some(projection),
Statistics::default(),
ExecutionPlanMetricsSet::new(),
None,
scan.batch_size as usize,
scan.num_partitions as usize,
None,
)?))
)))
}
PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
let input: Arc<dyn ExecutionPlan> =
Expand Down Expand Up @@ -470,6 +480,23 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
}
}

impl TryInto<ParquetPartition> for &protobuf::ParquetPartition {
type Error = BallistaError;

fn try_into(self) -> Result<ParquetPartition, Self::Error> {
let files = self
.files
.iter()
.map(|f| f.try_into())
.collect::<Result<Vec<_>, _>>()?;
Ok(ParquetPartition::new(
files,
self.index as usize,
ExecutionPlanMetricsSet::new(),
))
}
}

impl From<&protobuf::PhysicalColumn> for Column {
fn from(c: &protobuf::PhysicalColumn) -> Column {
Column::new(&c.name, c.index as usize)
Expand Down Expand Up @@ -620,6 +647,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {

let catalog_list =
Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;

let ctx_state = ExecutionContextState {
catalog_list,
scalar_functions: Default::default(),
Expand Down
Loading