Skip to content

Commit

Permalink
Allow repeated readdir offsets (#581)
Browse files Browse the repository at this point in the history
* Allow repeated readdir offsets

POSIX allows seeking an open directory handle, which in FUSE means the
`offset` can be any offset we've previously returned. This is pretty
annoying for us to implement since we're streaming directory entries
from S3 with ListObjects, which can't resume from an arbitrary index,
and can't fit its continuation tokens into a 64-bit offset anyway. So
we're probably never going to truly support seeking a directory handle.

But there's a special case we've seen come up a couple of times (#477, #520):
some applications read one page of directory entries and then seek back
to 0 and do it again. I don't fully understand _why_ they do this, but
it's common enough that it's worth special casing.

This change makes open directory handles remember their most recent
response so that they can repeat it if asked for the same offset again.
It's not too complicated other than needing to make sure we do
readdirplus correctly (managing the lookup counts for entries that are
being returned a second time).

I've tested this by running the PHP example from #477, which now works.

Signed-off-by: James Bornholt <bornholt@amazon.com>

* PR feedback

Signed-off-by: James Bornholt <bornholt@amazon.com>

* Changelog and docs

Signed-off-by: James Bornholt <bornholt@amazon.com>

---------

Signed-off-by: James Bornholt <bornholt@amazon.com>
  • Loading branch information
jamesbornholt authored Oct 27, 2023
1 parent 04f7499 commit 8e5688d
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 83 deletions.
2 changes: 1 addition & 1 deletion doc/SEMANTICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ the following behavior:

### Directory operations

Basic read-only directory operations (`opendir`, `readdir`, `closedir`) are supported.
Basic read-only directory operations (`opendir`, `readdir`, `closedir`) are supported. However, seeking (`lseek`) on directory handles is not supported.

Creating directories (`mkdir`) is supported, with the following behavior:

Expand Down
6 changes: 6 additions & 0 deletions mountpoint-s3/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## Unreleased

### Breaking changes
* No breaking changes.

### Other changes
* Some applications that read directory entries out of order (for example, [PHP](https://github.com/awslabs/mountpoint-s3/issues/477)) will now work correctly. ([#581](https://github.com/awslabs/mountpoint-s3/pull/581))

## v1.1.0 (October 23, 2023)

### Breaking changes
Expand Down
128 changes: 98 additions & 30 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use futures::task::Spawn;
use nix::unistd::{getgid, getuid};
use std::collections::HashMap;
use std::ffi::OsStr;
use std::ffi::{OsStr, OsString};
use std::str::FromStr;
use std::time::{Duration, UNIX_EPOCH};
use time::OffsetDateTime;
Expand Down Expand Up @@ -35,6 +35,7 @@ struct DirHandle {
ino: InodeNo,
handle: ReaddirHandle,
offset: AtomicI64,
last_response: AsyncMutex<Option<(i64, Vec<DirectoryEntry>)>>,
}

impl DirHandle {
Expand Down Expand Up @@ -402,15 +403,18 @@ pub struct Opened {
pub trait DirectoryReplier {
/// Add a new dentry to the reply. Returns true if the buffer was full and so the entry was not
/// added.
fn add<T: AsRef<OsStr>>(
&mut self,
ino: u64,
offset: i64,
name: T,
attr: FileAttr,
generation: u64,
ttl: Duration,
) -> bool;
fn add(&mut self, entry: DirectoryEntry) -> bool;
}

#[derive(Debug, Clone)]
pub struct DirectoryEntry {
pub ino: u64,
pub offset: i64,
pub name: OsString,
pub attr: FileAttr,
pub generation: u64,
pub ttl: Duration,
lookup: LookedUp,
}

/// Reply to a `read` call. This is funky because we want the reply to happen with only a borrow of
Expand Down Expand Up @@ -717,6 +721,7 @@ where
ino: parent,
handle: inode_handle,
offset: AtomicI64::new(0),
last_response: AsyncMutex::new(None),
};

let mut dir_handles = self.dir_handles.write().await;
Expand Down Expand Up @@ -764,19 +769,77 @@ where
};

if offset != dir_handle.offset() {
// POSIX allows seeking an open directory. That's a pain for us since we are streaming
// the directory entries and don't want to keep them all in memory. But one common case
// we've seen (https://github.com/awslabs/mountpoint-s3/issues/477) is applications that
// request offset 0 twice in a row. So we remember the last response and, if repeated,
// we return it again.
let last_response = dir_handle.last_response.lock().await;
if let Some((last_offset, entries)) = last_response.as_ref() {
if offset == *last_offset {
trace!(offset, "repeating readdir response");
for entry in entries {
if reply.add(entry.clone()) {
break;
}
// We are returning this result a second time, so the contract is that we
// must remember it again, except that readdirplus specifies that . and ..
// are never incremented.
if is_readdirplus && entry.name != "." && entry.name != ".." {
dir_handle.handle.remember(&entry.lookup);
}
}
return Ok(reply);
}
}
return Err(err!(
libc::EINVAL,
"offset mismatch, expected={}, actual={}",
"out-of-order readdir, expected={}, actual={}",
dir_handle.offset(),
offset
));
}

/// Wrap a replier to duplicate the entries and store them in `dir_handle.last_response` so
/// we can re-use them if the directory handle rewinds
struct Reply<R: DirectoryReplier> {
reply: R,
entries: Vec<DirectoryEntry>,
}

impl<R: DirectoryReplier> Reply<R> {
async fn finish(self, offset: i64, dir_handle: &DirHandle) -> R {
*dir_handle.last_response.lock().await = Some((offset, self.entries));
self.reply
}
}

impl<R: DirectoryReplier> DirectoryReplier for Reply<R> {
fn add(&mut self, entry: DirectoryEntry) -> bool {
let result = self.reply.add(entry.clone());
if !result {
self.entries.push(entry);
}
result
}
}

let mut reply = Reply { reply, entries: vec![] };

if dir_handle.offset() < 1 {
let lookup = self.superblock.getattr(&self.client, parent, false).await?;
let attr = self.make_attr(&lookup);
if reply.add(parent, dir_handle.offset() + 1, ".", attr, 0u64, lookup.validity()) {
return Ok(reply);
let entry = DirectoryEntry {
ino: parent,
offset: dir_handle.offset() + 1,
name: ".".into(),
attr,
generation: 0,
ttl: lookup.validity(),
lookup,
};
if reply.add(entry) {
return Ok(reply.finish(offset, &dir_handle).await);
}
dir_handle.next_offset();
}
Expand All @@ -786,36 +849,41 @@ where
.getattr(&self.client, dir_handle.handle.parent(), false)
.await?;
let attr = self.make_attr(&lookup);
if reply.add(
dir_handle.handle.parent(),
dir_handle.offset() + 1,
"..",
let entry = DirectoryEntry {
ino: dir_handle.handle.parent(),
offset: dir_handle.offset() + 1,
name: "..".into(),
attr,
0u64,
lookup.validity(),
) {
return Ok(reply);
generation: 0,
ttl: lookup.validity(),
lookup,
};
if reply.add(entry) {
return Ok(reply.finish(offset, &dir_handle).await);
}
dir_handle.next_offset();
}

loop {
let next = match dir_handle.handle.next(&self.client).await? {
None => return Ok(reply),
None => return Ok(reply.finish(offset, &dir_handle).await),
Some(next) => next,
};

let attr = self.make_attr(&next);
if reply.add(
attr.ino,
dir_handle.offset() + 1,
next.inode.name(),
let entry = DirectoryEntry {
ino: attr.ino,
offset: dir_handle.offset() + 1,
name: next.inode.name().into(),
attr,
0u64,
next.validity(),
) {
generation: 0,
ttl: next.validity(),
lookup: next.clone(),
};

if reply.add(entry) {
dir_handle.handle.readd(next);
return Ok(reply);
return Ok(reply.finish(offset, &dir_handle).await);
}
if is_readdirplus {
dir_handle.handle.remember(&next);
Expand Down
41 changes: 17 additions & 24 deletions mountpoint-s3/src/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@ use futures::executor::block_on;
use futures::task::Spawn;
use std::ffi::OsStr;
use std::path::Path;
use std::time::{Duration, SystemTime};
use std::time::SystemTime;
use time::OffsetDateTime;
use tracing::{instrument, Instrument};

use crate::fs::{self, DirectoryReplier, InodeNo, ReadReplier, S3Filesystem, S3FilesystemConfig, ToErrno};
use crate::fs::{
self, DirectoryEntry, DirectoryReplier, InodeNo, ReadReplier, S3Filesystem, S3FilesystemConfig, ToErrno,
};
use crate::prefix::Prefix;
#[cfg(target_os = "macos")]
use fuser::ReplyXTimes;
use fuser::{
FileAttr, Filesystem, KernelConfig, ReplyAttr, ReplyBmap, ReplyCreate, ReplyData, ReplyEmpty, ReplyEntry,
ReplyIoctl, ReplyLock, ReplyLseek, ReplyOpen, ReplyWrite, ReplyXattr, Request, TimeOrNow,
Filesystem, KernelConfig, ReplyAttr, ReplyBmap, ReplyCreate, ReplyData, ReplyEmpty, ReplyEntry, ReplyIoctl,
ReplyLock, ReplyLseek, ReplyOpen, ReplyWrite, ReplyXattr, Request, TimeOrNow,
};
use mountpoint_s3_client::ObjectClient;

Expand Down Expand Up @@ -168,16 +170,8 @@ where
}

impl<'a> DirectoryReplier for ReplyDirectory<'a> {
fn add<T: AsRef<OsStr>>(
&mut self,
ino: InodeNo,
offset: i64,
name: T,
attr: FileAttr,
_generation: u64,
_ttl: Duration,
) -> bool {
let result = self.inner.add(ino, offset, attr.kind, name);
fn add(&mut self, entry: DirectoryEntry) -> bool {
let result = self.inner.add(entry.ino, entry.offset, entry.attr.kind, entry.name);
if !result {
*self.count += 1;
}
Expand Down Expand Up @@ -215,16 +209,15 @@ where
}

impl<'a> DirectoryReplier for ReplyDirectoryPlus<'a> {
fn add<T: AsRef<OsStr>>(
&mut self,
ino: u64,
offset: i64,
name: T,
attr: FileAttr,
generation: u64,
ttl: Duration,
) -> bool {
let result = self.inner.add(ino, offset, name, &ttl, &attr, generation);
fn add(&mut self, entry: DirectoryEntry) -> bool {
let result = self.inner.add(
entry.ino,
entry.offset,
entry.name,
&entry.ttl,
&entry.attr,
entry.generation,
);
if !result {
*self.count += 1;
}
Expand Down
3 changes: 2 additions & 1 deletion mountpoint-s3/src/inode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,8 @@ impl Inode {
pub fn dec_lookup_count(&self, n: u64) -> u64 {
let mut state = self.inner.sync.write().unwrap();
let lookup_count = &mut state.lookup_count;
*lookup_count -= n;
debug_assert!(n <= *lookup_count, "lookup count cannot go negative");
*lookup_count = lookup_count.saturating_sub(n);
trace!(new_lookup_count = lookup_count, "decremented lookup count");
*lookup_count
}
Expand Down
29 changes: 3 additions & 26 deletions mountpoint-s3/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use aws_sdk_s3::config::Region;
use aws_sdk_s3::primitives::ByteStream;
use fuser::{FileAttr, FileType};
use futures::executor::ThreadPool;
use mountpoint_s3::fs::{self, DirectoryReplier, ReadReplier, ToErrno};
use mountpoint_s3::fs::{self, DirectoryEntry, DirectoryReplier, ReadReplier, ToErrno};
use mountpoint_s3::prefix::Prefix;
use mountpoint_s3::{S3Filesystem, S3FilesystemConfig};
use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig};
Expand All @@ -11,10 +11,8 @@ use mountpoint_s3_crt::common::rust_log_adapter::RustLogAdapter;
use rand::rngs::OsRng;
use rand::RngCore;
use std::collections::VecDeque;
use std::ffi::{OsStr, OsString};
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;

pub fn make_test_filesystem(
bucket: &str,
Expand Down Expand Up @@ -105,39 +103,18 @@ pub fn assert_attr(attr: FileAttr, ftype: FileType, size: u64, uid: u32, gid: u3
assert_eq!(attr.perm, perm);
}

#[derive(Debug, Clone)]
pub struct DirectoryEntry {
pub ino: u64,
pub offset: i64,
pub attr: FileAttr,
pub name: OsString,
}

#[derive(Debug, Default)]
pub struct DirectoryReply {
readdir_limit: usize,
pub entries: VecDeque<DirectoryEntry>,
}

impl DirectoryReplier for &mut DirectoryReply {
fn add<T: AsRef<OsStr>>(
&mut self,
ino: u64,
offset: i64,
name: T,
attr: FileAttr,
_generation: u64,
_ttl: Duration,
) -> bool {
fn add(&mut self, entry: DirectoryEntry) -> bool {
if self.readdir_limit > 0 && !self.entries.is_empty() && self.entries.len() % self.readdir_limit == 0 {
true
} else {
self.entries.push_back(DirectoryEntry {
ino,
offset,
attr,
name: name.as_ref().to_os_string(),
});
self.entries.push_back(entry);
false
}
}
Expand Down
Loading

2 comments on commit 8e5688d

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 8e5688d Previous: a50f1ca Ratio
random_read_four_threads 7.51611328125 MiB/s 15.94443359375 MiB/s 2.12
random_read_direct_io 1.20576171875 MiB/s 2.74296875 MiB/s 2.27
random_read 1.14990234375 MiB/s 2.5990234375 MiB/s 2.26

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 8e5688d Previous: a50f1ca Ratio
random_read_four_threads 7.50205078125 MiB/s 15.94443359375 MiB/s 2.13
random_read_direct_io 1.2796875 MiB/s 2.74296875 MiB/s 2.14

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.