diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 33638fdc50d4..49b65cfd5ed1 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -596,7 +596,7 @@ message FilterExecNode { PhysicalExprNode expr = 2; } -message FilePartition { +message FileGroup { repeated PartitionedFile files = 1; } @@ -606,7 +606,7 @@ message ScanLimit { } message ParquetScanExecNode { - repeated FilePartition partitions = 1; + repeated FileGroup file_groups = 1; Schema schema = 2; uint32 batch_size = 4; repeated uint32 projection = 6; diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 680e41980c72..75dd91569294 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -37,7 +37,7 @@ use datafusion::catalog::catalog::{ }; use datafusion::datasource::object_store::local::LocalFileSystem; use datafusion::datasource::object_store::{FileMeta, ObjectStoreRegistry, SizedFile}; -use datafusion::datasource::{FilePartition, PartitionedFile}; +use datafusion::datasource::PartitionedFile; use datafusion::execution::context::{ ExecutionConfig, ExecutionContextState, ExecutionProps, }; @@ -127,8 +127,8 @@ impl TryInto> for &protobuf::PhysicalPlanNode { Arc::new(LocalFileSystem {}), scan.files .iter() - .map(|f| f.try_into()) - .collect::, _>>()?, + .map(|f| f.into()) + .collect::>(), statistics, schema, scan.has_header, @@ -145,13 +145,10 @@ impl TryInto> for &protobuf::PhysicalPlanNode { Ok(Arc::new(ParquetExec::new( Arc::new(LocalFileSystem {}), - scan.partitions + scan.file_groups .iter() - .map(|p| { - let it = p.files.iter().map(|f| f.try_into()); - it.collect::, _>>() - }) - .collect::>, _>>()?, + .map(|p| p.into()) + .collect::>>(), statistics, schema, Some(projection), @@ -170,8 +167,8 @@ impl TryInto> for &protobuf::PhysicalPlanNode { Arc::new(LocalFileSystem {}), scan.files .iter() - .map(|f| f.try_into()) - .collect::, _>>()?, + .map(|f| f.into()) + .collect::>(), statistics, schema, Some(projection), @@ -741,23 +738,27 @@ pub fn parse_protobuf_hash_partitioning( } } -impl TryInto for &protobuf::PartitionedFile { - type Error = BallistaError; - - fn try_into(self) -> Result { - Ok(PartitionedFile { +impl From<&protobuf::PartitionedFile> for PartitionedFile { + fn from(val: &protobuf::PartitionedFile) -> Self { + PartitionedFile { file_meta: FileMeta { sized_file: SizedFile { - path: self.path.clone(), - size: self.size, + path: val.path.clone(), + size: val.size, }, - last_modified: if self.last_modified_ns == 0 { + last_modified: if val.last_modified_ns == 0 { None } else { - Some(Utc.timestamp_nanos(self.last_modified_ns as i64)) + Some(Utc.timestamp_nanos(val.last_modified_ns as i64)) }, }, - }) + } + } +} + +impl From<&protobuf::FileGroup> for Vec { + fn from(val: &protobuf::FileGroup) -> Self { + val.files.iter().map(|f| f.into()).collect() } } diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index 020b6888d8cf..e5e63476be41 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -275,18 +275,16 @@ impl TryInto for Arc { )), }) } else if let Some(exec) = plan.downcast_ref::() { - let partitions = exec - .partitions() - .into_iter() - .map(|p| protobuf::FilePartition { - files: p.iter().map(|f| f.into()).collect(), - }) + let file_groups = exec + .file_groups() + .iter() + .map(|p| p.as_slice().into()) .collect(); Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::ParquetScan( protobuf::ParquetScanExecNode { - partitions, + file_groups, statistics: Some((&exec.statistics()).into()), limit: exec .limit() @@ -688,6 +686,14 @@ impl From<&PartitionedFile> for protobuf::PartitionedFile { } } +impl From<&[PartitionedFile]> for protobuf::FileGroup { + fn from(gr: &[PartitionedFile]) -> protobuf::FileGroup { + protobuf::FileGroup { + files: gr.iter().map(|f| f.into()).collect(), + } + } +} + impl From<&ColumnStatistics> for protobuf::ColumnStats { fn from(cs: &ColumnStatistics) -> protobuf::ColumnStats { protobuf::ColumnStats { diff --git a/datafusion/src/datasource/datasource.rs b/datafusion/src/datasource/datasource.rs index 918200f5a91c..823b40807e93 100644 --- a/datafusion/src/datasource/datasource.rs +++ b/datafusion/src/datasource/datasource.rs @@ -71,6 +71,9 @@ pub trait TableProvider: Sync + Send { } /// Create an ExecutionPlan that will scan the table. + /// The table provider will be usually responsible of grouping + /// the source data into partitions that can be efficiently + /// parallelized or distributed. async fn scan( &self, projection: &Option>, diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs index b607469bff00..2e5330f16cb7 100644 --- a/datafusion/src/datasource/mod.rs +++ b/datafusion/src/datasource/mod.rs @@ -155,22 +155,6 @@ impl std::fmt::Display for PartitionedFile { } } -#[derive(Debug, Clone)] -/// A collection of files that should be read in a single task -pub struct FilePartition { - /// The index of the partition among all partitions - pub index: usize, - /// The contained files of the partition - pub files: Vec, -} - -impl std::fmt::Display for FilePartition { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - let files: Vec = self.files.iter().map(|f| f.to_string()).collect(); - write!(f, "{}", files.join(", ")) - } -} - fn create_max_min_accs( schema: &Schema, ) -> (Vec>, Vec>) { diff --git a/datafusion/src/physical_plan/file_format/mod.rs b/datafusion/src/physical_plan/file_format/mod.rs index aa9359c30da4..aa4f116482fb 100644 --- a/datafusion/src/physical_plan/file_format/mod.rs +++ b/datafusion/src/physical_plan/file_format/mod.rs @@ -26,3 +26,26 @@ pub use self::parquet::ParquetExec; pub use avro::AvroExec; pub use csv::CsvExec; pub use json::NdJsonExec; + +use crate::datasource::PartitionedFile; +use std::fmt::{Display, Formatter, Result}; + +/// A wrapper to customize partitioned file display +#[derive(Debug)] +struct FileGroupsDisplay<'a>(&'a [Vec]); + +impl<'a> Display for FileGroupsDisplay<'a> { + fn fmt(&self, f: &mut Formatter) -> Result { + let parts: Vec<_> = self + .0 + .iter() + .map(|pp| { + pp.iter() + .map(|pf| pf.file_meta.path()) + .collect::>() + .join(", ") + }) + .collect(); + write!(f, "[{}]", parts.join(", ")) + } +} diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 77eed01f893c..f719b54fe0aa 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -23,6 +23,7 @@ use std::{any::Any, convert::TryInto}; use crate::datasource::file_format::parquet::ChunkObjectReader; use crate::datasource::object_store::ObjectStore; +use crate::datasource::PartitionedFile; use crate::{ error::{DataFusionError, Result}, logical_plan::{Column, Expr}, @@ -59,14 +60,13 @@ use tokio::{ use async_trait::async_trait; -use crate::datasource::{FilePartition, PartitionedFile}; - /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] pub struct ParquetExec { object_store: Arc, - /// Parquet partitions to read - partitions: Vec, + /// Grouped list of files. Each group will be processed together by one + /// partition of the `ExecutionPlan`. + file_groups: Vec>, /// Schema after projection is applied schema: SchemaRef, /// Projection for which columns to load @@ -83,23 +83,6 @@ pub struct ParquetExec { limit: Option, } -/// Represents one partition of a Parquet data set and this currently means one Parquet file. -/// -/// In the future it would be good to support subsets of files based on ranges of row groups -/// so that we can better parallelize reads of large files across available cores (see -/// [ARROW-10995](https://issues.apache.org/jira/browse/ARROW-10995)). -/// -/// We may also want to support reading Parquet files that are partitioned based on a key and -/// in this case we would want this partition struct to represent multiple files for a given -/// partition key (see [ARROW-11019](https://issues.apache.org/jira/browse/ARROW-11019)). -#[derive(Debug, Clone)] -pub struct ParquetPartition { - /// The Parquet filename for this partition - pub file_partition: FilePartition, - /// Execution metrics - metrics: ExecutionPlanMetricsSet, -} - /// Stores metrics about the parquet execution for a particular parquet file #[derive(Debug, Clone)] struct ParquetFileMetrics { @@ -115,7 +98,7 @@ impl ParquetExec { #[allow(clippy::too_many_arguments)] pub fn new( object_store: Arc, - files: Vec>, + file_groups: Vec>, statistics: Statistics, schema: SchemaRef, projection: Option>, @@ -123,16 +106,8 @@ impl ParquetExec { batch_size: usize, limit: Option, ) -> Self { - debug!("Creating ParquetExec, desc: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", - files, projection, predicate, limit); - - let metrics = ExecutionPlanMetricsSet::new(); - - let partitions = files - .into_iter() - .enumerate() - .map(|(i, f)| ParquetPartition::new(f, i, metrics.clone())) - .collect::>(); + debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", + file_groups, projection, predicate, limit); let metrics = ExecutionPlanMetricsSet::new(); let predicate_creation_errors = @@ -162,7 +137,7 @@ impl ParquetExec { Self { object_store, - partitions, + file_groups, schema: projected_schema, projection, metrics, @@ -204,11 +179,8 @@ impl ParquetExec { } /// List of data files - pub fn partitions(&self) -> Vec<&[PartitionedFile]> { - self.partitions - .iter() - .map(|fp| fp.file_partition.files.as_slice()) - .collect() + pub fn file_groups(&self) -> &[Vec] { + &self.file_groups } /// Optional projection for which columns to load pub fn projection(&self) -> &[usize] { @@ -225,20 +197,6 @@ impl ParquetExec { } } -impl ParquetPartition { - /// Create a new parquet partition - pub fn new( - files: Vec, - index: usize, - metrics: ExecutionPlanMetricsSet, - ) -> Self { - Self { - file_partition: FilePartition { index, files }, - metrics, - } - } -} - impl ParquetFileMetrics { /// Create new metrics pub fn new( @@ -279,7 +237,7 @@ impl ExecutionPlan for ParquetExec { /// Get the output partitioning of this plan fn output_partitioning(&self) -> Partitioning { - Partitioning::UnknownPartitioning(self.partitions.len()) + Partitioning::UnknownPartitioning(self.file_groups.len()) } fn with_new_children( @@ -304,7 +262,7 @@ impl ExecutionPlan for ParquetExec { Receiver>, ) = channel(2); - let partition = self.partitions[partition_index].clone(); + let partition = self.file_groups[partition_index].clone(); let metrics = self.metrics.clone(); let projection = self.projection.clone(); let predicate_builder = self.predicate_builder.clone(); @@ -338,18 +296,12 @@ impl ExecutionPlan for ParquetExec { ) -> std::fmt::Result { match t { DisplayFormatType::Default => { - let files: Vec<_> = self - .partitions - .iter() - .map(|pp| format!("{}", pp.file_partition)) - .collect(); - write!( f, - "ParquetExec: batch_size={}, limit={:?}, partitions=[{}]", + "ParquetExec: batch_size={}, limit={:?}, partitions={}", self.batch_size, self.limit, - files.join(", ") + super::FileGroupsDisplay(&self.file_groups) ) } } @@ -493,7 +445,7 @@ fn build_row_group_predicate( fn read_partition( object_store: &dyn ObjectStore, partition_index: usize, - partition: ParquetPartition, + partition: Vec, metrics: ExecutionPlanMetricsSet, projection: &[usize], predicate_builder: &Option, @@ -502,8 +454,7 @@ fn read_partition( limit: Option, ) -> Result<()> { let mut total_rows = 0; - let all_files = partition.file_partition.files; - 'outer: for partitioned_file in all_files { + 'outer: for partitioned_file in partition { let file_metrics = ParquetFileMetrics::new( partition_index, &*partitioned_file.file_meta.path(),