Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue preventing reads after flush on a file handle #751

Merged
merged 10 commits into from
Feb 16, 2024
4 changes: 4 additions & 0 deletions mountpoint-s3/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## Unreleased

### Other changes
* Fix an issue where read file handles could be closed too early, leading to bad file descriptor errors on subsequent reads. As a consequence of this fix, opening an existing file to overwrite it **immediately** after closing a read file handle may occasionally fail with an "Operation not permitted" error. In such cases, Mountpoint logs will also report that the file is "not writable while being read". ([#751](https://github.com/awslabs/mountpoint-s3/pull/751))
* File handles are no longer initialized lazily. Lazy initialization was introduced in version 1.4.0 but is reverted in this change. If upgrading from 1.4.0, you may see errors that were previously deferred until read/write now raised at open time. ([#751](https://github.com/awslabs/mountpoint-s3/pull/751))

## v1.4.0 (January 26, 2024)

### New features
Expand Down
157 changes: 28 additions & 129 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,10 @@ where
Client: ObjectClient + Send + Sync + 'static,
Prefetcher: Prefetch,
{
/// A state where the file handle is created but the type is not yet determined
Created { lookup: LookedUp, flags: i32, pid: u32 },
/// The file handle has been assigned as a read handle
Read {
request: Prefetcher::PrefetchResult<Client>,
pid: u32,
},
Read(Prefetcher::PrefetchResult<Client>),
/// The file handle has been assigned as a write handle
Write(UploadState<Client>),
/// The file handle is already closed, currently only used to tell that the read is finished
Closed,
}

impl<Client, Prefetcher> std::fmt::Debug for FileHandleState<Client, Prefetcher>
Expand All @@ -86,15 +79,8 @@ where
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FileHandleState::Created { lookup, flags, pid } => f
.debug_struct("Created")
.field("lookup", lookup)
.field("flags", flags)
.field("pid", pid)
.finish(),
FileHandleState::Read { request: _, pid } => f.debug_struct("Read").field("pid", pid).finish(),
FileHandleState::Read(_) => f.debug_struct("Read").finish(),
FileHandleState::Write(arg0) => f.debug_tuple("Write").field(arg0).finish(),
FileHandleState::Closed => f.debug_struct("Closed").finish(),
}
}
}
Expand All @@ -104,22 +90,13 @@ where
Client: ObjectClient + Send + Sync,
Prefetcher: Prefetch,
{
async fn new(lookup: LookedUp, flags: i32, pid: u32) -> Self {
metrics::increment_gauge!("fs.current_handles", 1.0, "type" => "unassigned");
FileHandleState::Created { lookup, flags, pid }
}

async fn new_write_handle(
lookup: &LookedUp,
ino: InodeNo,
flags: i32,
pid: u32,
fs: &S3Filesystem<Client, Prefetcher>,
) -> Result<FileHandleState<Client, Prefetcher>, Error> {
if flags & libc::O_ACCMODE == libc::O_RDONLY {
return Err(err!(libc::EBADF, "file handle is not open for writes"));
}

let is_truncate = flags & libc::O_TRUNC != 0;
let handle = fs
.superblock
Expand All @@ -146,13 +123,8 @@ where

async fn new_read_handle(
lookup: &LookedUp,
flags: i32,
pid: u32,
fs: &S3Filesystem<Client, Prefetcher>,
) -> Result<FileHandleState<Client, Prefetcher>, Error> {
if flags & libc::O_WRONLY != 0 {
return Err(err!(libc::EBADF, "file handle is not open for reads",));
}
if !lookup.stat.is_readable {
return Err(err!(
libc::EACCES,
Expand All @@ -169,7 +141,7 @@ where
let request = fs
.prefetcher
.prefetch(fs.client.clone(), &fs.bucket, &full_key, object_size, etag.clone());
let handle = FileHandleState::Read { request, pid };
let handle = FileHandleState::Read(request);
metrics::increment_gauge!("fs.current_handles", 1.0, "type" => "read");
Ok(handle)
}
Expand Down Expand Up @@ -726,10 +698,29 @@ where
return Err(err!(libc::EINVAL, "O_SYNC and O_DSYNC are not supported"));
}

// All file handles will be lazy initialized on first read/write.
let state = FileHandleState::new(lookup, flags, pid).await.into();
let state = if flags & libc::O_RDWR != 0 {
dannycjones marked this conversation as resolved.
Show resolved Hide resolved
let is_truncate = flags & libc::O_TRUNC != 0;
if !remote_file || (self.config.allow_overwrite && is_truncate) {
dannycjones marked this conversation as resolved.
Show resolved Hide resolved
// If the file is new or opened in truncate mode, we know it must be a write handle.
debug!("fs:open choosing write handle for O_RDWR");
FileHandleState::new_write_handle(&lookup, lookup.inode.ino(), flags, pid, self).await?
} else {
// Otherwise, it must be a read handle.
debug!("fs:open choosing read handle for O_RDWR");
FileHandleState::new_read_handle(&lookup, self).await?
}
} else if flags & libc::O_WRONLY != 0 {
FileHandleState::new_write_handle(&lookup, lookup.inode.ino(), flags, pid, self).await?
} else {
FileHandleState::new_read_handle(&lookup, self).await?
vladem marked this conversation as resolved.
Show resolved Hide resolved
};

let fh = self.next_handle();
let handle = FileHandle { inode, full_key, state };
let handle = FileHandle {
inode,
full_key,
state: AsyncMutex::new(state),
};
debug!(fh, ino, "new file handle created");
self.file_handles.write().await.insert(fh, Arc::new(handle));

Expand Down Expand Up @@ -766,19 +757,8 @@ where
logging::record_name(handle.inode.name());
let mut state = handle.state.lock().await;
let request = match &mut *state {
FileHandleState::Created { lookup, flags, pid, .. } => {
metrics::decrement_gauge!("fs.current_handles", 1.0, "type" => "unassigned");

*state = FileHandleState::new_read_handle(lookup, *flags, *pid, self).await?;
if let FileHandleState::Read { request, .. } = &mut *state {
request
} else {
unreachable!("handle type always be assigned above");
}
}
FileHandleState::Read { request, .. } => request,
FileHandleState::Read(request) => request,
FileHandleState::Write(_) => return Err(err!(libc::EBADF, "file handle is not open for reads")),
FileHandleState::Closed => return Err(err!(libc::EBADF, "file handle is already closed")),
};

match request.read(offset as u64, size as usize).await {
Expand Down Expand Up @@ -870,18 +850,8 @@ where
let len = {
let mut state = handle.state.lock().await;
let request = match &mut *state {
FileHandleState::Created { lookup, flags, pid } => {
*state = FileHandleState::new_write_handle(lookup, ino, *flags, *pid, self).await?;
metrics::decrement_gauge!("fs.current_handles", 1.0, "type" => "unassigned");
if let FileHandleState::Write(request) = &mut *state {
request
} else {
unreachable!("handle type always be assigned above");
}
}
FileHandleState::Read { .. } => return Err(err!(libc::EBADF, "file handle is not open for writes")),
FileHandleState::Write(request) => request,
FileHandleState::Closed => return Err(err!(libc::EBADF, "file handle is already closed")),
};

request.write(offset, data, &handle.full_key).await?
Expand Down Expand Up @@ -1097,32 +1067,8 @@ where
logging::record_name(file_handle.inode.name());
let mut state = file_handle.state.lock().await;
let request = match &mut *state {
FileHandleState::Created { lookup, flags, pid } => {
// This happens when users call fsync without any read() or write() requests,
// since we don't know what type of handle it would be we need to consider what
// to do next for both cases.
// * if the file is new or opened in truncate mode, we know it must be a write
// handle so we can start an upload and complete it immediately, result in an
// empty file.
// * if the file already exists and it is not opened in truncate mode, we still
// can't be sure of its type so we will do nothing and just return ok.
let is_new_file = !lookup.inode.is_remote()?;
let is_truncate = *flags & libc::O_TRUNC != 0;
if is_new_file || is_truncate {
*state = FileHandleState::new_write_handle(lookup, lookup.inode.ino(), *flags, *pid, self).await?;
metrics::decrement_gauge!("fs.current_handles", 1.0, "type" => "unassigned");
if let FileHandleState::Write(request) = &mut *state {
request
} else {
unreachable!("handle type always be assigned above");
}
} else {
return Ok(());
}
}
FileHandleState::Read { .. } => return Ok(()),
FileHandleState::Write(request) => request,
FileHandleState::Closed => return Ok(()),
};
self.complete_upload(request, &file_handle.full_key, false, None).await
}
Expand All @@ -1141,10 +1087,6 @@ where
// process. In many cases, the child will then immediately close (flush) the duplicated
// file descriptors. We will not complete the upload if we can detect that the process
// invoking flush is different from the one that originally opened the file.
//
// The same for read path. We want to stop the prefetcher and decrease the reader count
// as soon as users close a file descriptor so that we don't block users from doing other
// operation like overwrite the file.
let file_handle = {
let file_handles = self.file_handles.read().await;
match file_handles.get(&fh) {
Expand All @@ -1155,30 +1097,11 @@ where
logging::record_name(file_handle.inode.name());
let mut state = file_handle.state.lock().await;
match &mut *state {
FileHandleState::Created { .. } => Ok(()),
FileHandleState::Read { pid: open_pid, .. } => {
passaro marked this conversation as resolved.
Show resolved Hide resolved
if !are_from_same_process(*open_pid, pid) {
trace!(
file_handle.full_key,
pid,
open_pid,
"not stopping prefetch because current pid differs from pid at open"
);
return Ok(());
}
// TODO make sure we cancel the inflight PrefetchingGetRequest. is just dropping enough?
file_handle.inode.finish_reading()?;
vladem marked this conversation as resolved.
Show resolved Hide resolved

// Mark the file handle state as closed so we only update the reader count once
*state = FileHandleState::Closed;
metrics::decrement_gauge!("fs.current_handles", 1.0, "type" => "read");
Ok(())
}
FileHandleState::Read { .. } => Ok(()),
FileHandleState::Write(request) => {
self.complete_upload(request, &file_handle.full_key, true, Some(pid))
.await
}
FileHandleState::Closed => Ok(()),
}
}

Expand Down Expand Up @@ -1210,38 +1133,14 @@ where
}
};

let mut state = file_handle.state.into_inner();
let request = match state {
FileHandleState::Created { lookup, flags, pid } => {
metrics::decrement_gauge!("fs.current_handles", 1.0, "type" => "unassigned");
// This happens when release is called before any read() or write(),
// since we don't know what type of handle it would be we need to consider
// what to do next for both cases.
// * if the file is new or opened in truncate mode, we know it must be a write
// handle so we can start an upload from here.
// * if the file already exists and it is not opened in truncate mode, we still
// can't be sure of its type so we will just drop it.
let is_new_file = !lookup.inode.is_remote()?;
let is_truncate = flags & libc::O_TRUNC != 0;
if is_new_file || is_truncate {
state = FileHandleState::new_write_handle(&lookup, lookup.inode.ino(), flags, pid, self).await?;
if let FileHandleState::Write(request) = state {
request
} else {
unreachable!("handle type always be assigned above");
}
} else {
return Ok(());
}
}
let request = match file_handle.state.into_inner() {
FileHandleState::Read { .. } => {
// TODO make sure we cancel the inflight PrefetchingGetRequest. is just dropping enough?
metrics::decrement_gauge!("fs.current_handles", 1.0, "type" => "read");
file_handle.inode.finish_reading()?;
return Ok(());
}
FileHandleState::Write(request) => request,
FileHandleState::Closed => return Ok(()),
};

let result = request.complete_if_in_progress(&file_handle.full_key).await;
Expand Down
48 changes: 9 additions & 39 deletions mountpoint-s3/tests/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,13 +611,8 @@ async fn test_sequential_write(write_size: usize) {
let file_ino = dentry.attr.ino;

// First let's check that we can't write it again
let fh = fs
.open(file_ino, libc::S_IFREG as i32 | libc::O_WRONLY, 0)
.await
.unwrap()
.fh;
let result = fs
.write(file_ino, fh, offset, &[0xaa; 27], 0, 0, None)
.open(file_ino, libc::S_IFREG as i32 | libc::O_WRONLY, 0)
.await
.expect_err("file should not be overwritable")
.to_errno();
Expand Down Expand Up @@ -700,22 +695,13 @@ async fn test_duplicate_write_fails() {
assert_eq!(dentry.attr.size, 0);
let file_ino = dentry.attr.ino;

let opened = fs
let _opened = fs
.open(file_ino, libc::S_IFREG as i32 | libc::O_WRONLY, 0)
.await
.unwrap();
_ = fs
vladem marked this conversation as resolved.
Show resolved Hide resolved
.write(file_ino, opened.fh, 0, &[0xaa; 27], 0, 0, None)
.await
.expect("first write should succeed");

let opened = fs
.open(file_ino, libc::S_IFREG as i32 | libc::O_WRONLY, 0)
.await
.expect("open should succeed");
// Should not be allowed to write the file a second time
let err = fs
.write(file_ino, opened.fh, 0, &[0xaa; 27], 0, 0, None)
.open(file_ino, libc::S_IFREG as i32 | libc::O_WRONLY, 0)
.await
.expect_err("should not be able to write twice")
.to_errno();
Expand Down Expand Up @@ -1248,20 +1234,12 @@ async fn test_flexible_retrieval_objects() {
let getattr = fs.getattr(entry.ino).await.unwrap();
assert_eq!(flexible_retrieval, getattr.attr.perm == 0);

let open = fs
.open(entry.ino, libc::O_RDONLY, 0)
.await
.expect("open should succeed");
let read_result = fs.read(entry.ino, open.fh, 0, 4096, 0, None).await;
let open = fs.open(entry.ino, libc::O_RDONLY, 0).await;
if flexible_retrieval {
let err = read_result.expect_err("can't read flexible retrieval objects");
let err = open.expect_err("can't open flexible retrieval objects");
assert_eq!(err.to_errno(), libc::EACCES);
} else {
assert_eq!(
&read_result.unwrap()[..],
b"hello world",
"instant retrieval files are readable"
);
let open = open.expect("instant retrieval files are readable");
fs.release(entry.ino, open.fh, 0, None, true).await.unwrap();
}
}
Expand All @@ -1287,20 +1265,12 @@ async fn test_flexible_retrieval_objects() {
let getattr = fs.getattr(lookup.attr.ino).await.unwrap();
assert_eq!(flexible_retrieval, getattr.attr.perm == 0);

let open = fs
.open(lookup.attr.ino, libc::O_RDONLY, 0)
.await
.expect("open should succeed");
let read_result = fs.read(lookup.attr.ino, open.fh, 0, 4096, 0, None).await;
let open = fs.open(lookup.attr.ino, libc::O_RDONLY, 0).await;
if flexible_retrieval {
let err = read_result.expect_err("can't read flexible retrieval objects");
let err = open.expect_err("can't open flexible retrieval objects");
assert_eq!(err.to_errno(), libc::EACCES);
} else {
assert_eq!(
&read_result.unwrap()[..],
b"hello world",
"instant retrieval files are readable"
);
let open = open.expect("instant retrieval files are readable");
fs.release(lookup.attr.ino, open.fh, 0, None, true).await.unwrap();
}
}
Expand Down
Loading
Loading