Skip to content

Commit

Permalink
refine test in repartition.rs & coalesce_batches.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 committed Jan 30, 2022
1 parent 18ced8d commit c26652b
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 45 deletions.
20 changes: 1 addition & 19 deletions datafusion/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,8 @@ pub fn concat_batches(
#[cfg(test)]
mod tests {
use super::*;
use crate::from_slice::FromSlice;
use crate::physical_plan::{memory::MemoryExec, repartition::RepartitionExec};
use arrow::array::UInt32Array;
use crate::test::create_vec_batches;
use arrow::datatypes::{DataType, Field, Schema};

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -325,23 +324,6 @@ mod tests {
Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]))
}

fn create_vec_batches(schema: &Arc<Schema>, num_batches: usize) -> Vec<RecordBatch> {
let batch = create_batch(schema);
let mut vec = Vec::with_capacity(num_batches);
for _ in 0..num_batches {
vec.push(batch.clone());
}
vec
}

fn create_batch(schema: &Arc<Schema>) -> RecordBatch {
RecordBatch::try_new(
schema.clone(),
vec![Arc::new(UInt32Array::from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]))],
)
.unwrap()
}

async fn coalesce_batches(
schema: &SchemaRef,
input_partitions: Vec<Vec<RecordBatch>>,
Expand Down
6 changes: 2 additions & 4 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send + Sync
/// EmptyRecordBatchStream can be used to create a RecordBatchStream
/// that will produce no results
pub struct EmptyRecordBatchStream {
/// Schema
/// Schema wrapped by Arc
schema: SchemaRef,
}

Expand Down Expand Up @@ -384,9 +384,7 @@ impl Partitioning {
pub fn partition_count(&self) -> usize {
use Partitioning::*;
match self {
RoundRobinBatch(n) => *n,
Hash(_, n) => *n,
UnknownPartitioning(n) => *n,
RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n,
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,9 @@ pub trait PhysicalPlanner {
///
/// `expr`: the expression to convert
///
/// `input_dfschema`: the logical plan schema for evaluating `e`
/// `input_dfschema`: the logical plan schema for evaluating `expr`
///
/// `input_schema`: the physical schema for evaluating `e`
/// `input_schema`: the physical schema for evaluating `expr`
fn create_physical_expr(
&self,
expr: &Expr,
Expand Down
22 changes: 3 additions & 19 deletions datafusion/src/physical_plan/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ struct RepartitionStream {
/// Number of input partitions that have finished sending batches to this output channel
num_input_partitions_processed: usize,

/// Schema
/// Schema wrapped by Arc
schema: SchemaRef,

/// channel containing the repartitioned batches
Expand Down Expand Up @@ -494,6 +494,7 @@ impl RecordBatchStream for RepartitionStream {
mod tests {
use super::*;
use crate::from_slice::FromSlice;
use crate::test::create_vec_batches;
use crate::{
assert_batches_sorted_eq,
physical_plan::{collect, expressions::col, memory::MemoryExec},
Expand All @@ -508,7 +509,7 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use arrow::{
array::{ArrayRef, StringArray, UInt32Array},
array::{ArrayRef, StringArray},
error::ArrowError,
};
use futures::FutureExt;
Expand Down Expand Up @@ -601,23 +602,6 @@ mod tests {
Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]))
}

fn create_vec_batches(schema: &Arc<Schema>, n: usize) -> Vec<RecordBatch> {
let batch = create_batch(schema);
let mut vec = Vec::with_capacity(n);
for _ in 0..n {
vec.push(batch.clone());
}
vec
}

fn create_batch(schema: &Arc<Schema>) -> RecordBatch {
RecordBatch::try_new(
schema.clone(),
vec![Arc::new(UInt32Array::from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]))],
)
.unwrap()
}

async fn repartition(
schema: &SchemaRef,
input_partitions: Vec<Vec<RecordBatch>>,
Expand Down
22 changes: 21 additions & 1 deletion datafusion/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Common unit test utility methods

use crate::arrow::array::UInt32Array;
use crate::datasource::object_store::local::local_unpartitioned_file;
use crate::datasource::{MemTable, PartitionedFile, TableProvider};
use crate::error::Result;
Expand Down Expand Up @@ -223,7 +224,7 @@ fn make_decimal() -> RecordBatch {
/// "millis" --> TimestampMillisecondArray
/// "secs" --> TimestampSecondArray
/// "names" --> StringArray
pub fn make_timestamps() -> RecordBatch {
fn make_timestamps() -> RecordBatch {
let ts_strings = vec![
Some("2018-11-13T17:11:10.011375885995"),
Some("2011-12-13T11:13:10.12345"),
Expand Down Expand Up @@ -302,6 +303,25 @@ pub fn assert_is_pending<'a, T>(fut: &mut Pin<Box<dyn Future<Output = T> + Send
assert!(poll.is_pending());
}

/// Create vector batches
pub fn create_vec_batches(schema: &Arc<Schema>, n: usize) -> Vec<RecordBatch> {
let batch = create_batch(schema);
let mut vec = Vec::with_capacity(n);
for _ in 0..n {
vec.push(batch.clone());
}
vec
}

/// Create batch
fn create_batch(schema: &Arc<Schema>) -> RecordBatch {
RecordBatch::try_new(
schema.clone(),
vec![Arc::new(UInt32Array::from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]))],
)
.unwrap()
}

pub mod exec;
pub mod object_store;
pub mod user_defined;
Expand Down

0 comments on commit c26652b

Please sign in to comment.