Skip to content

Commit

Permalink
feat: fs add concurrent write
Browse files Browse the repository at this point in the history
  • Loading branch information
hoslo committed Jun 27, 2024
1 parent 0ebb425 commit 4047e39
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 2 deletions.
18 changes: 16 additions & 2 deletions core/src/services/fs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use super::core::*;
use super::lister::FsLister;
use super::reader::FsReader;
use super::writer::FsWriter;
use super::writer::FsWriters;
use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -169,7 +170,7 @@ pub struct FsBackend {

impl Access for FsBackend {
type Reader = FsReader<tokio::fs::File>;
type Writer = FsWriter<tokio::fs::File>;
type Writer = FsWriters;
type Lister = Option<FsLister<tokio::fs::ReadDir>>;
type BlockingReader = FsReader<std::fs::File>;
type BlockingWriter = FsWriter<std::fs::File>;
Expand Down Expand Up @@ -313,7 +314,20 @@ impl Access for FsBackend {
.await
.map_err(new_std_io_error)?;

Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f)))
let w = FsWriter::new(target_path, tmp_path, f);

let w = if op.append() {
FsWriters::One(w)
} else {
debug!("write with concurrent: {}", op.concurrent());
FsWriters::Two(oio::PositionWriter::new(
w,
op.executor().cloned(),
op.concurrent(),
))
};

Ok((RpWrite::default(), w))
}

async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
Expand Down
24 changes: 24 additions & 0 deletions core/src/services/fs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use std::io::Write;
use std::os::unix::fs::FileExt;
use std::path::PathBuf;

use bytes::Buf;
Expand All @@ -24,6 +25,9 @@ use tokio::io::AsyncWriteExt;
use crate::raw::*;
use crate::*;

pub type FsWriters =
TwoWays<FsWriter<tokio::fs::File>, oio::PositionWriter<FsWriter<tokio::fs::File>>>;

pub struct FsWriter<F> {
target_path: PathBuf,
tmp_path: Option<PathBuf>,
Expand Down Expand Up @@ -101,3 +105,23 @@ impl oio::BlockingWrite for FsWriter<std::fs::File> {
Ok(())
}
}

impl oio::PositionWrite for FsWriter<tokio::fs::File> {
async fn write_all_at(&self, offset: u64, buf: Buffer) -> Result<()> {
let f = self.f.as_ref().expect("FsWriter must be initialized");

let f = f
.try_clone()
.await
.map_err(new_std_io_error)?
.into_std()
.await;

tokio::task::spawn_blocking(move || {
f.write_all_at(buf.chunk(), offset)
.map_err(new_std_io_error)
})
.await
.map_err(new_task_join_error)?
}
}

0 comments on commit 4047e39

Please sign in to comment.