diff --git a/Cargo.toml b/Cargo.toml index cd88e18fe17c..d4230f496862 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -91,4 +91,4 @@ lto = false opt-level = 3 overflow-checks = false panic = 'unwind' -rpath = false \ No newline at end of file +rpath = false diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index ae1e879d0da1..f6c310fb5da1 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -29,13 +29,18 @@ use crate::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, }; +use arrow_ipc::reader::FileDecoder; use arrow_schema::SchemaRef; +use datafusion_common::config::ConfigOptions; use datafusion_common::Statistics; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr}; use futures::StreamExt; -use object_store::{GetResultPayload, ObjectStore}; +use itertools::Itertools; +use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore}; + +use super::FileGroupPartitioner; /// Execution plan for scanning Arrow data source #[derive(Debug, Clone)] @@ -117,6 +122,28 @@ impl ExecutionPlan for ArrowExec { Ok(self) } + /// Redistribute files across partitions according to their size + /// See comments on [`FileGroupPartitioner`] for more detail. + fn repartitioned( + &self, + target_partitions: usize, + config: &ConfigOptions, + ) -> Result>> { + let repartition_file_min_size = config.optimizer.repartition_file_min_size; + let repartitioned_file_groups_option = FileGroupPartitioner::new() + .with_target_partitions(target_partitions) + .with_repartition_file_min_size(repartition_file_min_size) + .with_preserve_order_within_groups(self.output_ordering().is_some()) + .repartition_file_groups(&self.base_config.file_groups); + + if let Some(repartitioned_file_groups) = repartitioned_file_groups_option { + let mut new_plan = self.clone(); + new_plan.base_config.file_groups = repartitioned_file_groups; + return Ok(Some(Arc::new(new_plan))); + } + Ok(None) + } + fn execute( &self, partition: usize, @@ -155,19 +182,125 @@ impl FileOpener for ArrowOpener { let object_store = self.object_store.clone(); let projection = self.projection.clone(); Ok(Box::pin(async move { - let r = object_store.get(file_meta.location()).await?; - match r.payload { - GetResultPayload::File(file, _) => { - let arrow_reader = - arrow::ipc::reader::FileReader::try_new(file, projection)?; - Ok(futures::stream::iter(arrow_reader).boxed()) + let range = file_meta.range.clone(); + match range { + None => { + let r = object_store.get(file_meta.location()).await?; + match r.payload { + GetResultPayload::File(file, _) => { + let arrow_reader = arrow::ipc::reader::FileReader::try_new( + file, projection, + )?; + Ok(futures::stream::iter(arrow_reader).boxed()) + } + GetResultPayload::Stream(_) => { + let bytes = r.bytes().await?; + let cursor = std::io::Cursor::new(bytes); + let arrow_reader = arrow::ipc::reader::FileReader::try_new( + cursor, projection, + )?; + Ok(futures::stream::iter(arrow_reader).boxed()) + } + } } - GetResultPayload::Stream(_) => { - let bytes = r.bytes().await?; - let cursor = std::io::Cursor::new(bytes); - let arrow_reader = - arrow::ipc::reader::FileReader::try_new(cursor, projection)?; - Ok(futures::stream::iter(arrow_reader).boxed()) + Some(range) => { + // range is not none, the file maybe split into multiple parts to scan in parallel + // get footer_len firstly + let get_option = GetOptions { + range: Some(GetRange::Suffix(10)), + ..Default::default() + }; + let get_result = object_store + .get_opts(file_meta.location(), get_option) + .await?; + let footer_len_buf = get_result.bytes().await?; + let footer_len = arrow_ipc::reader::read_footer_length( + footer_len_buf[..].try_into().unwrap(), + )?; + // read footer according to footer_len + let get_option = GetOptions { + range: Some(GetRange::Suffix(10 + footer_len)), + ..Default::default() + }; + let get_result = object_store + .get_opts(file_meta.location(), get_option) + .await?; + let footer_buf = get_result.bytes().await?; + let footer = arrow_ipc::root_as_footer( + footer_buf[..footer_len].try_into().unwrap(), + ) + .map_err(|err| { + arrow_schema::ArrowError::ParseError(format!( + "Unable to get root as footer: {err:?}" + )) + })?; + // build decoder according to footer & projection + let schema = + arrow_ipc::convert::fb_to_schema(footer.schema().unwrap()); + let mut decoder = FileDecoder::new(schema.into(), footer.version()); + if let Some(projection) = projection { + decoder = decoder.with_projection(projection); + } + let dict_ranges = footer + .dictionaries() + .iter() + .flatten() + .map(|block| { + let block_len = block.bodyLength() as usize + + block.metaDataLength() as usize; + let block_offset = block.offset() as usize; + block_offset..block_offset + block_len + }) + .collect_vec(); + let dict_results = object_store + .get_ranges(file_meta.location(), &dict_ranges) + .await?; + for (dict_block, dict_result) in + footer.dictionaries().iter().flatten().zip(dict_results) + { + decoder.read_dictionary(dict_block, &dict_result.into())?; + } + + // filter recordbatches according to range + let recordbatches = footer + .recordBatches() + .iter() + .flatten() + .filter(|block| { + let block_offset = block.offset() as usize; + block_offset >= range.start as usize + && block_offset < range.end as usize + }) + .copied() + .collect_vec(); + + let recordbatch_ranges = recordbatches + .iter() + .map(|block| { + let block_len = block.bodyLength() as usize + + block.metaDataLength() as usize; + let block_offset = block.offset() as usize; + block_offset..block_offset + block_len + }) + .collect_vec(); + + let recordbatch_results = object_store + .get_ranges(file_meta.location(), &recordbatch_ranges) + .await?; + + Ok(futures::stream::iter( + recordbatches + .into_iter() + .zip(recordbatch_results) + .filter_map(move |(block, data)| { + match decoder.read_record_batch(&block, &data.into()) { + Ok(Some(record_batch)) => Some(Ok(record_batch)), + Ok(None) => None, + Err(err) => Some(Err(err)), + } + }), + ) + .boxed()) } } })) diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index 911b46c0bcf4..71855082895a 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -61,6 +61,7 @@ postgres = ["bytes", "chrono", "tokio-postgres", "postgres-types", "postgres-pro [dev-dependencies] env_logger = { workspace = true } num_cpus = { workspace = true } +tokio = { version = "1.0", features = ["rt-multi-thread"] } [[test]] harness = false diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt b/datafusion/sqllogictest/test_files/repartition_scan.slt index 5ee0da2d33e8..002177a122f7 100644 --- a/datafusion/sqllogictest/test_files/repartition_scan.slt +++ b/datafusion/sqllogictest/test_files/repartition_scan.slt @@ -253,7 +253,16 @@ query TT EXPLAIN SELECT * FROM arrow_table ---- logical_plan TableScan: arrow_table projection=[f0, f1, f2] -physical_plan ArrowExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow]]}, projection=[f0, f1, f2] +physical_plan ArrowExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow:0..461], [WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow:461..922], [WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow:922..1383], [WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow:1383..1842]]}, projection=[f0, f1, f2] + +# correct content +query ITB +SELECT * FROM arrow_table +---- +1 foo true +2 bar NULL +3 baz false +4 NULL true # Cleanup statement ok