Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make rattler_index::index concurrency safe #955

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/rattler_index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ rattler_package_streaming = { path="../rattler_package_streaming", version = "0.
serde_json = { workspace = true }
tracing = { workspace = true }
walkdir = { workspace = true }
fslock = { workspace = true }

[dev-dependencies]
tempfile = { workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions crates/rattler_index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ pub fn index(
};
}
let out_file = output_folder.join(platform).join("repodata.json");
let lock_file_path = out_file.with_extension("lock");
let mut lock = fslock::LockFile::open(&lock_file_path)?;
lock.lock_with_pid()?;
File::create(&out_file)?.write_all(serde_json::to_string_pretty(&repodata)?.as_bytes())?;
}

Expand Down
6 changes: 5 additions & 1 deletion crates/rattler_repodata_gateway/src/gateway/remote_subdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,13 @@ impl RemoteSubdirClient {
e => GatewayError::FetchRepoDataError(e),
})?;

let repo_data_json_path = repodata.repo_data_json_path.clone();
// repodata holds onto a file lock for json file that sparse will need
drop(repodata);

// Create a new sparse repodata client that can be used to read records from the repodata.
let sparse = LocalSubdirClient::from_channel_subdir(
&repodata.repo_data_json_path,
&repo_data_json_path,
channel.clone(),
platform.as_str(),
)
Expand Down
56 changes: 41 additions & 15 deletions crates/rattler_repodata_gateway/src/sparse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

use std::{
collections::{HashSet, VecDeque},
fmt, io,
fmt,
fs::OpenOptions,
io,
marker::PhantomData,
path::Path,
};
Expand All @@ -26,6 +28,8 @@ use serde_json::value::RawValue;
use superslice::Ext;
use thiserror::Error;

use crate::utils::LockedFile;

/// A struct to enable loading records from a `repodata.json` file on demand.
/// Since most of the time you don't need all the records from the
/// `repodata.json` this can help provide some significant speedups.
Expand All @@ -43,6 +47,9 @@ pub struct SparseRepoData {
/// A function that can be used to patch the package record after it has
/// been parsed. This is mainly used to add `pip` to `python` if desired
patch_record_fn: Option<fn(&mut PackageRecord)>,

/// memmap2 blocks file from being modified so wrap the repodata file with a lock
_lock: Option<LockedFile>,
}

enum SparseRepoDataInner {
Expand Down Expand Up @@ -104,20 +111,38 @@ impl SparseRepoData {
path: impl AsRef<Path>,
patch_function: Option<fn(&mut PackageRecord)>,
) -> Result<Self, io::Error> {
let file = fs::File::open(path.as_ref().to_owned())?;
let memory_map = unsafe { memmap2::Mmap::map(&file) }?;
Ok(SparseRepoData {
inner: SparseRepoDataInner::Memmapped(
MemmappedSparseRepoDataInnerTryBuilder {
memory_map,
repo_data_builder: |memory_map| serde_json::from_slice(memory_map.as_ref()),
}
.try_build()?,
),
subdir: subdir.into(),
channel,
patch_record_fn: patch_function,
})
if !path.as_ref().exists() {
Err(io::Error::new(
io::ErrorKind::NotFound,
format!("file not found: {:?}", path.as_ref()),
))
} else {
let lock_file_path = path.as_ref().with_extension("lock");
if !lock_file_path.exists() {
OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&lock_file_path)?;
}
let lock_file = LockedFile::open_ro(lock_file_path, "repodata cache")
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let file = fs::File::open(path.as_ref().to_owned())?;
let memory_map = unsafe { memmap2::Mmap::map(&file) }?;
Ok(SparseRepoData {
inner: SparseRepoDataInner::Memmapped(
MemmappedSparseRepoDataInnerTryBuilder {
memory_map,
repo_data_builder: |memory_map| serde_json::from_slice(memory_map.as_ref()),
}
.try_build()?,
),
subdir: subdir.into(),
channel,
patch_record_fn: patch_function,
_lock: Some(lock_file),
})
}
}

/// Construct an instance of self from a bytes and a [`Channel`].
Expand All @@ -141,6 +166,7 @@ impl SparseRepoData {
channel,
subdir: subdir.into(),
patch_record_fn: patch_function,
_lock: None,
})
}

Expand Down