Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: Fix SortPreservingMerge OOM kill issue #7261

Closed
wants to merge 14 commits into from
202 changes: 199 additions & 3 deletions datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,19 @@ impl ExecutionPlan for SortPreservingMergeExec {
#[cfg(test)]
mod tests {
use std::iter::FromIterator;
use arrow_array::UInt32Array;
use rand::Rng;
use uuid::Uuid;

use arrow::array::ArrayRef;
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::datatypes::{DataType, Field, Schema, Int32Type};
use arrow::record_batch::RecordBatch;
use datafusion_execution::config::SessionConfig;
use futures::{FutureExt, StreamExt};
use tempfile::TempDir;
use futures::{FutureExt, StreamExt, stream::BoxStream};

use crate::execution::context::SessionContext;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::expressions::col;
use crate::physical_plan::memory::MemoryExec;
Expand All @@ -287,10 +291,202 @@ mod tests {
use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
use crate::test::{self, assert_is_pending};
use crate::{assert_batches_eq, test_util};
use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray};
use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray, DictionaryArray};

use crate::physical_plan::streaming::PartitionStream;
use crate::physical_plan::stream::RecordBatchStreamAdapter;
use crate::datasource::{streaming::StreamingTable, TableProvider};

use super::*;

fn make_infinite_sorted_stream(col_b_init: &u32) -> BoxStream<'static, RecordBatch> {
let col_b_init_clone = col_b_init.clone();
futures::stream::unfold((0, col_b_init_clone), move |(mut counter, mut col_b_ascii)| async move {
// stop the stream at 20 batch now.
// Need to figure out how all the columns in the batches are sorted.
if counter >= 12000 {
return None;
}

if counter % 5 == 0 {
col_b_ascii = col_b_ascii + 2;
}

counter = counter + 1;

// building col `a`
let mut values_vector: Vec<String> = Vec::new();
for _i in 1..=8192 {
// values_vector.push(rand::thread_rng().gen_range(1..=1000));
values_vector.push(String::from(Uuid::new_v4().to_string()));
}
let values = StringArray::from(values_vector);

let mut keys_vector: Vec<i32> = Vec::new();
for _i in 1..=8192 {
keys_vector.push(rand::thread_rng().gen_range(0..8192));
}
let keys = Int32Array::from(keys_vector);
let col_a: ArrayRef = Arc::new(DictionaryArray::<Int32Type>::try_new(keys, Arc::new(values)).unwrap());

// building col `b`
let mut values: Vec<u32> = Vec::new();
for _i in 1..=8192 {
// let ascii_value = rand::thread_rng().gen_range(97..=110);
// values.push(String::from(from_u32(col_b_ascii).unwrap()));
values.push(col_b_ascii);
// values.sort();
}
let col_b: ArrayRef = Arc::new(UInt32Array::from(values));

// build a record batch out of col `a` and col `b`
let batch: RecordBatch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap();
Some((batch, (counter, col_b_ascii)))
}).boxed()
}

struct InfiniteStream {
schema: SchemaRef,
col_b_init: u32
}

impl PartitionStream for InfiniteStream {
fn schema(&self) -> &SchemaRef {
&self.schema
}

fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
// We create an iterator from the record batches and map them into Ok values,
// converting the iterator into a futures::stream::Stream
Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
make_infinite_sorted_stream(&self.col_b_init).map(Ok)
))
}
}

#[tokio::test]
async fn test_dict_merge_infinite() {
let session_ctx = SessionContext::new();
let task_ctx: Arc<TaskContext> = session_ctx.task_ctx();

let schema = SchemaRef::new(Schema::new(vec![
Field::new("a", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), false),
Field::new("b", DataType::UInt32, false),
]));

let stream_1 = Arc::new(InfiniteStream {
schema: schema.clone(), col_b_init: 1
});

let stream_2 = Arc::new(InfiniteStream {
schema: schema.clone(), col_b_init: 2
});

println!("SortPreservingMergeExec result: ");

let sort = vec![
PhysicalSortExpr {
expr: col("b", &schema).unwrap(),
options: Default::default(),
},
PhysicalSortExpr {
expr: col("a", &schema).unwrap(),
options: Default::default(),
},
];

let provider = StreamingTable::try_new(schema, vec![stream_1, stream_2]).unwrap();

let plan = provider.scan(&session_ctx.state(), None, &[], None).await.unwrap();
let exec = Arc::new(SortPreservingMergeExec::new(sort, plan));
let mut stream = exec.execute(0, task_ctx).unwrap();
while let Some(batch) = stream.next().await {
println!("{}", arrow::util::pretty::pretty_format_batches(&[batch.unwrap().clone()])
.unwrap()
.to_string());
}
}

#[tokio::test]
async fn test_dict_merge() {
let task_ctx = Arc::new(TaskContext::default());

let values = StringArray::from_iter_values(["a", "b", "c"]);
let keys = Int32Array::from(vec![0, 0, 1, 2, 2, 1, 1, 0, 2]);
let a: ArrayRef = Arc::new(DictionaryArray::<Int32Type>::try_new(keys, Arc::new(values)).unwrap());
let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
Some("a"),
Some("c"),
Some("e"),
Some("g"),
Some("i"),
Some("k"),
Some("m"),
Some("o"),
Some("q"),
]));
let batch_1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap();

let values = StringArray::from_iter_values(["d", "e", "f"]);
let keys = Int32Array::from(vec![0, 0, 1, 2, 2, 1, 1, 0, 2]);
let a: ArrayRef = Arc::new(DictionaryArray::<Int32Type>::try_new(keys, Arc::new(values)).unwrap());
let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
Some("b"),
Some("d"),
Some("f"),
Some("h"),
Some("j"),
Some("l"),
Some("n"),
Some("p"),
Some("r"),
]));
let batch_2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap();

let schema = batch_1.schema();
let sort = vec![
PhysicalSortExpr {
expr: col("b", &schema).unwrap(),
options: Default::default(),
}
];
let exec = MemoryExec::try_new(&[vec![batch_1], vec![batch_2]], schema, None).unwrap();
let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec)));
let collected = collect(merge, task_ctx).await.unwrap();
// collected.iter().for_each(|batch| {
// arrow::util::pretty::pretty_format_batches(&[batch.clone()])
// .unwrap()
// .to_string();
// });

let expected = vec![
"+---+---+",
"| a | b |",
"+---+---+",
"| a | a |",
"| d | b |",
"| a | c |",
"| d | d |",
"| b | e |",
"| e | f |",
"| c | g |",
"| f | h |",
"| c | i |",
"| f | j |",
"| b | k |",
"| e | l |",
"| b | m |",
"| e | n |",
"| a | o |",
"| d | p |",
"| c | q |",
"| f | r |",
"+---+---+",
];
assert_batches_eq!(expected, collected.as_slice());
}

#[tokio::test]
async fn test_merge_interleave() {
let task_ctx = Arc::new(TaskContext::default());
Expand Down
46 changes: 34 additions & 12 deletions datafusion/core/src/physical_plan/sorts/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::physical_plan::sorts::cursor::{FieldArray, FieldCursor, RowCursor};
use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{PhysicalExpr, PhysicalSortExpr};
use crate::physical_plan::PhysicalSortExpr;
use arrow::array::Array;
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
Expand All @@ -26,7 +26,6 @@ use datafusion_common::Result;
use datafusion_execution::memory_pool::MemoryReservation;
use futures::stream::{Fuse, StreamExt};
use std::marker::PhantomData;
use std::sync::Arc;
use std::task::{ready, Context, Poll};

/// A [`Stream`](futures::Stream) that has multiple partitions that can
Expand Down Expand Up @@ -82,11 +81,13 @@ pub struct RowCursorStream {
/// Converter to convert output of physical expressions
converter: RowConverter,
/// The physical expressions to sort by
column_expressions: Vec<Arc<dyn PhysicalExpr>>,
expressions: Vec<PhysicalSortExpr>,
/// Input streams
streams: FusedStreams,
/// Tracks the memory used by `converter`
reservation: MemoryReservation,
/// The schema of the input streams
schema: Schema,
}

impl RowCursorStream {
Expand All @@ -108,26 +109,47 @@ impl RowCursorStream {
let converter = RowConverter::new(sort_fields)?;
Ok(Self {
converter,
reservation,
column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(),
expressions: expressions.to_vec(),
streams: FusedStreams(streams),
reservation,
schema: schema.clone(),
})
}

fn convert_batch(&mut self, batch: &RecordBatch) -> Result<RowCursor> {
let cols = self
.column_expressions
let column_expressions: Vec<_> = self.expressions.iter().map(|x| x.expr.clone()).collect();
let cols = column_expressions
.iter()
.map(|expr| Ok(expr.evaluate(batch)?.into_array(batch.num_rows())))
.collect::<Result<Vec<_>>>()?;

let rows = self.converter.convert_columns(&cols)?;
self.reservation.try_resize(self.converter.size())?;
let sort_fields = self
.expressions
.iter()
.map(|expr| {
let data_type = expr.expr.data_type(&self.schema)?;
Ok(SortField::new_with_options(data_type, expr.options))
})
.collect::<Result<Vec<_>>>()?;

// track the memory in the newly created Rows.
let old_converter: &mut RowConverter = &mut self.converter;
let mut old_rows = old_converter.convert_columns(&cols)?;
self.reservation.try_resize(old_converter.size())?;
let mut rows_reservation = self.reservation.new_empty();
rows_reservation.try_grow(rows.size())?;
Ok(RowCursor::new(rows, rows_reservation))
rows_reservation.try_grow(old_rows.size())?;

println!("Old converter size: {0}", old_converter.size());
if old_converter.size() > 50*1024*1024 {
let mut new_converter = RowConverter::new(sort_fields)?;
let new_rows = new_converter.convert_columns(
&old_converter.convert_rows(&old_rows)?
)?;
rows_reservation.try_resize(new_rows.size())?;
old_rows = new_rows;
println!("Swapped old converter of size: {0} with new converter of size {1}", old_converter.size(), new_converter.size());
self.converter = new_converter;
}
Ok(RowCursor::new(old_rows, rows_reservation))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you will also have to arrange to rewrite all the outstanding existing Rows in other streams too somehow

}
}

Expand Down