From 8c24d3206fa2450fd95421efdb1882f93018a2e5 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 29 Nov 2023 14:59:41 +0100 Subject: [PATCH] Add integration test for the CONVERT TO DELTA statement --- tests/statements/convert.rs | 76 +++++++++++++++++++++++++++++++++++++ tests/statements/mod.rs | 3 ++ 2 files changed, 79 insertions(+) create mode 100644 tests/statements/convert.rs diff --git a/tests/statements/convert.rs b/tests/statements/convert.rs new file mode 100644 index 00000000..7b70e009 --- /dev/null +++ b/tests/statements/convert.rs @@ -0,0 +1,76 @@ +use crate::statements::*; + +#[tokio::test] +async fn test_convert_from_flat_parquet_table() -> Result<(), DataFusionError> { + let (context, maybe_test_dir) = make_context_with_pg(ObjectStoreType::Local).await; + + // Prepare a flat Parquet table + let table_uuid = Uuid::new_v4(); + let temp_dir = maybe_test_dir.expect("temporary data dir exists"); + let table_path = temp_dir.path().join(table_uuid.to_string()); + // Create the directory as otherwise the COPY will fail + create_dir(table_path.clone()).await?; + + // COPY the existing table multiple times to test converting flat table with more than one + // parquet file + context + .plan_query(&format!( + "COPY (VALUES (1, 'one'), (2, 'two')) TO '{}/file_1.parquet'", + table_path.display() + )) + .await?; + context + .plan_query(&format!( + "COPY (VALUES (3, 'three'), (4, 'four')) TO '{}/file_2.parquet'", + table_path.display() + )) + .await?; + context + .plan_query(&format!( + "COPY (VALUES (5, 'five'), (6, 'six')) TO '{}/file_3.parquet'", + table_path.display() + )) + .await?; + + // Now test the actual conversion + context + .plan_query(&format!("CONVERT '{table_uuid}' TO DELTA table_converted")) + .await?; + + // Finally test the contents of the converted table + let plan = context + .plan_query("SELECT * FROM table_converted ORDER BY column1") + .await?; + let results = context.collect(plan).await.unwrap(); + + let expected = [ + "+---------+---------+", + "| column1 | column2 |", + "+---------+---------+", + "| 1 | one |", + "| 2 | two |", + "| 3 | three |", + "| 4 | four |", + "| 5 | five |", + "| 6 | six |", + "+---------+---------+", + ]; + assert_batches_eq!(expected, &results); + + // Also check the final directory state + testutils::assert_uploaded_objects( + context + .internal_object_store + .get_log_store(table_uuid) + .object_store(), + vec![ + Path::from("_delta_log/00000000000000000000.json"), + Path::from("file_1.parquet"), + Path::from("file_2.parquet"), + Path::from("file_3.parquet"), + ], + ) + .await; + + Ok(()) +} diff --git a/tests/statements/mod.rs b/tests/statements/mod.rs index f15d0a43..04ed17d3 100644 --- a/tests/statements/mod.rs +++ b/tests/statements/mod.rs @@ -17,7 +17,9 @@ use serde_json::json; use sqlx::{any::install_default_drivers, AnyPool, Executor}; #[cfg(feature = "remote-tables")] use tempfile::{NamedTempFile, TempPath}; +use tokio::fs::create_dir; use tokio::time::sleep; +use uuid::Uuid; use rstest::rstest; use tempfile::TempDir; @@ -34,6 +36,7 @@ mod dml; mod query; // Hack because integration tests do not set cfg(test) // https://users.rust-lang.org/t/sharing-helper-function-between-unit-and-integration-tests/9941/2 +mod convert; #[allow(dead_code)] #[path = "../../src/testutils.rs"] mod testutils;