Skip to content

Commit

Permalink
Fix integration_datafusion tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler committed Dec 30, 2023
1 parent defe803 commit 1d64783
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 74 deletions.
2 changes: 1 addition & 1 deletion crates/deltalake-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ tempdir = { version = "0", optional = true }
[dev-dependencies]
criterion = "0.5"
ctor = "0"
deltalake-test = { path = "../deltalake-test" }
deltalake-test = { path = "../deltalake-test", features = ["datafusion"] }
dotenvy = "0"
hyper = { version = "0.14", features = ["server"] }
maplit = "1"
Expand Down
115 changes: 42 additions & 73 deletions crates/deltalake-core/tests/integration_datafusion.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#![cfg(all(feature = "integration_test", feature = "datafusion"))]

use arrow::array::Int64Array;
use common::datafusion::context_with_delta_table_factory;
use deltalake_core::test_utils::{IntegrationContext, StorageIntegration, TestResult, TestTables};
use deltalake_test::datafusion::*;
use deltalake_test::utils::*;
use serial_test::serial;

use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -32,13 +32,13 @@ use url::Url;

use deltalake_core::delta_datafusion::{DeltaPhysicalCodec, DeltaScan};
use deltalake_core::kernel::{DataType, MapType, PrimitiveType, StructField, StructType};
use deltalake_core::logstore::logstore_for;
use deltalake_core::operations::create::CreateBuilder;
use deltalake_core::protocol::SaveMode;
use deltalake_core::writer::{DeltaWriter, RecordBatchWriter};
use deltalake_core::{
open_table,
operations::{write::WriteBuilder, DeltaOps},
storage::config::configure_log_store,
DeltaTable, DeltaTableError,
};
use std::error::Error;
Expand All @@ -51,7 +51,9 @@ mod local {
#[tokio::test]
#[serial]
async fn test_datafusion_local() -> TestResult {
test_datafusion(StorageIntegration::Local).await
let storage = Box::new(LocalStorageIntegration::default());
let context = IntegrationContext::new(storage)?;
test_datafusion(&context).await
}

fn get_scanned_files(node: &dyn ExecutionPlan) -> HashSet<Label> {
Expand Down Expand Up @@ -124,7 +126,7 @@ mod local {
let ctx = context_with_delta_table_factory();

let mut d = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
d.push("tests/data/delta-0.8.0-partitioned");
d.push("../deltalake-test/tests/data/delta-0.8.0-partitioned");
let sql = format!(
"CREATE EXTERNAL TABLE demo STORED AS DELTATABLE LOCATION '{}'",
d.to_str().unwrap()
Expand Down Expand Up @@ -153,7 +155,7 @@ mod local {
#[tokio::test]
async fn test_datafusion_simple_query_partitioned() -> Result<()> {
let ctx = SessionContext::new();
let table = open_table("./tests/data/delta-0.8.0-partitioned")
let table = open_table("../deltalake-test/tests/data/delta-0.8.0-partitioned")
.await
.unwrap();
ctx.register_table("demo", Arc::new(table))?;
Expand Down Expand Up @@ -182,7 +184,7 @@ mod local {
let source_scan_bytes = {
let ctx = SessionContext::new();
let state = ctx.state();
let source_table = open_table("./tests/data/delta-0.8.0-date").await?;
let source_table = open_table("../deltalake-test/tests/data/delta-0.8.0-date").await?;
let source_scan = source_table.scan(&state, None, &[], None).await?;
physical_plan_to_bytes_with_extension_codec(source_scan, &DeltaPhysicalCodec {})?
};
Expand Down Expand Up @@ -219,13 +221,16 @@ mod local {
);

// Register the missing source table object store
let source_uri = source_scan
.as_any()
.downcast_ref::<DeltaScan>()
.unwrap()
.table_uri
.clone();
let source_store = configure_log_store(&source_uri, HashMap::new(), None).unwrap();
let source_uri = Url::parse(
&source_scan
.as_any()
.downcast_ref::<DeltaScan>()
.unwrap()
.table_uri
.clone(),
)
.unwrap();
let source_store = logstore_for(source_uri, HashMap::new()).unwrap();
let object_store_url = source_store.object_store_url();
let source_store_url: &Url = object_store_url.as_ref();
state
Expand Down Expand Up @@ -260,7 +265,9 @@ mod local {
#[tokio::test]
async fn test_datafusion_date_column() -> Result<()> {
let ctx = SessionContext::new();
let table = open_table("./tests/data/delta-0.8.0-date").await.unwrap();
let table = open_table("../deltalake-test/tests/data/delta-0.8.0-date")
.await
.unwrap();
ctx.register_table("dates", Arc::new(table))?;

let batches = ctx
Expand All @@ -280,7 +287,9 @@ mod local {
#[tokio::test]
async fn test_datafusion_stats() -> Result<()> {
// Validate a table that contains statisitics for all files
let table = open_table("./tests/data/delta-0.8.0").await.unwrap();
let table = open_table("../deltalake-test/tests/data/delta-0.8.0")
.await
.unwrap();
let statistics = table.state.datafusion_table_statistics()?;

assert_eq!(statistics.num_rows, Precision::Exact(4_usize),);
Expand Down Expand Up @@ -319,7 +328,9 @@ mod local {
assert_batches_sorted_eq!(&expected, &actual);

// Validate a table that does not contain column statisitics
let table = open_table("./tests/data/delta-0.2.0").await.unwrap();
let table = open_table("../deltalake-test/tests/data/delta-0.2.0")
.await
.unwrap();
let statistics = table.state.datafusion_table_statistics()?;

assert_eq!(statistics.num_rows, Precision::Absent);
Expand Down Expand Up @@ -355,7 +366,7 @@ mod local {
// In particular 'new_column' contains statistics for when it
// is introduced (10) but the commit following (11) does not contain
// statistics for this column.
let table = open_table("./tests/data/delta-1.2.1-only-struct-stats")
let table = open_table("../deltalake-test/tests/data/delta-1.2.1-only-struct-stats")
.await
.unwrap();
let schema = table.get_schema().unwrap();
Expand Down Expand Up @@ -838,7 +849,7 @@ mod local {
#[tokio::test]
async fn test_datafusion_partitioned_types() -> Result<()> {
let ctx = SessionContext::new();
let table = open_table("./tests/data/delta-2.2.0-partitioned-types")
let table = open_table("../deltalake-test/tests/data/delta-2.2.0-partitioned-types")
.await
.unwrap();
ctx.register_table("demo", Arc::new(table))?;
Expand Down Expand Up @@ -887,7 +898,7 @@ mod local {
#[tokio::test]
async fn test_datafusion_scan_timestamps() -> Result<()> {
let ctx = SessionContext::new();
let table = open_table("./tests/data/table_with_edge_timestamps")
let table = open_table("../deltalake-test/tests/data/table_with_edge_timestamps")
.await
.unwrap();
ctx.register_table("demo", Arc::new(table))?;
Expand All @@ -911,7 +922,9 @@ mod local {
#[tokio::test]
async fn test_issue_1292_datafusion_sql_projection() -> Result<()> {
let ctx = SessionContext::new();
let table = open_table("./tests/data/http_requests").await.unwrap();
let table = open_table("../deltalake-test/tests/data/http_requests")
.await
.unwrap();
ctx.register_table("http_requests", Arc::new(table))?;

let batches = ctx
Expand Down Expand Up @@ -940,7 +953,9 @@ mod local {
#[tokio::test]
async fn test_issue_1291_datafusion_sql_partitioned_data() -> Result<()> {
let ctx = SessionContext::new();
let table = open_table("./tests/data/http_requests").await.unwrap();
let table = open_table("../deltalake-test//tests/data/http_requests")
.await
.unwrap();
ctx.register_table("http_requests", Arc::new(table))?;

let batches = ctx
Expand Down Expand Up @@ -971,7 +986,9 @@ mod local {
#[tokio::test]
async fn test_issue_1374() -> Result<()> {
let ctx = SessionContext::new();
let table = open_table("./tests/data/issue_1374").await.unwrap();
let table = open_table("../deltalake-test/tests/data/issue_1374")
.await
.unwrap();
ctx.register_table("t", Arc::new(table))?;

let batches = ctx
Expand Down Expand Up @@ -1053,48 +1070,7 @@ mod local {
}
}

#[cfg(any(feature = "s3", feature = "s3-native-tls"))]
mod s3 {
use super::*;
#[tokio::test]
#[serial]
async fn test_datafusion_aws() -> TestResult {
test_datafusion(StorageIntegration::Amazon).await
}
}

#[cfg(feature = "azure")]
mod azure {
use super::*;
#[tokio::test]
#[serial]
async fn test_datafusion_azure() -> TestResult {
test_datafusion(StorageIntegration::Microsoft).await
}
}

#[cfg(feature = "gcs")]
mod gcs {
use super::*;
#[tokio::test]
#[serial]
async fn test_datafusion_gcp() -> TestResult {
test_datafusion(StorageIntegration::Google).await
}
}

#[cfg(feature = "hdfs")]
mod hdfs {
use super::*;
#[tokio::test]
#[serial]
async fn test_datafusion_hdfs() -> TestResult {
Ok(test_datafusion(StorageIntegration::Hdfs).await?)
}
}

async fn test_datafusion(storage: StorageIntegration) -> TestResult {
let context = IntegrationContext::new(storage)?;
async fn test_datafusion(context: &IntegrationContext) -> TestResult {
context.load_table(TestTables::Simple).await?;

simple_query(&context).await?;
Expand All @@ -1105,14 +1081,7 @@ async fn test_datafusion(storage: StorageIntegration) -> TestResult {
async fn simple_query(context: &IntegrationContext) -> TestResult {
let table_uri = context.uri_for_table(TestTables::Simple);

let dynamo_lock_option = "'DYNAMO_LOCK_OWNER_NAME' 's3::deltars/simple'".to_string();
let options = match context.integration {
StorageIntegration::Amazon => format!("'AWS_ALLOW_HTTP' '1', {dynamo_lock_option}"),
StorageIntegration::Microsoft => {
format!("'AZURE_STORAGE_ALLOW_HTTP' '1', {dynamo_lock_option}")
}
_ => dynamo_lock_option,
};
let options = "'DYNAMO_LOCK_OWNER_NAME' 's3::deltars/simple'".to_string();

let sql = format!(
"CREATE EXTERNAL TABLE demo \
Expand Down

0 comments on commit 1d64783

Please sign in to comment.