From 06d2573b04f26205e75485b5ccfc35e95931a632 Mon Sep 17 00:00:00 2001 From: DreaMer963 Date: Sun, 30 Jan 2022 16:24:27 +0800 Subject: [PATCH] refine test in repartition.rs & coalesce_batches.rs --- .../src/physical_plan/coalesce_batches.rs | 20 +---------------- datafusion/src/physical_plan/mod.rs | 6 ++--- datafusion/src/physical_plan/planner.rs | 4 ++-- datafusion/src/physical_plan/repartition.rs | 22 +++---------------- datafusion/src/test/mod.rs | 20 +++++++++++++++++ 5 files changed, 28 insertions(+), 44 deletions(-) diff --git a/datafusion/src/physical_plan/coalesce_batches.rs b/datafusion/src/physical_plan/coalesce_batches.rs index 586b05219bdf..ec238ad68cf8 100644 --- a/datafusion/src/physical_plan/coalesce_batches.rs +++ b/datafusion/src/physical_plan/coalesce_batches.rs @@ -295,9 +295,8 @@ pub fn concat_batches( #[cfg(test)] mod tests { use super::*; - use crate::from_slice::FromSlice; use crate::physical_plan::{memory::MemoryExec, repartition::RepartitionExec}; - use arrow::array::UInt32Array; + use crate::test::create_vec_batches; use arrow::datatypes::{DataType, Field, Schema}; #[tokio::test(flavor = "multi_thread")] @@ -325,23 +324,6 @@ mod tests { Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])) } - fn create_vec_batches(schema: &Arc, num_batches: usize) -> Vec { - let batch = create_batch(schema); - let mut vec = Vec::with_capacity(num_batches); - for _ in 0..num_batches { - vec.push(batch.clone()); - } - vec - } - - fn create_batch(schema: &Arc) -> RecordBatch { - RecordBatch::try_new( - schema.clone(), - vec![Arc::new(UInt32Array::from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]))], - ) - .unwrap() - } - async fn coalesce_batches( schema: &SchemaRef, input_partitions: Vec>, diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 216d4a65e639..24aa6ad38339 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -59,7 +59,7 @@ pub type SendableRecordBatchStream = Pin usize { use Partitioning::*; match self { - RoundRobinBatch(n) => *n, - Hash(_, n) => *n, - UnknownPartitioning(n) => *n, + RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n, } } } diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 2dcde9d11333..226e3f392497 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -226,9 +226,9 @@ pub trait PhysicalPlanner { /// /// `expr`: the expression to convert /// - /// `input_dfschema`: the logical plan schema for evaluating `e` + /// `input_dfschema`: the logical plan schema for evaluating `expr` /// - /// `input_schema`: the physical schema for evaluating `e` + /// `input_schema`: the physical schema for evaluating `expr` fn create_physical_expr( &self, expr: &Expr, diff --git a/datafusion/src/physical_plan/repartition.rs b/datafusion/src/physical_plan/repartition.rs index 746075429a45..86866728cdda 100644 --- a/datafusion/src/physical_plan/repartition.rs +++ b/datafusion/src/physical_plan/repartition.rs @@ -447,7 +447,7 @@ struct RepartitionStream { /// Number of input partitions that have finished sending batches to this output channel num_input_partitions_processed: usize, - /// Schema + /// Schema wrapped by Arc schema: SchemaRef, /// channel containing the repartitioned batches @@ -494,6 +494,7 @@ impl RecordBatchStream for RepartitionStream { mod tests { use super::*; use crate::from_slice::FromSlice; + use crate::test::create_vec_batches; use crate::{ assert_batches_sorted_eq, physical_plan::{collect, expressions::col, memory::MemoryExec}, @@ -508,7 +509,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; use arrow::{ - array::{ArrayRef, StringArray, UInt32Array}, + array::{ArrayRef, StringArray}, error::ArrowError, }; use futures::FutureExt; @@ -601,23 +602,6 @@ mod tests { Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])) } - fn create_vec_batches(schema: &Arc, n: usize) -> Vec { - let batch = create_batch(schema); - let mut vec = Vec::with_capacity(n); - for _ in 0..n { - vec.push(batch.clone()); - } - vec - } - - fn create_batch(schema: &Arc) -> RecordBatch { - RecordBatch::try_new( - schema.clone(), - vec![Arc::new(UInt32Array::from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]))], - ) - .unwrap() - } - async fn repartition( schema: &SchemaRef, input_partitions: Vec>, diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs index 497bfe59e1a1..cebd9ee02d1c 100644 --- a/datafusion/src/test/mod.rs +++ b/datafusion/src/test/mod.rs @@ -17,6 +17,7 @@ //! Common unit test utility methods +use crate::arrow::array::UInt32Array; use crate::datasource::object_store::local::local_unpartitioned_file; use crate::datasource::{MemTable, PartitionedFile, TableProvider}; use crate::error::Result; @@ -212,6 +213,25 @@ pub fn assert_is_pending<'a, T>(fut: &mut Pin + Send assert!(poll.is_pending()); } +/// Create vector batches +pub fn create_vec_batches(schema: &Arc, n: usize) -> Vec { + let batch = create_batch(schema); + let mut vec = Vec::with_capacity(n); + for _ in 0..n { + vec.push(batch.clone()); + } + vec +} + +/// Create batch +fn create_batch(schema: &Arc) -> RecordBatch { + RecordBatch::try_new( + schema.clone(), + vec![Arc::new(UInt32Array::from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]))], + ) + .unwrap() +} + pub mod exec; pub mod object_store; pub mod user_defined;