Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,23 @@ pub fn project_schema(
Ok(schema)
}

/// Extracts a row at the specified index from a set of columns and stores it in the provided buffer.
pub fn extract_row_at_idx_to_buf(
columns: &[ArrayRef],
idx: usize,
buf: &mut Vec<ScalarValue>,
) -> Result<()> {
buf.clear();

let iter = columns
.iter()
.map(|arr| ScalarValue::try_from_array(arr, idx));
for v in iter.into_iter() {
buf.push(v?);
}

Ok(())
}
/// Given column vectors, returns row at `idx`.
pub fn get_row_at_idx(columns: &[ArrayRef], idx: usize) -> Result<Vec<ScalarValue>> {
columns
Expand Down
27 changes: 26 additions & 1 deletion datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use std::str;
use std::sync::Arc;

use crate::fuzz_cases::aggregation_fuzzer::{
Expand Down Expand Up @@ -88,6 +87,32 @@ async fn test_min() {
.await;
}

#[tokio::test(flavor = "multi_thread")]
async fn test_first_val() {
let mut data_gen_config: DatasetGeneratorConfig = baseline_config();

for i in 0..data_gen_config.columns.len() {
if data_gen_config.columns[i].get_max_num_distinct().is_none() {
data_gen_config.columns[i] = data_gen_config.columns[i]
.clone()
// Minimize the chance of identical values in the order by columns to make the test more stable
.with_max_num_distinct(usize::MAX);
}
}

let query_builder = QueryBuilder::new()
.with_table_name("fuzz_table")
.with_aggregate_function("first_value")
.with_aggregate_arguments(data_gen_config.all_columns())
.set_group_by_columns(data_gen_config.all_columns());

AggregationFuzzerBuilder::from(data_gen_config)
.add_query_builder(query_builder)
.build()
.run()
.await;
}

#[tokio::test(flavor = "multi_thread")]
async fn test_max() {
let data_gen_config = baseline_config();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ impl ColumnDescr {
}
}

pub fn get_max_num_distinct(&self) -> Option<usize> {
self.max_num_distinct
}

/// set the maximum number of distinct values in this column
///
/// If `None`, the number of distinct values is randomly selected between 1
Expand Down
66 changes: 62 additions & 4 deletions datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashSet;
use std::sync::Arc;
use std::{collections::HashSet, str::FromStr};

use arrow::array::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use datafusion_common::{DataFusionError, Result};
use datafusion_common_runtime::JoinSet;
use rand::seq::SliceRandom;
use rand::{thread_rng, Rng};

use crate::fuzz_cases::aggregation_fuzzer::{
Expand Down Expand Up @@ -452,7 +453,11 @@ impl QueryBuilder {
pub fn generate_query(&self) -> String {
let group_by = self.random_group_by();
let mut query = String::from("SELECT ");
query.push_str(&self.random_aggregate_functions().join(", "));
query.push_str(&group_by.join(", "));
if !group_by.is_empty() {
query.push_str(", ");
}
query.push_str(&self.random_aggregate_functions(&group_by).join(", "));
query.push_str(" FROM ");
query.push_str(&self.table_name);
if !group_by.is_empty() {
Expand All @@ -474,22 +479,42 @@ impl QueryBuilder {
/// * `function_names` are randomly selected from [`Self::aggregate_functions`]
/// * `<DISTINCT> argument` is randomly selected from [`Self::arguments`]
/// * `alias` is a unique alias `colN` for the column (to avoid duplicate column names)
fn random_aggregate_functions(&self) -> Vec<String> {
fn random_aggregate_functions(&self, group_by_cols: &[String]) -> Vec<String> {
const MAX_NUM_FUNCTIONS: usize = 5;
let mut rng = thread_rng();
let num_aggregate_functions = rng.gen_range(1..MAX_NUM_FUNCTIONS);

let mut alias_gen = 1;

let mut aggregate_functions = vec![];

let mut order_by_black_list: HashSet<String> =
group_by_cols.iter().cloned().collect();
// remove one random col
if let Some(first) = order_by_black_list.iter().next().cloned() {
order_by_black_list.remove(&first);
}

while aggregate_functions.len() < num_aggregate_functions {
let idx = rng.gen_range(0..self.aggregate_functions.len());
let (function_name, is_distinct) = &self.aggregate_functions[idx];
let argument = self.random_argument();
let alias = format!("col{}", alias_gen);
let distinct = if *is_distinct { "DISTINCT " } else { "" };
alias_gen += 1;
let function = format!("{function_name}({distinct}{argument}) as {alias}");

let (order_by, null_opt) = if function_name.eq("first_value") {
(
self.order_by(&order_by_black_list), /* Among the order by columns, at most one group by column can be included to avoid all order by column values being identical */
self.null_opt(),
)
} else {
("".to_string(), "".to_string())
};

let function = format!(
"{function_name}({distinct}{argument}{order_by}) {null_opt} as {alias}"
);
aggregate_functions.push(function);
}
aggregate_functions
Expand All @@ -502,6 +527,39 @@ impl QueryBuilder {
self.arguments[idx].clone()
}

fn order_by(&self, black_list: &HashSet<String>) -> String {
let mut available_columns: Vec<String> = self
.arguments
.iter()
.filter(|col| !black_list.contains(*col))
.cloned()
.collect();

available_columns.shuffle(&mut thread_rng());

let num_of_order_by_col = 12;
let column_count = std::cmp::min(num_of_order_by_col, available_columns.len());

let selected_columns = &available_columns[0..column_count];

let mut rng = thread_rng();
let mut result = String::from_str(" order by ").unwrap();
for col in selected_columns {
let order = if rng.gen_bool(0.5) { "ASC" } else { "DESC" };
result.push_str(&format!("{} {},", col, order));
}

result.strip_suffix(",").unwrap().to_string()
}

fn null_opt(&self) -> String {
if thread_rng().gen_bool(0.5) {
"RESPECT NULLS".to_string()
} else {
"IGNORE NULLS".to_string()
}
}

/// Pick a random number of fields to group by (non-repeating)
///
/// Limited to 3 group by columns to ensure coverage for large groups. With
Expand Down
Loading