diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs index 89f2c1ca6834..c0457e55fac6 100644 --- a/core/src/services/fs/backend.rs +++ b/core/src/services/fs/backend.rs @@ -28,6 +28,7 @@ use super::core::*; use super::lister::FsLister; use super::reader::FsReader; use super::writer::FsWriter; +use super::writer::FsWriters; use crate::raw::*; use crate::*; @@ -169,7 +170,7 @@ pub struct FsBackend { impl Access for FsBackend { type Reader = FsReader; - type Writer = FsWriter; + type Writer = FsWriters; type Lister = Option>; type BlockingReader = FsReader; type BlockingWriter = FsWriter; @@ -313,7 +314,20 @@ impl Access for FsBackend { .await .map_err(new_std_io_error)?; - Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f))) + let w = FsWriter::new(target_path, tmp_path, f); + + let w = if op.append() { + FsWriters::One(w) + } else { + debug!("write with concurrent: {}", op.concurrent()); + FsWriters::Two(oio::PositionWriter::new( + w, + op.executor().cloned(), + op.concurrent(), + )) + }; + + Ok((RpWrite::default(), w)) } async fn delete(&self, path: &str, _: OpDelete) -> Result { diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index c0d0688750e4..9e5259d2fc15 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -16,6 +16,7 @@ // under the License. use std::io::Write; +use std::os::unix::fs::FileExt; use std::path::PathBuf; use bytes::Buf; @@ -24,6 +25,9 @@ use tokio::io::AsyncWriteExt; use crate::raw::*; use crate::*; +pub type FsWriters = + TwoWays, oio::PositionWriter>>; + pub struct FsWriter { target_path: PathBuf, tmp_path: Option, @@ -101,3 +105,23 @@ impl oio::BlockingWrite for FsWriter { Ok(()) } } + +impl oio::PositionWrite for FsWriter { + async fn write_all_at(&self, offset: u64, buf: Buffer) -> Result<()> { + let f = self.f.as_ref().expect("FsWriter must be initialized"); + + let f = f + .try_clone() + .await + .map_err(new_std_io_error)? + .into_std() + .await; + + tokio::task::spawn_blocking(move || { + f.write_all_at(buf.chunk(), offset) + .map_err(new_std_io_error) + }) + .await + .map_err(new_task_join_error)? + } +}