Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow repeated readdir offsets #581

Merged
merged 3 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 98 additions & 30 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use futures::task::Spawn;
use nix::unistd::{getgid, getuid};
use std::collections::HashMap;
use std::ffi::OsStr;
use std::ffi::{OsStr, OsString};
use std::str::FromStr;
use std::time::{Duration, UNIX_EPOCH};
use time::OffsetDateTime;
Expand Down Expand Up @@ -35,6 +35,7 @@ struct DirHandle {
ino: InodeNo,
handle: ReaddirHandle,
offset: AtomicI64,
last_response: AsyncMutex<Option<(i64, Vec<DirectoryEntry>)>>,
}

impl DirHandle {
Expand Down Expand Up @@ -402,15 +403,18 @@ pub struct Opened {
pub trait DirectoryReplier {
/// Add a new dentry to the reply. Returns true if the buffer was full and so the entry was not
/// added.
fn add<T: AsRef<OsStr>>(
&mut self,
ino: u64,
offset: i64,
name: T,
attr: FileAttr,
generation: u64,
ttl: Duration,
) -> bool;
fn add(&mut self, entry: DirectoryEntry) -> bool;
}

#[derive(Debug, Clone)]
pub struct DirectoryEntry {
pub ino: u64,
pub offset: i64,
pub name: OsString,
pub attr: FileAttr,
pub generation: u64,
pub ttl: Duration,
lookup: LookedUp,
}

/// Reply to a `read` call. This is funky because we want the reply to happen with only a borrow of
Expand Down Expand Up @@ -717,6 +721,7 @@ where
ino: parent,
handle: inode_handle,
offset: AtomicI64::new(0),
last_response: AsyncMutex::new(None),
};

let mut dir_handles = self.dir_handles.write().await;
Expand Down Expand Up @@ -764,19 +769,77 @@ where
};

if offset != dir_handle.offset() {
// POSIX allows seeking an open directory. That's a pain for us since we are streaming
// the directory entries and don't want to keep them all in memory. But one common case
// we've seen (https://github.com/awslabs/mountpoint-s3/issues/477) is applications that
// request offset 0 twice in a row. So we remember the last response and, if repeated,
// we return it again.
let last_response = dir_handle.last_response.lock().await;
if let Some((last_offset, entries)) = &*last_response {
jamesbornholt marked this conversation as resolved.
Show resolved Hide resolved
if offset == *last_offset {
trace!(offset, "repeating readdir response");
for entry in entries {
if reply.add(entry.clone()) {
break;
}
dannycjones marked this conversation as resolved.
Show resolved Hide resolved
// We are returning this result a second time, so the contract is that we
// must remember it again, except that readdirplus specifies that . and ..
// are never incremented.
if is_readdirplus && entry.name != "." && entry.name != ".." {
dir_handle.handle.remember(&entry.lookup);
}
}
return Ok(reply);
}
}
return Err(err!(
libc::EINVAL,
"offset mismatch, expected={}, actual={}",
"out-of-order readdir, expected={}, actual={}",
dir_handle.offset(),
offset
));
}

/// Wrap a replier to duplicate the entries and store them in `dir_handle.last_response` so
/// we can re-use them if the directory handle rewinds
struct Reply<R: DirectoryReplier> {
reply: R,
entries: Vec<DirectoryEntry>,
}

impl<R: DirectoryReplier> Reply<R> {
async fn finish(self, offset: i64, dir_handle: &DirHandle) -> R {
*dir_handle.last_response.lock().await = Some((offset, self.entries));
self.reply
}
}

impl<R: DirectoryReplier> DirectoryReplier for Reply<R> {
fn add(&mut self, entry: DirectoryEntry) -> bool {
let result = self.reply.add(entry.clone());
if !result {
self.entries.push(entry);
}
result
}
}

let mut reply = Reply { reply, entries: vec![] };

if dir_handle.offset() < 1 {
let lookup = self.superblock.getattr(&self.client, parent, false).await?;
let attr = self.make_attr(&lookup);
if reply.add(parent, dir_handle.offset() + 1, ".", attr, 0u64, lookup.validity()) {
return Ok(reply);
let entry = DirectoryEntry {
ino: parent,
offset: dir_handle.offset() + 1,
name: ".".into(),
attr,
generation: 0,
ttl: lookup.validity(),
lookup,
};
if reply.add(entry) {
return Ok(reply.finish(offset, &dir_handle).await);
}
dir_handle.next_offset();
}
Expand All @@ -786,36 +849,41 @@ where
.getattr(&self.client, dir_handle.handle.parent(), false)
.await?;
let attr = self.make_attr(&lookup);
if reply.add(
dir_handle.handle.parent(),
dir_handle.offset() + 1,
"..",
let entry = DirectoryEntry {
ino: dir_handle.handle.parent(),
offset: dir_handle.offset() + 1,
name: "..".into(),
attr,
0u64,
lookup.validity(),
) {
return Ok(reply);
generation: 0,
ttl: lookup.validity(),
lookup,
};
if reply.add(entry) {
return Ok(reply.finish(offset, &dir_handle).await);
}
dir_handle.next_offset();
}

loop {
let next = match dir_handle.handle.next(&self.client).await? {
None => return Ok(reply),
None => return Ok(reply.finish(offset, &dir_handle).await),
Some(next) => next,
};

let attr = self.make_attr(&next);
if reply.add(
attr.ino,
dir_handle.offset() + 1,
next.inode.name(),
let entry = DirectoryEntry {
ino: attr.ino,
offset: dir_handle.offset() + 1,
name: next.inode.name().into(),
attr,
0u64,
next.validity(),
) {
generation: 0,
ttl: next.validity(),
lookup: next.clone(),
};

if reply.add(entry) {
dir_handle.handle.readd(next);
return Ok(reply);
return Ok(reply.finish(offset, &dir_handle).await);
}
if is_readdirplus {
dir_handle.handle.remember(&next);
Expand Down
41 changes: 17 additions & 24 deletions mountpoint-s3/src/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@ use futures::executor::block_on;
use futures::task::Spawn;
use std::ffi::OsStr;
use std::path::Path;
use std::time::{Duration, SystemTime};
use std::time::SystemTime;
use time::OffsetDateTime;
use tracing::{instrument, Instrument};

use crate::fs::{self, DirectoryReplier, InodeNo, ReadReplier, S3Filesystem, S3FilesystemConfig, ToErrno};
use crate::fs::{
self, DirectoryEntry, DirectoryReplier, InodeNo, ReadReplier, S3Filesystem, S3FilesystemConfig, ToErrno,
};
use crate::prefix::Prefix;
#[cfg(target_os = "macos")]
use fuser::ReplyXTimes;
use fuser::{
FileAttr, Filesystem, KernelConfig, ReplyAttr, ReplyBmap, ReplyCreate, ReplyData, ReplyEmpty, ReplyEntry,
ReplyIoctl, ReplyLock, ReplyLseek, ReplyOpen, ReplyWrite, ReplyXattr, Request, TimeOrNow,
Filesystem, KernelConfig, ReplyAttr, ReplyBmap, ReplyCreate, ReplyData, ReplyEmpty, ReplyEntry, ReplyIoctl,
ReplyLock, ReplyLseek, ReplyOpen, ReplyWrite, ReplyXattr, Request, TimeOrNow,
};
use mountpoint_s3_client::ObjectClient;

Expand Down Expand Up @@ -168,16 +170,8 @@ where
}

impl<'a> DirectoryReplier for ReplyDirectory<'a> {
fn add<T: AsRef<OsStr>>(
&mut self,
ino: InodeNo,
offset: i64,
name: T,
attr: FileAttr,
_generation: u64,
_ttl: Duration,
) -> bool {
let result = self.inner.add(ino, offset, attr.kind, name);
fn add(&mut self, entry: DirectoryEntry) -> bool {
let result = self.inner.add(entry.ino, entry.offset, entry.attr.kind, entry.name);
if !result {
*self.count += 1;
}
Expand Down Expand Up @@ -215,16 +209,15 @@ where
}

impl<'a> DirectoryReplier for ReplyDirectoryPlus<'a> {
fn add<T: AsRef<OsStr>>(
&mut self,
ino: u64,
offset: i64,
name: T,
attr: FileAttr,
generation: u64,
ttl: Duration,
) -> bool {
let result = self.inner.add(ino, offset, name, &ttl, &attr, generation);
fn add(&mut self, entry: DirectoryEntry) -> bool {
let result = self.inner.add(
entry.ino,
entry.offset,
entry.name,
&entry.ttl,
&entry.attr,
entry.generation,
);
if !result {
*self.count += 1;
}
Expand Down
3 changes: 2 additions & 1 deletion mountpoint-s3/src/inode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,8 @@ impl Inode {
pub fn dec_lookup_count(&self, n: u64) -> u64 {
let mut state = self.inner.sync.write().unwrap();
let lookup_count = &mut state.lookup_count;
*lookup_count -= n;
debug_assert!(n <= *lookup_count, "lookup count cannot go negative");
*lookup_count = lookup_count.saturating_sub(n);
trace!(new_lookup_count = lookup_count, "decremented lookup count");
*lookup_count
}
Expand Down
29 changes: 3 additions & 26 deletions mountpoint-s3/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use aws_sdk_s3::config::Region;
use aws_sdk_s3::primitives::ByteStream;
use fuser::{FileAttr, FileType};
use futures::executor::ThreadPool;
use mountpoint_s3::fs::{self, DirectoryReplier, ReadReplier, ToErrno};
use mountpoint_s3::fs::{self, DirectoryEntry, DirectoryReplier, ReadReplier, ToErrno};
use mountpoint_s3::prefix::Prefix;
use mountpoint_s3::{S3Filesystem, S3FilesystemConfig};
use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig};
Expand All @@ -11,10 +11,8 @@ use mountpoint_s3_crt::common::rust_log_adapter::RustLogAdapter;
use rand::rngs::OsRng;
use rand::RngCore;
use std::collections::VecDeque;
use std::ffi::{OsStr, OsString};
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;

pub fn make_test_filesystem(
bucket: &str,
Expand Down Expand Up @@ -105,39 +103,18 @@ pub fn assert_attr(attr: FileAttr, ftype: FileType, size: u64, uid: u32, gid: u3
assert_eq!(attr.perm, perm);
}

#[derive(Debug, Clone)]
pub struct DirectoryEntry {
pub ino: u64,
pub offset: i64,
pub attr: FileAttr,
pub name: OsString,
}

#[derive(Debug, Default)]
pub struct DirectoryReply {
readdir_limit: usize,
pub entries: VecDeque<DirectoryEntry>,
}

impl DirectoryReplier for &mut DirectoryReply {
fn add<T: AsRef<OsStr>>(
&mut self,
ino: u64,
offset: i64,
name: T,
attr: FileAttr,
_generation: u64,
_ttl: Duration,
) -> bool {
fn add(&mut self, entry: DirectoryEntry) -> bool {
if self.readdir_limit > 0 && !self.entries.is_empty() && self.entries.len() % self.readdir_limit == 0 {
true
} else {
self.entries.push_back(DirectoryEntry {
ino,
offset,
attr,
name: name.as_ref().to_os_string(),
});
self.entries.push_back(entry);
false
}
}
Expand Down
Loading
Loading