From 3e8f108a82671660ab44328e372a4bbc846c853c Mon Sep 17 00:00:00 2001 From: Jayjeet Chakraborty Date: Thu, 20 Jul 2023 13:41:45 -0700 Subject: [PATCH 01/14] Add test for sort preseving merge on dict --- .../sorts/sort_preserving_merge.rs | 84 ++++++++++++++++++- 1 file changed, 82 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 97d5813610e2..e9517511c71d 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -271,7 +271,7 @@ mod tests { use arrow::array::ArrayRef; use arrow::compute::SortOptions; - use arrow::datatypes::{DataType, Field, Schema}; + use arrow::datatypes::{DataType, Field, Schema, Int32Type}; use arrow::record_batch::RecordBatch; use datafusion_execution::config::SessionConfig; use futures::{FutureExt, StreamExt}; @@ -287,10 +287,90 @@ mod tests { use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; use crate::test::{self, assert_is_pending}; use crate::{assert_batches_eq, test_util}; - use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray}; + use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray, DictionaryArray}; use super::*; + #[tokio::test] + async fn test_dict_merge() { + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + + let values = StringArray::from_iter_values(["a", "b", "c"]); + let keys = Int32Array::from(vec![0, 0, 1, 2, 2, 1, 1, 0, 2]); + let a: ArrayRef = Arc::new(DictionaryArray::::try_new(keys, Arc::new(values)).unwrap()); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("a"), + Some("c"), + Some("e"), + Some("g"), + Some("i"), + Some("k"), + Some("m"), + Some("o"), + Some("q"), + ])); + let batch_1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap(); + + let values = StringArray::from_iter_values(["d", "e", "f"]); + let keys = Int32Array::from(vec![0, 0, 1, 2, 2, 1, 1, 0, 2]); + let a: ArrayRef = Arc::new(DictionaryArray::::try_new(keys, Arc::new(values)).unwrap()); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("b"), + Some("d"), + Some("f"), + Some("h"), + Some("j"), + Some("l"), + Some("n"), + Some("p"), + Some("r"), + ])); + let batch_2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap(); + + let schema = batch_1.schema(); + let sort = vec![ + PhysicalSortExpr { + expr: col("b", &schema).unwrap(), + options: Default::default(), + } + ]; + let exec = MemoryExec::try_new(&[vec![batch_1], vec![batch_2]], schema, None).unwrap(); + let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); + let collected = collect(merge, task_ctx).await.unwrap(); + // collected.iter().for_each(|batch| { + // arrow::util::pretty::pretty_format_batches(&[batch.clone()]) + // .unwrap() + // .to_string(); + // }); + + let expected = vec![ + "+---+---+", + "| a | b |", + "+---+---+", + "| a | a |", + "| d | b |", + "| a | c |", + "| d | d |", + "| b | e |", + "| e | f |", + "| c | g |", + "| f | h |", + "| c | i |", + "| f | j |", + "| b | k |", + "| e | l |", + "| b | m |", + "| e | n |", + "| a | o |", + "| d | p |", + "| c | q |", + "| f | r |", + "+---+---+", + ]; + assert_batches_eq!(expected, collected.as_slice()); + } + #[tokio::test] async fn test_merge_interleave() { let task_ctx = Arc::new(TaskContext::default()); From e984dc028a542e93b161bb16d947c124e8b2d971 Mon Sep 17 00:00:00 2001 From: Jayjeet Chakraborty Date: Tue, 25 Jul 2023 12:03:56 -0400 Subject: [PATCH 02/14] Added code to generate infinite stream of batches --- .../sorts/sort_preserving_merge.rs | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index e9517511c71d..9a6f0f8cda94 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -289,8 +289,61 @@ mod tests { use crate::{assert_batches_eq, test_util}; use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray, DictionaryArray}; + use crate::datasource::streaming::StreamingTableExec; + use crate::physical_plan::streaming::PartitionStream; + use crate::physical_plan::stream::RecordBatchStreamAdapter; + use super::*; + struct InfiniteStream { + schema: SchemaRef, + batches: Vec, + } + + impl PartitionStream for InfiniteStream { + fn schema(&self) -> &SchemaRef { + &self.schema + } + + fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { + // We create an iterator from the record batches and map them into Ok values, + // converting the iterator into a futures::stream::Stream + Box::pin(RecordBatchStreamAdapter::new( + self.schema.clone(), + futures::stream::iter(self.batches.clone()).map(Ok), + )) + } + } + + #[tokio::test] + async fn test_dict_merge_inf() { + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + + // Need to generate batches of batches + let batches: Vec<_> = AccessLogGenerator::new() + .with_row_limit(1000) + .with_max_batch_size(50) + .collect(); + + let schema = batches[0].schema(); + + let sort = vec![ + PhysicalSortExpr { + expr: col("b", &schema).unwrap(), + options: Default::default(), + } + ]; + + let exec = StreamingTableExec::try_new(schema, vec![Arc::new(InfiniteStream { + schema: batches[0].schema(), + batches: batches.clone(), + })], true).unwrap(); + let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); + let collected = collect(merge, task_ctx).await.unwrap(); + + } + #[tokio::test] async fn test_dict_merge() { let session_ctx = SessionContext::new(); From c7602c018026289548f6e25256f18e344587fae8 Mon Sep 17 00:00:00 2001 From: Jayjeet Chakraborty Date: Wed, 26 Jul 2023 07:22:19 -0400 Subject: [PATCH 03/14] Try with integer --- .../sorts/sort_preserving_merge.rs | 146 ++++++++++-------- 1 file changed, 81 insertions(+), 65 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 9a6f0f8cda94..bfe0be01de49 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -268,6 +268,7 @@ impl ExecutionPlan for SortPreservingMergeExec { #[cfg(test)] mod tests { use std::iter::FromIterator; + use rand::Rng; use arrow::array::ArrayRef; use arrow::compute::SortOptions; @@ -289,7 +290,7 @@ mod tests { use crate::{assert_batches_eq, test_util}; use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray, DictionaryArray}; - use crate::datasource::streaming::StreamingTableExec; + // use crate::datasource::streaming::StreamingTableExec; use crate::physical_plan::streaming::PartitionStream; use crate::physical_plan::stream::RecordBatchStreamAdapter; @@ -318,29 +319,63 @@ mod tests { #[tokio::test] async fn test_dict_merge_inf() { let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); - - // Need to generate batches of batches - let batches: Vec<_> = AccessLogGenerator::new() - .with_row_limit(1000) - .with_max_batch_size(50) - .collect(); + let task_ctx: Arc = session_ctx.task_ctx(); + + // build a set of 100 million batches + let mut batches: Vec = Vec::new(); + for _i in 1..=2 { + // building col `a` + let values = + StringArray::from_iter_values(["x", "y", "z"]); + let mut keys_vector: Vec = Vec::new(); + for _i in 1..=8192 { + keys_vector.push(rand::thread_rng().gen_range(0..=2)); + } + let keys = Int32Array::from(keys_vector); + let col_a: ArrayRef = Arc::new(DictionaryArray::::try_new(keys, Arc::new(values)).unwrap()); + + // building col `b` + let mut values: Vec = Vec::new(); + for _i in 1..=8192 { + values.push(rand::thread_rng().gen_range(1..=999)); + } + let col_b: ArrayRef = Arc::new(Int32Array::from(values)); + + // build a record batch out of col `a` and col `b` + let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap(); + + // print the batch + // println!("{}", arrow::util::pretty::pretty_format_batches(&[batch.clone()]).unwrap().to_string()); + + batches.push(batch); + } + // get the schema of the stream of batches let schema = batches[0].schema(); + // sort the batches by col `b` let sort = vec![ PhysicalSortExpr { expr: col("b", &schema).unwrap(), options: Default::default(), } ]; - - let exec = StreamingTableExec::try_new(schema, vec![Arc::new(InfiniteStream { - schema: batches[0].schema(), - batches: batches.clone(), - })], true).unwrap(); + + // create a streaming table exec node + let exec = MemoryExec::try_new(&[batches], schema, None).unwrap(); + + // create a sort preserving merge exec node let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); + + // execute and collect the result let collected = collect(merge, task_ctx).await.unwrap(); + collected.iter().for_each(|batch| { + println!("{}", arrow::util::pretty::pretty_format_batches(&[batch.clone()]) + .unwrap() + .to_string()); + + println!("batch len: {}", batch.num_rows()); + }); } @@ -352,33 +387,14 @@ mod tests { let values = StringArray::from_iter_values(["a", "b", "c"]); let keys = Int32Array::from(vec![0, 0, 1, 2, 2, 1, 1, 0, 2]); let a: ArrayRef = Arc::new(DictionaryArray::::try_new(keys, Arc::new(values)).unwrap()); - let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ - Some("a"), - Some("c"), - Some("e"), - Some("g"), - Some("i"), - Some("k"), - Some("m"), - Some("o"), - Some("q"), - ])); + let b: ArrayRef = Arc::new(Int32Array::from(vec![10, 15, 12, 56, 34, 76, 2, 15, 29])); let batch_1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap(); let values = StringArray::from_iter_values(["d", "e", "f"]); let keys = Int32Array::from(vec![0, 0, 1, 2, 2, 1, 1, 0, 2]); let a: ArrayRef = Arc::new(DictionaryArray::::try_new(keys, Arc::new(values)).unwrap()); - let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ - Some("b"), - Some("d"), - Some("f"), - Some("h"), - Some("j"), - Some("l"), - Some("n"), - Some("p"), - Some("r"), - ])); + let b: ArrayRef = Arc::new(Int32Array::from(vec![11, 16, 13, 57, 35, 77, 4, 17, 34])); + let batch_2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap(); let schema = batch_1.schema(); @@ -391,37 +407,37 @@ mod tests { let exec = MemoryExec::try_new(&[vec![batch_1], vec![batch_2]], schema, None).unwrap(); let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); let collected = collect(merge, task_ctx).await.unwrap(); - // collected.iter().for_each(|batch| { - // arrow::util::pretty::pretty_format_batches(&[batch.clone()]) - // .unwrap() - // .to_string(); - // }); + collected.iter().for_each(|batch| { + println!("{}", arrow::util::pretty::pretty_format_batches(&[batch.clone()]) + .unwrap() + .to_string()); + }); - let expected = vec![ - "+---+---+", - "| a | b |", - "+---+---+", - "| a | a |", - "| d | b |", - "| a | c |", - "| d | d |", - "| b | e |", - "| e | f |", - "| c | g |", - "| f | h |", - "| c | i |", - "| f | j |", - "| b | k |", - "| e | l |", - "| b | m |", - "| e | n |", - "| a | o |", - "| d | p |", - "| c | q |", - "| f | r |", - "+---+---+", - ]; - assert_batches_eq!(expected, collected.as_slice()); + // let expected = vec![ + // "+---+---+", + // "| a | b |", + // "+---+---+", + // "| a | a |", + // "| d | b |", + // "| a | c |", + // "| d | d |", + // "| b | e |", + // "| e | f |", + // "| c | g |", + // "| f | h |", + // "| c | i |", + // "| f | j |", + // "| b | k |", + // "| e | l |", + // "| b | m |", + // "| e | n |", + // "| a | o |", + // "| d | p |", + // "| c | q |", + // "| f | r |", + // "+---+---+", + // ]; + // assert_batches_eq!(expected, collected.as_slice()); } #[tokio::test] From abdad4df03f1f126b6cb11273d4381e505d5945a Mon Sep 17 00:00:00 2001 From: Jayjeet Chakraborty Date: Wed, 26 Jul 2023 07:58:30 -0400 Subject: [PATCH 04/14] Use second col as string --- .../sorts/sort_preserving_merge.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index bfe0be01de49..7ad7c5524340 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -267,6 +267,7 @@ impl ExecutionPlan for SortPreservingMergeExec { #[cfg(test)] mod tests { + use std::char::from_u32; use std::iter::FromIterator; use rand::Rng; @@ -328,24 +329,25 @@ mod tests { let values = StringArray::from_iter_values(["x", "y", "z"]); let mut keys_vector: Vec = Vec::new(); - for _i in 1..=8192 { + for _i in 1..=10 { keys_vector.push(rand::thread_rng().gen_range(0..=2)); } let keys = Int32Array::from(keys_vector); let col_a: ArrayRef = Arc::new(DictionaryArray::::try_new(keys, Arc::new(values)).unwrap()); // building col `b` - let mut values: Vec = Vec::new(); - for _i in 1..=8192 { - values.push(rand::thread_rng().gen_range(1..=999)); + let mut values: Vec = Vec::new(); + for _i in 1..=10 { + let ascii_value = rand::thread_rng().gen_range(97..=122); + values.push(String::from(from_u32(ascii_value).unwrap())); } - let col_b: ArrayRef = Arc::new(Int32Array::from(values)); + let col_b: ArrayRef = Arc::new(StringArray::from(values)); // build a record batch out of col `a` and col `b` - let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap(); + let batch: RecordBatch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap(); // print the batch - // println!("{}", arrow::util::pretty::pretty_format_batches(&[batch.clone()]).unwrap().to_string()); + println!("{}", arrow::util::pretty::pretty_format_batches(&[batch.clone()]).unwrap().to_string()); batches.push(batch); } @@ -373,8 +375,6 @@ mod tests { println!("{}", arrow::util::pretty::pretty_format_batches(&[batch.clone()]) .unwrap() .to_string()); - - println!("batch len: {}", batch.num_rows()); }); } From 0424265b51dccadb1dfcffa0858f68b72c91c723 Mon Sep 17 00:00:00 2001 From: Jayjeet Chakraborty Date: Wed, 26 Jul 2023 13:52:09 -0400 Subject: [PATCH 05/14] Fixes --- .../sorts/sort_preserving_merge.rs | 50 +++++++++++++++---- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 7ad7c5524340..0319e6258132 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -291,9 +291,9 @@ mod tests { use crate::{assert_batches_eq, test_util}; use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray, DictionaryArray}; - // use crate::datasource::streaming::StreamingTableExec; use crate::physical_plan::streaming::PartitionStream; use crate::physical_plan::stream::RecordBatchStreamAdapter; + use crate::datasource::{streaming::StreamingTable, TableProvider}; use super::*; @@ -325,9 +325,14 @@ mod tests { // build a set of 100 million batches let mut batches: Vec = Vec::new(); for _i in 1..=2 { + // println!("i: {}", _i); // building col `a` let values = - StringArray::from_iter_values(["x", "y", "z"]); + StringArray::from_iter_values([ + "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", + "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy", + "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz" + ]); let mut keys_vector: Vec = Vec::new(); for _i in 1..=10 { keys_vector.push(rand::thread_rng().gen_range(0..=2)); @@ -338,9 +343,11 @@ mod tests { // building col `b` let mut values: Vec = Vec::new(); for _i in 1..=10 { - let ascii_value = rand::thread_rng().gen_range(97..=122); + let ascii_value = rand::thread_rng().gen_range(97..=110); values.push(String::from(from_u32(ascii_value).unwrap())); + values.sort(); } + let col_b: ArrayRef = Arc::new(StringArray::from(values)); // build a record batch out of col `a` and col `b` @@ -352,10 +359,11 @@ mod tests { batches.push(batch); } + println!("Result: "); + // get the schema of the stream of batches let schema = batches[0].schema(); - // sort the batches by col `b` let sort = vec![ PhysicalSortExpr { expr: col("b", &schema).unwrap(), @@ -363,13 +371,13 @@ mod tests { } ]; - // create a streaming table exec node - let exec = MemoryExec::try_new(&[batches], schema, None).unwrap(); - - // create a sort preserving merge exec node - let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); + let provider = StreamingTable::try_new(schema, vec![Arc::new(InfiniteStream { + schema: batches[0].schema(), + batches: batches.clone(), + })]).unwrap(); - // execute and collect the result + let exec = provider.scan(&session_ctx.state(), None, &[], None).await.unwrap(); + let merge = Arc::new(SortPreservingMergeExec::new(sort, exec)); let collected = collect(merge, task_ctx).await.unwrap(); collected.iter().for_each(|batch| { println!("{}", arrow::util::pretty::pretty_format_batches(&[batch.clone()]) @@ -377,6 +385,28 @@ mod tests { .to_string()); }); + // // sort the batches by col `b` + // let sort = vec![ + // PhysicalSortExpr { + // expr: col("b", &schema).unwrap(), + // options: Default::default(), + // } + // ]; + + // // create a streaming table exec node + // let exec = MemoryExec::try_new(&[batches], schema, None).unwrap(); + + // // create a sort preserving merge exec node + // let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); + + // // execute and collect the result + // let collected = collect(merge, task_ctx).await.unwrap(); + // collected.iter().for_each(|batch| { + // println!("{}", arrow::util::pretty::pretty_format_batches(&[batch.clone()]) + // .unwrap() + // .to_string()); + // }); + } #[tokio::test] From ddaaaef642381a2792144b72e76932372b9bdd3a Mon Sep 17 00:00:00 2001 From: Jayjeet Chakraborty Date: Wed, 26 Jul 2023 17:00:50 -0400 Subject: [PATCH 06/14] Fixes --- .../sorts/sort_preserving_merge.rs | 131 ++++++++---------- 1 file changed, 60 insertions(+), 71 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 0319e6258132..0e23db9d9db3 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -270,14 +270,15 @@ mod tests { use std::char::from_u32; use std::iter::FromIterator; use rand::Rng; + use std::pin::Pin; use arrow::array::ArrayRef; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, Int32Type}; use arrow::record_batch::RecordBatch; use datafusion_execution::config::SessionConfig; - use futures::{FutureExt, StreamExt}; use tempfile::TempDir; + use futures::{FutureExt, StreamExt, stream::BoxStream}; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::expressions::col; @@ -296,36 +297,17 @@ mod tests { use crate::datasource::{streaming::StreamingTable, TableProvider}; use super::*; - - struct InfiniteStream { - schema: SchemaRef, - batches: Vec, - } - - impl PartitionStream for InfiniteStream { - fn schema(&self) -> &SchemaRef { - &self.schema - } - fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { - // We create an iterator from the record batches and map them into Ok values, - // converting the iterator into a futures::stream::Stream - Box::pin(RecordBatchStreamAdapter::new( - self.schema.clone(), - futures::stream::iter(self.batches.clone()).map(Ok), - )) - } - } - - #[tokio::test] - async fn test_dict_merge_inf() { - let session_ctx = SessionContext::new(); - let task_ctx: Arc = session_ctx.task_ctx(); + fn make_infinite_sorted_stream() -> BoxStream<'static, RecordBatch> { + futures::stream::unfold(0, |state| async move { + // stop the stream at 1 batch now. + // Need to figure out how all the columns in the batches are sorted. + if state >= 1 { + return None; + } - // build a set of 100 million batches - let mut batches: Vec = Vec::new(); - for _i in 1..=2 { - // println!("i: {}", _i); + let next_state = state + 1; + // building col `a` let values = StringArray::from_iter_values([ @@ -334,7 +316,7 @@ mod tests { "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz" ]); let mut keys_vector: Vec = Vec::new(); - for _i in 1..=10 { + for _i in 1..=8192 { keys_vector.push(rand::thread_rng().gen_range(0..=2)); } let keys = Int32Array::from(keys_vector); @@ -342,27 +324,60 @@ mod tests { // building col `b` let mut values: Vec = Vec::new(); - for _i in 1..=10 { + for _i in 1..=8192 { let ascii_value = rand::thread_rng().gen_range(97..=110); values.push(String::from(from_u32(ascii_value).unwrap())); values.sort(); } - let col_b: ArrayRef = Arc::new(StringArray::from(values)); // build a record batch out of col `a` and col `b` let batch: RecordBatch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap(); - // print the batch - println!("{}", arrow::util::pretty::pretty_format_batches(&[batch.clone()]).unwrap().to_string()); + // println!("{}", arrow::util::pretty::pretty_format_batches(&[batch.clone()]).unwrap().to_string()); - batches.push(batch); + Some((batch, next_state)) + }).boxed() + } + + struct InfiniteStream { + schema: SchemaRef, + } + + impl PartitionStream for InfiniteStream { + fn schema(&self) -> &SchemaRef { + &self.schema + } + + fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { + // We create an iterator from the record batches and map them into Ok values, + // converting the iterator into a futures::stream::Stream + Box::pin(RecordBatchStreamAdapter::new( + self.schema.clone(), + make_infinite_sorted_stream().map(Ok) + )) } + } - println!("Result: "); + #[tokio::test] + async fn test_dict_merge_inf() { + let session_ctx = SessionContext::new(); + let task_ctx: Arc = session_ctx.task_ctx(); - // get the schema of the stream of batches - let schema = batches[0].schema(); + let schema = SchemaRef::new(Schema::new(vec![ + Field::new("a", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), false), + Field::new("b", DataType::Utf8, false), + ])); + + let stream_1 = Arc::new(InfiniteStream { + schema: schema.clone(), + }); + + let stream_2 = Arc::new(InfiniteStream { + schema: schema.clone(), + }); + + println!("SortPreservingMergeExec result: "); let sort = vec![ PhysicalSortExpr { @@ -371,42 +386,16 @@ mod tests { } ]; - let provider = StreamingTable::try_new(schema, vec![Arc::new(InfiniteStream { - schema: batches[0].schema(), - batches: batches.clone(), - })]).unwrap(); + let provider = StreamingTable::try_new(schema, vec![stream_1, stream_2]).unwrap(); - let exec = provider.scan(&session_ctx.state(), None, &[], None).await.unwrap(); - let merge = Arc::new(SortPreservingMergeExec::new(sort, exec)); - let collected = collect(merge, task_ctx).await.unwrap(); - collected.iter().for_each(|batch| { - println!("{}", arrow::util::pretty::pretty_format_batches(&[batch.clone()]) + let plan = provider.scan(&session_ctx.state(), None, &[], None).await.unwrap(); + let exec = Arc::new(SortPreservingMergeExec::new(sort, plan)); + let mut stream = exec.execute(0, task_ctx).unwrap(); + while let Some(batch) = stream.next().await { + println!("{}", arrow::util::pretty::pretty_format_batches(&[batch.unwrap().clone()]) .unwrap() .to_string()); - }); - - // // sort the batches by col `b` - // let sort = vec![ - // PhysicalSortExpr { - // expr: col("b", &schema).unwrap(), - // options: Default::default(), - // } - // ]; - - // // create a streaming table exec node - // let exec = MemoryExec::try_new(&[batches], schema, None).unwrap(); - - // // create a sort preserving merge exec node - // let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); - - // // execute and collect the result - // let collected = collect(merge, task_ctx).await.unwrap(); - // collected.iter().for_each(|batch| { - // println!("{}", arrow::util::pretty::pretty_format_batches(&[batch.clone()]) - // .unwrap() - // .to_string()); - // }); - + } } #[tokio::test] From 6a292e0ba2ad53c3fac7c71be80ea805d6594ed1 Mon Sep 17 00:00:00 2001 From: Jayjeet Chakraborty Date: Wed, 26 Jul 2023 17:01:32 -0400 Subject: [PATCH 07/14] Fixes --- .../core/src/physical_plan/sorts/sort_preserving_merge.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 0e23db9d9db3..149d964f14f8 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -333,9 +333,6 @@ mod tests { // build a record batch out of col `a` and col `b` let batch: RecordBatch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap(); - - // println!("{}", arrow::util::pretty::pretty_format_batches(&[batch.clone()]).unwrap().to_string()); - Some((batch, next_state)) }).boxed() } From 39b369ac08fc141e123825e49180abf5103aea1c Mon Sep 17 00:00:00 2001 From: Jayjeet Chakraborty Date: Wed, 26 Jul 2023 17:09:25 -0400 Subject: [PATCH 08/14] Fixes --- .../core/src/physical_plan/sorts/sort_preserving_merge.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 149d964f14f8..a9d537476f1b 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -357,7 +357,7 @@ mod tests { } #[tokio::test] - async fn test_dict_merge_inf() { + async fn test_dict_merge_infinite() { let session_ctx = SessionContext::new(); let task_ctx: Arc = session_ctx.task_ctx(); From a5066c307bafe221d49d78f37e21d3b961dd542d Mon Sep 17 00:00:00 2001 From: Jayjeet Chakraborty Date: Mon, 31 Jul 2023 08:28:03 -0700 Subject: [PATCH 09/14] Complete implementation for infinite stream merge using integer values (#1) * Remove unnecessary libs * Generate infinite batches * Generate high cardinality fields * Generate high cardinality fields * Merge wip --- .../sorts/sort_preserving_merge.rs | 56 ++++++++++--------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index a9d537476f1b..7f2278f64a19 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -267,10 +267,9 @@ impl ExecutionPlan for SortPreservingMergeExec { #[cfg(test)] mod tests { - use std::char::from_u32; use std::iter::FromIterator; + use arrow_array::UInt32Array; use rand::Rng; - use std::pin::Pin; use arrow::array::ArrayRef; use arrow::compute::SortOptions; @@ -298,47 +297,54 @@ mod tests { use super::*; - fn make_infinite_sorted_stream() -> BoxStream<'static, RecordBatch> { - futures::stream::unfold(0, |state| async move { - // stop the stream at 1 batch now. + fn make_infinite_sorted_stream(col_b_init: &u32) -> BoxStream<'static, RecordBatch> { + let col_b_init_clone = col_b_init.clone(); + futures::stream::unfold((0, col_b_init_clone), move |(mut counter, mut col_b_ascii)| async move { + // stop the stream at 20 batch now. // Need to figure out how all the columns in the batches are sorted. - if state >= 1 { + if counter >= 12000 { return None; } - let next_state = state + 1; + if counter % 5 == 0 { + col_b_ascii = col_b_ascii + 2; + } + + counter = counter + 1; // building col `a` - let values = - StringArray::from_iter_values([ - "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", - "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy", - "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz" - ]); + let mut values_vector: Vec = Vec::new(); + for _i in 1..=8192 { + values_vector.push(rand::thread_rng().gen_range(1..=1000)); + } + let values = Int32Array::from(values_vector); + let mut keys_vector: Vec = Vec::new(); for _i in 1..=8192 { - keys_vector.push(rand::thread_rng().gen_range(0..=2)); + keys_vector.push(rand::thread_rng().gen_range(0..8192)); } let keys = Int32Array::from(keys_vector); let col_a: ArrayRef = Arc::new(DictionaryArray::::try_new(keys, Arc::new(values)).unwrap()); // building col `b` - let mut values: Vec = Vec::new(); + let mut values: Vec = Vec::new(); for _i in 1..=8192 { - let ascii_value = rand::thread_rng().gen_range(97..=110); - values.push(String::from(from_u32(ascii_value).unwrap())); - values.sort(); + // let ascii_value = rand::thread_rng().gen_range(97..=110); + // values.push(String::from(from_u32(col_b_ascii).unwrap())); + values.push(col_b_ascii); + // values.sort(); } - let col_b: ArrayRef = Arc::new(StringArray::from(values)); + let col_b: ArrayRef = Arc::new(UInt32Array::from(values)); // build a record batch out of col `a` and col `b` let batch: RecordBatch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap(); - Some((batch, next_state)) + Some((batch, (counter, col_b_ascii))) }).boxed() } struct InfiniteStream { schema: SchemaRef, + col_b_init: u32 } impl PartitionStream for InfiniteStream { @@ -351,7 +357,7 @@ mod tests { // converting the iterator into a futures::stream::Stream Box::pin(RecordBatchStreamAdapter::new( self.schema.clone(), - make_infinite_sorted_stream().map(Ok) + make_infinite_sorted_stream(&self.col_b_init).map(Ok) )) } } @@ -362,16 +368,16 @@ mod tests { let task_ctx: Arc = session_ctx.task_ctx(); let schema = SchemaRef::new(Schema::new(vec![ - Field::new("a", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), false), - Field::new("b", DataType::Utf8, false), + Field::new("a", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32)), false), + Field::new("b", DataType::UInt32, false), ])); let stream_1 = Arc::new(InfiniteStream { - schema: schema.clone(), + schema: schema.clone(), col_b_init: 1 }); let stream_2 = Arc::new(InfiniteStream { - schema: schema.clone(), + schema: schema.clone(), col_b_init: 2 }); println!("SortPreservingMergeExec result: "); From f822a7935906a7d73e73cc99a4978cd183c4a0db Mon Sep 17 00:00:00 2001 From: Jayjeet Chakraborty Date: Mon, 31 Jul 2023 08:57:19 -0700 Subject: [PATCH 10/14] Use string dicts --- .../sorts/sort_preserving_merge.rs | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 7f2278f64a19..7ffce5cd0d16 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -270,6 +270,7 @@ mod tests { use std::iter::FromIterator; use arrow_array::UInt32Array; use rand::Rng; + use uuid::Uuid; use arrow::array::ArrayRef; use arrow::compute::SortOptions; @@ -316,6 +317,7 @@ mod tests { let mut values_vector: Vec = Vec::new(); for _i in 1..=8192 { values_vector.push(rand::thread_rng().gen_range(1..=1000)); + // values_vector.push(String::from(Uuid::new_v4().to_string())); } let values = Int32Array::from(values_vector); @@ -386,7 +388,27 @@ mod tests { PhysicalSortExpr { expr: col("b", &schema).unwrap(), options: Default::default(), - } + }, + PhysicalSortExpr { + expr: col("b", &schema).unwrap(), + options: Default::default(), + }, + PhysicalSortExpr { + expr: col("b", &schema).unwrap(), + options: Default::default(), + }, + PhysicalSortExpr { + expr: col("b", &schema).unwrap(), + options: Default::default(), + }, + PhysicalSortExpr { + expr: col("b", &schema).unwrap(), + options: Default::default(), + }, + PhysicalSortExpr { + expr: col("b", &schema).unwrap(), + options: Default::default(), + }, ]; let provider = StreamingTable::try_new(schema, vec![stream_1, stream_2]).unwrap(); From 29b6420807925117be8f40b07caab505f02fe5db Mon Sep 17 00:00:00 2001 From: Jayjeet Chakraborty Date: Mon, 31 Jul 2023 09:03:58 -0700 Subject: [PATCH 11/14] Use string dicts --- .../src/physical_plan/sorts/sort_preserving_merge.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 7ffce5cd0d16..129b6257bf8e 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -314,12 +314,12 @@ mod tests { counter = counter + 1; // building col `a` - let mut values_vector: Vec = Vec::new(); + let mut values_vector: Vec = Vec::new(); for _i in 1..=8192 { - values_vector.push(rand::thread_rng().gen_range(1..=1000)); - // values_vector.push(String::from(Uuid::new_v4().to_string())); + // values_vector.push(rand::thread_rng().gen_range(1..=1000)); + values_vector.push(String::from(Uuid::new_v4().to_string())); } - let values = Int32Array::from(values_vector); + let values = StringArray::from(values_vector); let mut keys_vector: Vec = Vec::new(); for _i in 1..=8192 { @@ -370,7 +370,7 @@ mod tests { let task_ctx: Arc = session_ctx.task_ctx(); let schema = SchemaRef::new(Schema::new(vec![ - Field::new("a", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32)), false), + Field::new("a", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), false), Field::new("b", DataType::UInt32, false), ])); From 278a7b665edd79d9cc276c20d44d98cec103d882 Mon Sep 17 00:00:00 2001 From: Jayjeet Chakraborty Date: Mon, 31 Jul 2023 12:45:35 -0700 Subject: [PATCH 12/14] Sort on a also --- .../sorts/sort_preserving_merge.rs | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 129b6257bf8e..994ab4e7b612 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -390,23 +390,7 @@ mod tests { options: Default::default(), }, PhysicalSortExpr { - expr: col("b", &schema).unwrap(), - options: Default::default(), - }, - PhysicalSortExpr { - expr: col("b", &schema).unwrap(), - options: Default::default(), - }, - PhysicalSortExpr { - expr: col("b", &schema).unwrap(), - options: Default::default(), - }, - PhysicalSortExpr { - expr: col("b", &schema).unwrap(), - options: Default::default(), - }, - PhysicalSortExpr { - expr: col("b", &schema).unwrap(), + expr: col("a", &schema).unwrap(), options: Default::default(), }, ]; From ec626e94cef180a7a2250323194f936266e713b3 Mon Sep 17 00:00:00 2001 From: Jayjeet Chakraborty Date: Thu, 10 Aug 2023 08:23:36 -0700 Subject: [PATCH 13/14] Flush out row converter at 50 MB --- .../sorts/sort_preserving_merge.rs | 87 +++++++++++-------- .../core/src/physical_plan/sorts/stream.rs | 45 +++++++--- 2 files changed, 86 insertions(+), 46 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 994ab4e7b612..05425c3191b6 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -415,14 +415,33 @@ mod tests { let values = StringArray::from_iter_values(["a", "b", "c"]); let keys = Int32Array::from(vec![0, 0, 1, 2, 2, 1, 1, 0, 2]); let a: ArrayRef = Arc::new(DictionaryArray::::try_new(keys, Arc::new(values)).unwrap()); - let b: ArrayRef = Arc::new(Int32Array::from(vec![10, 15, 12, 56, 34, 76, 2, 15, 29])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("a"), + Some("c"), + Some("e"), + Some("g"), + Some("i"), + Some("k"), + Some("m"), + Some("o"), + Some("q"), + ])); let batch_1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap(); let values = StringArray::from_iter_values(["d", "e", "f"]); let keys = Int32Array::from(vec![0, 0, 1, 2, 2, 1, 1, 0, 2]); let a: ArrayRef = Arc::new(DictionaryArray::::try_new(keys, Arc::new(values)).unwrap()); - let b: ArrayRef = Arc::new(Int32Array::from(vec![11, 16, 13, 57, 35, 77, 4, 17, 34])); - + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("b"), + Some("d"), + Some("f"), + Some("h"), + Some("j"), + Some("l"), + Some("n"), + Some("p"), + Some("r"), + ])); let batch_2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap(); let schema = batch_1.schema(); @@ -435,39 +454,39 @@ mod tests { let exec = MemoryExec::try_new(&[vec![batch_1], vec![batch_2]], schema, None).unwrap(); let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); let collected = collect(merge, task_ctx).await.unwrap(); - collected.iter().for_each(|batch| { - println!("{}", arrow::util::pretty::pretty_format_batches(&[batch.clone()]) - .unwrap() - .to_string()); - }); + // collected.iter().for_each(|batch| { + // arrow::util::pretty::pretty_format_batches(&[batch.clone()]) + // .unwrap() + // .to_string(); + // }); - // let expected = vec![ - // "+---+---+", - // "| a | b |", - // "+---+---+", - // "| a | a |", - // "| d | b |", - // "| a | c |", - // "| d | d |", - // "| b | e |", - // "| e | f |", - // "| c | g |", - // "| f | h |", - // "| c | i |", - // "| f | j |", - // "| b | k |", - // "| e | l |", - // "| b | m |", - // "| e | n |", - // "| a | o |", - // "| d | p |", - // "| c | q |", - // "| f | r |", - // "+---+---+", - // ]; - // assert_batches_eq!(expected, collected.as_slice()); + let expected = vec![ + "+---+---+", + "| a | b |", + "+---+---+", + "| a | a |", + "| d | b |", + "| a | c |", + "| d | d |", + "| b | e |", + "| e | f |", + "| c | g |", + "| f | h |", + "| c | i |", + "| f | j |", + "| b | k |", + "| e | l |", + "| b | m |", + "| e | n |", + "| a | o |", + "| d | p |", + "| c | q |", + "| f | r |", + "+---+---+", + ]; + assert_batches_eq!(expected, collected.as_slice()); } - + #[tokio::test] async fn test_merge_interleave() { let task_ctx = Arc::new(TaskContext::default()); diff --git a/datafusion/core/src/physical_plan/sorts/stream.rs b/datafusion/core/src/physical_plan/sorts/stream.rs index 9ef13b7eb25e..817f4b359a53 100644 --- a/datafusion/core/src/physical_plan/sorts/stream.rs +++ b/datafusion/core/src/physical_plan/sorts/stream.rs @@ -17,7 +17,7 @@ use crate::physical_plan::sorts::cursor::{FieldArray, FieldCursor, RowCursor}; use crate::physical_plan::SendableRecordBatchStream; -use crate::physical_plan::{PhysicalExpr, PhysicalSortExpr}; +use crate::physical_plan::PhysicalSortExpr; use arrow::array::Array; use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; @@ -26,7 +26,6 @@ use datafusion_common::Result; use datafusion_execution::memory_pool::MemoryReservation; use futures::stream::{Fuse, StreamExt}; use std::marker::PhantomData; -use std::sync::Arc; use std::task::{ready, Context, Poll}; /// A [`Stream`](futures::Stream) that has multiple partitions that can @@ -82,11 +81,13 @@ pub struct RowCursorStream { /// Converter to convert output of physical expressions converter: RowConverter, /// The physical expressions to sort by - column_expressions: Vec>, + expressions: Vec, /// Input streams streams: FusedStreams, /// Tracks the memory used by `converter` reservation: MemoryReservation, + /// The schema of the input streams + schema: Schema, } impl RowCursorStream { @@ -108,26 +109,46 @@ impl RowCursorStream { let converter = RowConverter::new(sort_fields)?; Ok(Self { converter, - reservation, - column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(), + expressions: expressions.to_vec(), streams: FusedStreams(streams), + reservation, + schema: schema.clone(), }) } fn convert_batch(&mut self, batch: &RecordBatch) -> Result { - let cols = self - .column_expressions + let column_expressions: Vec<_> = self.expressions.iter().map(|x| x.expr.clone()).collect(); + let cols = column_expressions .iter() .map(|expr| Ok(expr.evaluate(batch)?.into_array(batch.num_rows()))) .collect::>>()?; - let rows = self.converter.convert_columns(&cols)?; - self.reservation.try_resize(self.converter.size())?; + let sort_fields = self + .expressions + .iter() + .map(|expr| { + let data_type = expr.expr.data_type(&self.schema)?; + Ok(SortField::new_with_options(data_type, expr.options)) + }) + .collect::>>()?; - // track the memory in the newly created Rows. + let old_converter: &mut RowConverter = &mut self.converter; + let mut old_rows = old_converter.convert_columns(&cols)?; + self.reservation.try_resize(old_converter.size())?; let mut rows_reservation = self.reservation.new_empty(); - rows_reservation.try_grow(rows.size())?; - Ok(RowCursor::new(rows, rows_reservation)) + rows_reservation.try_grow(old_rows.size())?; + + println!("Old converter size: {0}", old_converter.size()); + if old_converter.size() > 50*1024*1024 { + let mut new_converter = RowConverter::new(sort_fields)?; + let new_rows = new_converter.convert_columns( + &old_converter.convert_rows(&old_rows)? + )?; + old_rows = new_rows; + println!("Swapped old converter of size: {0} with new converter of size {1}", old_converter.size(), new_converter.size()); + self.converter = new_converter; + } + Ok(RowCursor::new(old_rows, rows_reservation)) } } From 5dda1bc31f13aa51d09b0c0e04b36600474804f2 Mon Sep 17 00:00:00 2001 From: Jayjeet Chakraborty Date: Thu, 10 Aug 2023 09:15:53 -0700 Subject: [PATCH 14/14] Add rebase changes --- .../core/src/physical_plan/sorts/sort_preserving_merge.rs | 4 ++-- datafusion/core/src/physical_plan/sorts/stream.rs | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 05425c3191b6..4e97789486db 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -280,6 +280,7 @@ mod tests { use tempfile::TempDir; use futures::{FutureExt, StreamExt, stream::BoxStream}; + use crate::execution::context::SessionContext; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::expressions::col; use crate::physical_plan::memory::MemoryExec; @@ -409,8 +410,7 @@ mod tests { #[tokio::test] async fn test_dict_merge() { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let values = StringArray::from_iter_values(["a", "b", "c"]); let keys = Int32Array::from(vec![0, 0, 1, 2, 2, 1, 1, 0, 2]); diff --git a/datafusion/core/src/physical_plan/sorts/stream.rs b/datafusion/core/src/physical_plan/sorts/stream.rs index 817f4b359a53..b9d8e8ddbe80 100644 --- a/datafusion/core/src/physical_plan/sorts/stream.rs +++ b/datafusion/core/src/physical_plan/sorts/stream.rs @@ -144,6 +144,7 @@ impl RowCursorStream { let new_rows = new_converter.convert_columns( &old_converter.convert_rows(&old_rows)? )?; + rows_reservation.try_resize(new_rows.size())?; old_rows = new_rows; println!("Swapped old converter of size: {0} with new converter of size {1}", old_converter.size(), new_converter.size()); self.converter = new_converter;