Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for rewinddir by restarting readdir if offset is zero. #825

Merged
merged 8 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/SEMANTICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ the following behavior:

### Directory operations

Basic read-only directory operations (`opendir`, `readdir`, `closedir`) are supported. However, seeking (`lseek`) on directory handles is not supported.
Basic read-only directory operations (`opendir`, `readdir`, `closedir`, `rewinddir`) are supported. However, seeking (`lseek`) on directory handles is not supported.

Sorting order of `readdir` results:
* For general purpose buckets, `readdir` returns results in lexicographical order.
Expand Down
38 changes: 29 additions & 9 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub const FUSE_ROOT_INODE: InodeNo = 1u64;
struct DirHandle {
#[allow(unused)]
ino: InodeNo,
handle: ReaddirHandle,
handle: AsyncMutex<ReaddirHandle>,
offset: AtomicI64,
last_response: AsyncMutex<Option<(i64, Vec<DirectoryEntry>)>>,
}
Expand All @@ -50,6 +50,10 @@ impl DirHandle {
fn next_offset(&self) {
self.offset.fetch_add(1, Ordering::SeqCst);
}

fn rewind_offset(&self) {
self.offset.store(0, Ordering::SeqCst);
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -935,15 +939,20 @@ where
Ok(len)
}

/// Creates a new ReaddirHandle for the provided parent and default page size
async fn readdir_handle(&self, parent: InodeNo) -> Result<ReaddirHandle, InodeError> {
self.superblock.readdir(&self.client, parent, 1000).await
}

pub async fn opendir(&self, parent: InodeNo, _flags: i32) -> Result<Opened, Error> {
trace!("fs:opendir with parent {:?} flags {:#b}", parent, _flags);

let inode_handle = self.superblock.readdir(&self.client, parent, 1000).await?;
let inode_handle = self.readdir_handle(parent).await?;

let fh = self.next_handle();
let handle = DirHandle {
ino: parent,
handle: inode_handle,
handle: AsyncMutex::new(inode_handle),
offset: AtomicI64::new(0),
last_response: AsyncMutex::new(None),
};
Expand Down Expand Up @@ -992,12 +1001,22 @@ where
.ok_or_else(|| err!(libc::EBADF, "invalid directory handle"))?
};

// special case where we need to rewind and restart the streaming but only when it is not the first time we see offset 0
if offset == 0 && dir_handle.offset() != 0 {
let new_handle = self.readdir_handle(parent).await?;
*dir_handle.handle.lock().await = new_handle;
dir_handle.rewind_offset();
}

let readdir_handle = dir_handle.handle.lock().await;

if offset != dir_handle.offset() {
// POSIX allows seeking an open directory. That's a pain for us since we are streaming
// the directory entries and don't want to keep them all in memory. But one common case
// we've seen (https://github.com/awslabs/mountpoint-s3/issues/477) is applications that
// request offset 0 twice in a row. So we remember the last response and, if repeated,
// we return it again.

let last_response = dir_handle.last_response.lock().await;
if let Some((last_offset, entries)) = last_response.as_ref() {
if offset == *last_offset {
Expand All @@ -1010,12 +1029,13 @@ where
// must remember it again, except that readdirplus specifies that . and ..
// are never incremented.
if is_readdirplus && entry.name != "." && entry.name != ".." {
dir_handle.handle.remember(&entry.lookup);
readdir_handle.remember(&entry.lookup);
}
}
return Ok(reply);
}
}
arsh marked this conversation as resolved.
Show resolved Hide resolved

return Err(err!(
libc::EINVAL,
"out-of-order readdir, expected={}, actual={}",
Expand Down Expand Up @@ -1070,11 +1090,11 @@ where
if dir_handle.offset() < 2 {
let lookup = self
.superblock
.getattr(&self.client, dir_handle.handle.parent(), false)
.getattr(&self.client, readdir_handle.parent(), false)
.await?;
let attr = self.make_attr(&lookup);
let entry = DirectoryEntry {
ino: dir_handle.handle.parent(),
ino: readdir_handle.parent(),
offset: dir_handle.offset() + 1,
name: "..".into(),
attr,
Expand All @@ -1089,7 +1109,7 @@ where
}

loop {
let next = match dir_handle.handle.next(&self.client).await? {
let next = match readdir_handle.next(&self.client).await? {
None => return Ok(reply.finish(offset, &dir_handle).await),
Some(next) => next,
};
Expand All @@ -1106,11 +1126,11 @@ where
};

if reply.add(entry) {
dir_handle.handle.readd(next);
readdir_handle.readd(next);
return Ok(reply.finish(offset, &dir_handle).await);
}
if is_readdirplus {
dir_handle.handle.remember(&next);
readdir_handle.remember(&next);
}
dir_handle.next_offset();
}
Expand Down
204 changes: 169 additions & 35 deletions mountpoint-s3/tests/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use fuser::FileType;
use libc::S_IFREG;
use mountpoint_s3::fs::{CacheConfig, ToErrno, FUSE_ROOT_INODE};
use mountpoint_s3::fs::{CacheConfig, S3Personality, ToErrno, FUSE_ROOT_INODE};
use mountpoint_s3::prefix::Prefix;
use mountpoint_s3::S3FilesystemConfig;
use mountpoint_s3_client::failure_client::countdown_failure_client;
Expand All @@ -21,7 +21,7 @@ use std::time::{Duration, SystemTime};
use test_case::test_case;

mod common;
use common::{assert_attr, make_test_filesystem, make_test_filesystem_with_client, DirectoryReply};
use common::{assert_attr, make_test_filesystem, make_test_filesystem_with_client, DirectoryReply, TestS3Filesystem};

#[test_case(""; "unprefixed")]
#[test_case("test_prefix/"; "prefixed")]
Expand Down Expand Up @@ -1276,7 +1276,7 @@ async fn test_flexible_retrieval_objects() {
}

#[tokio::test]
async fn test_readdir_rewind() {
async fn test_readdir_rewind_ordered() {
let (client, fs) = make_test_filesystem("test_readdir_rewind", &Default::default(), Default::default());

for i in 0..10 {
Expand Down Expand Up @@ -1304,45 +1304,34 @@ async fn test_readdir_rewind() {
.expect_err("out of order");

// Requesting the same buffer size should work fine
let mut new_reply = DirectoryReply::new(5);
let _ = fs
.readdirplus(FUSE_ROOT_INODE, dir_handle, 0, &mut new_reply)
.await
.unwrap();
let new_entries = new_reply
.entries
.iter()
.map(|e| (e.ino, e.name.clone()))
.collect::<Vec<_>>();
let new_entries = ls(&fs, dir_handle, 0, 5).await;
assert_eq!(entries, new_entries);

// Requesting a smaller buffer works fine and returns a prefix
let mut new_reply = DirectoryReply::new(3);
let _ = fs
.readdirplus(FUSE_ROOT_INODE, dir_handle, 0, &mut new_reply)
.await
.unwrap();
let new_entries = new_reply
.entries
.iter()
.map(|e| (e.ino, e.name.clone()))
.collect::<Vec<_>>();
let new_entries = ls(&fs, dir_handle, 0, 3).await;
assert_eq!(&entries[..3], new_entries);

// Requesting a larger buffer works fine, but only partially fills (which is allowed)
let mut new_reply = DirectoryReply::new(10);
let _ = fs
.readdirplus(FUSE_ROOT_INODE, dir_handle, 0, &mut new_reply)
.await
.unwrap();
let new_entries = new_reply
.entries
.iter()
.map(|e| (e.ino, e.name.clone()))
.collect::<Vec<_>>();
assert_eq!(entries, new_entries);
// Requesting same offset (non zero) works fine by returning last response
let _ = ls(&fs, dir_handle, 0, 5).await;
let new_entries = ls(&fs, dir_handle, 5, 5).await;
let new_entries_repeat = ls(&fs, dir_handle, 5, 5).await;
assert_eq!(new_entries, new_entries_repeat);

// Request all entries
let new_entries = ls(&fs, dir_handle, 0, 20).await;
assert_eq!(new_entries.len(), 12); // 10 files + 2 dirs (. and ..) = 12 entries

// Request more entries but there is no more
let new_entries = ls(&fs, dir_handle, 12, 20).await;
assert_eq!(new_entries.len(), 0);

// Request everything from zero one more time
let new_entries = ls(&fs, dir_handle, 0, 20).await;
assert_eq!(new_entries.len(), 12); // 10 files + 2 dirs (. and ..) = 12 entries

// And we can resume the stream from the end of the first request
// but let's rewind first
let _ = ls(&fs, dir_handle, 0, 5).await;
let mut next_page = DirectoryReply::new(0);
let _ = fs
.readdirplus(
Expand All @@ -1353,6 +1342,7 @@ async fn test_readdir_rewind() {
)
.await
.unwrap();

assert_eq!(next_page.entries.len(), 7); // 10 directory entries + . + .. = 12, minus the 5 we already saw
assert_eq!(next_page.entries.front().unwrap().name, "foo3");

Expand All @@ -1365,3 +1355,147 @@ async fn test_readdir_rewind() {
}
}
}

#[tokio::test]
async fn test_readdir_rewind_unordered() {
let config = S3FilesystemConfig {
s3_personality: S3Personality::ExpressOneZone,
..Default::default()
};
let (client, fs) = make_test_filesystem("test_readdir_rewind", &Default::default(), config);

for i in 0..10 {
client.add_object(&format!("foo{i}"), b"foo".into());
}

let dir_handle = fs.opendir(FUSE_ROOT_INODE, 0).await.unwrap().fh;

// Requesting same offset (non zero) works fine by returning last response
let _ = ls(&fs, dir_handle, 0, 5).await;
let new_entries = ls(&fs, dir_handle, 5, 5).await;
let new_entries_repeat = ls(&fs, dir_handle, 5, 5).await;
assert_eq!(new_entries, new_entries_repeat);

// Request all entries
let new_entries = ls(&fs, dir_handle, 0, 20).await;
assert_eq!(new_entries.len(), 12); // 10 files + 2 dirs (. and ..) = 12 entries

// Request more entries but there is no more
let new_entries = ls(&fs, dir_handle, 12, 20).await;
assert_eq!(new_entries.len(), 0);

// Request everything from zero one more time
let new_entries = ls(&fs, dir_handle, 0, 20).await;
assert_eq!(new_entries.len(), 12); // 10 files + 2 dirs (. and ..) = 12 entries
}

#[test_case(Default::default())]
#[test_case(S3FilesystemConfig {s3_personality: S3Personality::ExpressOneZone, ..Default::default()})]
#[tokio::test]
async fn test_readdir_rewind_with_new_files(s3_fs_config: S3FilesystemConfig) {
let (client, fs) = make_test_filesystem("test_readdir_rewind", &Default::default(), s3_fs_config);

for i in 0..10 {
client.add_object(&format!("foo{i}"), b"foo".into());
}

let dir_handle = fs.opendir(FUSE_ROOT_INODE, 0).await.unwrap().fh;

// Let's add a new local file
let file_name = "newfile.bin";
new_local_file(&fs, file_name).await;

// Let's add a new remote file
client.add_object("foo10", b"foo".into());

// Requesting same offset (non zero) works fine by returning last response
let _ = ls(&fs, dir_handle, 0, 5).await;
let new_entries = ls(&fs, dir_handle, 5, 5).await;
let new_entries_repeat = ls(&fs, dir_handle, 5, 5).await;
assert_eq!(new_entries, new_entries_repeat);

// Request all entries
let new_entries = ls(&fs, dir_handle, 0, 20).await;
assert_eq!(new_entries.len(), 14); // 10 original remote files + 1 new local file + 1 new remote file + 2 dirs (. and ..) = 13 entries

// assert entries contain new local file
assert!(new_entries
.iter()
.any(|(_, name)| name.as_os_str().to_str().unwrap() == file_name));

// Request more entries but there is no more
let new_entries = ls(&fs, dir_handle, 14, 20).await;
assert_eq!(new_entries.len(), 0);

// Request everything from zero one more time
let new_entries = ls(&fs, dir_handle, 0, 20).await;
assert_eq!(new_entries.len(), 14); // 10 original remote files + 1 new local file + 1 new remote file + 2 dirs (. and ..) = 13 entries
}

#[tokio::test]
async fn test_readdir_rewind_with_local_files_only() {
let (_, fs) = make_test_filesystem("test_readdir_rewind", &Default::default(), Default::default());

let dir_handle = fs.opendir(FUSE_ROOT_INODE, 0).await.unwrap().fh;

// Let's add a new local file
let file_name = "newfile.bin";
new_local_file(&fs, file_name).await;

// Requesting same offset (non zero) works fine by returning last response
let _ = ls(&fs, dir_handle, 0, 5).await;
let new_entries = ls(&fs, dir_handle, 3, 5).await;
let new_entries_repeat = ls(&fs, dir_handle, 3, 5).await;
assert_eq!(new_entries, new_entries_repeat);

// Request all entries
let new_entries = ls(&fs, dir_handle, 0, 20).await;
assert_eq!(new_entries.len(), 3); // 1 new local file + 2 dirs (. and ..) = 3 entries

// assert entries contain new local file
assert!(new_entries.iter().any(|(_, name)| name == file_name));

// Request more entries but there is no more
let new_entries = ls(&fs, dir_handle, 3, 20).await;
assert_eq!(new_entries.len(), 0);

// Request everything from zero one more time
let new_entries = ls(&fs, dir_handle, 0, 20).await;
assert_eq!(new_entries.len(), 3); // 1 new local file + 2 dirs (. and ..) = 3 entries
}

async fn new_local_file(fs: &TestS3Filesystem<Arc<MockClient>>, filename: &str) {
let mode = libc::S_IFREG | libc::S_IRWXU; // regular file + 0700 permissions
let dentry = fs.mknod(FUSE_ROOT_INODE, filename.as_ref(), mode, 0, 0).await.unwrap();
assert_eq!(dentry.attr.size, 0);
let file_ino = dentry.attr.ino;

let fh = fs
.open(file_ino, libc::S_IFREG as i32 | libc::O_WRONLY, 0)
.await
.unwrap()
.fh;

let slice = &[0xaa; 27];
let written = fs.write(file_ino, fh, 0, slice, 0, 0, None).await.unwrap();
assert_eq!(written as usize, slice.len());
fs.fsync(file_ino, fh, true).await.unwrap();
}

async fn ls(
fs: &TestS3Filesystem<Arc<MockClient>>,
dir_handle: u64,
offset: i64,
max_entries: usize,
) -> Vec<(u64, OsString)> {
let mut reply = DirectoryReply::new(max_entries);
let _ = fs
.readdirplus(FUSE_ROOT_INODE, dir_handle, offset, &mut reply)
.await
.unwrap();
reply
.entries
.iter()
.map(|e| (e.ino, e.name.clone()))
.collect::<Vec<_>>()
}
Loading