diff --git a/core/src/services/fs/appender.rs b/core/src/services/fs/appender.rs new file mode 100644 index 00000000000..8b0eca967fb --- /dev/null +++ b/core/src/services/fs/appender.rs @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use bytes::Bytes; + +use tokio::io::AsyncWriteExt; + +use super::error::parse_io_error; +use crate::raw::*; +use crate::*; + +pub struct FsAppender { + f: F, +} + +impl FsAppender { + pub fn new(f: F) -> Self { + Self { f } + } +} + +#[async_trait] +impl oio::Append for FsAppender { + async fn append(&mut self, bs: Bytes) -> Result<()> { + self.f.write_all(&bs).await.map_err(parse_io_error)?; + + Ok(()) + } + + async fn close(&mut self) -> Result<()> { + self.f.sync_all().await.map_err(parse_io_error)?; + + Ok(()) + } +} diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs index 3e914f550fe..51585aa5711 100644 --- a/core/src/services/fs/backend.rs +++ b/core/src/services/fs/backend.rs @@ -27,6 +27,7 @@ use chrono::DateTime; use log::debug; use uuid::Uuid; +use super::appender::FsAppender; use super::error::parse_io_error; use super::pager::FsPager; use super::writer::FsWriter; @@ -43,6 +44,7 @@ use crate::*; /// - [x] stat /// - [x] read /// - [x] write +/// - [x] append /// - [x] create_dir /// - [x] delete /// - [x] copy @@ -296,7 +298,7 @@ impl Accessor for FsBackend { type BlockingReader = oio::into_blocking_reader::FdReader; type Writer = FsWriter; type BlockingWriter = FsWriter; - type Appender = (); + type Appender = FsAppender; type Pager = Option>; type BlockingPager = Option>; @@ -316,6 +318,8 @@ impl Accessor for FsBackend { create_dir: true, delete: true, + append: true, + list: true, list_with_delimiter_slash: true, @@ -434,6 +438,20 @@ impl Accessor for FsBackend { Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f))) } + async fn append(&self, path: &str, _: OpAppend) -> Result<(RpAppend, Self::Appender)> { + let path = Self::ensure_write_abs_path(&self.root, path).await?; + + let f = tokio::fs::OpenOptions::new() + .create(true) + .write(true) + .append(true) + .open(&path) + .await + .map_err(parse_io_error)?; + + Ok((RpAppend::new(), FsAppender::new(f))) + } + async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result { let from = self.root.join(from.trim_end_matches('/')); diff --git a/core/src/services/fs/mod.rs b/core/src/services/fs/mod.rs index aa2a5fca1f6..81e92090162 100644 --- a/core/src/services/fs/mod.rs +++ b/core/src/services/fs/mod.rs @@ -18,6 +18,7 @@ mod backend; pub use backend::FsBuilder as Fs; +mod appender; mod error; mod pager; mod writer;