Skip to content

Commit

Permalink
Refactor fs.rs to use call_with_permit scheme
Browse files Browse the repository at this point in the history
Simplify filesystem functions by delegating permit acquisition and execution to a single callback
  • Loading branch information
Zach Birenbaum committed Mar 10, 2024
1 parent 35daf43 commit 53afa6a
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 137 deletions.
4 changes: 2 additions & 2 deletions nativelink-store/src/fast_slow_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,9 @@ impl Store for FastSlowStore {
async fn update_with_whole_file(
self: Pin<&Self>,
digest: DigestInfo,
mut file: fs::ResumeableFileSlot<'static>,
mut file: fs::ResumeableFileSlot,
upload_size: UploadSizeInfo,
) -> Result<Option<fs::ResumeableFileSlot<'static>>, Error> {
) -> Result<Option<fs::ResumeableFileSlot>, Error> {
let fast_store = self.fast_store.inner_store(Some(digest));
let slow_store = self.slow_store.inner_store(Some(digest));
if fast_store.optimized_for(StoreOptimizations::FileUpdates) {
Expand Down
21 changes: 9 additions & 12 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
async fn make_and_open_file(
block_size: u64,
encoded_file_path: EncodedFilePath,
) -> Result<(Self, fs::ResumeableFileSlot<'static>, OsString), Error>
) -> Result<(Self, fs::ResumeableFileSlot, OsString), Error>
where
Self: Sized;

Expand All @@ -149,7 +149,7 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath>;

/// Returns a reader that will read part of the underlying file.
async fn read_file_part<'a>(&'a self, offset: u64, length: u64) -> Result<fs::ResumeableFileSlot<'a>, Error>;
async fn read_file_part(&self, offset: u64, length: u64) -> Result<fs::ResumeableFileSlot, Error>;

/// This function is a safe way to extract the file name of the underlying file. To protect users from
/// accidentally creating undefined behavior we encourage users to do the logic they need to do with
Expand Down Expand Up @@ -191,7 +191,7 @@ impl FileEntry for FileEntryImpl {
async fn make_and_open_file(
block_size: u64,
encoded_file_path: EncodedFilePath,
) -> Result<(FileEntryImpl, fs::ResumeableFileSlot<'static>, OsString), Error> {
) -> Result<(FileEntryImpl, fs::ResumeableFileSlot, OsString), Error> {
let temp_full_path = encoded_file_path.get_file_path().to_os_string();
let temp_file_result = fs::create_file(temp_full_path.clone())
.or_else(|mut err| async {
Expand Down Expand Up @@ -229,7 +229,7 @@ impl FileEntry for FileEntryImpl {
&self.encoded_file_path
}

async fn read_file_part<'a>(&'a self, offset: u64, length: u64) -> Result<fs::ResumeableFileSlot<'a>, Error> {
async fn read_file_part(&self, offset: u64, length: u64) -> Result<fs::ResumeableFileSlot, Error> {
let (mut file, full_content_path_for_debug_only) = self
.get_file_path_locked(|full_content_path| async move {
let file = fs::open_file(full_content_path.clone(), length)
Expand Down Expand Up @@ -544,7 +544,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
async fn update_file<'a>(
self: Pin<&'a Self>,
mut entry: Fe,
mut resumeable_temp_file: fs::ResumeableFileSlot<'a>,
mut resumeable_temp_file: fs::ResumeableFileSlot,
final_digest: DigestInfo,
mut reader: DropCloserReadHalf,
) -> Result<(), Error> {
Expand Down Expand Up @@ -626,11 +626,8 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
// Internally tokio spawns fs commands onto a blocking thread anyways.
// Since we are already on a blocking thread, we just need the `fs` wrapper to manage
// an open-file permit (ensure we don't open too many files at once).
let result = fs::call_with_permit(|| {
(rename_fn)(from_path.as_ref(), final_path.as_ref())
.err_tip(|| format!("Failed to rename temp file to final path {final_path:?}"))
})
.await;
let result = (rename_fn)(&from_path, &final_path)
.err_tip(|| format!("Failed to rename temp file to final path {final_path:?}"));

// In the event our move from temp file to final file fails we need to ensure we remove
// the entry from our map.
Expand Down Expand Up @@ -715,9 +712,9 @@ impl<Fe: FileEntry> Store for FilesystemStore<Fe> {
async fn update_with_whole_file(
self: Pin<&Self>,
digest: DigestInfo,
mut file: fs::ResumeableFileSlot<'static>,
mut file: fs::ResumeableFileSlot,
upload_size: UploadSizeInfo,
) -> Result<Option<fs::ResumeableFileSlot<'static>>, Error> {
) -> Result<Option<fs::ResumeableFileSlot>, Error> {
let path = file.get_path().as_os_str().to_os_string();
let file_size = match upload_size {
UploadSizeInfo::ExactSize(size) => size as u64,
Expand Down
4 changes: 2 additions & 2 deletions nativelink-store/tests/filesystem_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl<Hooks: FileEntryHooks + 'static + Sync + Send> FileEntry for TestFileEntry<
async fn make_and_open_file(
block_size: u64,
encoded_file_path: EncodedFilePath,
) -> Result<(Self, fs::ResumeableFileSlot<'static>, OsString), Error> {
) -> Result<(Self, fs::ResumeableFileSlot, OsString), Error> {
let (inner, file_slot, path) = FileEntryImpl::make_and_open_file(block_size, encoded_file_path).await?;
Ok((
Self {
Expand All @@ -101,7 +101,7 @@ impl<Hooks: FileEntryHooks + 'static + Sync + Send> FileEntry for TestFileEntry<
self.inner.as_ref().unwrap().get_encoded_file_path()
}

async fn read_file_part<'a>(&'a self, offset: u64, length: u64) -> Result<fs::ResumeableFileSlot<'a>, Error> {
async fn read_file_part(&self, offset: u64, length: u64) -> Result<fs::ResumeableFileSlot, Error> {
self.inner.as_ref().unwrap().read_file_part(offset, length).await
}

Expand Down
12 changes: 6 additions & 6 deletions nativelink-util/src/digest_hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ pub trait DigestHasher {
/// the file and feed it into the hasher.
fn digest_for_file(
self,
file: fs::ResumeableFileSlot<'static>,
file: fs::ResumeableFileSlot,
size_hint: Option<u64>,
) -> impl Future<Output = Result<(DigestInfo, fs::ResumeableFileSlot<'static>), Error>>;
) -> impl Future<Output = Result<(DigestInfo, fs::ResumeableFileSlot), Error>>;

/// Utility function to compute a hash from a generic reader.
fn compute_from_reader<R: AsyncRead + Unpin + Send>(
Expand Down Expand Up @@ -168,8 +168,8 @@ impl DigestHasherImpl {
#[inline]
async fn hash_file(
&mut self,
mut file: fs::ResumeableFileSlot<'static>,
) -> Result<(DigestInfo, fs::ResumeableFileSlot<'static>), Error> {
mut file: fs::ResumeableFileSlot,
) -> Result<(DigestInfo, fs::ResumeableFileSlot), Error> {
let reader = file.as_reader().await.err_tip(|| "In digest_for_file")?;
let digest = self
.compute_from_reader(reader)
Expand Down Expand Up @@ -202,9 +202,9 @@ impl DigestHasher for DigestHasherImpl {

async fn digest_for_file(
mut self,
mut file: fs::ResumeableFileSlot<'static>,
mut file: fs::ResumeableFileSlot,
size_hint: Option<u64>,
) -> Result<(DigestInfo, fs::ResumeableFileSlot<'static>), Error> {
) -> Result<(DigestInfo, fs::ResumeableFileSlot), Error> {
let file_position = file
.stream_position()
.await
Expand Down
Loading

0 comments on commit 53afa6a

Please sign in to comment.