Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement O_DIRECT for open to bypass metadata cache #614

Merged
merged 4 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::time::{Duration, UNIX_EPOCH};
use time::OffsetDateTime;
use tracing::{debug, error, trace};

use fuser::consts::FOPEN_DIRECT_IO;
use fuser::{FileAttr, KernelConfig};
use mountpoint_s3_client::error::{GetObjectError, ObjectClientError};
use mountpoint_s3_client::types::ETag;
Expand Down Expand Up @@ -562,7 +563,12 @@ where
pub async fn open(&self, ino: InodeNo, flags: i32, pid: u32) -> Result<Opened, Error> {
trace!("fs:open with ino {:?} flags {:?} pid {:?}", ino, flags, pid);

let force_revalidate = !self.config.cache_config.serve_lookup_from_cache;
#[cfg(not(target_os = "linux"))]
let direct_io = false;
#[cfg(target_os = "linux")]
let direct_io = flags & libc::O_DIRECT != 0;

let force_revalidate = !self.config.cache_config.serve_lookup_from_cache || direct_io;
let lookup = self.superblock.getattr(&self.client, ino, force_revalidate).await?;

match lookup.inode.kind() {
Expand Down Expand Up @@ -596,7 +602,9 @@ where
};
self.file_handles.write().await.insert(fh, Arc::new(handle));

Ok(Opened { fh, flags: 0 })
let reply_flags = if direct_io { FOPEN_DIRECT_IO } else { 0 };

Ok(Opened { fh, flags: reply_flags })
}

#[allow(clippy::too_many_arguments)] // We don't get to choose this interface
Expand Down
95 changes: 95 additions & 0 deletions mountpoint-s3/tests/fuse_tests/consistency_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,98 @@ fn page_cache_sharing_test_mock_with_cache(prefix: &str) {
prefix,
);
}

#[cfg(target_os = "linux")]
mod direct_io {
use super::*;

use std::fs::OpenOptions;
use std::os::unix::fs::OpenOptionsExt;
use std::time::Duration;

use test_case::test_case;

use mountpoint_s3::fs::{CacheConfig, S3FilesystemConfig};

fn cache_and_direct_io_test<F>(creator_fn: F, prefix: &str)
where
F: FnOnce(&str, TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox),
{
const OBJECT_SIZE: usize = 8;

let test_session_conf = TestSessionConfig {
filesystem_config: S3FilesystemConfig {
cache_config: CacheConfig {
serve_lookup_from_cache: true,
dir_ttl: Duration::from_secs(600),
file_ttl: Duration::from_secs(600),
},
..Default::default()
},
..Default::default()
};
let (mount_point, _session, mut test_client) = creator_fn(prefix, test_session_conf);

let file_name = "file.bin";

// Create the first version of the file
let old_contents = vec![0x0fu8; OBJECT_SIZE];
test_client.put_object(file_name, &old_contents).unwrap();

// Open and read fully the file before updating it remotely
let old_file = File::open(mount_point.path().join(file_name)).unwrap();
let mut buf = vec![0u8; OBJECT_SIZE];
old_file.read_exact_at(&mut buf, 0).unwrap();
assert_eq!(buf, &old_contents[..buf.len()]);

let new_contents = vec![0xffu8; OBJECT_SIZE];
test_client.put_object(file_name, &new_contents).unwrap();

// Open the file again, which should be reading from cache
for _ in 0..2 {
let new_file = File::open(mount_point.path().join(file_name)).unwrap();
new_file
.read_exact_at(&mut buf, 0)
.expect("should be OK as result is cached");
assert_eq!(
buf,
&old_contents[..buf.len()],
"bytes read should be old object from cache"
);
}

// Open the file w/ O_DIRECT, which should see the new file on S3 despite the old file being cached
let mut buf = [0u8; OBJECT_SIZE];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I use vec![0u8; OBJECT_SIZE] here, I sometimes find that the buffer hasn't been written to (even though read_exact_at has returned Ok). This makes me super uncomfortable (which is why this PR is still draft, not ready to merge).

Copy link
Member

@jamesbornholt jamesbornholt Nov 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

O_DIRECT is really hard to test under the Rust test runner because it has undefined behavior if run concurrently with a fork, and all of our FUSE tests involve a fork to spawn fusermount. We've seen it before in #114. Do you still see the issue if you filter to run only one of these tests?

You probably don't see it with this version because buf is on the stack, so less likely to be immediately scribbled over by malloc in the forked process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I saw those limitations with O_DIRECT and fork but wasn't expecting us to be using it in the tests. That sounds like a probable cause!

I don't see the issue when running the test on its own.

let new_file = OpenOptions::new()
.read(true)
.custom_flags(libc::O_DIRECT)
.open(mount_point.path().join(file_name))
.unwrap();
new_file
.read_exact_at(&mut buf, 0)
.expect("should be able to read file content from S3");
assert_eq!(
buf,
&new_contents[..buf.len()],
"bytes read should be new bytes from S3 client"
);
}

#[test_case(""; "no prefix")]
#[test_case("cache_and_direct_io_test_mock"; "prefix")]
fn cache_and_direct_io_test_mock(prefix: &str) {
cache_and_direct_io_test(
crate::fuse_tests::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)),
prefix,
);
}

#[cfg(feature = "s3_tests")]
#[test]
fn cache_and_direct_io_test_s3() {
cache_and_direct_io_test(
crate::fuse_tests::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)),
"cache_and_direct_io_test_s3",
);
}
}
Loading