Skip to content

Commit

Permalink
Fuzz test for spillable sort (#1706)
Browse files Browse the repository at this point in the history
  • Loading branch information
yjshen authored Jan 30, 2022
1 parent 75c7578 commit a7f0156
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 52 deletions.
1 change: 1 addition & 0 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ tempfile = "3"
[dev-dependencies]
criterion = "0.3"
doc-comment = "0.3"
fuzz-utils = { path = "fuzz-utils" }

[[bench]]
name = "aggregate_query_sql"
Expand Down
28 changes: 28 additions & 0 deletions datafusion/fuzz-utils/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "fuzz-utils"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { version = "8.0.0", features = ["prettyprint"] }
rand = "0.8"
env_logger = "0.9.0"
73 changes: 73 additions & 0 deletions datafusion/fuzz-utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Common utils for fuzz tests
use arrow::{array::Int32Array, record_batch::RecordBatch};
use rand::prelude::StdRng;
use rand::Rng;

pub use env_logger;

/// Extracts the i32 values from the set of batches and returns them as a single Vec
pub fn batches_to_vec(batches: &[RecordBatch]) -> Vec<Option<i32>> {
batches
.iter()
.map(|batch| {
assert_eq!(batch.num_columns(), 1);
batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
})
.flatten()
.collect()
}

/// extract values from batches and sort them
pub fn partitions_to_sorted_vec(partitions: &[Vec<RecordBatch>]) -> Vec<Option<i32>> {
let mut values: Vec<_> = partitions
.iter()
.map(|batches| batches_to_vec(batches).into_iter())
.flatten()
.collect();

values.sort_unstable();
values
}

/// Adds a random number of empty record batches into the stream
pub fn add_empty_batches(
batches: Vec<RecordBatch>,
rng: &mut StdRng,
) -> Vec<RecordBatch> {
let schema = batches[0].schema();

batches
.into_iter()
.map(|batch| {
// insert 0, or 1 empty batches before and after the current batch
let empty_batch = RecordBatch::new_empty(schema.clone());
std::iter::repeat(empty_batch.clone())
.take(rng.gen_range(0..2))
.chain(std::iter::once(batch))
.chain(std::iter::repeat(empty_batch).take(rng.gen_range(0..2)))
})
.flatten()
.collect()
}
3 changes: 2 additions & 1 deletion datafusion/src/execution/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,8 @@ const GB: u64 = 1 << 30;
const MB: u64 = 1 << 20;
const KB: u64 = 1 << 10;

fn human_readable_size(size: usize) -> String {
/// Present size in human readable form
pub fn human_readable_size(size: usize) -> String {
let size = size as u64;
let (value, unit) = {
if size >= 2 * TB {
Expand Down
4 changes: 3 additions & 1 deletion datafusion/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ pub mod options;
pub mod runtime_env;

pub use disk_manager::DiskManager;
pub use memory_manager::{MemoryConsumer, MemoryConsumerId, MemoryManager};
pub use memory_manager::{
human_readable_size, MemoryConsumer, MemoryConsumerId, MemoryManager,
};
6 changes: 4 additions & 2 deletions datafusion/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

use crate::error::{DataFusionError, Result};
use crate::execution::memory_manager::{
ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
human_readable_size, ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
};
use crate::execution::runtime_env::RuntimeEnv;
use crate::physical_plan::common::{batch_byte_size, IPCWriter, SizedRecordBatchStream};
Expand Down Expand Up @@ -348,7 +348,9 @@ fn write_sorted(
writer.finish()?;
debug!(
"Spilled {} batches of total {} rows to disk, memory released {}",
writer.num_batches, writer.num_rows, writer.num_bytes
writer.num_batches,
writer.num_rows,
human_readable_size(writer.num_bytes as usize),
);
Ok(())
}
Expand Down
50 changes: 2 additions & 48 deletions datafusion/tests/merge_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Fuzz Test for various corner cases merging streams of RecordBatchs
//! Fuzz Test for various corner cases merging streams of RecordBatches
use std::sync::Arc;

use arrow::{
Expand All @@ -32,6 +32,7 @@ use datafusion::{
sorts::sort_preserving_merge::SortPreservingMergeExec,
},
};
use fuzz_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec};
use rand::{prelude::StdRng, Rng, SeedableRng};

#[tokio::test]
Expand Down Expand Up @@ -147,35 +148,6 @@ async fn run_merge_test(input: Vec<Vec<RecordBatch>>) {
}
}

/// Extracts the i32 values from the set of batches and returns them as a single Vec
fn batches_to_vec(batches: &[RecordBatch]) -> Vec<Option<i32>> {
batches
.iter()
.map(|batch| {
assert_eq!(batch.num_columns(), 1);
batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
})
.flatten()
.collect()
}

// extract values from batches and sort them
fn partitions_to_sorted_vec(partitions: &[Vec<RecordBatch>]) -> Vec<Option<i32>> {
let mut values: Vec<_> = partitions
.iter()
.map(|batches| batches_to_vec(batches).into_iter())
.flatten()
.collect();

values.sort_unstable();
values
}

/// Return the values `low..high` in order, in randomly sized
/// record batches in a field named 'x' of type `Int32`
fn make_staggered_batches(low: i32, high: i32, seed: u64) -> Vec<RecordBatch> {
Expand All @@ -199,24 +171,6 @@ fn make_staggered_batches(low: i32, high: i32, seed: u64) -> Vec<RecordBatch> {
add_empty_batches(batches, &mut rng)
}

/// Adds a random number of empty record batches into the stream
fn add_empty_batches(batches: Vec<RecordBatch>, rng: &mut StdRng) -> Vec<RecordBatch> {
let schema = batches[0].schema();

batches
.into_iter()
.map(|batch| {
// insert 0, or 1 empty batches before and after the current batch
let empty_batch = RecordBatch::new_empty(schema.clone());
std::iter::repeat(empty_batch.clone())
.take(rng.gen_range(0..2))
.chain(std::iter::once(batch))
.chain(std::iter::repeat(empty_batch).take(rng.gen_range(0..2)))
})
.flatten()
.collect()
}

fn concat(mut v1: Vec<RecordBatch>, v2: Vec<RecordBatch>) -> Vec<RecordBatch> {
v1.extend(v2);
v1
Expand Down
121 changes: 121 additions & 0 deletions datafusion/tests/order_spill_fuzz.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Fuzz Test for various corner cases sorting RecordBatches exceeds available memory and should spill

use arrow::{
array::{ArrayRef, Int32Array},
compute::SortOptions,
record_batch::RecordBatch,
};
use datafusion::execution::memory_manager::MemoryManagerConfig;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::physical_plan::expressions::{col, PhysicalSortExpr};
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{collect, ExecutionPlan};
use fuzz_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec};
use rand::prelude::StdRng;
use rand::{Rng, SeedableRng};
use std::sync::Arc;

#[tokio::test]
async fn test_sort_1k_mem() {
run_sort(1024, vec![(5, false), (2000, true), (1000000, true)]).await
}

#[tokio::test]
async fn test_sort_100k_mem() {
run_sort(102400, vec![(5, false), (2000, false), (1000000, true)]).await
}

#[tokio::test]
async fn test_sort_unlimited_mem() {
run_sort(
usize::MAX,
vec![(5, false), (2000, false), (1000000, false)],
)
.await
}

/// Sort the input using SortExec and ensure the results are correct according to `Vec::sort`
async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) {
for (size, spill) in size_spill {
let input = vec![make_staggered_batches(size)];
let first_batch = input
.iter()
.map(|p| p.iter())
.flatten()
.next()
.expect("at least one batch");
let schema = first_batch.schema();

let sort = vec![PhysicalSortExpr {
expr: col("x", &schema).unwrap(),
options: SortOptions {
descending: false,
nulls_first: true,
},
}];

let exec = MemoryExec::try_new(&input, schema, None).unwrap();
let sort = Arc::new(SortExec::try_new(sort, Arc::new(exec)).unwrap());

let runtime_config = RuntimeConfig::new().with_memory_manager(
MemoryManagerConfig::try_new_limit(pool_size, 1.0).unwrap(),
);
let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap());
let collected = collect(sort.clone(), runtime).await.unwrap();

let expected = partitions_to_sorted_vec(&input);
let actual = batches_to_vec(&collected);

if spill {
assert_ne!(sort.metrics().unwrap().spill_count().unwrap(), 0);
} else {
assert_eq!(sort.metrics().unwrap().spill_count().unwrap(), 0);
}

assert_eq!(expected, actual, "failure in @ pool_size {}", pool_size);
}
}

/// Return randomly sized record batches in a field named 'x' of type `Int32`
/// with randomized i32 content
fn make_staggered_batches(len: usize) -> Vec<RecordBatch> {
let mut rng = rand::thread_rng();
let mut input: Vec<i32> = vec![0; len];
rng.fill(&mut input[..]);
let input = Int32Array::from_iter_values(input.into_iter());

// split into several record batches
let mut remainder =
RecordBatch::try_from_iter(vec![("x", Arc::new(input) as ArrayRef)]).unwrap();

let mut batches = vec![];

// use a random number generator to pick a random sized output
let mut rng = StdRng::seed_from_u64(42);
while remainder.num_rows() > 0 {
let batch_size = rng.gen_range(0..remainder.num_rows() + 1);

batches.push(remainder.slice(0, batch_size));
remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size);
}

add_empty_batches(batches, &mut rng)
}

0 comments on commit a7f0156

Please sign in to comment.