-
Notifications
You must be signed in to change notification settings - Fork 512
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: fs add concurrent write #4817
Conversation
4047e39
to
29de2d8
Compare
@Xuanwo I don't understand why the behavior test is failing. Can you help me look at it? |
0c8a26a
to
acbc5e7
Compare
@@ -49,6 +49,12 @@ pub trait PositionWrite: Send + Sync + Unpin + 'static { | |||
offset: u64, | |||
buf: Buffer, | |||
) -> impl Future<Output = Result<()>> + MaybeSend; | |||
|
|||
/// close is used to close the underlying storage. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
close will close the file instead of the storage.
/// close is used to close the underlying storage. | ||
fn close(&self, offset: u64, buf: Buffer) -> impl Future<Output = Result<()>> + MaybeSend; | ||
|
||
/// abort is used to abort the underlying storage. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same.
@@ -148,13 +154,16 @@ impl<W: PositionWrite> oio::Write for PositionWriter<W> { | |||
|
|||
if let Some(buffer) = self.cache.clone() { | |||
let offset = self.next_offset; | |||
self.w.write_all_at(offset, buffer).await?; | |||
self.w.close(offset, buffer).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why close will carry a buffer?
core/src/services/fs/writer.rs
Outdated
|
||
tokio::task::spawn_blocking(move || { | ||
let mut offset = offset; | ||
let mut buf: &[u8] = &buf.to_bytes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to_bytes
will copy the entire buffer to bytes which is slow. Maybe we can use:
while !buf.is_empty() {
match f.seek_write(buf.chunk(), offset) {
Ok(n) => {
buf.advance(n);
offset += n as u64
}
Err(e) => return Err(e).map_err(new_std_io_error),
}
}
core/src/services/fs/writer.rs
Outdated
.await; | ||
|
||
tokio::task::spawn_blocking(move || { | ||
f.write_all_at(&buf.to_bytes(), offset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't use to_bytes
whenever possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what method should i use?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use buf.chunk()
and buf.advance()
. I think we can implement a write_at
under different target so we don't need to remove the write
impl.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like this?
00964ed
to
5ce79e3
Compare
@@ -149,12 +155,16 @@ impl<W: PositionWrite> oio::Write for PositionWriter<W> { | |||
if let Some(buffer) = self.cache.clone() { | |||
let offset = self.next_offset; | |||
self.w.write_all_at(offset, buffer).await?; | |||
self.w.as_ref().close().await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should call close
no matter there are cache or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
core/src/services/fs/writer.rs
Outdated
f.seek_write(buf, offset).map_err(new_std_io_error) | ||
} | ||
|
||
#[cfg(not(target_os = "windows"))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use cfg(unix)
and cfg(windows)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
|
||
#[cfg(target_os = "windows")] | ||
fn write_at(f: &File, buf: &[u8], offset: u64) -> Result<usize> { | ||
f.seek_write(buf, offset).map_err(new_std_io_error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use use
here to avoid leave them too away to find.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
5ce79e3
to
93d5a6d
Compare
@Xuanwo any else problem? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Part #4526