Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid deadlocks when multiple uv processes lock resources #6790

Merged
merged 1 commit into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 3 additions & 6 deletions crates/uv-distribution/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1897,12 +1897,9 @@ async fn lock_shard(cache_shard: &CacheShard) -> Result<LockedFile, Error> {

fs_err::create_dir_all(root).map_err(Error::CacheWrite)?;

let lock: LockedFile = tokio::task::spawn_blocking({
let root = root.to_path_buf();
move || LockedFile::acquire(root.join(".lock"), root.display())
})
.await?
.map_err(Error::CacheWrite)?;
let lock = LockedFile::acquire(root.join(".lock"), root.display())
.await
.map_err(Error::CacheWrite)?;

Ok(lock)
}
3 changes: 2 additions & 1 deletion crates/uv-fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ fs-err = { workspace = true }
fs2 = { workspace = true }
path-slash = { workspace = true }
serde = { workspace = true, optional = true }
tokio = { workspace = true, optional = true}
tempfile = { workspace = true }
tracing = { workspace = true }
urlencoding = { workspace = true }
Expand All @@ -33,4 +34,4 @@ junction = { workspace = true }

[features]
default = []
tokio = ["fs-err/tokio", "backoff/tokio"]
tokio = ["dep:tokio", "fs-err/tokio", "backoff/tokio"]
31 changes: 27 additions & 4 deletions crates/uv-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@ pub fn is_temporary(path: impl AsRef<Path>) -> bool {
pub struct LockedFile(fs_err::File);

impl LockedFile {
pub fn acquire(path: impl AsRef<Path>, resource: impl Display) -> Result<Self, std::io::Error> {
let file = fs_err::File::create(path.as_ref())?;
/// Inner implementation for [`LockedFile::acquire_blocking`] and [`LockedFile::acquire`].
fn lock_file_blocking(file: fs_err::File, resource: &str) -> Result<Self, std::io::Error> {
trace!("Checking lock for `{resource}`");
match file.file().try_lock_exclusive() {
Ok(()) => {
Expand All @@ -328,19 +328,42 @@ impl LockedFile {
warn_user!(
"Waiting to acquire lock for {} (lockfile: {})",
resource,
path.user_display(),
file.path().user_display(),
);
file.file().lock_exclusive().map_err(|err| {
// Not an fs_err method, we need to build our own path context
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Could not lock {}: {}", path.as_ref().user_display(), err),
format!("Could not lock {}: {}", file.path().user_display(), err),
)
})?;
Ok(Self(file))
}
}
}

/// The same as [`LockedFile::acquire`], but for synchronous contexts. Do not use from an async
/// context, as this can block the runtime while waiting for another process to release the
/// lock.
pub fn acquire_blocking(
path: impl AsRef<Path>,
resource: impl Display,
) -> Result<Self, std::io::Error> {
let file = fs_err::File::create(path.as_ref())?;
let resource = resource.to_string();
Self::lock_file_blocking(file, &resource)
}
Comment on lines +345 to +355
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could remove this entirely too.


/// Acquire a cross-process lock for a resource using a file at the provided path.
#[cfg(feature = "tokio")]
pub async fn acquire(
path: impl AsRef<Path>,
resource: impl Display,
) -> Result<Self, std::io::Error> {
let file = fs_err::File::create(path.as_ref())?;
let resource = resource.to_string();
Comment on lines +363 to +364
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to do this for spawn_blocking. Are there better ways?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok, I believe it needs to have a static lifetime (or be owned) for this...

tokio::task::spawn_blocking(move || Self::lock_file_blocking(file, &resource)).await?
}
}

impl Drop for LockedFile {
Expand Down
3 changes: 2 additions & 1 deletion crates/uv-git/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ impl GitResolver {
let _lock = LockedFile::acquire(
lock_dir.join(cache_key::cache_digest(&repository_url)),
&repository_url,
)?;
)
.await?;

// Fetch the Git repository.
let source = if let Some(reporter) = reporter {
Expand Down
9 changes: 5 additions & 4 deletions crates/uv-python/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,22 +189,23 @@ impl PythonEnvironment {
}

/// Grab a file lock for the environment to prevent concurrent writes across processes.
pub fn lock(&self) -> Result<LockedFile, std::io::Error> {
pub async fn lock(&self) -> Result<LockedFile, std::io::Error> {
if let Some(target) = self.0.interpreter.target() {
// If we're installing into a `--target`, use a target-specific lockfile.
LockedFile::acquire(target.root().join(".lock"), target.root().user_display())
LockedFile::acquire(target.root().join(".lock"), target.root().user_display()).await
} else if let Some(prefix) = self.0.interpreter.prefix() {
// Likewise, if we're installing into a `--prefix`, use a prefix-specific lockfile.
LockedFile::acquire(prefix.root().join(".lock"), prefix.root().user_display())
LockedFile::acquire(prefix.root().join(".lock"), prefix.root().user_display()).await
} else if self.0.interpreter.is_virtualenv() {
// If the environment a virtualenv, use a virtualenv-specific lockfile.
LockedFile::acquire(self.0.root.join(".lock"), self.0.root.user_display())
LockedFile::acquire(self.0.root.join(".lock"), self.0.root.user_display()).await
} else {
// Otherwise, use a global lockfile.
LockedFile::acquire(
env::temp_dir().join(format!("uv-{}.lock", cache_key::cache_digest(&self.0.root))),
self.0.root.user_display(),
)
.await
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/uv-python/src/installation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl PythonInstallation {
let installations = ManagedPythonInstallations::from_settings()?.init()?;
let installations_dir = installations.root();
let cache_dir = installations.cache();
let _lock = installations.acquire_lock()?;
let _lock = installations.lock().await?;

let download = ManagedPythonDownload::from_request(&request)?;
let client = client_builder.build();
Expand Down
10 changes: 4 additions & 6 deletions crates/uv-python/src/managed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,10 @@ impl ManagedPythonInstallations {
Self { root: root.into() }
}

/// Lock the toolchains directory.
pub fn acquire_lock(&self) -> Result<LockedFile, Error> {
Ok(LockedFile::acquire(
self.root.join(".lock"),
self.root.user_display(),
)?)
/// Grab a file lock for the managed Python distribution directory to prevent concurrent access
/// across processes.
pub async fn lock(&self) -> Result<LockedFile, Error> {
Ok(LockedFile::acquire(self.root.join(".lock"), self.root.user_display()).await?)
}

/// Prefer, in order:
Expand Down
9 changes: 3 additions & 6 deletions crates/uv-tool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,9 @@ impl InstalledTools {
}
}

/// Lock the tools directory.
pub fn acquire_lock(&self) -> Result<LockedFile, Error> {
Ok(LockedFile::acquire(
self.root.join(".lock"),
self.root.user_display(),
)?)
/// Grab a file lock for the tools directory to prevent concurrent access across processes.
pub async fn lock(&self) -> Result<LockedFile, Error> {
Ok(LockedFile::acquire(self.root.join(".lock"), self.root.user_display()).await?)
}

/// Add a receipt for a tool.
Expand Down
2 changes: 1 addition & 1 deletion crates/uv/src/commands/pip/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ pub(crate) async fn pip_install(
}
}

let _lock = environment.lock()?;
let _lock = environment.lock().await?;

// Determine the markers to use for the resolution.
let interpreter = environment.interpreter();
Expand Down
2 changes: 1 addition & 1 deletion crates/uv/src/commands/pip/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ pub(crate) async fn pip_sync(
}
}

let _lock = environment.lock()?;
let _lock = environment.lock().await?;

let interpreter = environment.interpreter();

Expand Down
2 changes: 1 addition & 1 deletion crates/uv/src/commands/pip/uninstall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub(crate) async fn pip_uninstall(
}
}

let _lock = environment.lock()?;
let _lock = environment.lock().await?;

// Index the current `site-packages` directory.
let site_packages = uv_installer::SitePackages::from_environment(&environment)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/uv/src/commands/python/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub(crate) async fn install(
let installations = ManagedPythonInstallations::from_settings()?.init()?;
let installations_dir = installations.root();
let cache_dir = installations.cache();
let _lock = installations.acquire_lock()?;
let _lock = installations.lock().await?;

let targets = targets.into_iter().collect::<BTreeSet<_>>();
let requests: Vec<_> = if targets.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion crates/uv/src/commands/python/uninstall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub(crate) async fn uninstall(
printer: Printer,
) -> Result<ExitStatus> {
let installations = ManagedPythonInstallations::from_settings()?.init()?;
let _lock = installations.acquire_lock()?;
let _lock = installations.lock().await?;

// Perform the uninstallation.
do_uninstall(&installations, targets, all, printer).await?;
Expand Down
2 changes: 1 addition & 1 deletion crates/uv/src/commands/tool/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ pub(crate) async fn install(
let options = ToolOptions::from(options);

let installed_tools = InstalledTools::from_settings()?.init()?;
let _lock = installed_tools.acquire_lock()?;
let _lock = installed_tools.lock().await?;

// Find the existing receipt, if it exists. If the receipt is present but malformed, we'll
// remove the environment and continue with the install.
Expand Down
2 changes: 1 addition & 1 deletion crates/uv/src/commands/tool/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::printer::Printer;
/// List installed tools.
pub(crate) async fn list(show_paths: bool, cache: &Cache, printer: Printer) -> Result<ExitStatus> {
let installed_tools = InstalledTools::from_settings()?;
let _lock = match installed_tools.acquire_lock() {
let _lock = match installed_tools.lock().await {
Ok(lock) => lock,
Err(uv_tool::Error::Io(err)) if err.kind() == std::io::ErrorKind::NotFound => {
writeln!(printer.stderr(), "No tools installed")?;
Expand Down
2 changes: 1 addition & 1 deletion crates/uv/src/commands/tool/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ async fn get_or_create_environment(
// Check if the tool is already installed in a compatible environment.
if !isolated && !target.is_latest() {
let installed_tools = InstalledTools::from_settings()?.init()?;
let _lock = installed_tools.acquire_lock()?;
let _lock = installed_tools.lock().await?;

let existing_environment =
installed_tools
Expand Down
2 changes: 1 addition & 1 deletion crates/uv/src/commands/tool/uninstall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::printer::Printer;
/// Uninstall a tool.
pub(crate) async fn uninstall(name: Option<PackageName>, printer: Printer) -> Result<ExitStatus> {
let installed_tools = InstalledTools::from_settings()?.init()?;
let _lock = match installed_tools.acquire_lock() {
let _lock = match installed_tools.lock().await {
Ok(lock) => lock,
Err(uv_tool::Error::Io(err)) if err.kind() == std::io::ErrorKind::NotFound => {
if let Some(name) = name {
Expand Down
2 changes: 1 addition & 1 deletion crates/uv/src/commands/tool/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub(crate) async fn upgrade(
printer: Printer,
) -> Result<ExitStatus> {
let installed_tools = InstalledTools::from_settings()?.init()?;
let _lock = installed_tools.acquire_lock()?;
let _lock = installed_tools.lock().await?;

let names: BTreeSet<PackageName> =
name.map(|name| BTreeSet::from_iter([name]))
Expand Down
Loading