From 36ee07a4add01fb2c67293af9688b06cb25d1164 Mon Sep 17 00:00:00 2001 From: Andres Santana Date: Sat, 23 Mar 2024 11:52:31 +0000 Subject: [PATCH 1/8] Adding support for `rewinddir` by restarting readdir if offset is zero. Signed-off-by: Andres Santana --- mountpoint-s3/src/fs.rs | 9 ++ mountpoint-s3/src/inode/readdir.rs | 35 ++++++ mountpoint-s3/tests/fs.rs | 165 +++++++++++++++++++++++------ 3 files changed, 174 insertions(+), 35 deletions(-) diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index 4969b65a6..eec4a86cd 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -992,12 +992,20 @@ where .ok_or_else(|| err!(libc::EBADF, "invalid directory handle"))? }; + // special case where we need to rewind and restart the streaming + if offset == 0 && dir_handle.offset() != 0 { + // only do this if this is not the first call with offset 0 + dir_handle.offset.store(0, Ordering::SeqCst); + dir_handle.handle.rewind().await; + } + if offset != dir_handle.offset() { // POSIX allows seeking an open directory. That's a pain for us since we are streaming // the directory entries and don't want to keep them all in memory. But one common case // we've seen (https://github.com/awslabs/mountpoint-s3/issues/477) is applications that // request offset 0 twice in a row. So we remember the last response and, if repeated, // we return it again. + let last_response = dir_handle.last_response.lock().await; if let Some((last_offset, entries)) = last_response.as_ref() { if offset == *last_offset { @@ -1016,6 +1024,7 @@ where return Ok(reply); } } + return Err(err!( libc::EINVAL, "out-of-order readdir, expected={}, actual={}", diff --git a/mountpoint-s3/src/inode/readdir.rs b/mountpoint-s3/src/inode/readdir.rs index 8f1e5db39..912bb6f7a 100644 --- a/mountpoint-s3/src/inode/readdir.rs +++ b/mountpoint-s3/src/inode/readdir.rs @@ -186,6 +186,11 @@ impl ReaddirHandle { self.inner.update_from_remote(self.dir_ino, entry.name(), remote_lookup) } + pub async fn rewind(&self) { + self.readded.lock().unwrap().take(); + self.iter.lock().await.rewind(); + } + #[cfg(test)] pub(super) async fn collect(&self, client: &OC) -> Result, InodeError> { let mut result = vec![]; @@ -290,6 +295,13 @@ impl ReaddirIter { Self::Unordered(unordered::ReaddirIter::new(bucket, full_path, page_size, local_entries)) } + fn rewind(&mut self) { + match self { + Self::Ordered(iter) => iter.rewind(), + Self::Unordered(iter) => iter.rewind() + } + } + async fn next(&mut self, client: &impl ObjectClient) -> Result, InodeError> { match self { Self::Ordered(iter) => iter.next(client).await, @@ -391,6 +403,11 @@ impl RemoteIter { Ok(self.entries.pop_front()) } + + fn rewind(&mut self) { + self.state = RemoteIterState::InProgress(None); + self.entries.clear(); + } } /// Iterator implementation for S3 implementations that provide lexicographically ordered LIST. @@ -425,6 +442,14 @@ mod ordered { } } + pub fn rewind(&mut self) { + self.next_remote = None; + self.next_local = None; + self.last_entry = None; + self.remote.rewind(); + self.local.rewind(); + } + /// Return the next [ReaddirEntry] for the directory stream. If the stream is finished, returns /// `Ok(None)`. pub(super) async fn next(&mut self, client: &impl ObjectClient) -> Result, InodeError> { @@ -491,6 +516,10 @@ mod ordered { fn next(&mut self) -> Option { self.entries.pop_front() } + + fn rewind(&mut self) { + self.entries.clear(); + } } } @@ -549,5 +578,11 @@ mod unordered { Ok(self.local_iter.pop_front()) } + + pub fn rewind(&mut self) { + self.remote.rewind(); + self.local.clear(); + self.local_iter.clear(); + } } } diff --git a/mountpoint-s3/tests/fs.rs b/mountpoint-s3/tests/fs.rs index d2f1f5407..315e05fd8 100644 --- a/mountpoint-s3/tests/fs.rs +++ b/mountpoint-s3/tests/fs.rs @@ -2,7 +2,7 @@ use fuser::FileType; use libc::S_IFREG; -use mountpoint_s3::fs::{CacheConfig, ToErrno, FUSE_ROOT_INODE}; +use mountpoint_s3::fs::{CacheConfig, S3Personality, ToErrno, FUSE_ROOT_INODE}; use mountpoint_s3::prefix::Prefix; use mountpoint_s3::S3FilesystemConfig; use mountpoint_s3_client::failure_client::countdown_failure_client; @@ -21,7 +21,7 @@ use std::time::{Duration, SystemTime}; use test_case::test_case; mod common; -use common::{assert_attr, make_test_filesystem, make_test_filesystem_with_client, DirectoryReply}; +use common::{assert_attr, make_test_filesystem, make_test_filesystem_with_client, DirectoryReply, TestS3Filesystem}; #[test_case(""; "unprefixed")] #[test_case("test_prefix/"; "prefixed")] @@ -1276,7 +1276,7 @@ async fn test_flexible_retrieval_objects() { } #[tokio::test] -async fn test_readdir_rewind() { +async fn test_readdir_rewind_ordered() { let (client, fs) = make_test_filesystem("test_readdir_rewind", &Default::default(), Default::default()); for i in 0..10 { @@ -1304,45 +1304,34 @@ async fn test_readdir_rewind() { .expect_err("out of order"); // Requesting the same buffer size should work fine - let mut new_reply = DirectoryReply::new(5); - let _ = fs - .readdirplus(FUSE_ROOT_INODE, dir_handle, 0, &mut new_reply) - .await - .unwrap(); - let new_entries = new_reply - .entries - .iter() - .map(|e| (e.ino, e.name.clone())) - .collect::>(); + let new_entries = ls(&fs, dir_handle, 0, 5).await; assert_eq!(entries, new_entries); // Requesting a smaller buffer works fine and returns a prefix - let mut new_reply = DirectoryReply::new(3); - let _ = fs - .readdirplus(FUSE_ROOT_INODE, dir_handle, 0, &mut new_reply) - .await - .unwrap(); - let new_entries = new_reply - .entries - .iter() - .map(|e| (e.ino, e.name.clone())) - .collect::>(); + let new_entries = ls(&fs, dir_handle, 0, 3).await; assert_eq!(&entries[..3], new_entries); - // Requesting a larger buffer works fine, but only partially fills (which is allowed) - let mut new_reply = DirectoryReply::new(10); - let _ = fs - .readdirplus(FUSE_ROOT_INODE, dir_handle, 0, &mut new_reply) - .await - .unwrap(); - let new_entries = new_reply - .entries - .iter() - .map(|e| (e.ino, e.name.clone())) - .collect::>(); - assert_eq!(entries, new_entries); + // Requesting same offset (non zero) works fine by returning last response + let _ = ls(&fs, dir_handle, 0, 5).await; + let new_entries = ls(&fs, dir_handle, 5, 5).await; + let new_entries_repeat = ls(&fs, dir_handle, 5, 5).await; + assert_eq!(new_entries, new_entries_repeat); + + // Request all entries + let new_entries = ls(&fs, dir_handle, 0, 20).await; + assert_eq!(new_entries.len(), 12); // 10 files + 2 dirs (. and ..) = 12 entries + + // Request more entries but there is no more + let new_entries = ls(&fs, dir_handle, 12, 20).await; + assert_eq!(new_entries.len(), 0); + + // Request everything from zero one more time + let new_entries = ls(&fs, dir_handle, 0, 20).await; + assert_eq!(new_entries.len(), 12); // 10 files + 2 dirs (. and ..) = 12 entries // And we can resume the stream from the end of the first request + // but let's rewind first + let _ = ls(&fs, dir_handle, 0, 5).await; let mut next_page = DirectoryReply::new(0); let _ = fs .readdirplus( @@ -1353,6 +1342,7 @@ async fn test_readdir_rewind() { ) .await .unwrap(); + assert_eq!(next_page.entries.len(), 7); // 10 directory entries + . + .. = 12, minus the 5 we already saw assert_eq!(next_page.entries.front().unwrap().name, "foo3"); @@ -1365,3 +1355,108 @@ async fn test_readdir_rewind() { } } } + +#[tokio::test] +async fn test_readdir_rewind_unordered() { + let config = S3FilesystemConfig { + s3_personality: S3Personality::ExpressOneZone, + ..Default::default() + }; + let (client, fs) = make_test_filesystem("test_readdir_rewind", &Default::default(), config); + + for i in 0..10 { + client.add_object(&format!("foo{i}"), b"foo".into()); + } + + let dir_handle = fs.opendir(FUSE_ROOT_INODE, 0).await.unwrap().fh; + + // Requesting same offset (non zero) works fine by returning last response + let _ = ls(&fs, dir_handle, 0, 5).await; + let new_entries = ls(&fs, dir_handle, 5, 5).await; + let new_entries_repeat = ls(&fs, dir_handle, 5, 5).await; + assert_eq!(new_entries, new_entries_repeat); + + // Request all entries + let new_entries = ls(&fs, dir_handle, 0, 20).await; + assert_eq!(new_entries.len(), 12); // 10 files + 2 dirs (. and ..) = 12 entries + + // Request more entries but there is no more + let new_entries = ls(&fs, dir_handle, 12, 20).await; + assert_eq!(new_entries.len(), 0); + + // Request everything from zero one more time + let new_entries = ls(&fs, dir_handle, 0, 20).await; + assert_eq!(new_entries.len(), 12); // 10 files + 2 dirs (. and ..) = 12 entries +} + +#[test_case(Default::default())] +#[test_case(S3FilesystemConfig {s3_personality: S3Personality::ExpressOneZone, ..Default::default()})] +#[tokio::test] +async fn test_readdir_rewind_with_new_files(s3_fs_config: S3FilesystemConfig) { + let (client, fs) = make_test_filesystem("test_readdir_rewind", &Default::default(), s3_fs_config); + + for i in 0..10 { + client.add_object(&format!("foo{i}"), b"foo".into()); + } + + let dir_handle = fs.opendir(FUSE_ROOT_INODE, 0).await.unwrap().fh; + + // Let's add a new local file + let mode = libc::S_IFREG | libc::S_IRWXU; // regular file + 0700 permissions + let dentry = fs + .mknod(FUSE_ROOT_INODE, "newfile.bin".as_ref(), mode, 0, 0) + .await + .unwrap(); + assert_eq!(dentry.attr.size, 0); + let file_ino = dentry.attr.ino; + + let fh = fs + .open(file_ino, libc::S_IFREG as i32 | libc::O_WRONLY, 0) + .await + .unwrap() + .fh; + + let slice = &[0xaa; 27]; + let written = fs.write(file_ino, fh, 0, slice, 0, 0, None).await.unwrap(); + assert_eq!(written as usize, slice.len()); + fs.fsync(file_ino, fh, true).await.unwrap(); + + // Let's add a new remote file + client.add_object(&format!("foo10"), b"foo".into()); + + // Requesting same offset (non zero) works fine by returning last response + let _ = ls(&fs, dir_handle, 0, 5).await; + let new_entries = ls(&fs, dir_handle, 5, 5).await; + let new_entries_repeat = ls(&fs, dir_handle, 5, 5).await; + assert_eq!(new_entries, new_entries_repeat); + + // Request all entries + let new_entries = ls(&fs, dir_handle, 0, 20).await; + assert_eq!(new_entries.len(), 14); // 10 original remote files + 1 new local file + 1 new remote file + 2 dirs (. and ..) = 13 entries + + // Request more entries but there is no more + let new_entries = ls(&fs, dir_handle, 14, 20).await; + assert_eq!(new_entries.len(), 0); + + // Request everything from zero one more time + let new_entries = ls(&fs, dir_handle, 0, 20).await; + assert_eq!(new_entries.len(), 14); // 10 original remote files + 1 new local file + 1 new remote file + 2 dirs (. and ..) = 13 entries +} + +async fn ls( + fs: &TestS3Filesystem>, + dir_handle: u64, + offset: i64, + max_entries: usize, +) -> Vec<(u64, OsString)> { + let mut reply = DirectoryReply::new(max_entries); + let _ = fs + .readdirplus(FUSE_ROOT_INODE, dir_handle, offset, &mut reply) + .await + .unwrap(); + reply + .entries + .iter() + .map(|e| (e.ino, e.name.clone())) + .collect::>() +} From eafb4f4f545986a8fadf87c0fde7b307109a3b14 Mon Sep 17 00:00:00 2001 From: Andres Santana Date: Sat, 23 Mar 2024 15:43:15 +0000 Subject: [PATCH 2/8] Adding mention to rewinddir to semantics doc. Signed-off-by: Andres Santana --- doc/SEMANTICS.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/SEMANTICS.md b/doc/SEMANTICS.md index 37878008c..3c21464e7 100644 --- a/doc/SEMANTICS.md +++ b/doc/SEMANTICS.md @@ -203,7 +203,7 @@ the following behavior: ### Directory operations -Basic read-only directory operations (`opendir`, `readdir`, `closedir`) are supported. However, seeking (`lseek`) on directory handles is not supported. +Basic read-only directory operations (`opendir`, `readdir`, `closedir`, `rewinddir`) are supported. However, seeking (`lseek`) on directory handles is not supported. Sorting order of `readdir` results: * For general purpose buckets, `readdir` returns results in lexicographical order. From ab2d4131053a25726af2a1552d6c1aef15296b01 Mon Sep 17 00:00:00 2001 From: Andres Santana Date: Mon, 25 Mar 2024 18:18:10 +0000 Subject: [PATCH 3/8] Replace rewind method with a new ReaddirHandle. Signed-off-by: Andres Santana --- mountpoint-s3/src/fs.rs | 19 ++++++++-------- mountpoint-s3/src/inode/readdir.rs | 35 ------------------------------ 2 files changed, 10 insertions(+), 44 deletions(-) diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index eec4a86cd..d285f8def 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -37,7 +37,7 @@ pub const FUSE_ROOT_INODE: InodeNo = 1u64; struct DirHandle { #[allow(unused)] ino: InodeNo, - handle: ReaddirHandle, + handle: AsyncMutex, offset: AtomicI64, last_response: AsyncMutex)>>, } @@ -943,7 +943,7 @@ where let fh = self.next_handle(); let handle = DirHandle { ino: parent, - handle: inode_handle, + handle: AsyncMutex::new(inode_handle), offset: AtomicI64::new(0), last_response: AsyncMutex::new(None), }; @@ -996,7 +996,8 @@ where if offset == 0 && dir_handle.offset() != 0 { // only do this if this is not the first call with offset 0 dir_handle.offset.store(0, Ordering::SeqCst); - dir_handle.handle.rewind().await; + let new_handle = self.superblock.readdir(&self.client, parent, 1000).await?; + *dir_handle.handle.lock().await = new_handle; } if offset != dir_handle.offset() { @@ -1018,7 +1019,7 @@ where // must remember it again, except that readdirplus specifies that . and .. // are never incremented. if is_readdirplus && entry.name != "." && entry.name != ".." { - dir_handle.handle.remember(&entry.lookup); + dir_handle.handle.lock().await.remember(&entry.lookup); } } return Ok(reply); @@ -1079,11 +1080,11 @@ where if dir_handle.offset() < 2 { let lookup = self .superblock - .getattr(&self.client, dir_handle.handle.parent(), false) + .getattr(&self.client, dir_handle.handle.lock().await.parent(), false) .await?; let attr = self.make_attr(&lookup); let entry = DirectoryEntry { - ino: dir_handle.handle.parent(), + ino: dir_handle.handle.lock().await.parent(), offset: dir_handle.offset() + 1, name: "..".into(), attr, @@ -1098,7 +1099,7 @@ where } loop { - let next = match dir_handle.handle.next(&self.client).await? { + let next = match dir_handle.handle.lock().await.next(&self.client).await? { None => return Ok(reply.finish(offset, &dir_handle).await), Some(next) => next, }; @@ -1115,11 +1116,11 @@ where }; if reply.add(entry) { - dir_handle.handle.readd(next); + dir_handle.handle.lock().await.readd(next); return Ok(reply.finish(offset, &dir_handle).await); } if is_readdirplus { - dir_handle.handle.remember(&next); + dir_handle.handle.lock().await.remember(&next); } dir_handle.next_offset(); } diff --git a/mountpoint-s3/src/inode/readdir.rs b/mountpoint-s3/src/inode/readdir.rs index 912bb6f7a..8f1e5db39 100644 --- a/mountpoint-s3/src/inode/readdir.rs +++ b/mountpoint-s3/src/inode/readdir.rs @@ -186,11 +186,6 @@ impl ReaddirHandle { self.inner.update_from_remote(self.dir_ino, entry.name(), remote_lookup) } - pub async fn rewind(&self) { - self.readded.lock().unwrap().take(); - self.iter.lock().await.rewind(); - } - #[cfg(test)] pub(super) async fn collect(&self, client: &OC) -> Result, InodeError> { let mut result = vec![]; @@ -295,13 +290,6 @@ impl ReaddirIter { Self::Unordered(unordered::ReaddirIter::new(bucket, full_path, page_size, local_entries)) } - fn rewind(&mut self) { - match self { - Self::Ordered(iter) => iter.rewind(), - Self::Unordered(iter) => iter.rewind() - } - } - async fn next(&mut self, client: &impl ObjectClient) -> Result, InodeError> { match self { Self::Ordered(iter) => iter.next(client).await, @@ -403,11 +391,6 @@ impl RemoteIter { Ok(self.entries.pop_front()) } - - fn rewind(&mut self) { - self.state = RemoteIterState::InProgress(None); - self.entries.clear(); - } } /// Iterator implementation for S3 implementations that provide lexicographically ordered LIST. @@ -442,14 +425,6 @@ mod ordered { } } - pub fn rewind(&mut self) { - self.next_remote = None; - self.next_local = None; - self.last_entry = None; - self.remote.rewind(); - self.local.rewind(); - } - /// Return the next [ReaddirEntry] for the directory stream. If the stream is finished, returns /// `Ok(None)`. pub(super) async fn next(&mut self, client: &impl ObjectClient) -> Result, InodeError> { @@ -516,10 +491,6 @@ mod ordered { fn next(&mut self) -> Option { self.entries.pop_front() } - - fn rewind(&mut self) { - self.entries.clear(); - } } } @@ -578,11 +549,5 @@ mod unordered { Ok(self.local_iter.pop_front()) } - - pub fn rewind(&mut self) { - self.remote.rewind(); - self.local.clear(); - self.local_iter.clear(); - } } } From b9495dd6a78d5e808dac53780a8535ac3c3747de Mon Sep 17 00:00:00 2001 From: Andres Santana Date: Mon, 25 Mar 2024 18:27:53 +0000 Subject: [PATCH 4/8] Adding a rewind_offset fn. Signed-off-by: Andres Santana --- mountpoint-s3/src/fs.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index d285f8def..db41c08a7 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -50,6 +50,10 @@ impl DirHandle { fn next_offset(&self) { self.offset.fetch_add(1, Ordering::SeqCst); } + + fn rewind_offset(&self) { + self.offset.store(0, Ordering::SeqCst); + } } #[derive(Debug)] @@ -995,7 +999,7 @@ where // special case where we need to rewind and restart the streaming if offset == 0 && dir_handle.offset() != 0 { // only do this if this is not the first call with offset 0 - dir_handle.offset.store(0, Ordering::SeqCst); + dir_handle.rewind_offset(); let new_handle = self.superblock.readdir(&self.client, parent, 1000).await?; *dir_handle.handle.lock().await = new_handle; } From 1aec473f948874621fe66780ea019827f2e17ebd Mon Sep 17 00:00:00 2001 From: Andres Santana Date: Tue, 26 Mar 2024 06:59:58 +0000 Subject: [PATCH 5/8] Adding more tests. Signed-off-by: Andres Santana --- mountpoint-s3/tests/fs.rs | 76 +++++++++++++++++++++++++++++---------- 1 file changed, 58 insertions(+), 18 deletions(-) diff --git a/mountpoint-s3/tests/fs.rs b/mountpoint-s3/tests/fs.rs index 315e05fd8..1a8902dd9 100644 --- a/mountpoint-s3/tests/fs.rs +++ b/mountpoint-s3/tests/fs.rs @@ -1,5 +1,6 @@ //! Manually implemented tests executing the FUSE protocol against [S3Filesystem] +use assert_cmd::assert; use fuser::FileType; use libc::S_IFREG; use mountpoint_s3::fs::{CacheConfig, S3Personality, ToErrno, FUSE_ROOT_INODE}; @@ -1402,24 +1403,8 @@ async fn test_readdir_rewind_with_new_files(s3_fs_config: S3FilesystemConfig) { let dir_handle = fs.opendir(FUSE_ROOT_INODE, 0).await.unwrap().fh; // Let's add a new local file - let mode = libc::S_IFREG | libc::S_IRWXU; // regular file + 0700 permissions - let dentry = fs - .mknod(FUSE_ROOT_INODE, "newfile.bin".as_ref(), mode, 0, 0) - .await - .unwrap(); - assert_eq!(dentry.attr.size, 0); - let file_ino = dentry.attr.ino; - - let fh = fs - .open(file_ino, libc::S_IFREG as i32 | libc::O_WRONLY, 0) - .await - .unwrap() - .fh; - - let slice = &[0xaa; 27]; - let written = fs.write(file_ino, fh, 0, slice, 0, 0, None).await.unwrap(); - assert_eq!(written as usize, slice.len()); - fs.fsync(file_ino, fh, true).await.unwrap(); + let file_name = "newfile.bin"; + new_local_file(&fs, &file_name).await; // Let's add a new remote file client.add_object(&format!("foo10"), b"foo".into()); @@ -1434,6 +1419,11 @@ async fn test_readdir_rewind_with_new_files(s3_fs_config: S3FilesystemConfig) { let new_entries = ls(&fs, dir_handle, 0, 20).await; assert_eq!(new_entries.len(), 14); // 10 original remote files + 1 new local file + 1 new remote file + 2 dirs (. and ..) = 13 entries + // assert entries contain new local file + assert!(new_entries + .iter() + .any(|(_, name)| name.as_os_str().to_str().unwrap() == file_name)); + // Request more entries but there is no more let new_entries = ls(&fs, dir_handle, 14, 20).await; assert_eq!(new_entries.len(), 0); @@ -1443,6 +1433,56 @@ async fn test_readdir_rewind_with_new_files(s3_fs_config: S3FilesystemConfig) { assert_eq!(new_entries.len(), 14); // 10 original remote files + 1 new local file + 1 new remote file + 2 dirs (. and ..) = 13 entries } +#[tokio::test] +async fn test_readdir_rewind_with_local_files_only() { + let (_, fs) = make_test_filesystem("test_readdir_rewind", &Default::default(), Default::default()); + + let dir_handle = fs.opendir(FUSE_ROOT_INODE, 0).await.unwrap().fh; + + // Let's add a new local file + let file_name = "newfile.bin"; + new_local_file(&fs, &file_name).await; + + // Requesting same offset (non zero) works fine by returning last response + let _ = ls(&fs, dir_handle, 0, 5).await; + let new_entries = ls(&fs, dir_handle, 3, 5).await; + let new_entries_repeat = ls(&fs, dir_handle, 3, 5).await; + assert_eq!(new_entries, new_entries_repeat); + + // Request all entries + let new_entries = ls(&fs, dir_handle, 0, 20).await; + assert_eq!(new_entries.len(), 3); // 1 new local file + 2 dirs (. and ..) = 3 entries + + // assert entries contain new local file + assert!(new_entries.iter().any(|(_, name)| name == file_name)); + + // Request more entries but there is no more + let new_entries = ls(&fs, dir_handle, 3, 20).await; + assert_eq!(new_entries.len(), 0); + + // Request everything from zero one more time + let new_entries = ls(&fs, dir_handle, 0, 20).await; + assert_eq!(new_entries.len(), 3); // 1 new local file + 2 dirs (. and ..) = 3 entries +} + +async fn new_local_file(fs: &TestS3Filesystem>, filename: &str) { + let mode = libc::S_IFREG | libc::S_IRWXU; // regular file + 0700 permissions + let dentry = fs.mknod(FUSE_ROOT_INODE, filename.as_ref(), mode, 0, 0).await.unwrap(); + assert_eq!(dentry.attr.size, 0); + let file_ino = dentry.attr.ino; + + let fh = fs + .open(file_ino, libc::S_IFREG as i32 | libc::O_WRONLY, 0) + .await + .unwrap() + .fh; + + let slice = &[0xaa; 27]; + let written = fs.write(file_ino, fh, 0, slice, 0, 0, None).await.unwrap(); + assert_eq!(written as usize, slice.len()); + fs.fsync(file_ino, fh, true).await.unwrap(); +} + async fn ls( fs: &TestS3Filesystem>, dir_handle: u64, From e1b6eae2165fb8fcf616be987e01a1229dd2968b Mon Sep 17 00:00:00 2001 From: Andres Santana Date: Tue, 26 Mar 2024 07:10:56 +0000 Subject: [PATCH 6/8] Creating a fn for creating a default handle. Signed-off-by: Andres Santana --- mountpoint-s3/src/fs.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index db41c08a7..a03b787ee 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -939,10 +939,14 @@ where Ok(len) } + async fn default_handle(&self, parent: InodeNo) -> Result { + self.superblock.readdir(&self.client, parent, 1000).await + } + pub async fn opendir(&self, parent: InodeNo, _flags: i32) -> Result { trace!("fs:opendir with parent {:?} flags {:#b}", parent, _flags); - let inode_handle = self.superblock.readdir(&self.client, parent, 1000).await?; + let inode_handle = self.default_handle(parent).await?; let fh = self.next_handle(); let handle = DirHandle { @@ -1000,7 +1004,7 @@ where if offset == 0 && dir_handle.offset() != 0 { // only do this if this is not the first call with offset 0 dir_handle.rewind_offset(); - let new_handle = self.superblock.readdir(&self.client, parent, 1000).await?; + let new_handle = self.default_handle( parent).await?; *dir_handle.handle.lock().await = new_handle; } From c96c06b1ad6741c8a14179b8c529a13330068c60 Mon Sep 17 00:00:00 2001 From: Andres Santana Date: Tue, 26 Mar 2024 07:19:12 +0000 Subject: [PATCH 7/8] Fix clippy and format. Signed-off-by: Andres Santana --- mountpoint-s3/src/fs.rs | 2 +- mountpoint-s3/tests/fs.rs | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index a03b787ee..2652cd313 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -1004,7 +1004,7 @@ where if offset == 0 && dir_handle.offset() != 0 { // only do this if this is not the first call with offset 0 dir_handle.rewind_offset(); - let new_handle = self.default_handle( parent).await?; + let new_handle = self.default_handle(parent).await?; *dir_handle.handle.lock().await = new_handle; } diff --git a/mountpoint-s3/tests/fs.rs b/mountpoint-s3/tests/fs.rs index 1a8902dd9..2f4de4d49 100644 --- a/mountpoint-s3/tests/fs.rs +++ b/mountpoint-s3/tests/fs.rs @@ -1,6 +1,5 @@ //! Manually implemented tests executing the FUSE protocol against [S3Filesystem] -use assert_cmd::assert; use fuser::FileType; use libc::S_IFREG; use mountpoint_s3::fs::{CacheConfig, S3Personality, ToErrno, FUSE_ROOT_INODE}; @@ -1404,10 +1403,10 @@ async fn test_readdir_rewind_with_new_files(s3_fs_config: S3FilesystemConfig) { // Let's add a new local file let file_name = "newfile.bin"; - new_local_file(&fs, &file_name).await; + new_local_file(&fs, file_name).await; // Let's add a new remote file - client.add_object(&format!("foo10"), b"foo".into()); + client.add_object("foo10", b"foo".into()); // Requesting same offset (non zero) works fine by returning last response let _ = ls(&fs, dir_handle, 0, 5).await; @@ -1441,7 +1440,7 @@ async fn test_readdir_rewind_with_local_files_only() { // Let's add a new local file let file_name = "newfile.bin"; - new_local_file(&fs, &file_name).await; + new_local_file(&fs, file_name).await; // Requesting same offset (non zero) works fine by returning last response let _ = ls(&fs, dir_handle, 0, 5).await; From e8ae6218b7dc40796830208a52c9a5a2e14cba89 Mon Sep 17 00:00:00 2001 From: Andres Santana Date: Thu, 28 Mar 2024 12:48:49 +0000 Subject: [PATCH 8/8] Rename to readdir_handle. Move to single lock. Signed-off-by: Andres Santana --- mountpoint-s3/src/fs.rs | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index 2652cd313..8bd795c68 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -939,14 +939,15 @@ where Ok(len) } - async fn default_handle(&self, parent: InodeNo) -> Result { + /// Creates a new ReaddirHandle for the provided parent and default page size + async fn readdir_handle(&self, parent: InodeNo) -> Result { self.superblock.readdir(&self.client, parent, 1000).await } pub async fn opendir(&self, parent: InodeNo, _flags: i32) -> Result { trace!("fs:opendir with parent {:?} flags {:#b}", parent, _flags); - let inode_handle = self.default_handle(parent).await?; + let inode_handle = self.readdir_handle(parent).await?; let fh = self.next_handle(); let handle = DirHandle { @@ -1000,14 +1001,15 @@ where .ok_or_else(|| err!(libc::EBADF, "invalid directory handle"))? }; - // special case where we need to rewind and restart the streaming + // special case where we need to rewind and restart the streaming but only when it is not the first time we see offset 0 if offset == 0 && dir_handle.offset() != 0 { - // only do this if this is not the first call with offset 0 - dir_handle.rewind_offset(); - let new_handle = self.default_handle(parent).await?; + let new_handle = self.readdir_handle(parent).await?; *dir_handle.handle.lock().await = new_handle; + dir_handle.rewind_offset(); } + let readdir_handle = dir_handle.handle.lock().await; + if offset != dir_handle.offset() { // POSIX allows seeking an open directory. That's a pain for us since we are streaming // the directory entries and don't want to keep them all in memory. But one common case @@ -1027,7 +1029,7 @@ where // must remember it again, except that readdirplus specifies that . and .. // are never incremented. if is_readdirplus && entry.name != "." && entry.name != ".." { - dir_handle.handle.lock().await.remember(&entry.lookup); + readdir_handle.remember(&entry.lookup); } } return Ok(reply); @@ -1088,11 +1090,11 @@ where if dir_handle.offset() < 2 { let lookup = self .superblock - .getattr(&self.client, dir_handle.handle.lock().await.parent(), false) + .getattr(&self.client, readdir_handle.parent(), false) .await?; let attr = self.make_attr(&lookup); let entry = DirectoryEntry { - ino: dir_handle.handle.lock().await.parent(), + ino: readdir_handle.parent(), offset: dir_handle.offset() + 1, name: "..".into(), attr, @@ -1107,7 +1109,7 @@ where } loop { - let next = match dir_handle.handle.lock().await.next(&self.client).await? { + let next = match readdir_handle.next(&self.client).await? { None => return Ok(reply.finish(offset, &dir_handle).await), Some(next) => next, }; @@ -1124,11 +1126,11 @@ where }; if reply.add(entry) { - dir_handle.handle.lock().await.readd(next); + readdir_handle.readd(next); return Ok(reply.finish(offset, &dir_handle).await); } if is_readdirplus { - dir_handle.handle.lock().await.remember(&next); + readdir_handle.remember(&next); } dir_handle.next_offset(); }