Skip to content

Commit

Permalink
feat(rust, python): add HDFS support via hdfs-native package (#2612)
Browse files Browse the repository at this point in the history
# Description
Add support for HDFS using
[hdfs-native](https://github.com/Kimahriman/hdfs-native), a pure* Rust
client for interacting with HDFS. Creates a new `hdfs` sub-crate, adds
it as a feature to `deltalake` meta crate, and includes it in Python
wheels by default. There is a Rust integration test that requires Hadoop
and Java to be installed, and makes use of a small Maven program I ship
under the `integration-test` feature flag to run a MiniDFS server.

*Dynamically loads `libgssapi_krb5` using `libloading` for Kerberos
support

# Related Issue(s)
<!---
For example:

- closes #106
--->
Resolves #2611 

# Documentation

<!---
Share links to useful documentation
--->
  • Loading branch information
Kimahriman authored Jun 21, 2024
1 parent a300218 commit d17ed97
Show file tree
Hide file tree
Showing 12 changed files with 223 additions and 6 deletions.
18 changes: 15 additions & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ jobs:
override: true

- name: build and lint with clippy
run: cargo clippy --features azure,datafusion,s3,gcs,glue --tests
run: cargo clippy --features azure,datafusion,s3,gcs,glue,hdfs --tests

- name: Spot-check build for native-tls features
run: cargo clippy --no-default-features --features azure,datafusion,s3-native-tls,gcs,glue --tests

- name: Check docs
run: cargo doc --features azure,datafusion,s3,gcs,glue
run: cargo doc --features azure,datafusion,s3,gcs,glue,hdfs

- name: Check no default features (except rustls)
run: cargo check --no-default-features --features rustls
Expand Down Expand Up @@ -114,12 +114,24 @@ jobs:
toolchain: stable
override: true

# Install Java and Hadoop for HDFS integration tests
- uses: actions/setup-java@v4
with:
distribution: "temurin"
java-version: "17"

- name: Download Hadoop
run: |
wget -q https://dlcdn.apache.org/hadoop/common/hadoop-3.4.0/hadoop-3.4.0.tar.gz
tar -xf hadoop-3.4.0.tar.gz -C $GITHUB_WORKSPACE
echo "$GITHUB_WORKSPACE/hadoop-3.4.0/bin" >> $GITHUB_PATH
- name: Start emulated services
run: docker-compose up -d

- name: Run tests with rustls (default)
run: |
cargo test --features integration_test,azure,s3,gcs,datafusion
cargo test --features integration_test,azure,s3,gcs,datafusion,hdfs
- name: Run tests with native-tls
run: |
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ of features outlined in the Delta [protocol][protocol] is also [tracked](#protoc
| Azure ADLS Gen2 | ![done] | ![done] | |
| Microsoft OneLake | ![done] | ![done] | |
| Google Cloud Storage | ![done] | ![done] | |
| HDFS | ![done] | ![done] | |

### Supported Operations

Expand Down
4 changes: 2 additions & 2 deletions crates/deltalake/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ rust-version.workspace = true

[package.metadata.docs.rs]
# We cannot use all_features because TLS features are mutually exclusive.
# We cannot use hdfs feature because it requires Java to be installed.
features = ["azure", "datafusion", "gcs", "hdfs", "json", "python", "s3", "unity-experimental"]

[dependencies]
deltalake-core = { version = "~0.18.0", path = "../core" }
deltalake-aws = { version = "0.1.1", path = "../aws", default-features = false, optional = true }
deltalake-azure = { version = "0.1.1", path = "../azure", optional = true }
deltalake-gcp = { version = "0.2.1", path = "../gcp", optional = true }
deltalake-hdfs = { version = "0.1.0", path = "../hdfs", optional = true }
deltalake-catalog-glue = { version = "0.1.0", path = "../catalog-glue", optional = true }

[features]
Expand All @@ -32,7 +32,7 @@ datafusion = ["deltalake-core/datafusion"]
datafusion-ext = ["datafusion"]
gcs = ["deltalake-gcp"]
glue = ["deltalake-catalog-glue"]
hdfs = []
hdfs = ["deltalake-hdfs"]
json = ["deltalake-core/json"]
python = ["deltalake-core/python"]
s3-native-tls = ["deltalake-aws/native-tls"]
Expand Down
2 changes: 2 additions & 0 deletions crates/deltalake/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ pub use deltalake_aws as aws;
pub use deltalake_azure as azure;
#[cfg(feature = "gcs")]
pub use deltalake_gcp as gcp;
#[cfg(feature = "hdfs")]
pub use deltalake_hdfs as hdfs;
29 changes: 29 additions & 0 deletions crates/hdfs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[package]
name = "deltalake-hdfs"
version = "0.1.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
edition.workspace = true
homepage.workspace = true
description.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = ">=0.17.0, <0.19.0", path = "../core" }
hdfs-native-object-store = "0.11"

# workspace dependecies
object_store = { workspace = true }
tokio = { workspace = true }
url = { workspace = true }

[dev-dependencies]
serial_test = "3"
deltalake-test = { path = "../test" }
which = "4"

[features]
integration_test = ["hdfs-native-object-store/integration-test"]
48 changes: 48 additions & 0 deletions crates/hdfs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::sync::Arc;

use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory};
use deltalake_core::storage::{
factories, url_prefix_handler, ObjectStoreFactory, ObjectStoreRef, StorageOptions,
};
use deltalake_core::{DeltaResult, Path};
use hdfs_native_object_store::HdfsObjectStore;
use url::Url;

#[derive(Clone, Default, Debug)]
pub struct HdfsFactory {}

impl ObjectStoreFactory for HdfsFactory {
fn parse_url_opts(
&self,
url: &Url,
options: &StorageOptions,
) -> DeltaResult<(ObjectStoreRef, Path)> {
let store: ObjectStoreRef = Arc::new(HdfsObjectStore::with_config(
url.as_str(),
options.0.clone(),
)?);
let prefix = Path::parse(url.path())?;
Ok((url_prefix_handler(store, prefix.clone()), prefix))
}
}

impl LogStoreFactory for HdfsFactory {
fn with_options(
&self,
store: ObjectStoreRef,
location: &Url,
options: &StorageOptions,
) -> DeltaResult<Arc<dyn LogStore>> {
Ok(default_logstore(store, location, options))
}
}

/// Register an [ObjectStoreFactory] for common HDFS [Url] schemes
pub fn register_handlers(_additional_prefixes: Option<Url>) {
let factory = Arc::new(HdfsFactory {});
for scheme in ["hdfs", "viewfs"].iter() {
let url = Url::parse(&format!("{}://", scheme)).unwrap();
factories().insert(url.clone(), factory.clone());
logstores().insert(url.clone(), factory.clone());
}
}
60 changes: 60 additions & 0 deletions crates/hdfs/tests/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#![cfg(feature = "integration_test")]
use deltalake_hdfs::register_handlers;
use deltalake_test::utils::*;
use hdfs_native_object_store::minidfs::MiniDfs;
use std::{
collections::HashSet,
process::{Command, ExitStatus},
};

use which::which;

pub struct HdfsIntegration {
minidfs: MiniDfs,
}

impl Default for HdfsIntegration {
fn default() -> Self {
register_handlers(None);
let minidfs = MiniDfs::with_features(&HashSet::new());
Self { minidfs }
}
}

impl StorageIntegration for HdfsIntegration {
fn prepare_env(&self) {
println!("Preparing env");
}

fn create_bucket(&self) -> std::io::Result<ExitStatus> {
let hadoop_exc = which("hadoop").expect("Failed to find hadoop executable");

Ok(Command::new(hadoop_exc)
.args(["fs", "-mkdir", &self.root_uri()])
.status()
.unwrap())
}

fn bucket_name(&self) -> String {
"/test-deltalake".to_string()
}

fn root_uri(&self) -> String {
format!("{}{}", self.minidfs.url, self.bucket_name())
}

fn copy_directory(&self, source: &str, destination: &str) -> std::io::Result<ExitStatus> {
println!("Copy directory called with {} {}", source, destination);
let hadoop_exc = which("hadoop").expect("Failed to find hadoop executable");
Ok(Command::new(hadoop_exc)
.args([
"fs",
"-copyFromLocal",
"-p",
source,
&format!("{}/{}", self.root_uri(), destination),
])
.status()
.unwrap())
}
}
16 changes: 16 additions & 0 deletions crates/hdfs/tests/integration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#![cfg(feature = "integration_test")]
use deltalake_test::{test_read_tables, IntegrationContext, TestResult};
use serial_test::serial;

mod context;
use context::*;

#[tokio::test]
#[serial]
async fn test_read_tables_hdfs() -> TestResult {
let context = IntegrationContext::new(Box::<HdfsIntegration>::default())?;

test_read_tables(&context).await?;

Ok(())
}
46 changes: 46 additions & 0 deletions docs/integrations/object-storage/hdfs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# HDFS Storage Backend
HDFS support is provided via the [hdfs-native-object-store](https://github.com/datafusion-contrib/hdfs-native-object-store) package, which sits on top of [hdfs-native](https://github.com/Kimahriman/hdfs-native). This is an HDFS client written from scratch in Rust, with no bindings to libhdfs or any use of Java. While it supports most common cluster configurations, it does not support every possible client configuration that could exist.

## Supported Configurations
By default, the client looks for existing Hadoop configs in following manner:
- If the `HADOOP_CONF_DIR` environment variable is defined, load configs from `$HADOOP_CONF_DIR/core-site.xml` and `$HADOOP_CONF_DIR/hdfs-site.xml`
- Otherwise, if the `HADOOP_HOME` environment variable is set, load configs from `$HADOOP_HOME/etc/hadoop/core-site.xml` and `$HADOOP_HOME/etc/hadoop/hdfs-site.xml`

Additionally, you can pass Hadoop configs as `storage_options` and these will take precedence over the above configs.

Currently the supported client configuration parameters are:
- `dfs.ha.namenodes.*` - name service support
- `dfs.namenode.rpc-address.*` - name service support
- `fs.viewfs.mounttable.*.link.*` - ViewFS links
- `fs.viewfs.mounttable.*.linkFallback` - ViewFS link fallback

If you find your setup is not supported, please file an issue in the [hdfs-native](https://github.com/Kimahriman/hdfs-native) repository.

## Secure Clusters
The client supports connecting to secure clusters through both Kerberos authentication as well as token authentication, and all SASL protection types are supported. The highest supported protection mechanism advertised by the server will be used.

### Kerberos Support
Kerberos is supported through dynamically loading the `libgssapi_krb5` library. This must be installed separately through your package manager, and currently only works on Linux and Mac.

Debian-based systems:
```bash
apt-get install libgssapi-krb5-2
```

RHEL-based systems:
```bash
yum install krb5-libs
```

MacOS:
```bash
brew install krb5
```

Then simply `kinit` to get your TGT and authentication to HDFS should just work.

### Token Support
Token authentication is supported by looking for a token file located at the environment variable `HADOOP_TOKEN_FILE_LOCATION`. This is the location systems like YARN will automatically place a delegation token, so things will just work inside of YARN jobs.

## Issues
If you face any HDFS-specific issues, please report to the [hdfs-native-object-store](https://github.com/datafusion-contrib/hdfs-native-object-store) repository.
2 changes: 2 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ nav:
- api/catalog.md
- api/exceptions.md
- Integrations:
- Object Storage:
- integrations/object-storage/hdfs.md
- Arrow: integrations/delta-lake-arrow.md
- Daft: integrations/delta-lake-daft.md
- Dagster: integrations/delta-lake-dagster.md
Expand Down
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ features = ["extension-module", "abi3", "abi3-py38"]
[dependencies.deltalake]
path = "../crates/deltalake"
version = "0"
features = ["azure", "gcs", "python", "datafusion", "unity-experimental"]
features = ["azure", "gcs", "python", "datafusion", "unity-experimental", "hdfs"]

[features]
default = ["rustls"]
Expand Down
1 change: 1 addition & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1868,6 +1868,7 @@ fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> {
deltalake::aws::register_handlers(None);
deltalake::azure::register_handlers(None);
deltalake::gcp::register_handlers(None);
deltalake::hdfs::register_handlers(None);
deltalake_mount::register_handlers(None);

let py = m.py();
Expand Down

0 comments on commit d17ed97

Please sign in to comment.