Skip to content

Commit

Permalink
feat: fs add concurrent write
Browse files Browse the repository at this point in the history
  • Loading branch information
hoslo committed Jun 27, 2024
1 parent 0ebb425 commit 29de2d8
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 3 deletions.
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ services-dbfs = []
services-dropbox = []
services-etcd = ["dep:etcd-client", "dep:bb8"]
services-foundationdb = ["dep:foundationdb"]
services-fs = ["tokio/fs"]
services-fs = ["tokio/fs", "internal-tokio-rt"]
services-ftp = ["dep:suppaftp", "dep:bb8", "dep:async-tls"]
services-gcs = [
"dep:reqsign",
Expand Down
18 changes: 16 additions & 2 deletions core/src/services/fs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use super::core::*;
use super::lister::FsLister;
use super::reader::FsReader;
use super::writer::FsWriter;
use super::writer::FsWriters;
use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -169,7 +170,7 @@ pub struct FsBackend {

impl Access for FsBackend {
type Reader = FsReader<tokio::fs::File>;
type Writer = FsWriter<tokio::fs::File>;
type Writer = FsWriters;
type Lister = Option<FsLister<tokio::fs::ReadDir>>;
type BlockingReader = FsReader<std::fs::File>;
type BlockingWriter = FsWriter<std::fs::File>;
Expand Down Expand Up @@ -313,7 +314,20 @@ impl Access for FsBackend {
.await
.map_err(new_std_io_error)?;

Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f)))
let w = FsWriter::new(target_path, tmp_path, f);

let w = if op.append() {
FsWriters::One(w)
} else {
debug!("write with concurrent: {}", op.concurrent());
FsWriters::Two(oio::PositionWriter::new(
w,
op.executor().cloned(),
op.concurrent(),
))
};

Ok((RpWrite::default(), w))
}

async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
Expand Down
24 changes: 24 additions & 0 deletions core/src/services/fs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use std::io::Write;
use std::os::unix::fs::FileExt;
use std::path::PathBuf;

use bytes::Buf;
Expand All @@ -24,6 +25,9 @@ use tokio::io::AsyncWriteExt;
use crate::raw::*;
use crate::*;

pub type FsWriters =
TwoWays<FsWriter<tokio::fs::File>, oio::PositionWriter<FsWriter<tokio::fs::File>>>;

pub struct FsWriter<F> {
target_path: PathBuf,
tmp_path: Option<PathBuf>,
Expand Down Expand Up @@ -101,3 +105,23 @@ impl oio::BlockingWrite for FsWriter<std::fs::File> {
Ok(())
}
}

impl oio::PositionWrite for FsWriter<tokio::fs::File> {
async fn write_all_at(&self, offset: u64, buf: Buffer) -> Result<()> {
let f = self.f.as_ref().expect("FsWriter must be initialized");

let f = f
.try_clone()
.await
.map_err(new_std_io_error)?
.into_std()
.await;

tokio::task::spawn_blocking(move || {
f.write_all_at(buf.chunk(), offset)
.map_err(new_std_io_error)
})
.await
.map_err(new_task_join_error)?
}
}

0 comments on commit 29de2d8

Please sign in to comment.