diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs index 3cf245d23..4e77904b2 100644 --- a/nativelink-worker/src/running_actions_manager.rs +++ b/nativelink-worker/src/running_actions_manager.rs @@ -73,7 +73,6 @@ use tokio::io::{AsyncReadExt, AsyncSeekExt}; use tokio::process; use tokio::sync::{oneshot, watch}; use tokio::task::spawn_blocking; -use tokio::time::timeout; use tokio_stream::wrappers::ReadDirStream; use tonic::Request; use tracing::{error, info}; @@ -256,44 +255,32 @@ fn is_executable(metadata: &std::fs::Metadata, _full_path: &impl AsRef) -> } async fn upload_file( - mut resumeable_file: fs::ResumeableFileSlot, cas_store: Pin<&dyn Store>, full_path: impl AsRef + Debug, hasher: DigestHasherFunc, + metadata: std::fs::Metadata, ) -> Result { - let (is_executable, file_size) = { - let file_handle = resumeable_file.as_reader().await.err_tip(|| { - "Could not get reader from file slot in RunningActionsManager::upload_file()" - })?; - let metadata = file_handle - .get_ref() - .as_ref() - .metadata() - .await - .err_tip(|| format!("While reading metadata for {:?}", full_path.as_ref()))?; - (is_executable(&metadata, &full_path), metadata.len()) - }; - let (digest, resumeable_file) = { - let (digest, mut resumeable_file) = hasher - .hasher() - .digest_for_file(resumeable_file, Some(file_size)) - .await - .err_tip(|| { - format!("Failed to hash file in digest_for_file failed for {full_path:?}") - })?; + let is_executable = is_executable(&metadata, &full_path); + let file_size = metadata.len(); + let resumeable_file = fs::open_file(&full_path, u64::MAX) + .await + .err_tip(|| format!("Could not open file {full_path:?}"))?; + + let (digest, mut resumeable_file) = hasher + .hasher() + .digest_for_file(resumeable_file, Some(file_size)) + .await + .err_tip(|| format!("Failed to hash file in digest_for_file failed for {full_path:?}"))?; + + resumeable_file + .as_reader() + .await + .err_tip(|| "Could not get reader from file slot in RunningActionsManager::upload_file()")? + .get_mut() + .rewind() + .await + .err_tip(|| "Could not rewind file")?; - resumeable_file - .as_reader() - .await - .err_tip(|| { - "Could not get reader from file slot in RunningActionsManager::upload_file()" - })? - .get_mut() - .rewind() - .await - .err_tip(|| "Could not rewind file")?; - (digest, resumeable_file) - }; cas_store .update_with_whole_file( digest, @@ -395,11 +382,8 @@ fn upload_directory<'a, P: AsRef + Debug + Send + Sync + Clone + 'a>( // lived as possible. This is why we iterate the directory and then build a bunch of // futures with all the work we are wanting to do then execute it. It allows us to // close the directory iterator file descriptor, then open the child files/folders. - while let Some(entry) = dir_stream.next().await { - let entry = match entry { - Ok(entry) => entry, - Err(e) => return Err(e).err_tip(|| "Error while iterating directory")?, - }; + while let Some(entry_result) = dir_stream.next().await { + let entry = entry_result.err_tip(|| "Error while iterating directory")?; let file_type = entry .file_type() .await @@ -445,11 +429,10 @@ fn upload_directory<'a, P: AsRef + Debug + Send + Sync + Clone + 'a>( ); } else if file_type.is_file() { file_futures.push(async move { - let file_handle = - fs::open_file(full_path.as_os_str().to_os_string(), u64::MAX) - .await - .err_tip(|| format!("Could not open file {full_path:?}"))?; - upload_file(file_handle, cas_store, &full_path, hasher) + let metadata = fs::metadata(&full_path) + .await + .err_tip(|| format!("Could not open file {full_path:?}"))?; + upload_file(cas_store, &full_path, hasher, metadata) .map_ok(|v| v.into()) .await }); @@ -1025,8 +1008,7 @@ impl RunningActionImpl { let work_directory = &self.work_directory; output_path_futures.push(async move { let metadata = { - let mut resumeable_file = match fs::open_file(full_path.clone(), u64::MAX).await - { + let metadata = match fs::symlink_metadata(&full_path).await { Ok(file) => file, Err(e) => { if e.code == Code::NotFound { @@ -1037,32 +1019,10 @@ impl RunningActionImpl { return Err(e).err_tip(|| format!("Could not open file {full_path:?}")); } }; - // We cannot rely on the file_handle's metadata, because it follows symlinks, so - // we need to instead use `symlink_metadata`. - let metadata_fut = fs::symlink_metadata(&full_path); - tokio::pin!(metadata_fut); - - // Just in case we are starved for open file descriptors, we timeout the metadata - // call and close the file, then try again. - let metadata = match timeout( - fs::idle_file_descriptor_timeout(), - &mut metadata_fut, - ) - .await - { - Ok(result) => result, - Err(_) => { - resumeable_file - .close_file() - .await - .err_tip(|| "In inner_upload_results()")?; - (&mut metadata_fut).await - } - } - .err_tip(|| format!("While querying symlink metadata for {entry}"))?; + if metadata.is_file() { return Ok(OutputType::File( - upload_file(resumeable_file, cas_store, &full_path, hasher) + upload_file(cas_store, &full_path, hasher, metadata) .await .map(|mut file_info| { file_info.name_or_path = NameOrPath::Path(entry); diff --git a/nativelink-worker/tests/running_actions_manager_test.rs b/nativelink-worker/tests/running_actions_manager_test.rs index f24c8f5d6..b9fde47bb 100644 --- a/nativelink-worker/tests/running_actions_manager_test.rs +++ b/nativelink-worker/tests/running_actions_manager_test.rs @@ -26,7 +26,7 @@ use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use futures::{FutureExt, TryFutureExt}; +use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use nativelink_config::cas_server::EnvironmentSource; use nativelink_error::{make_input_err, Code, Error, ResultExt}; #[cfg_attr(target_family = "windows", allow(unused_imports))] @@ -3018,4 +3018,194 @@ exit 1 ); Ok(()) } + + // We've experienced deadlocks when uploading, so make only a single permit available and + // check it's able to handle uploading some directories with some files in. + // Be default this test is ignored because it *must* be run single threaded... to run this + // test execute: + // cargo test -p nativelink-worker --test running_actions_manager_test -- --test-threads=1 --ignored + #[tokio::test] + #[ignore] + async fn upload_with_single_permit() -> Result<(), Box> { + const WORKER_ID: &str = "foo_worker_id"; + + fn test_monotonic_clock() -> SystemTime { + static CLOCK: AtomicU64 = AtomicU64::new(0); + monotonic_clock(&CLOCK) + } + + let (_, slow_store, cas_store, ac_store) = setup_stores().await?; + let root_action_directory = make_temp_path("root_action_directory"); + fs::create_dir_all(&root_action_directory).await?; + + // Take all but one FD permit away. + let _permits = futures::stream::iter(1..fs::OPEN_FILE_SEMAPHORE.available_permits()) + .then(|_| fs::OPEN_FILE_SEMAPHORE.acquire()) + .try_collect::>() + .await?; + assert_eq!(1, fs::OPEN_FILE_SEMAPHORE.available_permits()); + + let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_callbacks( + RunningActionsManagerArgs { + root_action_directory, + execution_configuration: ExecutionConfiguration::default(), + cas_store: Pin::into_inner(cas_store.clone()), + ac_store: Some(Pin::into_inner(ac_store.clone())), + historical_store: Pin::into_inner(cas_store.clone()), + upload_action_result_config: + &nativelink_config::cas_server::UploadActionResultConfig { + upload_ac_results_strategy: + nativelink_config::cas_server::UploadCacheResultsStrategy::never, + ..Default::default() + }, + max_action_timeout: Duration::MAX, + timeout_handled_externally: false, + }, + Callbacks { + now_fn: test_monotonic_clock, + sleep_fn: |_duration| Box::pin(futures::future::pending()), + }, + )?); + let action_result = { + const SALT: u64 = 55; + #[cfg(target_family = "unix")] + let arguments = vec![ + "sh".to_string(), + "-c".to_string(), + "printf '123 ' > ./test.txt; mkdir ./tst; printf '456 ' > ./tst/tst.txt; printf 'foo-stdout '; >&2 printf 'bar-stderr '" + .to_string(), + ]; + #[cfg(target_family = "windows")] + let arguments = vec![ + "cmd".to_string(), + "/C".to_string(), + // Note: Windows adds two spaces after 'set /p=XXX'. + "echo | set /p=123> ./test.txt & mkdir ./tst & echo | set /p=456> ./tst/tst.txt & echo | set /p=foo-stdout & echo | set /p=bar-stderr 1>&2 & exit 0" + .to_string(), + ]; + let working_directory = "some_cwd"; + let command = Command { + arguments, + output_paths: vec!["test.txt".to_string(), "tst".to_string()], + working_directory: working_directory.to_string(), + ..Default::default() + }; + let command_digest = serialize_and_upload_message( + &command, + cas_store.as_ref(), + &mut DigestHasherFunc::Sha256.hasher(), + ) + .await?; + let input_root_digest = serialize_and_upload_message( + &Directory { + directories: vec![DirectoryNode { + name: working_directory.to_string(), + digest: Some( + serialize_and_upload_message( + &Directory::default(), + cas_store.as_ref(), + &mut DigestHasherFunc::Sha256.hasher(), + ) + .await? + .into(), + ), + }], + ..Default::default() + }, + cas_store.as_ref(), + &mut DigestHasherFunc::Sha256.hasher(), + ) + .await?; + let action = Action { + command_digest: Some(command_digest.into()), + input_root_digest: Some(input_root_digest.into()), + ..Default::default() + }; + let action_digest = serialize_and_upload_message( + &action, + cas_store.as_ref(), + &mut DigestHasherFunc::Sha256.hasher(), + ) + .await?; + + let running_action_impl = running_actions_manager + .create_and_add_action( + WORKER_ID.to_string(), + StartExecute { + execute_request: Some(ExecuteRequest { + action_digest: Some(action_digest.into()), + ..Default::default() + }), + salt: SALT, + queued_timestamp: None, + }, + ) + .await?; + + run_action(running_action_impl.clone()).await? + }; + let file_content = slow_store + .as_ref() + .get_part_unchunked(action_result.output_files[0].digest, 0, None, None) + .await?; + assert_eq!(from_utf8(&file_content)?, "123 "); + let stdout_content = slow_store + .as_ref() + .get_part_unchunked(action_result.stdout_digest, 0, None, None) + .await?; + assert_eq!(from_utf8(&stdout_content)?, "foo-stdout "); + let stderr_content = slow_store + .as_ref() + .get_part_unchunked(action_result.stderr_digest, 0, None, None) + .await?; + assert_eq!(from_utf8(&stderr_content)?, "bar-stderr "); + let mut clock_time = make_system_time(0); + assert_eq!( + action_result, + ActionResult { + output_files: vec![FileInfo { + name_or_path: NameOrPath::Path("test.txt".to_string()), + digest: DigestInfo::try_new( + "c69e10a5f54f4e28e33897fbd4f8701595443fa8c3004aeaa20dd4d9a463483b", + 4 + )?, + is_executable: false, + }], + stdout_digest: DigestInfo::try_new( + "15019a676f057d97d1ad3af86f3cc1e623cb33b18ff28422bbe3248d2471cc94", + 11 + )?, + stderr_digest: DigestInfo::try_new( + "2375ab8a01ca11e1ea7606dfb58756c153d49733cde1dbfb5a1e00f39afacf06", + 12 + )?, + exit_code: 0, + output_folders: vec![DirectoryInfo { + path: "tst".to_string(), + tree_digest: DigestInfo::try_new( + "95711c1905d4898a70209dd6e98241dcafb479c00241a1ea4ed8415710d706f3", + 166, + )?, + },], + output_file_symlinks: vec![], + output_directory_symlinks: vec![], + server_logs: HashMap::new(), + execution_metadata: ExecutionMetadata { + worker: WORKER_ID.to_string(), + queued_timestamp: SystemTime::UNIX_EPOCH, + worker_start_timestamp: increment_clock(&mut clock_time), + input_fetch_start_timestamp: increment_clock(&mut clock_time), + input_fetch_completed_timestamp: increment_clock(&mut clock_time), + execution_start_timestamp: increment_clock(&mut clock_time), + execution_completed_timestamp: increment_clock(&mut clock_time), + output_upload_start_timestamp: increment_clock(&mut clock_time), + output_upload_completed_timestamp: increment_clock(&mut clock_time), + worker_completed_timestamp: increment_clock(&mut clock_time), + }, + error: None, + message: String::new(), + } + ); + Ok(()) + } }