From 5b44a8d2556c406f99acafaac3ab28983e86f9ce Mon Sep 17 00:00:00 2001 From: Jan Schutte <4732389+SchutteJan@users.noreply.github.com> Date: Tue, 9 Jul 2024 18:01:01 +0200 Subject: [PATCH 1/7] Refactor to move parse_url_opts into own storage module --- kernel/src/engine/default/mod.rs | 4 +++- kernel/src/engine/default/storage.rs | 13 +++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) create mode 100644 kernel/src/engine/default/storage.rs diff --git a/kernel/src/engine/default/mod.rs b/kernel/src/engine/default/mod.rs index a2e8a1636..5a8ef7af6 100644 --- a/kernel/src/engine/default/mod.rs +++ b/kernel/src/engine/default/mod.rs @@ -8,7 +8,8 @@ use std::sync::Arc; -use object_store::{parse_url_opts, path::Path, DynObjectStore}; +use self::storage::parse_url_opts; +use object_store::{path::Path, DynObjectStore}; use url::Url; use self::executor::TaskExecutor; @@ -25,6 +26,7 @@ pub mod file_stream; pub mod filesystem; pub mod json; pub mod parquet; +pub mod storage; #[derive(Debug)] pub struct DefaultEngine { diff --git a/kernel/src/engine/default/storage.rs b/kernel/src/engine/default/storage.rs new file mode 100644 index 000000000..4d82a3766 --- /dev/null +++ b/kernel/src/engine/default/storage.rs @@ -0,0 +1,13 @@ +use object_store::parse_url_opts as object_store_parse_url_opts; +use object_store::path::Path; +use object_store::{Error, ObjectStore}; +use url::Url; + +pub fn parse_url_opts(url: &Url, options: I) -> Result<(Box, Path), Error> +where + I: IntoIterator, + K: AsRef, + V: Into, +{ + object_store_parse_url_opts(url, options) +} From 55d052d2e1f48b26f63c4e10e6842a59edf36413 Mon Sep 17 00:00:00 2001 From: Jan Schutte <4732389+SchutteJan@users.noreply.github.com> Date: Tue, 9 Jul 2024 22:28:30 +0200 Subject: [PATCH 2/7] Add hdfs_native_object_store dependency to DefaultEngine --- Cargo.toml | 1 + kernel/Cargo.toml | 2 ++ 2 files changed, 3 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 381bf2199..8d9efd8da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,3 +31,4 @@ arrow-select = { version = "^52.0" } arrow-schema = { version = "^52.0" } parquet = { version = "^52.0", features = ["object_store"] } object_store = "^0.10.1" +hdfs-native-object-store = "0.11.0" diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index 601feb0ed..db4f7e01e 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -45,6 +45,7 @@ arrow-ord = { workspace = true, optional = true } arrow-schema = { workspace = true, optional = true } futures = { version = "0.3", optional = true } object_store = { workspace = true, optional = true } +hdfs-native-object-store = {workspace = true, optional = true} # Used in default and sync engine parquet = { workspace = true, optional = true } # Used for fetching direct urls (like pre-signed urls) @@ -76,6 +77,7 @@ default-engine = [ "parquet/object_store", "reqwest", "tokio", + "hdfs-native-object-store" ] developer-visibility = [] From 54364ddc63dc66b9561d6d64d6a424ef5566a181 Mon Sep 17 00:00:00 2001 From: Jan Schutte <4732389+SchutteJan@users.noreply.github.com> Date: Tue, 9 Jul 2024 22:32:43 +0200 Subject: [PATCH 3/7] Use HdfsObjectStore if scheme is equal to hdfs/viewfs --- kernel/src/engine/default/storage.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/kernel/src/engine/default/storage.rs b/kernel/src/engine/default/storage.rs index 4d82a3766..2e19be09a 100644 --- a/kernel/src/engine/default/storage.rs +++ b/kernel/src/engine/default/storage.rs @@ -1,3 +1,4 @@ +use hdfs_native_object_store::HdfsObjectStore; use object_store::parse_url_opts as object_store_parse_url_opts; use object_store::path::Path; use object_store::{Error, ObjectStore}; @@ -9,5 +10,16 @@ where K: AsRef, V: Into, { + let scheme = url.scheme(); + if scheme == "hdfs" || scheme == "viewfs" { + let options_map = options + .into_iter() + .map(|(k, v)| (k.as_ref().to_string(), v.into())) + .collect(); + let store = HdfsObjectStore::with_config(url.as_str(), options_map)?; + let path = Path::parse(url.path())?; + return Ok((Box::new(store), path)); + } + object_store_parse_url_opts(url, options) } From 3b718f555b0f3023f4b85d812ee6c90c6ae527a0 Mon Sep 17 00:00:00 2001 From: Jan Schutte <4732389+SchutteJan@users.noreply.github.com> Date: Fri, 12 Jul 2024 14:28:02 +0200 Subject: [PATCH 4/7] Move hdfs-native-object-store dependency to cloud feature --- kernel/Cargo.toml | 2 +- kernel/src/engine/default/storage.rs | 22 +++++++++++++--------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index db4f7e01e..91692879c 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -62,6 +62,7 @@ cloud = [ "object_store/azure", "object_store/gcp", "object_store/http", + "hdfs-native-object-store", ] default = ["sync-engine"] default-engine = [ @@ -77,7 +78,6 @@ default-engine = [ "parquet/object_store", "reqwest", "tokio", - "hdfs-native-object-store" ] developer-visibility = [] diff --git a/kernel/src/engine/default/storage.rs b/kernel/src/engine/default/storage.rs index 2e19be09a..302bc1fa0 100644 --- a/kernel/src/engine/default/storage.rs +++ b/kernel/src/engine/default/storage.rs @@ -1,3 +1,4 @@ +#[cfg(feature = "cloud")] use hdfs_native_object_store::HdfsObjectStore; use object_store::parse_url_opts as object_store_parse_url_opts; use object_store::path::Path; @@ -10,15 +11,18 @@ where K: AsRef, V: Into, { - let scheme = url.scheme(); - if scheme == "hdfs" || scheme == "viewfs" { - let options_map = options - .into_iter() - .map(|(k, v)| (k.as_ref().to_string(), v.into())) - .collect(); - let store = HdfsObjectStore::with_config(url.as_str(), options_map)?; - let path = Path::parse(url.path())?; - return Ok((Box::new(store), path)); + #[cfg(feature = "cloud")] + { + let scheme = url.scheme(); + if scheme == "hdfs" || scheme == "viewfs" { + let options_map = options + .into_iter() + .map(|(k, v)| (k.as_ref().to_string(), v.into())) + .collect(); + let store = HdfsObjectStore::with_config(url.as_str(), options_map)?; + let path = Path::parse(url.path())?; + return Ok((Box::new(store), path)); + } } object_store_parse_url_opts(url, options) From 1f57962207ae6256e7189a9ec5aa13be33d734f9 Mon Sep 17 00:00:00 2001 From: Jan Schutte <4732389+SchutteJan@users.noreply.github.com> Date: Fri, 12 Jul 2024 16:47:09 +0200 Subject: [PATCH 5/7] Add hdfs integration test --- Cargo.toml | 2 ++ kernel/Cargo.toml | 9 +++++ kernel/tests/hdfs.rs | 80 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 91 insertions(+) create mode 100644 kernel/tests/hdfs.rs diff --git a/Cargo.toml b/Cargo.toml index 8d9efd8da..3ad11838f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,3 +32,5 @@ arrow-schema = { version = "^52.0" } parquet = { version = "^52.0", features = ["object_store"] } object_store = "^0.10.1" hdfs-native-object-store = "0.11.0" +hdfs-native = "0.10.0" +walkdir = "2.5.0" \ No newline at end of file diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index 91692879c..2f230ed18 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -54,6 +54,10 @@ reqwest = { version = "^0.12.0", optional = true } # optionally used with default engine (though not required) tokio = { version = "1", optional = true, features = ["rt-multi-thread"] } +# Used in integration tests +hdfs-native = {workspace = true, optional = true} +walkdir = {workspace = true, optional = true} + [features] arrow-conversion = ["arrow-schema"] arrow-expression = ["arrow-arith", "arrow-array", "arrow-ord", "arrow-schema"] @@ -89,6 +93,11 @@ sync-engine = [ "arrow-select", "parquet", ] +integration-test = [ + "hdfs-native-object-store/integration-test", + "hdfs-native", + "walkdir", +] [build-dependencies] rustc_version = "0.4.0" diff --git a/kernel/tests/hdfs.rs b/kernel/tests/hdfs.rs new file mode 100644 index 000000000..ee838e548 --- /dev/null +++ b/kernel/tests/hdfs.rs @@ -0,0 +1,80 @@ +// Hdfs integration tests +// +// In order to set up the MiniDFS cluster you need to have Java, Maven, Hadoop binaries and Kerberos +// tools available and on your path. Any Java version between 8 and 17 should work. + +#![cfg(feature = "integration-test")] +#![cfg(feature = "cloud")] + +use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; +use delta_kernel::engine::default::DefaultEngine; +use delta_kernel::Table; +use hdfs_native::{Client, WriteOptions}; +use hdfs_native_object_store::minidfs::MiniDfs; +use std::collections::HashSet; +use std::fs; +use std::path::Path; +use std::sync::Arc; +extern crate walkdir; +use walkdir::WalkDir; + +async fn write_local_path_to_hdfs( + local_path: &Path, + remote_path: &Path, + client: &Client, +) -> Result<(), Box> { + for entry in WalkDir::new(local_path) { + let entry = entry?; + let path = entry.path(); + + let relative_path = path.strip_prefix(local_path)?; + let destination = remote_path.join(relative_path); + + if path.is_file() { + let bytes = fs::read(path)?; + let mut writer = client + .create( + destination.as_path().to_str().unwrap(), + WriteOptions::default(), + ) + .await?; + writer.write(bytes.into()).await?; + writer.close().await?; + } else { + client + .mkdirs(destination.as_path().to_str().unwrap(), 0o755, true) + .await?; + } + } + + Ok(()) +} + +#[tokio::test] +async fn read_table_version_hdfs() -> Result<(), Box> { + let minidfs = MiniDfs::with_features(&HashSet::new()); + let hdfs_client = Client::default(); + + // Copy table to MiniDFS + write_local_path_to_hdfs( + "./tests/data/app-txn-checkpoint".as_ref(), + "/my-delta-table".as_ref(), + &hdfs_client, + ) + .await?; + + let url_str = format!("{}/my-delta-table", minidfs.url); + let url = url::Url::parse(&url_str).unwrap(); + + let engine = DefaultEngine::try_new( + &url, + std::iter::empty::<(&str, &str)>(), + Arc::new(TokioBackgroundExecutor::new()), + )?; + + let table = Table::new(url); + let snapshot = table.snapshot(&engine, None)?; + assert_eq!(snapshot.version(), 1); + + Ok(()) +} From a60f5a223954648ab4d913bed9b3159adf84c22e Mon Sep 17 00:00:00 2001 From: Jan Schutte <4732389+SchutteJan@users.noreply.github.com> Date: Mon, 15 Jul 2024 13:40:46 +0200 Subject: [PATCH 6/7] Split hdfs code path in parse_url_opts to separate function --- kernel/src/engine/default/storage.rs | 37 +++++++++++++++++----------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/kernel/src/engine/default/storage.rs b/kernel/src/engine/default/storage.rs index 302bc1fa0..6c2d5746d 100644 --- a/kernel/src/engine/default/storage.rs +++ b/kernel/src/engine/default/storage.rs @@ -1,6 +1,6 @@ #[cfg(feature = "cloud")] use hdfs_native_object_store::HdfsObjectStore; -use object_store::parse_url_opts as object_store_parse_url_opts; +use object_store::parse_url_opts as parse_url_opts_object_store; use object_store::path::Path; use object_store::{Error, ObjectStore}; use url::Url; @@ -11,19 +11,28 @@ where K: AsRef, V: Into, { - #[cfg(feature = "cloud")] - { - let scheme = url.scheme(); - if scheme == "hdfs" || scheme == "viewfs" { - let options_map = options - .into_iter() - .map(|(k, v)| (k.as_ref().to_string(), v.into())) - .collect(); - let store = HdfsObjectStore::with_config(url.as_str(), options_map)?; - let path = Path::parse(url.path())?; - return Ok((Box::new(store), path)); - } + match url.scheme() { + #[cfg(feature = "cloud")] + "hdfs" | "viewfs" => parse_url_opts_hdfs_native(url, options), + _ => parse_url_opts_object_store(url, options), } +} - object_store_parse_url_opts(url, options) +#[cfg(feature = "cloud")] +pub fn parse_url_opts_hdfs_native( + url: &Url, + options: I, +) -> Result<(Box, Path), Error> +where + I: IntoIterator, + K: AsRef, + V: Into, +{ + let options_map = options + .into_iter() + .map(|(k, v)| (k.as_ref().to_string(), v.into())) + .collect(); + let store = HdfsObjectStore::with_config(url.as_str(), options_map)?; + let path = Path::parse(url.path())?; + return Ok((Box::new(store), path)); } From 55c3a4726a93868a8bbdca892233c88228143e98 Mon Sep 17 00:00:00 2001 From: Jan Schutte <4732389+SchutteJan@users.noreply.github.com> Date: Mon, 15 Jul 2024 13:42:30 +0200 Subject: [PATCH 7/7] Add comment on running hdfs integration test and combine feature flag macro --- kernel/tests/hdfs.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/kernel/tests/hdfs.rs b/kernel/tests/hdfs.rs index ee838e548..5c1011d07 100644 --- a/kernel/tests/hdfs.rs +++ b/kernel/tests/hdfs.rs @@ -2,9 +2,14 @@ // // In order to set up the MiniDFS cluster you need to have Java, Maven, Hadoop binaries and Kerberos // tools available and on your path. Any Java version between 8 and 17 should work. - -#![cfg(feature = "integration-test")] -#![cfg(feature = "cloud")] +// +// Run these integration tests with: +// cargo test --features integration-test,cloud --test hdfs +#![cfg(all( + feature = "integration-test", + feature = "cloud", + not(target_os = "windows") +))] use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine;