Skip to content

Commit

Permalink
Port tests in parquet.rs to sqllogictest (#8560)
Browse files Browse the repository at this point in the history
* setup parquet.slt and port parquet_query test to it

* port parquet_with_sort_order_specified, but missing files

* port fixed_size_binary_columns test

* port window_fn_timestamp_tz test

* port parquet_single_nan_schema test

* port parquet_query_with_max_min test

* use COPY to create tables in parquet.slt to test partitioning over multi-file data

* remove unneeded optimizer setting; check type of timestamp column
  • Loading branch information
hiltontj authored Dec 19, 2023
1 parent 9bc61b3 commit f041e73
Show file tree
Hide file tree
Showing 2 changed files with 304 additions and 292 deletions.
292 changes: 0 additions & 292 deletions datafusion/core/tests/sql/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,207 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use std::{fs, path::Path};

use ::parquet::arrow::ArrowWriter;
use datafusion::{datasource::listing::ListingOptions, execution::options::ReadOptions};
use datafusion_common::cast::{as_list_array, as_primitive_array, as_string_array};
use tempfile::TempDir;

use super::*;

#[tokio::test]
async fn parquet_query() {
let ctx = SessionContext::new();
register_alltypes_parquet(&ctx).await;
// NOTE that string_col is actually a binary column and does not have the UTF8 logical type
// so we need an explicit cast
let sql = "SELECT id, CAST(string_col AS varchar) FROM alltypes_plain";
let actual = execute_to_batches(&ctx, sql).await;
let expected = [
"+----+---------------------------+",
"| id | alltypes_plain.string_col |",
"+----+---------------------------+",
"| 4 | 0 |",
"| 5 | 1 |",
"| 6 | 0 |",
"| 7 | 1 |",
"| 2 | 0 |",
"| 3 | 1 |",
"| 0 | 0 |",
"| 1 | 1 |",
"+----+---------------------------+",
];

assert_batches_eq!(expected, &actual);
}

#[tokio::test]
/// Test that if sort order is specified in ListingOptions, the sort
/// expressions make it all the way down to the ParquetExec
async fn parquet_with_sort_order_specified() {
let parquet_read_options = ParquetReadOptions::default();
let session_config = SessionConfig::new().with_target_partitions(2);

// The sort order is not specified
let options_no_sort = parquet_read_options.to_listing_options(&session_config);

// The sort order is specified (not actually correct in this case)
let file_sort_order = [col("string_col"), col("int_col")]
.into_iter()
.map(|e| {
let ascending = true;
let nulls_first = false;
e.sort(ascending, nulls_first)
})
.collect::<Vec<_>>();

let options_sort = parquet_read_options
.to_listing_options(&session_config)
.with_file_sort_order(vec![file_sort_order]);

// This string appears in ParquetExec if the output ordering is
// specified
let expected_output_ordering =
"output_ordering=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST]";

// when sort not specified, should not appear in the explain plan
let num_files = 1;
assert_not_contains!(
run_query_with_options(options_no_sort, num_files).await,
expected_output_ordering
);

// when sort IS specified, SHOULD appear in the explain plan
let num_files = 1;
assert_contains!(
run_query_with_options(options_sort.clone(), num_files).await,
expected_output_ordering
);

// when sort IS specified, but there are too many files (greater
// than the number of partitions) sort should not appear
let num_files = 3;
assert_not_contains!(
run_query_with_options(options_sort, num_files).await,
expected_output_ordering
);
}

/// Runs a limit query against a parquet file that was registered from
/// options on num_files copies of all_types_plain.parquet
async fn run_query_with_options(options: ListingOptions, num_files: usize) -> String {
let ctx = SessionContext::new();

let testdata = datafusion::test_util::parquet_test_data();
let file_path = format!("{testdata}/alltypes_plain.parquet");

// Create a directory of parquet files with names
// 0.parquet
// 1.parquet
let tmpdir = TempDir::new().unwrap();
for i in 0..num_files {
let target_file = tmpdir.path().join(format!("{i}.parquet"));
println!("Copying {file_path} to {target_file:?}");
std::fs::copy(&file_path, target_file).unwrap();
}

let provided_schema = None;
let sql_definition = None;
ctx.register_listing_table(
"t",
tmpdir.path().to_string_lossy(),
options.clone(),
provided_schema,
sql_definition,
)
.await
.unwrap();

let batches = ctx.sql("explain select int_col, string_col from t order by string_col, int_col limit 10")
.await
.expect("planing worked")
.collect()
.await
.expect("execution worked");

arrow::util::pretty::pretty_format_batches(&batches)
.unwrap()
.to_string()
}

#[tokio::test]
async fn fixed_size_binary_columns() {
let ctx = SessionContext::new();
ctx.register_parquet(
"t0",
"tests/data/test_binary.parquet",
ParquetReadOptions::default(),
)
.await
.unwrap();
let sql = "SELECT ids FROM t0 ORDER BY ids";
let dataframe = ctx.sql(sql).await.unwrap();
let results = dataframe.collect().await.unwrap();
for batch in results {
assert_eq!(466, batch.num_rows());
assert_eq!(1, batch.num_columns());
}
}

#[tokio::test]
async fn window_fn_timestamp_tz() {
let ctx = SessionContext::new();
ctx.register_parquet(
"t0",
"tests/data/timestamp_with_tz.parquet",
ParquetReadOptions::default(),
)
.await
.unwrap();

let sql = "SELECT count, LAG(timestamp, 1) OVER (ORDER BY timestamp) FROM t0";
let dataframe = ctx.sql(sql).await.unwrap();
let results = dataframe.collect().await.unwrap();

let mut num_rows = 0;
for batch in results {
num_rows += batch.num_rows();
assert_eq!(2, batch.num_columns());

let ty = batch.column(0).data_type().clone();
assert_eq!(DataType::Int64, ty);

let ty = batch.column(1).data_type().clone();
assert_eq!(
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
ty
);
}

assert_eq!(131072, num_rows);
}

#[tokio::test]
async fn parquet_single_nan_schema() {
let ctx = SessionContext::new();
let testdata = datafusion::test_util::parquet_test_data();
ctx.register_parquet(
"single_nan",
&format!("{testdata}/single_nan.parquet"),
ParquetReadOptions::default(),
)
.await
.unwrap();
let sql = "SELECT mycol FROM single_nan";
let dataframe = ctx.sql(sql).await.unwrap();
let results = dataframe.collect().await.unwrap();
for batch in results {
assert_eq!(1, batch.num_rows());
assert_eq!(1, batch.num_columns());
}
}

#[tokio::test]
#[ignore = "Test ignored, will be enabled as part of the nested Parquet reader"]
async fn parquet_list_columns() {
Expand Down Expand Up @@ -286,98 +89,3 @@ async fn parquet_list_columns() {
assert_eq!(result.value(2), "hij");
assert_eq!(result.value(3), "xyz");
}

#[tokio::test]
async fn parquet_query_with_max_min() {
let tmp_dir = TempDir::new().unwrap();
let table_dir = tmp_dir.path().join("parquet_test");
let table_path = Path::new(&table_dir);

let fields = vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Utf8, true),
Field::new("c3", DataType::Int64, true),
Field::new("c4", DataType::Date32, true),
];

let schema = Arc::new(Schema::new(fields.clone()));

if let Ok(()) = fs::create_dir(table_path) {
let filename = "foo.parquet";
let path = table_path.join(filename);
let file = fs::File::create(path).unwrap();
let mut writer =
ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), None)
.unwrap();

// create mock record batch
let c1s = Arc::new(Int32Array::from(vec![1, 2, 3]));
let c2s = Arc::new(StringArray::from(vec!["aaa", "bbb", "ccc"]));
let c3s = Arc::new(Int64Array::from(vec![100, 200, 300]));
let c4s = Arc::new(Date32Array::from(vec![Some(1), Some(2), Some(3)]));
let rec_batch =
RecordBatch::try_new(schema.clone(), vec![c1s, c2s, c3s, c4s]).unwrap();

writer.write(&rec_batch).unwrap();
writer.close().unwrap();
}

// query parquet
let ctx = SessionContext::new();

ctx.register_parquet(
"foo",
&format!("{}/foo.parquet", table_dir.to_str().unwrap()),
ParquetReadOptions::default(),
)
.await
.unwrap();

let sql = "SELECT max(c1) FROM foo";
let actual = execute_to_batches(&ctx, sql).await;
let expected = [
"+-------------+",
"| MAX(foo.c1) |",
"+-------------+",
"| 3 |",
"+-------------+",
];

assert_batches_eq!(expected, &actual);

let sql = "SELECT min(c2) FROM foo";
let actual = execute_to_batches(&ctx, sql).await;
let expected = [
"+-------------+",
"| MIN(foo.c2) |",
"+-------------+",
"| aaa |",
"+-------------+",
];

assert_batches_eq!(expected, &actual);

let sql = "SELECT max(c3) FROM foo";
let actual = execute_to_batches(&ctx, sql).await;
let expected = [
"+-------------+",
"| MAX(foo.c3) |",
"+-------------+",
"| 300 |",
"+-------------+",
];

assert_batches_eq!(expected, &actual);

let sql = "SELECT min(c4) FROM foo";
let actual = execute_to_batches(&ctx, sql).await;
let expected = [
"+-------------+",
"| MIN(foo.c4) |",
"+-------------+",
"| 1970-01-02 |",
"+-------------+",
];

assert_batches_eq!(expected, &actual);
}
Loading

0 comments on commit f041e73

Please sign in to comment.