diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 97d5813610e2..4e97789486db 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -268,15 +268,19 @@ impl ExecutionPlan for SortPreservingMergeExec { #[cfg(test)] mod tests { use std::iter::FromIterator; + use arrow_array::UInt32Array; + use rand::Rng; + use uuid::Uuid; use arrow::array::ArrayRef; use arrow::compute::SortOptions; - use arrow::datatypes::{DataType, Field, Schema}; + use arrow::datatypes::{DataType, Field, Schema, Int32Type}; use arrow::record_batch::RecordBatch; use datafusion_execution::config::SessionConfig; - use futures::{FutureExt, StreamExt}; use tempfile::TempDir; + use futures::{FutureExt, StreamExt, stream::BoxStream}; + use crate::execution::context::SessionContext; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::expressions::col; use crate::physical_plan::memory::MemoryExec; @@ -287,10 +291,202 @@ mod tests { use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; use crate::test::{self, assert_is_pending}; use crate::{assert_batches_eq, test_util}; - use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray}; + use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray, DictionaryArray}; + + use crate::physical_plan::streaming::PartitionStream; + use crate::physical_plan::stream::RecordBatchStreamAdapter; + use crate::datasource::{streaming::StreamingTable, TableProvider}; use super::*; + + fn make_infinite_sorted_stream(col_b_init: &u32) -> BoxStream<'static, RecordBatch> { + let col_b_init_clone = col_b_init.clone(); + futures::stream::unfold((0, col_b_init_clone), move |(mut counter, mut col_b_ascii)| async move { + // stop the stream at 20 batch now. + // Need to figure out how all the columns in the batches are sorted. + if counter >= 12000 { + return None; + } + + if counter % 5 == 0 { + col_b_ascii = col_b_ascii + 2; + } + + counter = counter + 1; + + // building col `a` + let mut values_vector: Vec = Vec::new(); + for _i in 1..=8192 { + // values_vector.push(rand::thread_rng().gen_range(1..=1000)); + values_vector.push(String::from(Uuid::new_v4().to_string())); + } + let values = StringArray::from(values_vector); + + let mut keys_vector: Vec = Vec::new(); + for _i in 1..=8192 { + keys_vector.push(rand::thread_rng().gen_range(0..8192)); + } + let keys = Int32Array::from(keys_vector); + let col_a: ArrayRef = Arc::new(DictionaryArray::::try_new(keys, Arc::new(values)).unwrap()); + + // building col `b` + let mut values: Vec = Vec::new(); + for _i in 1..=8192 { + // let ascii_value = rand::thread_rng().gen_range(97..=110); + // values.push(String::from(from_u32(col_b_ascii).unwrap())); + values.push(col_b_ascii); + // values.sort(); + } + let col_b: ArrayRef = Arc::new(UInt32Array::from(values)); + + // build a record batch out of col `a` and col `b` + let batch: RecordBatch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap(); + Some((batch, (counter, col_b_ascii))) + }).boxed() + } + + struct InfiniteStream { + schema: SchemaRef, + col_b_init: u32 + } + + impl PartitionStream for InfiniteStream { + fn schema(&self) -> &SchemaRef { + &self.schema + } + + fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { + // We create an iterator from the record batches and map them into Ok values, + // converting the iterator into a futures::stream::Stream + Box::pin(RecordBatchStreamAdapter::new( + self.schema.clone(), + make_infinite_sorted_stream(&self.col_b_init).map(Ok) + )) + } + } + + #[tokio::test] + async fn test_dict_merge_infinite() { + let session_ctx = SessionContext::new(); + let task_ctx: Arc = session_ctx.task_ctx(); + let schema = SchemaRef::new(Schema::new(vec![ + Field::new("a", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), false), + Field::new("b", DataType::UInt32, false), + ])); + + let stream_1 = Arc::new(InfiniteStream { + schema: schema.clone(), col_b_init: 1 + }); + + let stream_2 = Arc::new(InfiniteStream { + schema: schema.clone(), col_b_init: 2 + }); + + println!("SortPreservingMergeExec result: "); + + let sort = vec![ + PhysicalSortExpr { + expr: col("b", &schema).unwrap(), + options: Default::default(), + }, + PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: Default::default(), + }, + ]; + + let provider = StreamingTable::try_new(schema, vec![stream_1, stream_2]).unwrap(); + + let plan = provider.scan(&session_ctx.state(), None, &[], None).await.unwrap(); + let exec = Arc::new(SortPreservingMergeExec::new(sort, plan)); + let mut stream = exec.execute(0, task_ctx).unwrap(); + while let Some(batch) = stream.next().await { + println!("{}", arrow::util::pretty::pretty_format_batches(&[batch.unwrap().clone()]) + .unwrap() + .to_string()); + } + } + + #[tokio::test] + async fn test_dict_merge() { + let task_ctx = Arc::new(TaskContext::default()); + + let values = StringArray::from_iter_values(["a", "b", "c"]); + let keys = Int32Array::from(vec![0, 0, 1, 2, 2, 1, 1, 0, 2]); + let a: ArrayRef = Arc::new(DictionaryArray::::try_new(keys, Arc::new(values)).unwrap()); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("a"), + Some("c"), + Some("e"), + Some("g"), + Some("i"), + Some("k"), + Some("m"), + Some("o"), + Some("q"), + ])); + let batch_1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap(); + + let values = StringArray::from_iter_values(["d", "e", "f"]); + let keys = Int32Array::from(vec![0, 0, 1, 2, 2, 1, 1, 0, 2]); + let a: ArrayRef = Arc::new(DictionaryArray::::try_new(keys, Arc::new(values)).unwrap()); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("b"), + Some("d"), + Some("f"), + Some("h"), + Some("j"), + Some("l"), + Some("n"), + Some("p"), + Some("r"), + ])); + let batch_2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap(); + + let schema = batch_1.schema(); + let sort = vec![ + PhysicalSortExpr { + expr: col("b", &schema).unwrap(), + options: Default::default(), + } + ]; + let exec = MemoryExec::try_new(&[vec![batch_1], vec![batch_2]], schema, None).unwrap(); + let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); + let collected = collect(merge, task_ctx).await.unwrap(); + // collected.iter().for_each(|batch| { + // arrow::util::pretty::pretty_format_batches(&[batch.clone()]) + // .unwrap() + // .to_string(); + // }); + + let expected = vec![ + "+---+---+", + "| a | b |", + "+---+---+", + "| a | a |", + "| d | b |", + "| a | c |", + "| d | d |", + "| b | e |", + "| e | f |", + "| c | g |", + "| f | h |", + "| c | i |", + "| f | j |", + "| b | k |", + "| e | l |", + "| b | m |", + "| e | n |", + "| a | o |", + "| d | p |", + "| c | q |", + "| f | r |", + "+---+---+", + ]; + assert_batches_eq!(expected, collected.as_slice()); + } + #[tokio::test] async fn test_merge_interleave() { let task_ctx = Arc::new(TaskContext::default()); diff --git a/datafusion/core/src/physical_plan/sorts/stream.rs b/datafusion/core/src/physical_plan/sorts/stream.rs index 9ef13b7eb25e..b9d8e8ddbe80 100644 --- a/datafusion/core/src/physical_plan/sorts/stream.rs +++ b/datafusion/core/src/physical_plan/sorts/stream.rs @@ -17,7 +17,7 @@ use crate::physical_plan::sorts::cursor::{FieldArray, FieldCursor, RowCursor}; use crate::physical_plan::SendableRecordBatchStream; -use crate::physical_plan::{PhysicalExpr, PhysicalSortExpr}; +use crate::physical_plan::PhysicalSortExpr; use arrow::array::Array; use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; @@ -26,7 +26,6 @@ use datafusion_common::Result; use datafusion_execution::memory_pool::MemoryReservation; use futures::stream::{Fuse, StreamExt}; use std::marker::PhantomData; -use std::sync::Arc; use std::task::{ready, Context, Poll}; /// A [`Stream`](futures::Stream) that has multiple partitions that can @@ -82,11 +81,13 @@ pub struct RowCursorStream { /// Converter to convert output of physical expressions converter: RowConverter, /// The physical expressions to sort by - column_expressions: Vec>, + expressions: Vec, /// Input streams streams: FusedStreams, /// Tracks the memory used by `converter` reservation: MemoryReservation, + /// The schema of the input streams + schema: Schema, } impl RowCursorStream { @@ -108,26 +109,47 @@ impl RowCursorStream { let converter = RowConverter::new(sort_fields)?; Ok(Self { converter, - reservation, - column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(), + expressions: expressions.to_vec(), streams: FusedStreams(streams), + reservation, + schema: schema.clone(), }) } fn convert_batch(&mut self, batch: &RecordBatch) -> Result { - let cols = self - .column_expressions + let column_expressions: Vec<_> = self.expressions.iter().map(|x| x.expr.clone()).collect(); + let cols = column_expressions .iter() .map(|expr| Ok(expr.evaluate(batch)?.into_array(batch.num_rows()))) .collect::>>()?; - let rows = self.converter.convert_columns(&cols)?; - self.reservation.try_resize(self.converter.size())?; + let sort_fields = self + .expressions + .iter() + .map(|expr| { + let data_type = expr.expr.data_type(&self.schema)?; + Ok(SortField::new_with_options(data_type, expr.options)) + }) + .collect::>>()?; - // track the memory in the newly created Rows. + let old_converter: &mut RowConverter = &mut self.converter; + let mut old_rows = old_converter.convert_columns(&cols)?; + self.reservation.try_resize(old_converter.size())?; let mut rows_reservation = self.reservation.new_empty(); - rows_reservation.try_grow(rows.size())?; - Ok(RowCursor::new(rows, rows_reservation)) + rows_reservation.try_grow(old_rows.size())?; + + println!("Old converter size: {0}", old_converter.size()); + if old_converter.size() > 50*1024*1024 { + let mut new_converter = RowConverter::new(sort_fields)?; + let new_rows = new_converter.convert_columns( + &old_converter.convert_rows(&old_rows)? + )?; + rows_reservation.try_resize(new_rows.size())?; + old_rows = new_rows; + println!("Swapped old converter of size: {0} with new converter of size {1}", old_converter.size(), new_converter.size()); + self.converter = new_converter; + } + Ok(RowCursor::new(old_rows, rows_reservation)) } }