From cc7af852462d353bea9c69d9ae4dc2760b197237 Mon Sep 17 00:00:00 2001 From: jihuayu Date: Tue, 6 Feb 2024 17:11:03 +0800 Subject: [PATCH 1/4] update google drive docs --- core/src/services/gdrive/docs.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/services/gdrive/docs.md b/core/src/services/gdrive/docs.md index 325a26c37a7e..9f56dc4d3b35 100644 --- a/core/src/services/gdrive/docs.md +++ b/core/src/services/gdrive/docs.md @@ -7,9 +7,9 @@ This service can be used to: - [x] write - [x] delete - [x] create_dir -- [ ] list -- [ ] copy -- [ ] rename +- [x] list +- [x] copy +- [x] rename - [ ] batch From 0e552a2d6a6e90dce34d0e36898b1ab300b178bb Mon Sep 17 00:00:00 2001 From: jihuayu Date: Thu, 8 Feb 2024 10:43:32 +0800 Subject: [PATCH 2/4] add hdfs_native rename stat delete --- core/src/services/hdfs_native/backend.rs | 62 +++++++++++++++++++----- 1 file changed, 51 insertions(+), 11 deletions(-) diff --git a/core/src/services/hdfs_native/backend.rs b/core/src/services/hdfs_native/backend.rs index 48ef1caae699..13c8d5f65a8d 100644 --- a/core/src/services/hdfs_native/backend.rs +++ b/core/src/services/hdfs_native/backend.rs @@ -131,7 +131,7 @@ impl Builder for HdfsNativeBuilder { Some(v) => v, None => { return Err(Error::new(ErrorKind::ConfigInvalid, "url is empty") - .with_context("service", Scheme::HdfsNative)) + .with_context("service", Scheme::HdfsNative)); } }; @@ -181,7 +181,30 @@ impl Accessor for HdfsNativeBackend { type BlockingLister = (); fn info(&self) -> AccessorInfo { - todo!() + let mut am = AccessorInfo::default(); + am.set_scheme(Scheme::HdfsNative) + .set_root(&self.root) + .set_native_capability(Capability { + stat: true, + + read: true, + read_can_seek: true, + + write: true, + write_can_append: self._enable_append, + + create_dir: true, + delete: true, + + list: true, + + rename: true, + blocking: true, + + ..Default::default() + }); + + am } async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result { @@ -218,20 +241,37 @@ impl Accessor for HdfsNativeBackend { Ok((RpWrite::new(), w)) } - async fn copy(&self, _from: &str, _to: &str, _args: OpCopy) -> Result { - todo!() - } + async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result { + let from_path = build_rooted_abs_path(&self.root, from); + let to_path = build_rooted_abs_path(&self.root, to); + + self.client + .rename(&from_path, &to_path, false) + .await + .map_err(parse_hdfs_error)?; - async fn rename(&self, _from: &str, _to: &str, _args: OpRename) -> Result { - todo!() + Ok(RpRename::default()) } - async fn stat(&self, _path: &str, _args: OpStat) -> Result { - todo!() + async fn stat(&self, path: &str, _args: OpStat) -> Result { + let p = build_rooted_abs_path(&self.root, path); + + self.client + .get_file_info(&p) + .await + .map_err(parse_hdfs_error)?; + Ok(RpStat::default()) } - async fn delete(&self, _path: &str, _args: OpDelete) -> Result { - todo!() + async fn delete(&self, path: &str, _args: OpDelete) -> Result { + let p = build_rooted_abs_path(&self.root, path); + + self.client + .delete(&p, true) + .await + .map_err(parse_hdfs_error)?; + + Ok(RpDelete::new()) } async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> { From d3bbb903a068d349fc182d0a292ac5f316015c70 Mon Sep 17 00:00:00 2001 From: jihuayu <8042833@qq.com> Date: Fri, 9 Feb 2024 10:21:21 +0800 Subject: [PATCH 3/4] update hdfs-native --- core/src/services/hdfs_native/backend.rs | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/core/src/services/hdfs_native/backend.rs b/core/src/services/hdfs_native/backend.rs index 13c8d5f65a8d..c1f10bcebc11 100644 --- a/core/src/services/hdfs_native/backend.rs +++ b/core/src/services/hdfs_native/backend.rs @@ -256,11 +256,26 @@ impl Accessor for HdfsNativeBackend { async fn stat(&self, path: &str, _args: OpStat) -> Result { let p = build_rooted_abs_path(&self.root, path); - self.client + let status: hdfs_native::client::FileStatus = self + .client .get_file_info(&p) .await .map_err(parse_hdfs_error)?; - Ok(RpStat::default()) + + let mode = if status.isdir { + EntryMode::DIR + } else { + EntryMode::FILE + }; + + let mut metadata = Metadata::new(mode); + metadata + .set_last_modified(parse_datetime_from_from_timestamp_millis( + status.modification_time as i64, + )?) + .set_content_length(status.length as u64); + + Ok(RpStat::new(metadata)) } async fn delete(&self, path: &str, _args: OpDelete) -> Result { @@ -271,7 +286,7 @@ impl Accessor for HdfsNativeBackend { .await .map_err(parse_hdfs_error)?; - Ok(RpDelete::new()) + Ok(RpDelete::default()) } async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> { From 6db073d3af8a7936805049b9b93da3c1dc95e2c4 Mon Sep 17 00:00:00 2001 From: jihuayu <8042833@qq.com> Date: Fri, 9 Feb 2024 11:44:29 +0800 Subject: [PATCH 4/4] update doc --- core/src/services/hdfs_native/docs.md | 34 +++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/core/src/services/hdfs_native/docs.md b/core/src/services/hdfs_native/docs.md index 7d720d49865a..4f7e245b9d82 100644 --- a/core/src/services/hdfs_native/docs.md +++ b/core/src/services/hdfs_native/docs.md @@ -1 +1,35 @@ A distributed file system that provides high-throughput access to application data. +Using [Native Rust HDFS client](https://github.com/Kimahriman/hdfs-native). + +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [x] write +- [x] create_dir +- [x] delete +- [x] rename +- [x] list +- [x] blocking +- [x] append + +## Differences with webhdfs + +[Webhdfs][crate::services::Webhdfs] is powered by hdfs's RESTful HTTP API. + +## Differences with hdfs + +[hdfs][crate::services::Hdfs] is powered by libhdfs and require the Java dependencies + +## Features + +HDFS-native support needs to enable feature `services-hdfs-native`. + +## Configuration + +- `root`: Set the work dir for backend. +- `url`: Set the url for backend. +- `enable_append`: enable the append capacity. Default is false. +