From 30fcf4f70b24c18278a7d9255db83849d05f7f67 Mon Sep 17 00:00:00 2001 From: nichmor Date: Thu, 28 Nov 2024 17:46:37 +0200 Subject: [PATCH 1/9] feat: add request coalescing for isolated tools --- crates/pixi_build_frontend/src/tool/cache.rs | 185 ++++++++++++------- crates/pixi_build_frontend/src/tool/mod.rs | 34 ++-- 2 files changed, 141 insertions(+), 78 deletions(-) diff --git a/crates/pixi_build_frontend/src/tool/cache.rs b/crates/pixi_build_frontend/src/tool/cache.rs index 30be3bdb8..574b55211 100644 --- a/crates/pixi_build_frontend/src/tool/cache.rs +++ b/crates/pixi_build_frontend/src/tool/cache.rs @@ -1,4 +1,8 @@ -use std::{fmt::Debug, hash::Hash, path::PathBuf}; +use std::{ + fmt::Debug, + path::PathBuf, + sync::{Arc, Weak}, +}; use dashmap::{DashMap, Entry}; use miette::{miette, IntoDiagnostic, Result}; @@ -15,11 +19,12 @@ use rattler_shell::{ use rattler_solve::{resolvo::Solver, SolverImpl, SolverTask}; use rattler_virtual_packages::{VirtualPackage, VirtualPackageOverrides}; use reqwest_middleware::ClientWithMiddleware; +use tokio::sync::broadcast; use super::IsolatedTool; use crate::{ tool::{SystemTool, Tool, ToolSpec}, - IsolatedToolSpec, SystemToolSpec, + IsolatedToolSpec, }; pub struct ToolContextBuilder { @@ -127,31 +132,28 @@ impl ToolContext { ) -> Result { let spec = match spec { ToolSpec::Io(ipc) => return Ok(Tool::Io(ipc)), - ToolSpec::Isolated(isolated) => CacheableToolSpec::Isolated(isolated), - ToolSpec::System(system) => CacheableToolSpec::System(system), - }; - - let cache_entry = match self.cache.cache.entry(spec.clone()) { - Entry::Occupied(entry) => return Ok(entry.get().clone().into()), - Entry::Vacant(entry) => entry, + ToolSpec::Isolated(isolated) => { + if isolated.specs.is_empty() { + return Err(ToolCacheError::Install(miette!( + "No build match specs provided for '{}' command.", + isolated.command + ))); + } + + isolated + } + + // I think we cannot bypass caching SystemTool as it is a wrapper around a spec command + ToolSpec::System(system) => return Ok(Tool::System(SystemTool::new(system.command))), }; - let tool: CachedTool = match spec { - CacheableToolSpec::Isolated(spec) => CachedTool::Isolated(if spec.specs.is_empty() { - return Err(ToolCacheError::Install(miette!( - "No build match specs provided for '{}' command.", - spec.command - ))); - } else { - self.install(&spec, channel_config) - .await - .map_err(ToolCacheError::Install)? - }), - CacheableToolSpec::System(spec) => SystemTool::new(spec.command).into(), - }; + let installed = self + .cache + .get_or_install_tool(spec, self, channel_config) + .await + .map_err(ToolCacheError::Install)?; - cache_entry.insert(tool.clone()); - Ok(tool.into()) + Ok(installed.into()) } /// Installed the tool in the isolated environment. @@ -266,15 +268,24 @@ impl ToolContext { } } -/// A [`ToolCache`] maintains a cache of environments for build tools. +/// A record that is either pending or has been fetched. +#[derive(Clone)] +enum PendingOrFetched { + Pending(Weak>), + Fetched(T), +} + +/// A [`ToolCache`] maintains a cache of environments for isolated build tools. /// /// This is useful to ensure that if we need to build multiple packages that use /// the same tool, we can reuse their environments. -/// (nichita): it can also be seen as a way to create tools itself -#[derive(Default, Debug)] +/// Implementation for request coalescing is inspired by: +/// * https://github.com/conda/rattler/blob/main/crates/rattler_repodata_gateway/src/gateway/mod.rs#L180 +/// * https://github.com/prefix-dev/rip/blob/main/crates/rattler_installs_packages/src/wheel_builder/mod.rs#L39 +#[derive(Default)] pub struct ToolCache { /// The cache of tools. - pub cache: DashMap, + cache: DashMap>>, } #[derive(thiserror::Error, Debug)] @@ -287,42 +298,6 @@ pub enum ToolCacheError { CacheDir(miette::Report), } -/// Describes the specification of the tool. This can be used to cache tool -/// information. -#[derive(Debug, Clone, Hash, Eq, PartialEq)] -pub enum CacheableToolSpec { - Isolated(IsolatedToolSpec), - System(SystemToolSpec), -} - -/// A tool that can be invoked. -#[derive(Debug, Clone)] -pub enum CachedTool { - Isolated(IsolatedTool), - System(SystemTool), -} - -impl From for Tool { - fn from(value: CachedTool) -> Self { - match value { - CachedTool::Isolated(tool) => Tool::Isolated(tool), - CachedTool::System(tool) => Tool::System(tool), - } - } -} - -impl From for CachedTool { - fn from(value: IsolatedTool) -> Self { - Self::Isolated(value) - } -} - -impl From for CachedTool { - fn from(value: SystemTool) -> Self { - Self::System(value) - } -} - impl ToolCache { /// Construct a new tool cache. pub fn new() -> Self { @@ -330,6 +305,87 @@ impl ToolCache { cache: DashMap::default(), } } + + pub async fn get_or_install_tool( + &self, + spec: IsolatedToolSpec, + context: &ToolContext, + channel_config: &ChannelConfig, + ) -> miette::Result> { + let sender = match self.cache.entry(spec.clone()) { + Entry::Vacant(entry) => { + // Construct a sender so other tasks can subscribe + let (sender, _) = broadcast::channel(1); + let sender = Arc::new(sender); + + // modify the current entry to the pending entry. + // this is an atomic operation + // because who holds the entry holds mutable access. + + entry.insert(PendingOrFetched::Pending(Arc::downgrade(&sender))); + + sender + } + Entry::Occupied(mut entry) => { + let tool = entry.get(); + match tool { + PendingOrFetched::Pending(sender) => { + let sender = sender.upgrade(); + if let Some(sender) = sender { + // Create a receiver before we drop the entry. While we hold on to + // the entry we have exclusive access to it, this means the task + // currently fetching the subdir will not be able to store a value + // until we drop the entry. + // By creating the receiver here we ensure that we are subscribed + // before the other tasks sends a value over the channel. + let mut receiver = sender.subscribe(); + + // Explicitly drop the entry, so we don't block any other tasks. + drop(entry); + + return match receiver.recv().await { + Ok(tool) => Ok(tool), + Err(_) => miette::bail!( + "a coalesced tool {} request install failed", + spec.command + ), + }; + } else { + // Construct a sender so other tasks can subscribe + let (sender, _) = broadcast::channel(1); + let sender = Arc::new(sender); + + // Modify the current entry to the pending entry, this is an atomic + // operation because who holds the entry holds mutable access. + entry.insert(PendingOrFetched::Pending(Arc::downgrade(&sender))); + + sender + } + } + PendingOrFetched::Fetched(tool) => return Ok(tool.clone()), + } + } + }; + + // At this point we have exclusive write access to this specific entry. All + // other tasks will find a pending entry and will wait for the records + // to become available. + // + // Let's start by creating the subdir. If an error occurs we immediately return + // the error. This will drop the sender and all other waiting tasks will + // receive an error. + let tool = Arc::new(context.install(&spec, channel_config).await?); + + // Store the fetched files in the entry. + self.cache + .insert(spec, PendingOrFetched::Fetched(tool.clone())); + + // Send the records to all waiting tasks. We don't care if there are no + // receivers, so we drop the error. + let _ = sender.send(tool.clone()); + + Ok(tool) + } } #[cfg(test)] @@ -340,7 +396,6 @@ mod tests { use rattler_conda_types::{ChannelConfig, MatchSpec, NamedChannelOrUrl, ParseStrictness}; use reqwest_middleware::ClientWithMiddleware; - // use super::ToolCache; use crate::{ tool::{ToolContext, ToolSpec}, IsolatedToolSpec, diff --git a/crates/pixi_build_frontend/src/tool/mod.rs b/crates/pixi_build_frontend/src/tool/mod.rs index 10a552478..bb237fe08 100644 --- a/crates/pixi_build_frontend/src/tool/mod.rs +++ b/crates/pixi_build_frontend/src/tool/mod.rs @@ -1,7 +1,7 @@ mod cache; mod spec; -use std::{collections::HashMap, path::PathBuf}; +use std::{collections::HashMap, path::PathBuf, sync::Arc}; pub use cache::{ToolCacheError, ToolContext}; pub use spec::{IsolatedToolSpec, SystemToolSpec, ToolSpec}; @@ -11,14 +11,14 @@ use crate::InProcessBackend; /// A tool that can be invoked. #[derive(Debug)] pub enum Tool { - Isolated(IsolatedTool), + Isolated(Arc), System(SystemTool), Io(InProcessBackend), } #[derive(Debug)] pub enum ExecutableTool { - Isolated(IsolatedTool), + Isolated(Arc), System(SystemTool), } @@ -43,6 +43,12 @@ impl From for Tool { } } +impl From> for Tool { + fn from(value: Arc) -> Self { + Self::Isolated(value) + } +} + /// A tool that is installed in its own isolated environment. #[derive(Debug, Clone)] pub struct IsolatedTool { @@ -69,11 +75,11 @@ impl IsolatedTool { } } -impl From for Tool { - fn from(value: IsolatedTool) -> Self { - Self::Isolated(value) - } -} +// impl From for Tool { +// fn from(value: IsolatedTool) -> Self { +// Self::Isolated(value) +// } +// } impl Tool { pub fn as_executable(&self) -> Option { @@ -105,11 +111,13 @@ impl ExecutableTool { /// Construct a new tool that calls another executable. pub fn with_executable(&self, executable: impl Into) -> Self { match self { - ExecutableTool::Isolated(tool) => ExecutableTool::Isolated(IsolatedTool::new( - executable, - tool.prefix.clone(), - tool.activation_scripts.clone(), - )), + ExecutableTool::Isolated(tool) => { + ExecutableTool::Isolated(Arc::new(IsolatedTool::new( + executable, + tool.prefix.clone(), + tool.activation_scripts.clone(), + ))) + } ExecutableTool::System(_) => ExecutableTool::System(SystemTool::new(executable)), } } From 61bc7263cc49a98226f8510c11b60d817338a6fa Mon Sep 17 00:00:00 2001 From: nichmor Date: Thu, 28 Nov 2024 17:50:22 +0200 Subject: [PATCH 2/9] misc: update comment --- crates/pixi_build_frontend/src/tool/cache.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/pixi_build_frontend/src/tool/cache.rs b/crates/pixi_build_frontend/src/tool/cache.rs index 574b55211..3c85d84eb 100644 --- a/crates/pixi_build_frontend/src/tool/cache.rs +++ b/crates/pixi_build_frontend/src/tool/cache.rs @@ -380,7 +380,7 @@ impl ToolCache { self.cache .insert(spec, PendingOrFetched::Fetched(tool.clone())); - // Send the records to all waiting tasks. We don't care if there are no + // Send the tool to all waiting tasks. We don't care if there are no // receivers, so we drop the error. let _ = sender.send(tool.clone()); From 548a17ec2c0690ffca0bfdba3065e1b19bdcde3f Mon Sep 17 00:00:00 2001 From: nichmor Date: Thu, 28 Nov 2024 17:50:36 +0200 Subject: [PATCH 3/9] misc: update comment --- crates/pixi_build_frontend/src/tool/cache.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/pixi_build_frontend/src/tool/cache.rs b/crates/pixi_build_frontend/src/tool/cache.rs index 3c85d84eb..2ceb8e41f 100644 --- a/crates/pixi_build_frontend/src/tool/cache.rs +++ b/crates/pixi_build_frontend/src/tool/cache.rs @@ -368,7 +368,7 @@ impl ToolCache { }; // At this point we have exclusive write access to this specific entry. All - // other tasks will find a pending entry and will wait for the records + // other tasks will find a pending entry and will wait for the tool // to become available. // // Let's start by creating the subdir. If an error occurs we immediately return From 47ead7a58eb311c3672e098ef2c1c4dd3c68afae Mon Sep 17 00:00:00 2001 From: nichmor Date: Fri, 29 Nov 2024 16:56:48 +0200 Subject: [PATCH 4/9] misc: add some tool installer --- crates/pixi_build_frontend/Cargo.toml | 5 + .../pixi_build_frontend/src/build_frontend.rs | 3 +- crates/pixi_build_frontend/src/tool/cache.rs | 468 ++++++++---------- .../pixi_build_frontend/src/tool/installer.rs | 276 +++++++++++ crates/pixi_build_frontend/src/tool/mod.rs | 15 +- 5 files changed, 500 insertions(+), 267 deletions(-) create mode 100644 crates/pixi_build_frontend/src/tool/installer.rs diff --git a/crates/pixi_build_frontend/Cargo.toml b/crates/pixi_build_frontend/Cargo.toml index 755a9fdea..a0aa8fd46 100644 --- a/crates/pixi_build_frontend/Cargo.toml +++ b/crates/pixi_build_frontend/Cargo.toml @@ -46,3 +46,8 @@ pixi_build_types = { path = "../pixi_build_types" } insta = { workspace = true, features = ["yaml", "filters"] } rstest = { workspace = true } tempfile = { workspace = true } +tokio = { workspace = true, features = [ + "process", + "io-std", + "rt-multi-thread", +] } diff --git a/crates/pixi_build_frontend/src/build_frontend.rs b/crates/pixi_build_frontend/src/build_frontend.rs index aa58a6933..d025bfa16 100644 --- a/crates/pixi_build_frontend/src/build_frontend.rs +++ b/crates/pixi_build_frontend/src/build_frontend.rs @@ -7,8 +7,7 @@ use rattler_conda_types::ChannelConfig; use crate::{ protocol, protocol_builder::{EnabledProtocols, ProtocolBuilder}, - tool::ToolContext, - Protocol, SetupRequest, + Protocol, SetupRequest, ToolContext, }; /// The frontend for building packages. diff --git a/crates/pixi_build_frontend/src/tool/cache.rs b/crates/pixi_build_frontend/src/tool/cache.rs index 2ceb8e41f..6bfd96f4f 100644 --- a/crates/pixi_build_frontend/src/tool/cache.rs +++ b/crates/pixi_build_frontend/src/tool/cache.rs @@ -5,270 +5,13 @@ use std::{ }; use dashmap::{DashMap, Entry}; -use miette::{miette, IntoDiagnostic, Result}; -use pixi_consts::consts::{CACHED_BUILD_ENVS_DIR, CONDA_REPODATA_CACHE_DIR}; -use pixi_progress::wrap_in_progress; -use pixi_utils::{EnvironmentHash, PrefixGuard}; -use rattler::{install::Installer, package_cache::PackageCache}; -use rattler_conda_types::{Channel, ChannelConfig, GenericVirtualPackage, Platform}; -use rattler_repodata_gateway::Gateway; -use rattler_shell::{ - activation::{ActivationVariables, Activator}, - shell::ShellEnum, -}; -use rattler_solve::{resolvo::Solver, SolverImpl, SolverTask}; -use rattler_virtual_packages::{VirtualPackage, VirtualPackageOverrides}; -use reqwest_middleware::ClientWithMiddleware; +use rattler_conda_types::ChannelConfig; use tokio::sync::broadcast; -use super::IsolatedTool; -use crate::{ - tool::{SystemTool, Tool, ToolSpec}, - IsolatedToolSpec, -}; - -pub struct ToolContextBuilder { - gateway: Option, - client: ClientWithMiddleware, - cache_dir: PathBuf, - cache: ToolCache, -} - -impl Default for ToolContextBuilder { - fn default() -> Self { - Self::new() - } -} - -impl ToolContextBuilder { - /// Create a new tool context builder. - pub fn new() -> Self { - Self { - gateway: None, - client: ClientWithMiddleware::default(), - cache_dir: pixi_config::get_cache_dir().expect("we should have a cache dir"), - cache: ToolCache::default(), - } - } - - /// Set the gateway for the tool context. - pub fn with_gateway(mut self, gateway: Gateway) -> Self { - self.gateway = Some(gateway); - self - } - - /// Set the client for the tool context. - pub fn with_client(mut self, client: ClientWithMiddleware) -> Self { - self.client = client; - self - } - - /// Set the cache directory for the tool context. - pub fn with_cache_dir(mut self, cache_dir: PathBuf) -> Self { - self.cache_dir = cache_dir; - self - } - - pub fn with_cache(mut self, cache: ToolCache) -> Self { - self.cache = cache; - self - } - - /// Build the `ToolContext` using builder configuration. - pub fn build(self) -> ToolContext { - let gateway = self.gateway.unwrap_or_else(|| { - Gateway::builder() - .with_client(self.client.clone()) - .with_cache_dir(self.cache_dir.join(CONDA_REPODATA_CACHE_DIR)) - .finish() - }); - - ToolContext { - cache_dir: self.cache_dir, - client: self.client, - cache: self.cache, - gateway, - } - } -} - -/// The tool context, -/// containing client, channels and gateway configuration -/// that will be used to resolve and install tools. -#[derive(Default)] -pub struct ToolContext { - // Authentication client to use for fetching repodata. - pub client: ClientWithMiddleware, - // The cache directory to use for the tools. - pub cache_dir: PathBuf, - // The gateway to use for fetching repodata. - pub gateway: Gateway, - // The cache to use for the tools. - pub cache: ToolCache, -} - -impl Debug for ToolContext { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ToolContext") - .field("client", &self.client) - .field("cache_dir", &self.cache_dir) - .finish() - } -} - -impl ToolContext { - /// Create a new tool context builder with the given channels. - pub fn builder() -> ToolContextBuilder { - ToolContextBuilder::new() - } - - /// Instantiate a tool from a specification. - /// - /// If the tool is not already cached, it will be created, installed and cached. - pub async fn instantiate( - &self, - spec: ToolSpec, - channel_config: &ChannelConfig, - ) -> Result { - let spec = match spec { - ToolSpec::Io(ipc) => return Ok(Tool::Io(ipc)), - ToolSpec::Isolated(isolated) => { - if isolated.specs.is_empty() { - return Err(ToolCacheError::Install(miette!( - "No build match specs provided for '{}' command.", - isolated.command - ))); - } - - isolated - } - - // I think we cannot bypass caching SystemTool as it is a wrapper around a spec command - ToolSpec::System(system) => return Ok(Tool::System(SystemTool::new(system.command))), - }; - - let installed = self - .cache - .get_or_install_tool(spec, self, channel_config) - .await - .map_err(ToolCacheError::Install)?; - - Ok(installed.into()) - } - - /// Installed the tool in the isolated environment. - pub async fn install( - &self, - spec: &IsolatedToolSpec, - channel_config: &ChannelConfig, - ) -> miette::Result { - let channels: Vec = spec - .channels - .iter() - .cloned() - .map(|channel| channel.into_channel(channel_config)) - .collect::, _>>() - .into_diagnostic()?; - - let repodata = self - .gateway - .query( - channels.clone(), - [Platform::current(), Platform::NoArch], - spec.specs.clone(), - ) - .recursive(true) - .execute() - .await - .into_diagnostic()?; - - // Determine virtual packages of the current platform - let virtual_packages = VirtualPackage::detect(&VirtualPackageOverrides::from_env()) - .unwrap() - .iter() - .cloned() - .map(GenericVirtualPackage::from) - .collect(); - - let solved_records = Solver - .solve(SolverTask { - specs: spec.specs.clone(), - virtual_packages, - ..SolverTask::from_iter(&repodata) - }) - .into_diagnostic()?; - - let cache = EnvironmentHash::new( - spec.command.clone(), - spec.specs.clone(), - channels.iter().map(|c| c.base_url.to_string()).collect(), - ); - - let cached_dir = self - .cache_dir - .join(CACHED_BUILD_ENVS_DIR) - .join(cache.name()); - - let mut prefix_guard = PrefixGuard::new(&cached_dir).into_diagnostic()?; - - let mut write_guard = - wrap_in_progress("acquiring write lock on prefix", || prefix_guard.write()) - .into_diagnostic()?; - - // If the environment already exists, we can return early. - if write_guard.is_ready() { - tracing::info!("reusing existing environment in {}", cached_dir.display()); - let _ = write_guard.finish(); +use super::{installer::ToolInstaller, IsolatedTool}; +use crate::IsolatedToolSpec; - // Get the activation scripts - let activator = - Activator::from_path(&cached_dir, ShellEnum::default(), Platform::current()) - .unwrap(); - - let activation_scripts = activator - .run_activation(ActivationVariables::from_env().unwrap_or_default(), None) - .unwrap(); - - return Ok(IsolatedTool::new( - spec.command.clone(), - cached_dir, - activation_scripts, - )); - } - - // Update the prefix to indicate that we are installing it. - write_guard.begin().into_diagnostic()?; - - // Install the environment - Installer::new() - .with_download_client(self.client.clone()) - .with_package_cache(PackageCache::new( - self.cache_dir - .join(pixi_consts::consts::CONDA_PACKAGE_CACHE_DIR), - )) - .install(&cached_dir, solved_records) - .await - .into_diagnostic()?; - - // Get the activation scripts - let activator = - Activator::from_path(&cached_dir, ShellEnum::default(), Platform::current()).unwrap(); - - let activation_scripts = activator - .run_activation(ActivationVariables::from_env().unwrap_or_default(), None) - .unwrap(); - - let _ = write_guard.finish(); - - Ok(IsolatedTool::new( - spec.command.clone(), - cached_dir, - activation_scripts, - )) - } -} - -/// A record that is either pending or has been fetched. +/// A entity that is either pending or has been fetched. #[derive(Clone)] enum PendingOrFetched { Pending(Weak>), @@ -309,7 +52,7 @@ impl ToolCache { pub async fn get_or_install_tool( &self, spec: IsolatedToolSpec, - context: &ToolContext, + context: &impl ToolInstaller, channel_config: &ChannelConfig, ) -> miette::Result> { let sender = match self.cache.entry(spec.clone()) { @@ -390,17 +133,46 @@ impl ToolCache { #[cfg(test)] mod tests { - use std::path::PathBuf; + use std::{ + collections::HashMap, + path::PathBuf, + sync::{atomic::AtomicU16, Arc}, + }; use pixi_config::Config; use rattler_conda_types::{ChannelConfig, MatchSpec, NamedChannelOrUrl, ParseStrictness}; use reqwest_middleware::ClientWithMiddleware; + use tokio::sync::{Barrier, Mutex}; use crate::{ - tool::{ToolContext, ToolSpec}, + tool::{installer::ToolContext, IsolatedTool, Tool, ToolInstaller, ToolSpec}, IsolatedToolSpec, }; + /// A test installer that will count how many times a tool was installed. + /// This is used to verify that we only install a tool once. + #[derive(Default, Clone)] + struct TestInstaller { + count: Arc>>, + } + + impl ToolInstaller for TestInstaller { + async fn install( + &self, + spec: &IsolatedToolSpec, + _channel_config: &ChannelConfig, + ) -> miette::Result { + let mut count = self.count.lock().await; + let count = count.entry(spec.clone()).or_insert(0); + *count += 1; + + let isolated_tool = + IsolatedTool::new(spec.command.clone(), PathBuf::new(), HashMap::default()); + + Ok(isolated_tool) + } + } + #[tokio::test] async fn test_tool_cache() { // let mut cache = ToolCache::new(); @@ -429,4 +201,172 @@ mod tests { exec.command().arg("hello").spawn().unwrap(); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_installing_is_synced() { + // This test verifies that we only install a tool once even if multiple tasks + // request the same tool at the same time. + + let mut config = Config::default(); + config.default_channels = vec![NamedChannelOrUrl::Name("conda-forge".to_string())]; + + let auth_client = ClientWithMiddleware::default(); + let channel_config = ChannelConfig::default_with_root_dir(PathBuf::new()); + + let tool_context = Arc::new( + ToolContext::builder() + .with_client(auth_client.clone()) + .build(), + ); + + let tool_installer = TestInstaller::default(); + + let tool_spec = IsolatedToolSpec { + specs: vec![MatchSpec::from_str("cowpy", ParseStrictness::Strict).unwrap()], + command: "cowpy".into(), + channels: Vec::from([NamedChannelOrUrl::Name("conda-forge".to_string())]), + }; + + // Let's imitate that we have 4 requests to install a tool + // we will use a barrier to ensure all tasks start at the same time. + let num_tasks = 4; + let barrier = Arc::new(Barrier::new(num_tasks)); + let mut handles = Vec::new(); + + for _ in 0..num_tasks { + let barrier = barrier.clone(); + // let tool_installer = tool_installer.clone(); + + let tool_context = tool_context.clone(); + + let tool_installer = tool_installer.clone(); + + let channel_config = channel_config.clone(); + let tool_spec = tool_spec.clone(); + + let handle = tokio::spawn(async move { + barrier.wait().await; + + let tool = tool_context + .cache + .get_or_install_tool(tool_spec, &tool_installer, &channel_config) + .await; + tool + }); + + handles.push(handle); + } + + // Wait for all tasks to complete + let tools = futures::future::join_all(handles) + .await + .into_iter() + .map(|tool| tool.unwrap()) + .collect::>(); + + // verify that we dont have any errors + let errors = tools.iter().filter(|tool| tool.is_err()).count(); + assert_eq!(errors, 0); + + // verify that only one was installed + let lock = tool_installer.count.lock().await; + let install_count = lock.get(&tool_spec).unwrap(); + assert_eq!(install_count, &1); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_handle_a_failure() { + // This test verifies that during the installation of a tool, if an error occurs + // the tool is not cached and the next request will try to install the tool again. + + // A test installer that will fail on the first request. + #[derive(Default, Clone)] + struct TestInstaller { + count: Arc>>, + } + + impl ToolInstaller for TestInstaller { + async fn install( + &self, + spec: &IsolatedToolSpec, + _channel_config: &ChannelConfig, + ) -> miette::Result { + let mut count = self.count.lock().await; + let count = count.entry(spec.clone()).or_insert(0); + *count += 1; + + if count == &1 { + miette::bail!("error on first request"); + } + + let isolated_tool = + IsolatedTool::new(spec.command.clone(), PathBuf::new(), HashMap::default()); + Ok(isolated_tool) + } + } + + let mut config = Config::default(); + config.default_channels = vec![NamedChannelOrUrl::Name("conda-forge".to_string())]; + + let auth_client = ClientWithMiddleware::default(); + let channel_config = ChannelConfig::default_with_root_dir(PathBuf::new()); + + let tool_context = Arc::new( + ToolContext::builder() + .with_client(auth_client.clone()) + .build(), + ); + + let tool_installer = TestInstaller::default(); + + let tool_spec = IsolatedToolSpec { + specs: vec![MatchSpec::from_str("cowpy", ParseStrictness::Strict).unwrap()], + command: "cowpy".into(), + channels: Vec::from([NamedChannelOrUrl::Name("conda-forge".to_string())]), + }; + + // Let's imitate that we have 4 requests to install a tool + // we will use a barrier to ensure all tasks start at the same time. + let num_tasks = 4; + let barrier = Arc::new(Barrier::new(num_tasks)); + let mut handles = Vec::new(); + + for _ in 0..num_tasks { + let barrier = barrier.clone(); + + let tool_context = tool_context.clone(); + + let tool_installer = tool_installer.clone(); + + let channel_config = channel_config.clone(); + let tool_spec = tool_spec.clone(); + + let handle = tokio::spawn(async move { + barrier.wait().await; + + let tool = tool_context + .cache + .get_or_install_tool(tool_spec, &tool_installer, &channel_config) + .await; + tool + }); + + handles.push(handle); + } + + // Wait for all tasks to complete + let tools = futures::future::join_all(handles) + .await + .into_iter() + .map(|tool| tool.unwrap()) + .collect::>(); + + // now we need to validate that exactly one install was errored out + let errors = tools.iter().filter(|tool| tool.is_err()).count(); + assert_eq!(errors, 1); + + let lock = tool_installer.count.lock().await; + let install_count = lock.get(&tool_spec).unwrap(); + assert_eq!(install_count, &2); + } } diff --git a/crates/pixi_build_frontend/src/tool/installer.rs b/crates/pixi_build_frontend/src/tool/installer.rs new file mode 100644 index 000000000..2f927a488 --- /dev/null +++ b/crates/pixi_build_frontend/src/tool/installer.rs @@ -0,0 +1,276 @@ +use std::fmt::Debug; +use std::future::Future; +use std::path::PathBuf; + +use pixi_consts::consts::{CACHED_BUILD_ENVS_DIR, CONDA_REPODATA_CACHE_DIR}; +use pixi_progress::wrap_in_progress; +use pixi_utils::{EnvironmentHash, PrefixGuard}; +use rattler::{install::Installer, package_cache::PackageCache}; +use rattler_conda_types::{Channel, ChannelConfig, GenericVirtualPackage, Platform}; +use rattler_repodata_gateway::Gateway; +use rattler_shell::{ + activation::{ActivationVariables, Activator}, + shell::ShellEnum, +}; +use rattler_solve::{resolvo::Solver, SolverImpl, SolverTask}; +use rattler_virtual_packages::{VirtualPackage, VirtualPackageOverrides}; +use reqwest_middleware::ClientWithMiddleware; + +use super::{ + cache::ToolCache, IsolatedTool, IsolatedToolSpec, SystemTool, Tool, ToolCacheError, ToolSpec, +}; + +use miette::{miette, IntoDiagnostic}; + +/// A trait that is responsible for installing tools. +pub trait ToolInstaller { + /// Install the tool. + fn install( + &self, + tool: &IsolatedToolSpec, + channel_config: &ChannelConfig, + ) -> impl Future> + Send; +} + +pub struct ToolContextBuilder { + gateway: Option, + client: ClientWithMiddleware, + cache_dir: PathBuf, + cache: ToolCache, +} + +impl Default for ToolContextBuilder { + fn default() -> Self { + Self::new() + } +} + +impl ToolContextBuilder { + /// Create a new tool context builder. + pub fn new() -> Self { + Self { + gateway: None, + client: ClientWithMiddleware::default(), + cache_dir: pixi_config::get_cache_dir().expect("we should have a cache dir"), + cache: ToolCache::default(), + } + } + + /// Set the gateway for the tool context. + pub fn with_gateway(mut self, gateway: Gateway) -> Self { + self.gateway = Some(gateway); + self + } + + /// Set the client for the tool context. + pub fn with_client(mut self, client: ClientWithMiddleware) -> Self { + self.client = client; + self + } + + /// Set the cache directory for the tool context. + pub fn with_cache_dir(mut self, cache_dir: PathBuf) -> Self { + self.cache_dir = cache_dir; + self + } + + pub fn with_cache(mut self, cache: ToolCache) -> Self { + self.cache = cache; + self + } + + /// Build the `ToolContext` using builder configuration. + pub fn build(self) -> ToolContext { + let gateway = self.gateway.unwrap_or_else(|| { + Gateway::builder() + .with_client(self.client.clone()) + .with_cache_dir(self.cache_dir.join(CONDA_REPODATA_CACHE_DIR)) + .finish() + }); + + ToolContext { + cache_dir: self.cache_dir, + client: self.client, + cache: self.cache, + gateway, + } + } +} + +/// The tool context, +/// containing client, channels and gateway configuration +/// that will be used to resolve and install tools. +#[derive(Default)] +pub struct ToolContext { + // Authentication client to use for fetching repodata. + pub client: ClientWithMiddleware, + // The cache directory to use for the tools. + pub cache_dir: PathBuf, + // The gateway to use for fetching repodata. + pub gateway: Gateway, + // The cache to use for the tools. + pub cache: ToolCache, +} + +impl Debug for ToolContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ToolContext") + .field("client", &self.client) + .field("cache_dir", &self.cache_dir) + .finish() + } +} + +impl ToolContext { + /// Create a new tool context builder with the given channels. + pub fn builder() -> ToolContextBuilder { + ToolContextBuilder::new() + } + + /// Instantiate a tool from a specification. + /// + /// If the tool is not already cached, it will be created, installed and cached. + pub async fn instantiate( + &self, + spec: ToolSpec, + channel_config: &ChannelConfig, + ) -> Result { + let spec = match spec { + ToolSpec::Io(ipc) => return Ok(Tool::Io(ipc)), + ToolSpec::Isolated(isolated) => { + if isolated.specs.is_empty() { + return Err(ToolCacheError::Install(miette!( + "No build match specs provided for '{}' command.", + isolated.command + ))); + } + + isolated + } + + // I think we cannot bypass caching SystemTool as it is a wrapper around a spec command + ToolSpec::System(system) => return Ok(Tool::System(SystemTool::new(system.command))), + }; + + let installed = self + .cache + .get_or_install_tool(spec, self, channel_config) + .await + .map_err(ToolCacheError::Install)?; + + Ok(installed.into()) + } +} + +impl ToolInstaller for ToolContext { + /// Installed the tool in the isolated environment. + async fn install( + &self, + spec: &IsolatedToolSpec, + channel_config: &ChannelConfig, + ) -> miette::Result { + let channels: Vec = spec + .channels + .iter() + .cloned() + .map(|channel| channel.into_channel(channel_config)) + .collect::, _>>() + .into_diagnostic()?; + + let repodata = self + .gateway + .query( + channels.clone(), + [Platform::current(), Platform::NoArch], + spec.specs.clone(), + ) + .recursive(true) + .execute() + .await + .into_diagnostic()?; + + // Determine virtual packages of the current platform + let virtual_packages = VirtualPackage::detect(&VirtualPackageOverrides::from_env()) + .unwrap() + .iter() + .cloned() + .map(GenericVirtualPackage::from) + .collect(); + + let solved_records = Solver + .solve(SolverTask { + specs: spec.specs.clone(), + virtual_packages, + ..SolverTask::from_iter(&repodata) + }) + .into_diagnostic()?; + + let cache = EnvironmentHash::new( + spec.command.clone(), + spec.specs.clone(), + channels.iter().map(|c| c.base_url.to_string()).collect(), + ); + + let cached_dir = self + .cache_dir + .join(CACHED_BUILD_ENVS_DIR) + .join(cache.name()); + + let mut prefix_guard = PrefixGuard::new(&cached_dir).into_diagnostic()?; + + let mut write_guard = + wrap_in_progress("acquiring write lock on prefix", || prefix_guard.write()) + .into_diagnostic()?; + + // If the environment already exists, we can return early. + if write_guard.is_ready() { + tracing::info!("reusing existing environment in {}", cached_dir.display()); + let _ = write_guard.finish(); + + // Get the activation scripts + let activator = + Activator::from_path(&cached_dir, ShellEnum::default(), Platform::current()) + .unwrap(); + + let activation_scripts = activator + .run_activation(ActivationVariables::from_env().unwrap_or_default(), None) + .unwrap(); + + return Ok(IsolatedTool::new( + spec.command.clone(), + cached_dir, + activation_scripts, + )); + } + + // Update the prefix to indicate that we are installing it. + write_guard.begin().into_diagnostic()?; + + // Install the environment + Installer::new() + .with_download_client(self.client.clone()) + .with_package_cache(PackageCache::new( + self.cache_dir + .join(pixi_consts::consts::CONDA_PACKAGE_CACHE_DIR), + )) + .install(&cached_dir, solved_records) + .await + .into_diagnostic()?; + + // Get the activation scripts + let activator = + Activator::from_path(&cached_dir, ShellEnum::default(), Platform::current()).unwrap(); + + let activation_scripts = activator + .run_activation(ActivationVariables::from_env().unwrap_or_default(), None) + .unwrap(); + + let _ = write_guard.finish(); + + Ok(IsolatedTool::new( + spec.command.clone(), + cached_dir, + activation_scripts, + )) + } +} diff --git a/crates/pixi_build_frontend/src/tool/mod.rs b/crates/pixi_build_frontend/src/tool/mod.rs index bb237fe08..d05a6cb4d 100644 --- a/crates/pixi_build_frontend/src/tool/mod.rs +++ b/crates/pixi_build_frontend/src/tool/mod.rs @@ -1,13 +1,16 @@ mod cache; +mod installer; mod spec; use std::{collections::HashMap, path::PathBuf, sync::Arc}; -pub use cache::{ToolCacheError, ToolContext}; +pub use cache::ToolCacheError; pub use spec::{IsolatedToolSpec, SystemToolSpec, ToolSpec}; use crate::InProcessBackend; +pub use installer::ToolContext; + /// A tool that can be invoked. #[derive(Debug)] pub enum Tool { @@ -16,6 +19,16 @@ pub enum Tool { Io(InProcessBackend), } +impl Tool { + pub fn as_isolated(&self) -> Option> { + match self { + Tool::Isolated(tool) => Some(tool.clone()), + Tool::System(_) => None, + Tool::Io(_) => None, + } + } +} + #[derive(Debug)] pub enum ExecutableTool { Isolated(Arc), From bc2badcfeab5be1ce236c57895fac89591c66e53 Mon Sep 17 00:00:00 2001 From: nichmor Date: Fri, 29 Nov 2024 16:59:05 +0200 Subject: [PATCH 5/9] misc: add some tool installer --- crates/pixi_build_frontend/src/tool/cache.rs | 4 ++-- crates/pixi_build_frontend/src/tool/mod.rs | 6 ------ 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/crates/pixi_build_frontend/src/tool/cache.rs b/crates/pixi_build_frontend/src/tool/cache.rs index 6bfd96f4f..afe8df7e2 100644 --- a/crates/pixi_build_frontend/src/tool/cache.rs +++ b/crates/pixi_build_frontend/src/tool/cache.rs @@ -77,7 +77,7 @@ impl ToolCache { if let Some(sender) = sender { // Create a receiver before we drop the entry. While we hold on to // the entry we have exclusive access to it, this means the task - // currently fetching the subdir will not be able to store a value + // currently installing the tool will not be able to store a value // until we drop the entry. // By creating the receiver here we ensure that we are subscribed // before the other tasks sends a value over the channel. @@ -114,7 +114,7 @@ impl ToolCache { // other tasks will find a pending entry and will wait for the tool // to become available. // - // Let's start by creating the subdir. If an error occurs we immediately return + // Let's start by installing tool. If an error occurs we immediately return // the error. This will drop the sender and all other waiting tasks will // receive an error. let tool = Arc::new(context.install(&spec, channel_config).await?); diff --git a/crates/pixi_build_frontend/src/tool/mod.rs b/crates/pixi_build_frontend/src/tool/mod.rs index d05a6cb4d..352aa75f3 100644 --- a/crates/pixi_build_frontend/src/tool/mod.rs +++ b/crates/pixi_build_frontend/src/tool/mod.rs @@ -88,12 +88,6 @@ impl IsolatedTool { } } -// impl From for Tool { -// fn from(value: IsolatedTool) -> Self { -// Self::Isolated(value) -// } -// } - impl Tool { pub fn as_executable(&self) -> Option { match self { From 2c347b14847783f808302008e42f4dc5a3030940 Mon Sep 17 00:00:00 2001 From: nichmor Date: Fri, 29 Nov 2024 17:02:43 +0200 Subject: [PATCH 6/9] misc: remove comments --- crates/pixi_build_frontend/src/tool/cache.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/crates/pixi_build_frontend/src/tool/cache.rs b/crates/pixi_build_frontend/src/tool/cache.rs index afe8df7e2..4485519cc 100644 --- a/crates/pixi_build_frontend/src/tool/cache.rs +++ b/crates/pixi_build_frontend/src/tool/cache.rs @@ -133,11 +133,7 @@ impl ToolCache { #[cfg(test)] mod tests { - use std::{ - collections::HashMap, - path::PathBuf, - sync::{atomic::AtomicU16, Arc}, - }; + use std::{collections::HashMap, path::PathBuf, sync::Arc}; use pixi_config::Config; use rattler_conda_types::{ChannelConfig, MatchSpec, NamedChannelOrUrl, ParseStrictness}; @@ -145,7 +141,10 @@ mod tests { use tokio::sync::{Barrier, Mutex}; use crate::{ - tool::{installer::ToolContext, IsolatedTool, Tool, ToolInstaller, ToolSpec}, + tool::{ + installer::{ToolContext, ToolInstaller}, + IsolatedTool, ToolSpec, + }, IsolatedToolSpec, }; @@ -235,7 +234,6 @@ mod tests { for _ in 0..num_tasks { let barrier = barrier.clone(); - // let tool_installer = tool_installer.clone(); let tool_context = tool_context.clone(); From 80fcb2e94c9fecb4dfcd3a79165d9603bb682cc8 Mon Sep 17 00:00:00 2001 From: nichmor Date: Fri, 29 Nov 2024 18:23:47 +0200 Subject: [PATCH 7/9] misc: return var directly --- crates/pixi_build_frontend/src/tool/cache.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/crates/pixi_build_frontend/src/tool/cache.rs b/crates/pixi_build_frontend/src/tool/cache.rs index 4485519cc..389aeef39 100644 --- a/crates/pixi_build_frontend/src/tool/cache.rs +++ b/crates/pixi_build_frontend/src/tool/cache.rs @@ -245,11 +245,10 @@ mod tests { let handle = tokio::spawn(async move { barrier.wait().await; - let tool = tool_context + tool_context .cache .get_or_install_tool(tool_spec, &tool_installer, &channel_config) - .await; - tool + .await }); handles.push(handle); @@ -342,11 +341,10 @@ mod tests { let handle = tokio::spawn(async move { barrier.wait().await; - let tool = tool_context + tool_context .cache .get_or_install_tool(tool_spec, &tool_installer, &channel_config) - .await; - tool + .await }); handles.push(handle); From fee4a941077a4ea3932db72c711d7f487de7e198 Mon Sep 17 00:00:00 2001 From: nichmor Date: Mon, 2 Dec 2024 15:07:09 +0200 Subject: [PATCH 8/9] misc: make isolated tool non arc --- .../pixi_build_frontend/src/tool/installer.rs | 9 ++++++- crates/pixi_build_frontend/src/tool/mod.rs | 24 +++++++++---------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/crates/pixi_build_frontend/src/tool/installer.rs b/crates/pixi_build_frontend/src/tool/installer.rs index 751cac509..f0ca379d0 100644 --- a/crates/pixi_build_frontend/src/tool/installer.rs +++ b/crates/pixi_build_frontend/src/tool/installer.rs @@ -179,7 +179,14 @@ impl ToolContext { .await .map_err(ToolCacheError::Install)?; - Ok(installed.into()) + // Return the installed tool as a non arc instance + let tool = IsolatedTool::new( + installed.command.clone(), + installed.prefix.clone(), + installed.activation_scripts.clone(), + ); + + Ok(tool.into()) } } diff --git a/crates/pixi_build_frontend/src/tool/mod.rs b/crates/pixi_build_frontend/src/tool/mod.rs index 352aa75f3..16aef0c8d 100644 --- a/crates/pixi_build_frontend/src/tool/mod.rs +++ b/crates/pixi_build_frontend/src/tool/mod.rs @@ -2,7 +2,7 @@ mod cache; mod installer; mod spec; -use std::{collections::HashMap, path::PathBuf, sync::Arc}; +use std::{collections::HashMap, path::PathBuf}; pub use cache::ToolCacheError; pub use spec::{IsolatedToolSpec, SystemToolSpec, ToolSpec}; @@ -14,13 +14,13 @@ pub use installer::ToolContext; /// A tool that can be invoked. #[derive(Debug)] pub enum Tool { - Isolated(Arc), + Isolated(IsolatedTool), System(SystemTool), Io(InProcessBackend), } impl Tool { - pub fn as_isolated(&self) -> Option> { + pub fn as_isolated(&self) -> Option { match self { Tool::Isolated(tool) => Some(tool.clone()), Tool::System(_) => None, @@ -31,7 +31,7 @@ impl Tool { #[derive(Debug)] pub enum ExecutableTool { - Isolated(Arc), + Isolated(IsolatedTool), System(SystemTool), } @@ -56,8 +56,8 @@ impl From for Tool { } } -impl From> for Tool { - fn from(value: Arc) -> Self { +impl From for Tool { + fn from(value: IsolatedTool) -> Self { Self::Isolated(value) } } @@ -118,13 +118,11 @@ impl ExecutableTool { /// Construct a new tool that calls another executable. pub fn with_executable(&self, executable: impl Into) -> Self { match self { - ExecutableTool::Isolated(tool) => { - ExecutableTool::Isolated(Arc::new(IsolatedTool::new( - executable, - tool.prefix.clone(), - tool.activation_scripts.clone(), - ))) - } + ExecutableTool::Isolated(tool) => ExecutableTool::Isolated(IsolatedTool::new( + executable, + tool.prefix.clone(), + tool.activation_scripts.clone(), + )), ExecutableTool::System(_) => ExecutableTool::System(SystemTool::new(executable)), } } From f3f1387ecda41f31cefbdea3908783ac260fc49a Mon Sep 17 00:00:00 2001 From: nichmor Date: Mon, 2 Dec 2024 15:25:06 +0200 Subject: [PATCH 9/9] misc: return as reference --- crates/pixi_build_frontend/src/tool/installer.rs | 8 +------- crates/pixi_build_frontend/src/tool/mod.rs | 4 ++-- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/crates/pixi_build_frontend/src/tool/installer.rs b/crates/pixi_build_frontend/src/tool/installer.rs index f0ca379d0..c69463c65 100644 --- a/crates/pixi_build_frontend/src/tool/installer.rs +++ b/crates/pixi_build_frontend/src/tool/installer.rs @@ -180,13 +180,7 @@ impl ToolContext { .map_err(ToolCacheError::Install)?; // Return the installed tool as a non arc instance - let tool = IsolatedTool::new( - installed.command.clone(), - installed.prefix.clone(), - installed.activation_scripts.clone(), - ); - - Ok(tool.into()) + Ok(installed.as_ref().clone().into()) } } diff --git a/crates/pixi_build_frontend/src/tool/mod.rs b/crates/pixi_build_frontend/src/tool/mod.rs index 16aef0c8d..ca21f38e4 100644 --- a/crates/pixi_build_frontend/src/tool/mod.rs +++ b/crates/pixi_build_frontend/src/tool/mod.rs @@ -20,9 +20,9 @@ pub enum Tool { } impl Tool { - pub fn as_isolated(&self) -> Option { + pub fn as_isolated(&self) -> Option<&IsolatedTool> { match self { - Tool::Isolated(tool) => Some(tool.clone()), + Tool::Isolated(tool) => Some(tool), Tool::System(_) => None, Tool::Io(_) => None, }