Skip to content

Commit

Permalink
Auto merge of #12706 - ehuss:cache-lock-mode, r=epage
Browse files Browse the repository at this point in the history
Add new package cache lock modes

The way locking worked before this PR is that only one cargo could write to the package cache at once (otherwise it could cause corruption). However, it allowed cargo's to read from the package cache while running a build under the assumption that writers are append-only and won't affect reading. This allows multiple builds to run concurrently, only blocking on the part where it is not possible to run concurrently (downloading to the cache).

This introduces a new package cache locking strategy to support the ability to safely modify existing cache entries while other cargos are potentially reading from the cache. It has different locking modes:

- `MutateExclusive` (new) — Held when cargo wants to modify existing cache entries (such as being introduced for garbage collection in #12634), and ensures only one cargo has access to the cache during that time.
- `DownloadExclusive`  (renamed) — This is a more specialized name for the lock that was before this PR.   A caller should acquire this when downloading into the cache and doing resolution.  It ensures that only one cargo can append to the cache, but allows other cargos to concurrently read from the cache.
- `Shared` (new) — This is to preserve the old concurrent build behavior by allowing multiple concurrent cargos to hold this while a build is running when it is reading from the cache

**Reviewing suggestions:**
There are a few commits needed to help with testing which are first. The main commit has the following:
- `src/cargo/util/cache_lock.rs` is an abstraction around package cache locks, and is the heart of the change. It should have comments and notes which should guide what it is doing. The `CacheLocker` is stored in `Config` along with all our other global stuff.
- Every call to `config.acquire_package_cache_lock()` has been changed to explicitly state which lock mode it wants to lock the package cache in.
- `Context::compile` is the key point where the `Shared` lock is acquired, ensuring that no mutation is done while the cache is being read.
- `MutateExclusive` is not used in this PR, but is being added in preparation for #12634.
- The non-blocking `try_acquire_package_cache_lock` API is not used in this PR, but is being added in preparation for #12634 to allow automatic gc to skip running if another cargo is already running (to avoid unnecessary blocking).
- `src/cargo/util/flock.rs` has been updated with some code cleanup (removing unused stuff), adds support for non-blocking locks, and renames some functions to make their operation clearer.
- `tests/testsuite/cache_lock.rs` contains tests for all the different permutations of ways of acquiring locks.
  • Loading branch information
bors committed Oct 9, 2023
2 parents 0871c0e + 78bb7c5 commit b48c41a
Show file tree
Hide file tree
Showing 29 changed files with 1,239 additions and 228 deletions.
1 change: 1 addition & 0 deletions crates/cargo-test-support/src/compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ fn substitute_macros(input: &str) -> String {
("[SKIPPING]", " Skipping"),
("[WAITING]", " Waiting"),
("[PUBLISHED]", " Published"),
("[BLOCKING]", " Blocking"),
];
let mut result = input.to_owned();
for &(pat, subst) in &macros {
Expand Down
48 changes: 48 additions & 0 deletions crates/cargo-test-support/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::path::{Path, PathBuf};
use std::process::{Command, Output};
use std::str;
use std::sync::OnceLock;
use std::thread::JoinHandle;
use std::time::{self, Duration};

use anyhow::{bail, Result};
Expand Down Expand Up @@ -1470,3 +1471,50 @@ pub fn symlink_supported() -> bool {
pub fn no_such_file_err_msg() -> String {
std::io::Error::from_raw_os_error(2).to_string()
}

/// Helper to retry a function `n` times.
///
/// The function should return `Some` when it is ready.
pub fn retry<F, R>(n: u32, mut f: F) -> R
where
F: FnMut() -> Option<R>,
{
let mut count = 0;
let start = std::time::Instant::now();
loop {
if let Some(r) = f() {
return r;
}
count += 1;
if count > n {
panic!(
"test did not finish within {n} attempts ({:?} total)",
start.elapsed()
);
}
sleep_ms(100);
}
}

#[test]
#[should_panic(expected = "test did not finish")]
fn retry_fails() {
retry(2, || None::<()>);
}

/// Helper that waits for a thread to finish, up to `n` tenths of a second.
pub fn thread_wait_timeout<T>(n: u32, thread: JoinHandle<T>) -> T {
retry(n, || thread.is_finished().then_some(()));
thread.join().unwrap()
}

/// Helper that runs some function, and waits up to `n` tenths of a second for
/// it to finish.
pub fn threaded_timeout<F, R>(n: u32, f: F) -> R
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let thread = std::thread::spawn(|| f());
thread_wait_timeout(n, thread)
}
3 changes: 2 additions & 1 deletion crates/xtask-bump-check/src/xtask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use cargo::core::Registry;
use cargo::core::SourceId;
use cargo::core::Workspace;
use cargo::sources::source::QueryKind;
use cargo::util::cache_lock::CacheLockMode;
use cargo::util::command_prelude::*;
use cargo::util::ToSemver;
use cargo::CargoResult;
Expand Down Expand Up @@ -347,7 +348,7 @@ fn check_crates_io<'a>(
) -> CargoResult<()> {
let source_id = SourceId::crates_io(config)?;
let mut registry = PackageRegistry::new(config)?;
let _lock = config.acquire_package_cache_lock()?;
let _lock = config.acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?;
registry.lock_patches();
config.shell().status(
STATUS,
Expand Down
8 changes: 8 additions & 0 deletions src/cargo/core/compiler/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::{Arc, Mutex};
use crate::core::compiler::compilation::{self, UnitOutput};
use crate::core::compiler::{self, artifact, Unit};
use crate::core::PackageId;
use crate::util::cache_lock::CacheLockMode;
use crate::util::errors::CargoResult;
use crate::util::profile;
use anyhow::{bail, Context as _};
Expand Down Expand Up @@ -132,6 +133,13 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
///
/// [`ops::cargo_compile`]: ../../../ops/cargo_compile/index.html
pub fn compile(mut self, exec: &Arc<dyn Executor>) -> CargoResult<Compilation<'cfg>> {
// A shared lock is held during the duration of the build since rustc
// needs to read from the `src` cache, and we don't want other
// commands modifying the `src` cache while it is running.
let _lock = self
.bcx
.config
.acquire_package_cache_lock(CacheLockMode::Shared)?;
let mut queue = JobQueue::new(self.bcx);
let mut plan = BuildPlan::new();
let build_plan = self.bcx.build_config.build_plan;
Expand Down
10 changes: 7 additions & 3 deletions src/cargo/core/compiler/future_incompat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::core::compiler::BuildContext;
use crate::core::{Dependency, PackageId, Workspace};
use crate::sources::source::QueryKind;
use crate::sources::SourceConfigMap;
use crate::util::cache_lock::CacheLockMode;
use crate::util::{iter_join, CargoResult};
use anyhow::{bail, format_err, Context};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -166,7 +167,7 @@ impl OnDiskReports {
let on_disk = serde_json::to_vec(&self).unwrap();
if let Err(e) = ws
.target_dir()
.open_rw(
.open_rw_exclusive_create(
FUTURE_INCOMPAT_FILE,
ws.config(),
"Future incompatibility report",
Expand All @@ -190,7 +191,7 @@ impl OnDiskReports {

/// Loads the on-disk reports.
pub fn load(ws: &Workspace<'_>) -> CargoResult<OnDiskReports> {
let report_file = match ws.target_dir().open_ro(
let report_file = match ws.target_dir().open_ro_shared(
FUTURE_INCOMPAT_FILE,
ws.config(),
"Future incompatible report",
Expand Down Expand Up @@ -297,7 +298,10 @@ fn render_report(per_package_reports: &[FutureIncompatReportPackage]) -> BTreeMa
/// This is best-effort - if an error occurs, `None` will be returned.
fn get_updates(ws: &Workspace<'_>, package_ids: &BTreeSet<PackageId>) -> Option<String> {
// This in general ignores all errors since this is opportunistic.
let _lock = ws.config().acquire_package_cache_lock().ok()?;
let _lock = ws
.config()
.acquire_package_cache_lock(CacheLockMode::DownloadExclusive)
.ok()?;
// Create a set of updated registry sources.
let map = SourceConfigMap::new(ws.config()).ok()?;
let mut package_ids: BTreeSet<_> = package_ids
Expand Down
2 changes: 1 addition & 1 deletion src/cargo/core/compiler/layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl Layout {
// For now we don't do any more finer-grained locking on the artifact
// directory, so just lock the entire thing for the duration of this
// compile.
let lock = dest.open_rw(".cargo-lock", ws.config(), "build directory")?;
let lock = dest.open_rw_exclusive_create(".cargo-lock", ws.config(), "build directory")?;
let root = root.into_path_unlocked();
let dest = dest.into_path_unlocked();
let deps = dest.join("deps");
Expand Down
11 changes: 8 additions & 3 deletions src/cargo/core/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::core::resolver::{HasDevUnits, Resolve};
use crate::core::{Dependency, Manifest, PackageId, SourceId, Target};
use crate::core::{Summary, Workspace};
use crate::sources::source::{MaybePackage, SourceMap};
use crate::util::config::PackageCacheLock;
use crate::util::cache_lock::{CacheLock, CacheLockMode};
use crate::util::errors::{CargoResult, HttpNotSuccessful};
use crate::util::interning::InternedString;
use crate::util::network::http::http_handle_and_timeout;
Expand Down Expand Up @@ -367,7 +367,7 @@ pub struct Downloads<'a, 'cfg> {
next_speed_check_bytes_threshold: Cell<u64>,
/// Global filesystem lock to ensure only one Cargo is downloading at a
/// time.
_lock: PackageCacheLock<'cfg>,
_lock: CacheLock<'cfg>,
}

struct Download<'cfg> {
Expand Down Expand Up @@ -465,7 +465,9 @@ impl<'cfg> PackageSet<'cfg> {
timeout,
next_speed_check: Cell::new(Instant::now()),
next_speed_check_bytes_threshold: Cell::new(0),
_lock: self.config.acquire_package_cache_lock()?,
_lock: self
.config
.acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?,
})
}

Expand All @@ -478,6 +480,9 @@ impl<'cfg> PackageSet<'cfg> {

pub fn get_many(&self, ids: impl IntoIterator<Item = PackageId>) -> CargoResult<Vec<&Package>> {
let mut pkgs = Vec::new();
let _lock = self
.config
.acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?;
let mut downloads = self.enable_download()?;
for id in ids {
pkgs.extend(downloads.start(id)?);
Expand Down
5 changes: 4 additions & 1 deletion src/cargo/ops/cargo_add/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::core::Shell;
use crate::core::Summary;
use crate::core::Workspace;
use crate::sources::source::QueryKind;
use crate::util::cache_lock::CacheLockMode;
use crate::util::style;
use crate::util::toml_mut::dependency::Dependency;
use crate::util::toml_mut::dependency::GitSource;
Expand Down Expand Up @@ -77,7 +78,9 @@ pub fn add(workspace: &Workspace<'_>, options: &AddOptions<'_>) -> CargoResult<(
let mut registry = PackageRegistry::new(options.config)?;

let deps = {
let _lock = options.config.acquire_package_cache_lock()?;
let _lock = options
.config
.acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?;
registry.lock_patches();
options
.dependencies
Expand Down
5 changes: 4 additions & 1 deletion src/cargo/ops/cargo_generate_lockfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::core::resolver::features::{CliFeatures, HasDevUnits};
use crate::core::{PackageId, PackageIdSpec};
use crate::core::{Resolve, SourceId, Workspace};
use crate::ops;
use crate::util::cache_lock::CacheLockMode;
use crate::util::config::Config;
use crate::util::style;
use crate::util::CargoResult;
Expand Down Expand Up @@ -48,7 +49,9 @@ pub fn update_lockfile(ws: &Workspace<'_>, opts: &UpdateOptions<'_>) -> CargoRes

// Updates often require a lot of modifications to the registry, so ensure
// that we're synchronized against other Cargos.
let _lock = ws.config().acquire_package_cache_lock()?;
let _lock = ws
.config()
.acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?;

let max_rust_version = ws.rust_version();

Expand Down
5 changes: 3 additions & 2 deletions src/cargo/ops/cargo_package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::core::{registry::PackageRegistry, resolver::HasDevUnits};
use crate::core::{Feature, Shell, Verbosity, Workspace};
use crate::core::{Package, PackageId, PackageSet, Resolve, SourceId};
use crate::sources::PathSource;
use crate::util::cache_lock::CacheLockMode;
use crate::util::config::JobsConfig;
use crate::util::errors::CargoResult;
use crate::util::toml::TomlManifest;
Expand Down Expand Up @@ -132,7 +133,7 @@ pub fn package_one(
let dir = ws.target_dir().join("package");
let mut dst = {
let tmp = format!(".{}", filename);
dir.open_rw(&tmp, config, "package scratch space")?
dir.open_rw_exclusive_create(&tmp, config, "package scratch space")?
};

// Package up and test a temporary tarball and only move it to the final
Expand Down Expand Up @@ -806,7 +807,7 @@ pub fn check_yanked(
) -> CargoResult<()> {
// Checking the yanked status involves taking a look at the registry and
// maybe updating files, so be sure to lock it here.
let _lock = config.acquire_package_cache_lock()?;
let _lock = config.acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?;

let mut sources = pkg_set.sources_mut();
let mut pending: Vec<PackageId> = resolve.iter().collect();
Expand Down
11 changes: 7 additions & 4 deletions src/cargo/ops/common_for_install_and_uninstall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::ops::{self, CompileFilter, CompileOptions};
use crate::sources::source::QueryKind;
use crate::sources::source::Source;
use crate::sources::PathSource;
use crate::util::cache_lock::CacheLockMode;
use crate::util::errors::CargoResult;
use crate::util::Config;
use crate::util::{FileLock, Filesystem};
Expand Down Expand Up @@ -97,8 +98,10 @@ pub struct CrateListingV1 {
impl InstallTracker {
/// Create an InstallTracker from information on disk.
pub fn load(config: &Config, root: &Filesystem) -> CargoResult<InstallTracker> {
let v1_lock = root.open_rw(Path::new(".crates.toml"), config, "crate metadata")?;
let v2_lock = root.open_rw(Path::new(".crates2.json"), config, "crate metadata")?;
let v1_lock =
root.open_rw_exclusive_create(Path::new(".crates.toml"), config, "crate metadata")?;
let v2_lock =
root.open_rw_exclusive_create(Path::new(".crates2.json"), config, "crate metadata")?;

let v1 = (|| -> CargoResult<_> {
let mut contents = String::new();
Expand Down Expand Up @@ -536,7 +539,7 @@ where
// This operation may involve updating some sources or making a few queries
// which may involve frobbing caches, as a result make sure we synchronize
// with other global Cargos
let _lock = config.acquire_package_cache_lock()?;
let _lock = config.acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?;

if needs_update {
source.invalidate_cache();
Expand Down Expand Up @@ -604,7 +607,7 @@ where
// This operation may involve updating some sources or making a few queries
// which may involve frobbing caches, as a result make sure we synchronize
// with other global Cargos
let _lock = config.acquire_package_cache_lock()?;
let _lock = config.acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?;

source.invalidate_cache();

Expand Down
6 changes: 3 additions & 3 deletions src/cargo/ops/lockfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub fn load_pkg_lockfile(ws: &Workspace<'_>) -> CargoResult<Option<Resolve>> {
return Ok(None);
}

let mut f = lock_root.open_ro("Cargo.lock", ws.config(), "Cargo.lock file")?;
let mut f = lock_root.open_ro_shared("Cargo.lock", ws.config(), "Cargo.lock file")?;

let mut s = String::new();
f.read_to_string(&mut s)
Expand Down Expand Up @@ -79,7 +79,7 @@ pub fn write_pkg_lockfile(ws: &Workspace<'_>, resolve: &mut Resolve) -> CargoRes

// Ok, if that didn't work just write it out
lock_root
.open_rw("Cargo.lock", ws.config(), "Cargo.lock file")
.open_rw_exclusive_create("Cargo.lock", ws.config(), "Cargo.lock file")
.and_then(|mut f| {
f.file().set_len(0)?;
f.write_all(out.as_bytes())?;
Expand All @@ -100,7 +100,7 @@ fn resolve_to_string_orig(
) -> (Option<String>, String, Filesystem) {
// Load the original lock file if it exists.
let lock_root = lock_root(ws);
let orig = lock_root.open_ro("Cargo.lock", ws.config(), "Cargo.lock file");
let orig = lock_root.open_ro_shared("Cargo.lock", ws.config(), "Cargo.lock file");
let orig = orig.and_then(|mut f| {
let mut s = String::new();
f.read_to_string(&mut s)?;
Expand Down
3 changes: 2 additions & 1 deletion src/cargo/ops/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::core::SourceId;
use crate::sources::source::Source;
use crate::sources::{RegistrySource, SourceConfigMap};
use crate::util::auth;
use crate::util::cache_lock::CacheLockMode;
use crate::util::config::{Config, PathAndArgs};
use crate::util::errors::CargoResult;
use crate::util::network::http::http_handle;
Expand Down Expand Up @@ -131,7 +132,7 @@ fn registry(
}

let cfg = {
let _lock = config.acquire_package_cache_lock()?;
let _lock = config.acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?;
let mut src = RegistrySource::remote(source_ids.replacement, &HashSet::new(), config)?;
// Only update the index if `force_update` is set.
if force_update {
Expand Down
3 changes: 2 additions & 1 deletion src/cargo/ops/registry/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::sources::source::QueryKind;
use crate::sources::SourceConfigMap;
use crate::sources::CRATES_IO_REGISTRY;
use crate::util::auth;
use crate::util::cache_lock::CacheLockMode;
use crate::util::config::JobsConfig;
use crate::util::Progress;
use crate::util::ProgressStyle;
Expand Down Expand Up @@ -233,7 +234,7 @@ fn wait_for_publish(
progress.tick_now(0, max, "")?;
let is_available = loop {
{
let _lock = config.acquire_package_cache_lock()?;
let _lock = config.acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?;
// Force re-fetching the source
//
// As pulling from a git source is expensive, we track when we've done it within the
Expand Down
5 changes: 4 additions & 1 deletion src/cargo/ops/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use crate::core::Feature;
use crate::core::{GitReference, PackageId, PackageIdSpec, PackageSet, SourceId, Workspace};
use crate::ops;
use crate::sources::PathSource;
use crate::util::cache_lock::CacheLockMode;
use crate::util::errors::CargoResult;
use crate::util::RustVersion;
use crate::util::{profile, CanonicalUrl};
Expand Down Expand Up @@ -289,7 +290,9 @@ pub fn resolve_with_previous<'cfg>(
) -> CargoResult<Resolve> {
// We only want one Cargo at a time resolving a crate graph since this can
// involve a lot of frobbing of the global caches.
let _lock = ws.config().acquire_package_cache_lock()?;
let _lock = ws
.config()
.acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?;

// Here we place an artificial limitation that all non-registry sources
// cannot be locked at more than one revision. This means that if a Git
Expand Down
5 changes: 4 additions & 1 deletion src/cargo/sources/git/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::sources::source::MaybePackage;
use crate::sources::source::QueryKind;
use crate::sources::source::Source;
use crate::sources::PathSource;
use crate::util::cache_lock::CacheLockMode;
use crate::util::errors::CargoResult;
use crate::util::hex::short_hash;
use crate::util::Config;
Expand Down Expand Up @@ -212,7 +213,9 @@ impl<'cfg> Source for GitSource<'cfg> {
// Ignore errors creating it, in case this is a read-only filesystem:
// perhaps the later operations can succeed anyhow.
let _ = git_fs.create_dir();
let git_path = self.config.assert_package_cache_locked(&git_fs);
let git_path = self
.config
.assert_package_cache_locked(CacheLockMode::DownloadExclusive, &git_fs);

// Before getting a checkout, make sure that `<cargo_home>/git` is
// marked as excluded from indexing and backups. Older versions of Cargo
Expand Down
Loading

0 comments on commit b48c41a

Please sign in to comment.