Skip to content

Commit

Permalink
feat(services/hdfs-native): Add capabilities for hdfs-native service (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jihuayu authored Feb 9, 2024
1 parent 9f0ebb4 commit 688fde2
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 11 deletions.
77 changes: 66 additions & 11 deletions core/src/services/hdfs_native/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl Builder for HdfsNativeBuilder {
Some(v) => v,
None => {
return Err(Error::new(ErrorKind::ConfigInvalid, "url is empty")
.with_context("service", Scheme::HdfsNative))
.with_context("service", Scheme::HdfsNative));
}
};

Expand Down Expand Up @@ -181,7 +181,30 @@ impl Accessor for HdfsNativeBackend {
type BlockingLister = ();

fn info(&self) -> AccessorInfo {
todo!()
let mut am = AccessorInfo::default();
am.set_scheme(Scheme::HdfsNative)
.set_root(&self.root)
.set_native_capability(Capability {
stat: true,

read: true,
read_can_seek: true,

write: true,
write_can_append: self._enable_append,

create_dir: true,
delete: true,

list: true,

rename: true,
blocking: true,

..Default::default()
});

am
}

async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
Expand Down Expand Up @@ -218,20 +241,52 @@ impl Accessor for HdfsNativeBackend {
Ok((RpWrite::new(), w))
}

async fn copy(&self, _from: &str, _to: &str, _args: OpCopy) -> Result<RpCopy> {
todo!()
}
async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
let from_path = build_rooted_abs_path(&self.root, from);
let to_path = build_rooted_abs_path(&self.root, to);

async fn rename(&self, _from: &str, _to: &str, _args: OpRename) -> Result<RpRename> {
todo!()
self.client
.rename(&from_path, &to_path, false)
.await
.map_err(parse_hdfs_error)?;

Ok(RpRename::default())
}

async fn stat(&self, _path: &str, _args: OpStat) -> Result<RpStat> {
todo!()
async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
let p = build_rooted_abs_path(&self.root, path);

let status: hdfs_native::client::FileStatus = self
.client
.get_file_info(&p)
.await
.map_err(parse_hdfs_error)?;

let mode = if status.isdir {
EntryMode::DIR
} else {
EntryMode::FILE
};

let mut metadata = Metadata::new(mode);
metadata
.set_last_modified(parse_datetime_from_from_timestamp_millis(
status.modification_time as i64,
)?)
.set_content_length(status.length as u64);

Ok(RpStat::new(metadata))
}

async fn delete(&self, _path: &str, _args: OpDelete) -> Result<RpDelete> {
todo!()
async fn delete(&self, path: &str, _args: OpDelete) -> Result<RpDelete> {
let p = build_rooted_abs_path(&self.root, path);

self.client
.delete(&p, true)
.await
.map_err(parse_hdfs_error)?;

Ok(RpDelete::default())
}

async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
Expand Down
34 changes: 34 additions & 0 deletions core/src/services/hdfs_native/docs.md
Original file line number Diff line number Diff line change
@@ -1 +1,35 @@
A distributed file system that provides high-throughput access to application data.
Using [Native Rust HDFS client](https://github.com/Kimahriman/hdfs-native).

## Capabilities

This service can be used to:

- [x] stat
- [x] read
- [x] write
- [x] create_dir
- [x] delete
- [x] rename
- [x] list
- [x] blocking
- [x] append

## Differences with webhdfs

[Webhdfs][crate::services::Webhdfs] is powered by hdfs's RESTful HTTP API.

## Differences with hdfs

[hdfs][crate::services::Hdfs] is powered by libhdfs and require the Java dependencies

## Features

HDFS-native support needs to enable feature `services-hdfs-native`.

## Configuration

- `root`: Set the work dir for backend.
- `url`: Set the url for backend.
- `enable_append`: enable the append capacity. Default is false.

0 comments on commit 688fde2

Please sign in to comment.