From 7976ba50c2bed0254e6179d3825b6237807a5010 Mon Sep 17 00:00:00 2001 From: George Miao Date: Wed, 19 Jun 2024 11:52:35 +0900 Subject: [PATCH 1/3] fix reader --- core/src/services/compfs/reader.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/services/compfs/reader.rs b/core/src/services/compfs/reader.rs index 758c110894d..eca63db3ead 100644 --- a/core/src/services/compfs/reader.rs +++ b/core/src/services/compfs/reader.rs @@ -40,13 +40,14 @@ impl oio::Read for CompfsReader { async fn read(&mut self) -> Result { let mut bs = self.core.buf_pool.get(); + let pos = self.range.offset(); let len = self.range.size().expect("range size is always Some"); bs.reserve(len as _); let f = self.file.clone(); let mut bs = self .core .exec(move || async move { - let (_, bs) = buf_try!(@try f.read_at(bs, len).await); + let (_, bs) = buf_try!(@try f.read_at(bs, pos).await); Ok(bs) }) .await?; From 07b1c3291a95c821904f722bb2ee5955eced6ca3 Mon Sep 17 00:00:00 2001 From: George Miao Date: Wed, 19 Jun 2024 12:02:28 +0900 Subject: [PATCH 2/3] builder --- core/src/services/compfs/backend.rs | 27 +++++++++++++++++++++++++-- core/src/services/compfs/lister.rs | 2 +- core/src/services/compfs/reader.rs | 2 +- core/src/services/compfs/writer.rs | 2 +- 4 files changed, 28 insertions(+), 5 deletions(-) diff --git a/core/src/services/compfs/backend.rs b/core/src/services/compfs/backend.rs index 65bd189b683..f6559fa4c8c 100644 --- a/core/src/services/compfs/backend.rs +++ b/core/src/services/compfs/backend.rs @@ -15,7 +15,10 @@ // specific language governing permissions and limitations // under the License. +use compio::dispatcher::Dispatcher; + use super::{core::CompfsCore, lister::CompfsLister, reader::CompfsReader, writer::CompfsWriter}; + use crate::raw::*; use crate::*; @@ -42,7 +45,7 @@ impl CompfsBuilder { impl Builder for CompfsBuilder { const SCHEME: Scheme = Scheme::Compfs; - type Accessor = (); + type Accessor = CompfsBackend; fn from_map(map: HashMap) -> Self { let mut builder = CompfsBuilder::default(); @@ -53,7 +56,27 @@ impl Builder for CompfsBuilder { } fn build(&mut self) -> Result { - todo!() + let root = match self.root.take() { + Some(root) => Ok(root), + None => Err(Error::new( + ErrorKind::ConfigInvalid, + "root is not specified", + )), + }?; + let dispatcher = Dispatcher::new().map_err(|_| { + Error::new( + ErrorKind::Unexpected, + "failed to initiate compio dispatcher", + ) + })?; + let core = CompfsCore { + root, + dispatcher, + buf_pool: oio::PooledBuf::new(16), + }; + Ok(CompfsBackend { + core: Arc::new(core), + }) } } diff --git a/core/src/services/compfs/lister.rs b/core/src/services/compfs/lister.rs index 7a380d48b71..ac12fc7820e 100644 --- a/core/src/services/compfs/lister.rs +++ b/core/src/services/compfs/lister.rs @@ -28,7 +28,7 @@ pub struct CompfsLister { } impl CompfsLister { - pub fn new(core: Arc, read_dir: ReadDir) -> Self { + pub(super) fn new(core: Arc, read_dir: ReadDir) -> Self { Self { core, read_dir: Some(read_dir), diff --git a/core/src/services/compfs/reader.rs b/core/src/services/compfs/reader.rs index eca63db3ead..421efd8af11 100644 --- a/core/src/services/compfs/reader.rs +++ b/core/src/services/compfs/reader.rs @@ -31,7 +31,7 @@ pub struct CompfsReader { } impl CompfsReader { - pub fn new(core: Arc, file: compio::fs::File, range: BytesRange) -> Self { + pub(super) fn new(core: Arc, file: compio::fs::File, range: BytesRange) -> Self { Self { core, file, range } } } diff --git a/core/src/services/compfs/writer.rs b/core/src/services/compfs/writer.rs index 95a97519104..d46c1b69b7c 100644 --- a/core/src/services/compfs/writer.rs +++ b/core/src/services/compfs/writer.rs @@ -30,7 +30,7 @@ pub struct CompfsWriter { } impl CompfsWriter { - pub fn new(core: Arc, file: Cursor) -> Self { + pub(super) fn new(core: Arc, file: Cursor) -> Self { Self { core, file } } } From f4689cb6840df07747e59d6dc7728c29d7d58dd0 Mon Sep 17 00:00:00 2001 From: George Miao Date: Thu, 20 Jun 2024 16:08:47 +0900 Subject: [PATCH 3/3] implement aux fn's --- core/src/services/compfs/backend.rs | 81 +++++++++++++++++++++++++++-- core/src/services/compfs/core.rs | 4 ++ 2 files changed, 80 insertions(+), 5 deletions(-) diff --git a/core/src/services/compfs/backend.rs b/core/src/services/compfs/backend.rs index f6559fa4c8c..9926e04f977 100644 --- a/core/src/services/compfs/backend.rs +++ b/core/src/services/compfs/backend.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use compio::dispatcher::Dispatcher; +use compio::{dispatcher::Dispatcher, fs::OpenOptions}; use super::{core::CompfsCore, lister::CompfsLister, reader::CompfsReader, writer::CompfsWriter}; @@ -112,7 +112,6 @@ impl Access for CompfsBackend { copy: true, rename: true, - blocking: true, ..Default::default() }); @@ -120,8 +119,79 @@ impl Access for CompfsBackend { am } + async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result { + let path = self.core.prepare_path(path); + + self.core + .exec(move || async move { compio::fs::create_dir_all(path).await }) + .await?; + + Ok(RpCreateDir::default()) + } + + async fn stat(&self, path: &str, _: OpStat) -> Result { + let path = self.core.prepare_path(path); + + let meta = self + .core + .exec(move || async move { compio::fs::metadata(path).await }) + .await?; + let ty = meta.file_type(); + let mode = if ty.is_dir() { + EntryMode::DIR + } else if ty.is_file() { + EntryMode::FILE + } else { + EntryMode::Unknown + }; + let last_mod = meta.modified().map_err(new_std_io_error)?.into(); + let ret = Metadata::new(mode).with_last_modified(last_mod); + + Ok(RpStat::new(ret)) + } + + async fn delete(&self, path: &str, _: OpDelete) -> Result { + let path = self.core.prepare_path(path); + + self.core + .exec(move || async move { compio::fs::remove_file(path).await }) + .await?; + + Ok(RpDelete::default()) + } + + async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result { + let from = self.core.prepare_path(from); + let to = self.core.prepare_path(to); + + self.core + .exec(move || async move { + let from = OpenOptions::new().read(true).open(from).await?; + let to = OpenOptions::new().write(true).create(true).open(to).await?; + + let (mut from, mut to) = (Cursor::new(from), Cursor::new(to)); + compio::io::copy(&mut from, &mut to).await?; + + Ok(()) + }) + .await?; + + Ok(RpCopy::default()) + } + + async fn rename(&self, from: &str, to: &str, _: OpRename) -> Result { + let from = self.core.prepare_path(from); + let to = self.core.prepare_path(to); + + self.core + .exec(move || async move { compio::fs::rename(from, to).await }) + .await?; + + Ok(RpRename::default()) + } + async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, Self::Reader)> { - let path = self.core.root.join(path.trim_end_matches('/')); + let path = self.core.prepare_path(path); let file = self .core @@ -133,7 +203,7 @@ impl Access for CompfsBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let path = self.core.root.join(path.trim_end_matches('/')); + let path = self.core.prepare_path(path); let append = args.append(); let file = self .core @@ -153,7 +223,8 @@ impl Access for CompfsBackend { } async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> { - let path = self.core.root.join(path.trim_end_matches('/')); + let path = self.core.prepare_path(path); + let read_dir = match self .core .exec_blocking(move || std::fs::read_dir(path)) diff --git a/core/src/services/compfs/core.rs b/core/src/services/compfs/core.rs index 60c68106f15..e600edbe7d2 100644 --- a/core/src/services/compfs/core.rs +++ b/core/src/services/compfs/core.rs @@ -44,6 +44,10 @@ pub(super) struct CompfsCore { } impl CompfsCore { + pub fn prepare_path(&self, path: &str) -> PathBuf { + self.root.join(path.trim_end_matches('/')) + } + pub async fn exec(&self, f: Fn) -> crate::Result where Fn: FnOnce() -> Fut + Send + 'static,