diff --git a/.github/services/hdfs_native/hdfs/action.yml b/.github/services/hdfs_native/hdfs/action.yml new file mode 100644 index 000000000000..18a52345c4aa --- /dev/null +++ b/.github/services/hdfs_native/hdfs/action.yml @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: hdfs_native +description: 'Behavior test for hdfs_native' + +runs: + using: "composite" + steps: + - name: Setup java env + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "11" + - name: Setup + shell: bash + run: | + curl -LsSf https://dlcdn.apache.org/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz | tar zxf - -C /home/runner + + export HADOOP_HOME="/home/runner/hadoop-3.3.5" + export CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob) + + cp ./fixtures/hdfs/hdfs-site.xml ${HADOOP_HOME}/etc/hadoop/hdfs-site.xml + + cat << EOF >> $GITHUB_ENV + HADOOP_HOME=${HADOOP_HOME} + CLASSPATH=${CLASSPATH} + LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${HADOOP_HOME}/lib/native + OPENDAL_HDFS_NATIVE_ROOT=/tmp/opendal/ + OPENDAL_HDFS_NATIVE_URL=hdfs://127.0.0.1:9000 + OPENDAL_HDFS_NATIVE_ENABLE_APPEND=false + EOF diff --git a/bindings/java/Cargo.toml b/bindings/java/Cargo.toml index c6b77acf2f8a..35b156a401a1 100644 --- a/bindings/java/Cargo.toml +++ b/bindings/java/Cargo.toml @@ -64,6 +64,7 @@ services-all = [ "services-gdrive", # FIXME how to support HDFS services in bindings? # "services-hdfs", + # "services-hdfs-native", "services-huggingface", "services-ipfs", "services-memcached", @@ -126,6 +127,7 @@ services-ftp = ["opendal/services-ftp"] services-gdrive = ["opendal/services-gdrive"] services-gridfs = ["opendal/services-gridfs"] services-hdfs = ["opendal/services-hdfs"] +services-hdfs-native = ["opendal/services-hdfs-native"] services-huggingface = ["opendal/services-huggingface"] services-ipfs = ["opendal/services-ipfs"] services-koofr = ["opendal/services-koofr"] diff --git a/bindings/nodejs/Cargo.toml b/bindings/nodejs/Cargo.toml index d4dcc4cafe37..3045b06accb1 100644 --- a/bindings/nodejs/Cargo.toml +++ b/bindings/nodejs/Cargo.toml @@ -62,6 +62,7 @@ services-all = [ "services-gdrive", # FIXME how to support HDFS services in bindings? # "services-hdfs", + # "services-hdfs-native", "services-huggingface", "services-ipfs", "services-memcached", @@ -124,6 +125,7 @@ services-ftp = ["opendal/services-ftp"] services-gdrive = ["opendal/services-gdrive"] services-gridfs = ["opendal/services-gridfs"] services-hdfs = ["opendal/services-hdfs"] +services-hdfs-native = ["opendal/services-hdfs-native"] services-huggingface = ["opendal/services-huggingface"] services-ipfs = ["opendal/services-ipfs"] services-koofr = ["opendal/services-koofr"] diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index 01a91ee29a39..289475666228 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -64,6 +64,7 @@ services-all = [ "services-gdrive", # FIXME how to support HDFS services in bindings? # "services-hdfs", + # "services-hdfs-native", "services-huggingface", "services-ipfs", "services-memcached", @@ -126,6 +127,7 @@ services-ftp = ["opendal/services-ftp"] services-gdrive = ["opendal/services-gdrive"] services-gridfs = ["opendal/services-gridfs"] services-hdfs = ["opendal/services-hdfs"] +services-hdfs-native = ["opendal/services-hdfs-native"] services-huggingface = ["opendal/services-huggingface"] services-ipfs = ["opendal/services-ipfs"] services-koofr = ["opendal/services-koofr"] diff --git a/core/src/services/hdfs_native/backend.rs b/core/src/services/hdfs_native/backend.rs index 80f607d5140b..3cf7812e67ca 100644 --- a/core/src/services/hdfs_native/backend.rs +++ b/core/src/services/hdfs_native/backend.rs @@ -152,24 +152,26 @@ impl Access for HdfsNativeBackend { type BlockingLister = (); type BlockingDeleter = (); - fn info(&self) -> Arc { + fn info(&self) -> AccessorInfo { let mut am = AccessorInfo::default(); am.set_scheme(Scheme::HdfsNative) .set_root(&self.root) .set_native_capability(Capability { stat: true, + list: true, + read: true, + write: true, stat_has_last_modified: true, stat_has_content_length: true, delete: true, rename: true, - - shared: true, + blocking: true, ..Default::default() }); - am.into() + am } async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result { diff --git a/core/src/services/hdfs_native/docs.md b/core/src/services/hdfs_native/docs.md index 2b5367c0c087..3db1e78115be 100644 --- a/core/src/services/hdfs_native/docs.md +++ b/core/src/services/hdfs_native/docs.md @@ -6,12 +6,12 @@ Using [Native Rust HDFS client](https://github.com/Kimahriman/hdfs-native). This service can be used to: - [x] stat -- [ ] read -- [ ] write +- [x] read +- [x] write - [ ] create_dir - [x] delete - [x] rename -- [ ] list +- [x] list - [x] blocking - [ ] append diff --git a/core/src/services/hdfs_native/lister.rs b/core/src/services/hdfs_native/lister.rs index bd2863783f31..b5e2c3e01245 100644 --- a/core/src/services/hdfs_native/lister.rs +++ b/core/src/services/hdfs_native/lister.rs @@ -15,28 +15,62 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; +use chrono::DateTime; +use hdfs_native::client::ListStatusIterator; -use crate::raw::oio; use crate::raw::oio::Entry; +use crate::raw::{build_rel_path, oio}; +use crate::services::hdfs_native::error::parse_hdfs_error; use crate::*; pub struct HdfsNativeLister { - _path: String, - _client: Arc, + root: String, + lsi: ListStatusIterator, } impl HdfsNativeLister { - pub fn new(path: String, client: Arc) -> Self { - HdfsNativeLister { - _path: path, - _client: client, + pub fn new(root: &str, lsi: ListStatusIterator) -> Self { + Self { + root: root.to_string(), + lsi, } } } impl oio::List for HdfsNativeLister { async fn next(&mut self) -> Result> { - todo!() + if let Some(de) = self + .lsi + .next() + .await + .transpose() + .map_err(parse_hdfs_error)? + { + let path = build_rel_path(&self.root, &de.path); + + let entry = if !de.isdir { + let odt = DateTime::from_timestamp(de.modification_time as i64, 0); + + let Some(dt) = odt else { + return Err(Error::new( + ErrorKind::Unexpected, + &format!("Failure in extracting modified_time for {}", path), + )); + }; + let meta = Metadata::new(EntryMode::FILE) + .with_content_length(de.length as u64) + .with_last_modified(dt); + oio::Entry::new(&path, meta) + } else if de.isdir { + // Make sure we are returning the correct path. + oio::Entry::new(&format!("{path}/"), Metadata::new(EntryMode::DIR)) + } else { + oio::Entry::new(&path, Metadata::new(EntryMode::Unknown)) + }; + + Ok(Some(entry)) + } else { + Ok(None) + } } } diff --git a/core/src/services/hdfs_native/reader.rs b/core/src/services/hdfs_native/reader.rs index 0a51eaf8c854..32a339677c12 100644 --- a/core/src/services/hdfs_native/reader.rs +++ b/core/src/services/hdfs_native/reader.rs @@ -18,19 +18,31 @@ use hdfs_native::file::FileReader; use crate::raw::*; +use crate::services::hdfs_native::error::parse_hdfs_error; use crate::*; pub struct HdfsNativeReader { - _f: FileReader, + f: FileReader, } impl HdfsNativeReader { pub fn new(f: FileReader) -> Self { - HdfsNativeReader { _f: f } + HdfsNativeReader { f } } } impl oio::Read for HdfsNativeReader { + async fn read_at(&self, offset: u64, limit: usize) -> Result { + // Perform the read operation using read_range + let bytes = self + .f + .read_range(offset as usize, limit) + .await + .map_err(parse_hdfs_error)?; + + Ok(Buffer::from(bytes)) + } + async fn read(&mut self) -> Result { todo!() } diff --git a/core/src/services/hdfs_native/writer.rs b/core/src/services/hdfs_native/writer.rs index 4cab45b3be46..f99380a428c4 100644 --- a/core/src/services/hdfs_native/writer.rs +++ b/core/src/services/hdfs_native/writer.rs @@ -18,25 +18,28 @@ use hdfs_native::file::FileWriter; use crate::raw::oio; +use crate::services::hdfs_native::error::parse_hdfs_error; use crate::*; pub struct HdfsNativeWriter { - _f: FileWriter, + f: FileWriter, } impl HdfsNativeWriter { pub fn new(f: FileWriter) -> Self { - HdfsNativeWriter { _f: f } + HdfsNativeWriter { f } } } impl oio::Write for HdfsNativeWriter { - async fn write(&mut self, _bs: Buffer) -> Result<()> { - todo!() + async fn write(&mut self, bs: Buffer) -> Result { + let bytes = bs.to_bytes(); + let n = self.f.write(bytes).await.map_err(parse_hdfs_error)?; + Ok(n) } async fn close(&mut self) -> Result<()> { - todo!() + self.f.close().await.map_err(parse_hdfs_error) } async fn abort(&mut self) -> Result<()> {