Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(services/hdfs_native): Add read,write,list implementation for hdfs_native #4505

Draft
wants to merge 23 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8cfcd8b
implementation for hdfs-native read,write.list
shbhmrzd Apr 19, 2024
5b8bce6
add behvaviour tests for hdfs_native
shbhmrzd Apr 19, 2024
1a709df
Merge remote-tracking branch 'upstream/main' into native_hdfs_read_write
shbhmrzd Apr 19, 2024
e9e417b
fix doc build
shbhmrzd Apr 19, 2024
af6e959
review comments
shbhmrzd Apr 22, 2024
42848ea
Merge remote-tracking branch 'upstream/main' into native_hdfs_read_write
shbhmrzd Apr 22, 2024
a8a6d17
review comments
shbhmrzd Apr 22, 2024
5b80bec
use url
shbhmrzd Apr 22, 2024
75ef053
Merge remote-tracking branch 'upstream/main' into native_hdfs_read_write
shbhmrzd Apr 22, 2024
bcc4aaf
use localhost url
shbhmrzd Apr 22, 2024
c23d576
java bindings hdfs-native
shbhmrzd Apr 22, 2024
76ea03d
add hdfs-native scheme
shbhmrzd Apr 22, 2024
a274223
restrict node and python bindings for hdfs-native
shbhmrzd Apr 22, 2024
b3b4435
remove hdfs-native from java binding
shbhmrzd Apr 22, 2024
77acd20
revert scheme and test plan changes
shbhmrzd Apr 23, 2024
c3178c0
Merge remote-tracking branch 'upstream/main' into native_hdfs_read_write
shbhmrzd Apr 23, 2024
fb8965d
Merge remote-tracking branch 'upstream/main' into native_hdfs_read_write
shbhmrzd Apr 28, 2024
914ead7
update supporting list, read and write
shbhmrzd Apr 28, 2024
63c2919
add read, write, list capability in accessor info
shbhmrzd Apr 28, 2024
3346eef
Merge remote-tracking branch 'upstream/main' into native_hdfs_read_write
shbhmrzd May 26, 2024
e795ffb
use read instead of read_range as it panics
shbhmrzd May 26, 2024
7be3838
revert changes
shbhmrzd May 26, 2024
3681e1f
Merge remote-tracking branch 'upstream/main' into native_hdfs_read_write
shbhmrzd Jan 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions .github/services/hdfs_native/hdfs/action.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: hdfs_native
description: 'Behavior test for hdfs_native'

runs:
using: "composite"
steps:
- name: Setup java env
uses: actions/setup-java@v4
with:
distribution: temurin
java-version: "11"
- name: Setup
shell: bash
run: |
curl -LsSf https://dlcdn.apache.org/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz | tar zxf - -C /home/runner

export HADOOP_HOME="/home/runner/hadoop-3.3.5"
export CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob)

cp ./fixtures/hdfs/hdfs-site.xml ${HADOOP_HOME}/etc/hadoop/hdfs-site.xml

cat << EOF >> $GITHUB_ENV
HADOOP_HOME=${HADOOP_HOME}
CLASSPATH=${CLASSPATH}
LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${HADOOP_HOME}/lib/native
OPENDAL_HDFS_NATIVE_ROOT=/tmp/opendal/
OPENDAL_HDFS_NATIVE_URL=hdfs://127.0.0.1:9000
OPENDAL_HDFS_NATIVE_ENABLE_APPEND=false
EOF
2 changes: 2 additions & 0 deletions bindings/java/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ services-all = [
"services-gdrive",
# FIXME how to support HDFS services in bindings?
# "services-hdfs",
# "services-hdfs-native",
"services-huggingface",
"services-ipfs",
"services-memcached",
Expand Down Expand Up @@ -126,6 +127,7 @@ services-ftp = ["opendal/services-ftp"]
services-gdrive = ["opendal/services-gdrive"]
services-gridfs = ["opendal/services-gridfs"]
services-hdfs = ["opendal/services-hdfs"]
services-hdfs-native = ["opendal/services-hdfs-native"]
services-huggingface = ["opendal/services-huggingface"]
services-ipfs = ["opendal/services-ipfs"]
services-koofr = ["opendal/services-koofr"]
Expand Down
2 changes: 2 additions & 0 deletions bindings/nodejs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ services-all = [
"services-gdrive",
# FIXME how to support HDFS services in bindings?
# "services-hdfs",
# "services-hdfs-native",
"services-huggingface",
"services-ipfs",
"services-memcached",
Expand Down Expand Up @@ -124,6 +125,7 @@ services-ftp = ["opendal/services-ftp"]
services-gdrive = ["opendal/services-gdrive"]
services-gridfs = ["opendal/services-gridfs"]
services-hdfs = ["opendal/services-hdfs"]
services-hdfs-native = ["opendal/services-hdfs-native"]
services-huggingface = ["opendal/services-huggingface"]
services-ipfs = ["opendal/services-ipfs"]
services-koofr = ["opendal/services-koofr"]
Expand Down
2 changes: 2 additions & 0 deletions bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ services-all = [
"services-gdrive",
# FIXME how to support HDFS services in bindings?
# "services-hdfs",
# "services-hdfs-native",
"services-huggingface",
"services-ipfs",
"services-memcached",
Expand Down Expand Up @@ -126,6 +127,7 @@ services-ftp = ["opendal/services-ftp"]
services-gdrive = ["opendal/services-gdrive"]
services-gridfs = ["opendal/services-gridfs"]
services-hdfs = ["opendal/services-hdfs"]
services-hdfs-native = ["opendal/services-hdfs-native"]
services-huggingface = ["opendal/services-huggingface"]
services-ipfs = ["opendal/services-ipfs"]
services-koofr = ["opendal/services-koofr"]
Expand Down
10 changes: 6 additions & 4 deletions core/src/services/hdfs_native/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,24 +152,26 @@ impl Access for HdfsNativeBackend {
type BlockingLister = ();
type BlockingDeleter = ();

fn info(&self) -> Arc<AccessorInfo> {
fn info(&self) -> AccessorInfo {
let mut am = AccessorInfo::default();
am.set_scheme(Scheme::HdfsNative)
.set_root(&self.root)
.set_native_capability(Capability {
stat: true,
list: true,
read: true,
write: true,
stat_has_last_modified: true,
stat_has_content_length: true,

delete: true,
rename: true,

shared: true,
blocking: true,

..Default::default()
});

am.into()
am
}

async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
Expand Down
6 changes: 3 additions & 3 deletions core/src/services/hdfs_native/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ Using [Native Rust HDFS client](https://github.com/Kimahriman/hdfs-native).
This service can be used to:

- [x] stat
- [ ] read
- [ ] write
- [x] read
- [x] write
- [ ] create_dir
- [x] delete
- [x] rename
- [ ] list
- [x] list
- [x] blocking
- [ ] append

Expand Down
52 changes: 43 additions & 9 deletions core/src/services/hdfs_native/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,62 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;
use chrono::DateTime;
use hdfs_native::client::ListStatusIterator;

use crate::raw::oio;
use crate::raw::oio::Entry;
use crate::raw::{build_rel_path, oio};
use crate::services::hdfs_native::error::parse_hdfs_error;
use crate::*;

pub struct HdfsNativeLister {
_path: String,
_client: Arc<hdfs_native::Client>,
root: String,
lsi: ListStatusIterator,
}

impl HdfsNativeLister {
pub fn new(path: String, client: Arc<hdfs_native::Client>) -> Self {
HdfsNativeLister {
_path: path,
_client: client,
pub fn new(root: &str, lsi: ListStatusIterator) -> Self {
Self {
root: root.to_string(),
lsi,
}
}
}

impl oio::List for HdfsNativeLister {
async fn next(&mut self) -> Result<Option<Entry>> {
todo!()
if let Some(de) = self
.lsi
.next()
.await
.transpose()
.map_err(parse_hdfs_error)?
{
let path = build_rel_path(&self.root, &de.path);

let entry = if !de.isdir {
let odt = DateTime::from_timestamp(de.modification_time as i64, 0);

let Some(dt) = odt else {
return Err(Error::new(
ErrorKind::Unexpected,
&format!("Failure in extracting modified_time for {}", path),
));
};
let meta = Metadata::new(EntryMode::FILE)
.with_content_length(de.length as u64)
.with_last_modified(dt);
oio::Entry::new(&path, meta)
} else if de.isdir {
// Make sure we are returning the correct path.
oio::Entry::new(&format!("{path}/"), Metadata::new(EntryMode::DIR))
} else {
oio::Entry::new(&path, Metadata::new(EntryMode::Unknown))
};

Ok(Some(entry))
} else {
Ok(None)
}
}
}
16 changes: 14 additions & 2 deletions core/src/services/hdfs_native/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,31 @@
use hdfs_native::file::FileReader;

use crate::raw::*;
use crate::services::hdfs_native::error::parse_hdfs_error;
use crate::*;

pub struct HdfsNativeReader {
_f: FileReader,
f: FileReader,
}

impl HdfsNativeReader {
pub fn new(f: FileReader) -> Self {
HdfsNativeReader { _f: f }
HdfsNativeReader { f }
}
}

impl oio::Read for HdfsNativeReader {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
// Perform the read operation using read_range
let bytes = self
.f
.read_range(offset as usize, limit)
.await
.map_err(parse_hdfs_error)?;

Ok(Buffer::from(bytes))
}

async fn read(&mut self) -> Result<Buffer> {
todo!()
}
Expand Down
13 changes: 8 additions & 5 deletions core/src/services/hdfs_native/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,28 @@
use hdfs_native::file::FileWriter;

use crate::raw::oio;
use crate::services::hdfs_native::error::parse_hdfs_error;
use crate::*;

pub struct HdfsNativeWriter {
_f: FileWriter,
f: FileWriter,
}

impl HdfsNativeWriter {
pub fn new(f: FileWriter) -> Self {
HdfsNativeWriter { _f: f }
HdfsNativeWriter { f }
}
}

impl oio::Write for HdfsNativeWriter {
async fn write(&mut self, _bs: Buffer) -> Result<()> {
todo!()
async fn write(&mut self, bs: Buffer) -> Result<usize> {
let bytes = bs.to_bytes();
let n = self.f.write(bytes).await.map_err(parse_hdfs_error)?;
Ok(n)
}

async fn close(&mut self) -> Result<()> {
todo!()
self.f.close().await.map_err(parse_hdfs_error)
}

async fn abort(&mut self) -> Result<()> {
Expand Down
Loading