Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS storage support via datafusion-objectstore-hdfs #1273

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ link:https://github.com/rajasekarv/vega[vega], etc. It also provides bindings to
* AWS S3
* Azure Blob Storage / Azure Datalake Storage Gen2
* Google Cloud Storage
* HDFS

.Support features
|===
Expand Down
2 changes: 2 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ async-trait = "0.1"
bytes = "1"
chrono = { version = "0.4.22", default-features = false, features = ["clock"] }
cfg-if = "1"
datafusion-objectstore-hdfs = { version = "0.1.3", default-features = false, features = ["hdfs3", "try_spawn_blocking"], optional = true }
errno = "0.3"
futures = "0.3"
itertools = "0.10"
Expand Down Expand Up @@ -119,6 +120,7 @@ s3 = [
"object_store/aws",
"object_store/aws_profile",
]
hdfs = ["datafusion-objectstore-hdfs"]
glue-native-tls = ["s3-native-tls", "rusoto_glue"]
glue = ["s3", "rusoto_glue/rustls"]
python = ["arrow/pyarrow"]
Expand Down
1 change: 1 addition & 0 deletions rust/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ cargo run --example read_delta_table
- `datafusion` - enable the `datafusion::datasource::TableProvider` trait implementation for Delta Tables, allowing them to be queried using [DataFusion](https://github.com/apache/arrow-datafusion).
- `datafusion-ext` - DEPRECATED: alias for `datafusion` feature
- `parquet2` - use parquet2 for checkpoint deserialization. Since `arrow` and `parquet` features are enabled by default for backwards compatibility, this feature needs to be used with `--no-default-features`.
- `hdfs` - enable the HDFS storage backend to work with Delta Tables in HDFS.

## Development

Expand Down
2 changes: 2 additions & 0 deletions rust/src/data_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ pub fn get_data_catalog(data_catalog: &str) -> Result<Box<dyn DataCatalog>, Data
"gcp" => unimplemented!("GCP Data Catalog is not implemented"),
#[cfg(feature = "azure")]
"azure" => unimplemented!("Azure Data Catalog is not implemented"),
#[cfg(feature = "hdfs")]
"hdfs" => unimplemented!("HDFS Data Catalog is not implemented"),
#[cfg(feature = "glue")]
"glue" => Ok(Box::new(glue::GlueDataCatalog::new()?)),
_ => Err(DataCatalogError::InvalidDataCatalog {
Expand Down
19 changes: 19 additions & 0 deletions rust/src/storage/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use url::Url;

#[cfg(any(feature = "s3", feature = "s3-native-tls"))]
use super::s3::{S3StorageBackend, S3StorageOptions};
#[cfg(feature = "hdfs")]
use datafusion_objectstore_hdfs::object_store::hdfs::HadoopFileSystem;
#[cfg(any(feature = "s3", feature = "s3-native-tls"))]
use object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey};
#[cfg(feature = "azure")]
Expand Down Expand Up @@ -103,6 +105,7 @@ pub(crate) enum ObjectStoreKind {
S3,
Google,
Azure,
Hdfs,
}

impl ObjectStoreKind {
Expand All @@ -113,6 +116,7 @@ impl ObjectStoreKind {
"az" | "abfs" | "abfss" | "azure" | "wasb" | "adl" => Ok(ObjectStoreKind::Azure),
"s3" | "s3a" => Ok(ObjectStoreKind::S3),
"gs" => Ok(ObjectStoreKind::Google),
"hdfs" => Ok(ObjectStoreKind::Hdfs),
"https" => {
let host = url.host_str().unwrap_or_default();
if host.contains("amazonaws.com") {
Expand Down Expand Up @@ -192,6 +196,21 @@ impl ObjectStoreKind {
feature: "gcs",
url: storage_url.as_ref().into(),
}),
#[cfg(feature = "hdfs")]
ObjectStoreKind::Hdfs => {
let store = HadoopFileSystem::new(storage_url.as_ref()).ok_or_else(|| {
DeltaTableError::Generic(format!(
"failed to create HadoopFileSystem for {}",
storage_url.as_ref()
))
})?;
Ok(Self::url_prefix_handler(store, storage_url))
}
#[cfg(not(feature = "hdfs"))]
ObjectStoreKind::Hdfs => Err(DeltaTableError::MissingFeature {
feature: "hdfs",
url: storage_url.as_ref().into(),
}),
}
}

Expand Down
54 changes: 54 additions & 0 deletions rust/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl IntegrationContext {
StorageIntegration::Microsoft => format!("az://{}", &bucket),
StorageIntegration::Google => format!("gs://{}", &bucket),
StorageIntegration::Local => format!("file://{}", &bucket),
StorageIntegration::Hdfs => format!("hdfs://localhost:9000/{}", &bucket),
};
// the "storage_backend" will always point to the root ofg the object store.
// TODO should we provide the store via object_Store builders?
Expand Down Expand Up @@ -85,6 +86,7 @@ impl IntegrationContext {
StorageIntegration::Microsoft => format!("az://{}", &self.bucket),
StorageIntegration::Google => format!("gs://{}", &self.bucket),
StorageIntegration::Local => format!("file://{}", &self.bucket),
StorageIntegration::Hdfs => format!("hdfs://localhost:9000/{}", &self.bucket),
}
}

Expand Down Expand Up @@ -140,6 +142,9 @@ impl Drop for IntegrationContext {
gs_cli::delete_bucket(&self.bucket).unwrap();
}
StorageIntegration::Local => (),
StorageIntegration::Hdfs => {
hdfs_cli::delete_dir(&self.bucket).unwrap();
}
};
}
}
Expand All @@ -150,6 +155,7 @@ pub enum StorageIntegration {
Microsoft,
Google,
Local,
Hdfs,
}

impl StorageIntegration {
Expand All @@ -159,6 +165,7 @@ impl StorageIntegration {
Self::Amazon => s3_cli::prepare_env(),
Self::Google => gs_cli::prepare_env(),
Self::Local => (),
Self::Hdfs => (),
}
}

Expand All @@ -182,6 +189,10 @@ impl StorageIntegration {
Ok(())
}
Self::Local => Ok(()),
Self::Hdfs => {
hdfs_cli::create_dir(name)?;
Ok(())
}
}
}
}
Expand Down Expand Up @@ -447,3 +458,46 @@ pub mod gs_cli {
set_env_if_not_set("GOOGLE_ENDPOINT_URL", "http://localhost:4443/storage/v1/b");
}
}

/// small wrapper around hdfs cli
pub mod hdfs_cli {
use std::env;
use std::path::PathBuf;
// use super::set_env_if_not_set;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we drop this?

Suggested change
// use super::set_env_if_not_set;

use std::process::{Command, ExitStatus};

fn hdfs_cli_path() -> PathBuf {
let hadoop_home =
env::var("HADOOP_HOME").expect("HADOOP_HOME environment variable not set");
PathBuf::from(hadoop_home).join("bin").join("hdfs")
}

pub fn create_dir(dir_name: impl AsRef<str>) -> std::io::Result<ExitStatus> {
let path = hdfs_cli_path();
let mut child = Command::new(&path)
.args([
"dfs",
"-mkdir",
"-p",
format!("/{}", dir_name.as_ref()).as_str(),
])
.spawn()
.expect("hdfs command is installed");
child.wait()
}

pub fn delete_dir(dir_name: impl AsRef<str>) -> std::io::Result<ExitStatus> {
let path = hdfs_cli_path();
let mut child = Command::new(&path)
.args([
"dfs",
"-rm",
"-r",
"-f",
format!("/{}", dir_name.as_ref()).as_str(),
])
.spawn()
.expect("hdfs command is installed");
child.wait()
}
}
7 changes: 7 additions & 0 deletions rust/tests/command_filesystem_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ async fn test_filesystem_check_gcp() -> TestResult {
Ok(test_filesystem_check(StorageIntegration::Google).await?)
}

#[cfg(feature = "hdfs")]
#[tokio::test]
#[serial]
async fn test_filesystem_check_hdfs() -> TestResult {
Ok(test_filesystem_check(StorageIntegration::Hdfs).await?)
}

async fn test_filesystem_check(storage: StorageIntegration) -> TestResult {
let context = IntegrationContext::new(storage)?;
context.load_table(TestTables::Simple).await?;
Expand Down
20 changes: 20 additions & 0 deletions rust/tests/common/hdfs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use super::TestContext;
use std::collections::HashMap;

pub struct Hdfs {
name_node: String,
}

pub fn setup_hdfs_context() -> TestContext {
let mut config = HashMap::new();

let name_node = "hdfs://localhost:9000".to_owned();

config.insert("URI".to_owned(), name_node.clone());

TestContext {
storage_context: Some(Box::new(Hdfs { name_node })),
config,
..TestContext::default()
}
}
4 changes: 4 additions & 0 deletions rust/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub mod adls;
pub mod clock;
#[cfg(feature = "datafusion")]
pub mod datafusion;
#[cfg(feature = "hdfs")]
pub mod hdfs;
#[cfg(any(feature = "s3", feature = "s3-native-tls"))]
pub mod s3;
pub mod schemas;
Expand Down Expand Up @@ -47,6 +49,8 @@ impl TestContext {
Ok("AZURE_GEN2") => adls::setup_azure_gen2_context().await,
#[cfg(any(feature = "s3", feature = "s3-native-tls"))]
Ok("S3_LOCAL_STACK") => s3::setup_s3_context().await,
#[cfg(feature = "hdfs")]
Ok("HDFS") => hdfs::setup_hdfs_context(),
_ => panic!("Invalid backend for delta-rs tests"),
}
}
Expand Down
9 changes: 9 additions & 0 deletions rust/tests/integration_checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ async fn cleanup_metadata_gcp_test() -> TestResult {
Ok(())
}

#[cfg(feature = "hdfs")]
#[tokio::test]
#[serial]
async fn cleanup_metadata_hdfs_test() -> TestResult {
let context = IntegrationContext::new(StorageIntegration::Hdfs)?;
cleanup_metadata_test(&context).await?;
Ok(())
}

// Last-Modified for S3 could not be altered by user, hence using system pauses which makes
// test to run longer but reliable
async fn cleanup_metadata_test(context: &IntegrationContext) -> TestResult {
Expand Down
7 changes: 7 additions & 0 deletions rust/tests/integration_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ async fn test_commit_tables_gcp() {
commit_tables(StorageIntegration::Google).await.unwrap();
}

#[cfg(feature = "hdfs")]
#[tokio::test]
#[serial]
async fn test_commit_tables_hdfs() {
commit_tables(StorageIntegration::Hdfs).await.unwrap();
}

#[cfg(any(feature = "s3", feature = "s3-native-tls"))]
#[tokio::test]
#[serial]
Expand Down
9 changes: 9 additions & 0 deletions rust/tests/integration_concurrent_writes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ async fn test_concurrent_writes_azure() -> TestResult {
Ok(())
}

// tracked via https://github.com/datafusion-contrib/datafusion-objectstore-hdfs/issues/13
#[ignore]
#[cfg(feature = "hdfs")]
#[tokio::test]
async fn test_concurrent_writes_hdfs() -> TestResult {
test_concurrent_writes(StorageIntegration::Hdfs).await?;
Ok(())
}

async fn test_concurrent_writes(integration: StorageIntegration) -> TestResult {
let context = IntegrationContext::new(integration)?;
let (_table, table_uri) = prepare_table(&context).await?;
Expand Down
7 changes: 7 additions & 0 deletions rust/tests/integration_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ async fn test_datafusion_gcp() -> TestResult {
Ok(test_datafusion(StorageIntegration::Google).await?)
}

#[cfg(feature = "hdfs")]
#[tokio::test]
#[serial]
async fn test_datafusion_hdfs() -> TestResult {
Ok(test_datafusion(StorageIntegration::Hdfs).await?)
}

async fn test_datafusion(storage: StorageIntegration) -> TestResult {
let context = IntegrationContext::new(storage)?;
context.load_table(TestTables::Simple).await?;
Expand Down
8 changes: 8 additions & 0 deletions rust/tests/integration_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ async fn test_object_store_google() -> TestResult {
Ok(())
}

#[cfg(feature = "hdfs")]
#[tokio::test]
#[serial]
async fn test_object_store_hdfs() -> TestResult {
test_object_store(StorageIntegration::Hdfs, false).await?;
Ok(())
}

async fn test_object_store(integration: StorageIntegration, skip_copy: bool) -> TestResult {
let context = IntegrationContext::new(integration)?;
let delta_store = DeltaTableBuilder::from_uri(&context.root_uri())
Expand Down
7 changes: 7 additions & 0 deletions rust/tests/integration_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ async fn test_read_tables_azure() -> TestResult {
Ok(read_tables(StorageIntegration::Microsoft).await?)
}

#[cfg(feature = "hdfs")]
#[tokio::test]
#[serial]
async fn test_read_tables_hdfs() -> TestResult {
Ok(read_tables(StorageIntegration::Hdfs).await?)
}

#[cfg(any(feature = "s3", feature = "s3-native-tls"))]
#[tokio::test]
#[serial]
Expand Down