diff --git a/BUILD.bazel b/BUILD.bazel index b3af22cc3..e24b4f294 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -25,7 +25,6 @@ rust_binary( "@crates//:async-lock", "@crates//:axum", "@crates//:clap", - "@crates//:console-subscriber", "@crates//:futures", "@crates//:hyper", "@crates//:mimalloc", @@ -39,7 +38,6 @@ rust_binary( "@crates//:tonic", "@crates//:tower", "@crates//:tracing", - "@crates//:tracing-subscriber", ], ) diff --git a/Cargo.lock b/Cargo.lock index 06e6f306c..168bf8bfe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1740,7 +1740,6 @@ dependencies = [ "async-lock", "axum", "clap", - "console-subscriber", "futures", "hyper", "mimalloc", @@ -1762,7 +1761,6 @@ dependencies = [ "tonic 0.11.0", "tower", "tracing", - "tracing-subscriber", ] [[package]] @@ -1901,6 +1899,7 @@ dependencies = [ "redis", "redis-test", "serde", + "serial_test", "sha2", "shellexpand", "tempfile", @@ -1920,6 +1919,7 @@ dependencies = [ "async-trait", "blake3", "bytes", + "console-subscriber", "futures", "hex", "hyper", @@ -1943,6 +1943,7 @@ dependencies = [ "tokio-util", "tonic 0.11.0", "tracing", + "tracing-subscriber", ] [[package]] @@ -2685,6 +2686,15 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" +[[package]] +name = "scc" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ad2bbb0ae5100a07b7a6f2ed7ab5fd0045551a4c507989b7a620046ea3efdc" +dependencies = [ + "sdd", +] + [[package]] name = "schannel" version = "0.1.23" @@ -2710,6 +2720,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "sdd" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b84345e4c9bd703274a082fb80caaa99b7612be48dfaa1dd9266577ec412309d" + [[package]] name = "sec1" version = "0.3.0" @@ -2818,6 +2834,31 @@ dependencies = [ "serde", ] +[[package]] +name = "serial_test" +version = "3.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b4b487fe2acf240a021cf57c6b2b4903b1e78ca0ecd862a71b71d2a51fed77d" +dependencies = [ + "futures", + "log", + "once_cell", + "parking_lot", + "scc", + "serial_test_derive", +] + +[[package]] +name = "serial_test_derive" +version = "3.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82fe9db325bcef1fbcde82e078a5cc4efdf787e96b3b9cf45b50b529f2083d67" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.52", +] + [[package]] name = "sha1" version = "0.10.6" diff --git a/Cargo.toml b/Cargo.toml index 0ee59bc55..5ed2b9504 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,6 @@ async-lock = "3.3.0" axum = "0.6.20" clap = { version = "4.5.4", features = ["derive"] } -console-subscriber = { version = "0.2.0" } futures = "0.3.30" hyper = { version = "0.14.28" } mimalloc = "0.1.41" @@ -53,4 +52,3 @@ tokio-rustls = "0.25.0" tonic = { version = "0.11.0", features = ["gzip", "tls"] } tower = "0.4.13" tracing = "0.1.40" -tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } diff --git a/nativelink-macro/src/lib.rs b/nativelink-macro/src/lib.rs index 0883fc15d..f37175569 100644 --- a/nativelink-macro/src/lib.rs +++ b/nativelink-macro/src/lib.rs @@ -34,6 +34,11 @@ pub fn nativelink_test(attr: TokenStream, item: TokenStream) -> TokenStream { #[allow(clippy::disallowed_methods)] #[tokio::test(#attr)] async fn #fn_name(#fn_inputs) #fn_output { + // Error means already initialized, which is ok. + let _ = nativelink_util::init_tracing(); + // If already set it's ok. + let _ = nativelink_util::fs::set_idle_file_descriptor_timeout(std::time::Duration::from_millis(100)); + #[warn(clippy::disallowed_methods)] ::std::sync::Arc::new(::nativelink_util::origin_context::OriginContext::new()).wrap_async( ::nativelink_util::__tracing::trace_span!("test"), async move { diff --git a/nativelink-store/BUILD.bazel b/nativelink-store/BUILD.bazel index 365e1e7ff..772dd6aa0 100644 --- a/nativelink-store/BUILD.bazel +++ b/nativelink-store/BUILD.bazel @@ -116,10 +116,12 @@ rust_test_suite( "@crates//:hyper", "@crates//:memory-stats", "@crates//:once_cell", + "@crates//:parking_lot", "@crates//:pretty_assertions", "@crates//:rand", "@crates//:redis", "@crates//:redis-test", + "@crates//:serial_test", "@crates//:sha2", "@crates//:tokio", "@crates//:tokio-stream", diff --git a/nativelink-store/Cargo.toml b/nativelink-store/Cargo.toml index 764437ed3..1e6cb6828 100644 --- a/nativelink-store/Cargo.toml +++ b/nativelink-store/Cargo.toml @@ -53,3 +53,4 @@ aws-smithy-types = "1.1.8" aws-sdk-s3 = { version = "1.24.0" } aws-smithy-runtime = { version = "1.3.1", features = ["test-util"] } aws-smithy-runtime-api = "1.4.0" +serial_test = { version = "3.1.1", features = ["async"] } diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 7d806a690..efa7d41d8 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -802,6 +802,9 @@ impl Store for FilesystemStore { digest, }), ); + // We are done with the file, if we hold a reference to the file here, it could + // result in a deadlock if `emplace_file()` also needs file descriptors. + drop(file); self.emplace_file(digest, Arc::new(entry)) .await .err_tip(|| "Could not move file into store in upload_file_to_store, maybe dest is on different volume?")?; diff --git a/nativelink-store/tests/filesystem_store_test.rs b/nativelink-store/tests/filesystem_store_test.rs index 9209c4ac8..8d467f544 100644 --- a/nativelink-store/tests/filesystem_store_test.rs +++ b/nativelink-store/tests/filesystem_store_test.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cell::RefCell; use std::env; use std::ffi::{OsStr, OsString}; use std::fmt::{Debug, Formatter}; @@ -20,9 +19,8 @@ use std::marker::PhantomData; use std::ops::DerefMut; use std::path::Path; use std::pin::Pin; -use std::rc::Rc; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::{Duration, SystemTime}; use async_lock::RwLock; @@ -32,8 +30,9 @@ use filetime::{set_file_atime, FileTime}; use futures::executor::block_on; use futures::task::Poll; use futures::{poll, Future, FutureExt}; -use nativelink_error::{Code, Error, ResultExt}; +use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_macro::nativelink_test; +use nativelink_store::fast_slow_store::FastSlowStore; use nativelink_store::filesystem_store::{ digest_from_filename, EncodedFilePath, FileEntry, FileEntryImpl, FilesystemStore, }; @@ -44,7 +43,9 @@ use nativelink_util::origin_context::ContextAwareFuture; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use nativelink_util::{background_spawn, spawn}; use once_cell::sync::Lazy; +use parking_lot::Mutex; use rand::{thread_rng, Rng}; +use serial_test::serial; use sha2::{Digest, Sha256}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::Barrier; @@ -231,9 +232,25 @@ async fn write_file(file_name: &OsStr, data: &[u8]) -> Result<(), Error> { .err_tip(|| "Could not sync file") } +async fn wait_for_no_open_files() -> Result<(), Error> { + let mut counter = 0; + while fs::get_open_files_for_test() != 0 { + sleep(Duration::from_millis(1)).await; + counter += 1; + if counter > 1000 { + return Err(make_err!( + Code::Internal, + "Timed out waiting all files to close" + )); + } + } + Ok(()) +} + #[cfg(test)] mod filesystem_store_tests { use pretty_assertions::assert_eq; + use tokio::io::AsyncSeekExt; use super::*; // Must be declared in every module. @@ -242,6 +259,7 @@ mod filesystem_store_tests { const VALUE1: &str = "0123456789"; const VALUE2: &str = "9876543210"; + #[serial] #[nativelink_test] async fn valid_results_after_shutdown_test() -> Result<(), Error> { let digest = DigestInfo::try_new(HASH1, VALUE1.len())?; @@ -292,6 +310,7 @@ mod filesystem_store_tests { Ok(()) } + #[serial] #[nativelink_test] async fn temp_files_get_deleted_on_replace_test() -> Result<(), Error> { let digest1 = DigestInfo::try_new(HASH1, VALUE1.len())?; @@ -382,6 +401,7 @@ mod filesystem_store_tests { // This test ensures that if a file is overridden and an open stream to the file already // exists, the open stream will continue to work properly and when the stream is done the // temporary file (of the object that was deleted) is cleaned up. + #[serial] #[nativelink_test] async fn file_continues_to_stream_on_content_replace_test() -> Result<(), Error> { let digest1 = DigestInfo::try_new(HASH1, VALUE1.len())?; @@ -514,6 +534,7 @@ mod filesystem_store_tests { // Eviction has a different code path than a file replacement, so we check that if a // file is evicted and has an open stream on it, it will stay alive and eventually // get deleted. + #[serial] #[nativelink_test] async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> { let digest1 = DigestInfo::try_new(HASH1, VALUE1.len())?; @@ -628,6 +649,7 @@ mod filesystem_store_tests { Ok(()) } + #[serial] #[nativelink_test] async fn atime_updates_on_get_part_test() -> Result<(), Error> { let digest1 = DigestInfo::try_new(HASH1, VALUE1.len())?; @@ -677,6 +699,7 @@ mod filesystem_store_tests { Ok(()) } + #[serial] #[nativelink_test] async fn oldest_entry_evicted_with_access_times_loaded_from_disk() -> Result<(), Error> { // Note these are swapped to ensure they aren't in numerical order. @@ -731,6 +754,7 @@ mod filesystem_store_tests { Ok(()) } + #[serial] #[nativelink_test] async fn eviction_drops_file_test() -> Result<(), Error> { let digest1 = DigestInfo::try_new(HASH1, VALUE1.len())?; @@ -783,6 +807,7 @@ mod filesystem_store_tests { // Test to ensure that if we are holding a reference to `FileEntry` and the contents are // replaced, the `FileEntry` continues to use the old data. // `FileEntry` file contents should be immutable for the lifetime of the object. + #[serial] #[nativelink_test] async fn digest_contents_replaced_continues_using_old_data() -> Result<(), Error> { let digest = DigestInfo::try_new(HASH1, VALUE1.len())?; @@ -829,6 +854,7 @@ mod filesystem_store_tests { Ok(()) } + #[serial] #[nativelink_test] async fn eviction_on_insert_calls_unref_once() -> Result<(), Error> { const SMALL_VALUE: &str = "01"; @@ -844,7 +870,7 @@ mod filesystem_store_tests { let path = Path::new(&path_str); let digest = digest_from_filename(path.file_name().unwrap().to_str().unwrap()).unwrap(); - UNREFED_DIGESTS.lock().unwrap().push(digest); + UNREFED_DIGESTS.lock().push(digest); Ok(()) })) .unwrap(); @@ -878,7 +904,7 @@ mod filesystem_store_tests { { // Our first digest should have been unrefed exactly once. - let unrefed_digests = UNREFED_DIGESTS.lock().unwrap(); + let unrefed_digests = UNREFED_DIGESTS.lock(); assert_eq!( unrefed_digests.len(), 1, @@ -890,6 +916,7 @@ mod filesystem_store_tests { Ok(()) } + #[serial] #[nativelink_test] #[allow(clippy::await_holding_refcell_ref)] async fn rename_on_insert_fails_due_to_filesystem_error_proper_cleanup_happens( @@ -921,7 +948,7 @@ mod filesystem_store_tests { ); let (mut tx, rx) = make_buf_channel_pair(); - let update_fut = Rc::new(RefCell::new(store.as_ref().update( + let update_fut = Arc::new(async_lock::Mutex::new(store.as_ref().update( digest, rx, UploadSizeInfo::MaxSize(100), @@ -929,7 +956,7 @@ mod filesystem_store_tests { // This will process as much of the future as it can before it needs to pause. // Our temp file will be created and opened and ready to have contents streamed // to it. - assert_eq!(poll!(update_fut.borrow_mut().deref_mut())?, Poll::Pending); + assert_eq!(poll!(update_fut.lock().await.deref_mut())?, Poll::Pending); const INITIAL_CONTENT: &str = "hello"; tx.send(INITIAL_CONTENT.into()).await?; @@ -979,7 +1006,7 @@ mod filesystem_store_tests { // This will ensure we yield to our future and other potential spawns. tokio::task::yield_now().await; assert_eq!( - poll!(update_fut_clone.borrow_mut().deref_mut())?, + poll!(update_fut_clone.lock().await.deref_mut())?, Poll::Pending ); Ok(()) @@ -1001,7 +1028,7 @@ mod filesystem_store_tests { // Now finish waiting on update(). This should reuslt in an error because we deleted our dest // folder. - let update_result = update_fut.borrow_mut().deref_mut().await; + let update_result = update_fut.lock().await.deref_mut().await; assert!( update_result.is_err(), "Expected update to fail due to temp file being deleted before rename" @@ -1030,6 +1057,7 @@ mod filesystem_store_tests { Ok(()) } + #[serial] #[nativelink_test] async fn get_part_timeout_test() -> Result<(), Error> { let large_value = "x".repeat(1024); @@ -1082,6 +1110,7 @@ mod filesystem_store_tests { Ok(()) } + #[serial] #[nativelink_test] async fn get_part_is_zero_digest() -> Result<(), Error> { let digest = DigestInfo { @@ -1126,6 +1155,7 @@ mod filesystem_store_tests { Ok(()) } + #[serial] #[nativelink_test] async fn has_with_results_on_zero_digests() -> Result<(), Error> { let digest = DigestInfo { @@ -1197,12 +1227,13 @@ mod filesystem_store_tests { } /// Regression test for: https://github.com/TraceMachina/nativelink/issues/495. + #[serial] #[nativelink_test(flavor = "multi_thread")] async fn update_file_future_drops_before_rename() -> Result<(), Error> { let digest = DigestInfo::try_new(HASH1, VALUE1.len())?; // Mutex can be used to signal to the rename function to pause execution. - static RENAME_REQUEST_PAUSE_MUX: Mutex<()> = Mutex::new(()); + static RENAME_REQUEST_PAUSE_MUX: async_lock::Mutex<()> = async_lock::Mutex::new(()); // Boolean used to know if the rename function is currently paused. static RENAME_IS_PAUSED: AtomicBool = AtomicBool::new(false); @@ -1219,9 +1250,11 @@ mod filesystem_store_tests { |from, to| { // If someone locked our mutex, it means we need to pause, so we // simply request a lock on the same mutex. - if RENAME_REQUEST_PAUSE_MUX.try_lock().is_err() { + if RENAME_REQUEST_PAUSE_MUX.try_lock().is_none() { RENAME_IS_PAUSED.store(true, Ordering::Release); - let _lock = RENAME_REQUEST_PAUSE_MUX.lock(); + while RENAME_REQUEST_PAUSE_MUX.try_lock().is_none() { + std::thread::sleep(Duration::from_millis(1)); + } RENAME_IS_PAUSED.store(false, Ordering::Release); } std::fs::rename(from, to) @@ -1242,7 +1275,7 @@ mod filesystem_store_tests { // the replace/update future. // 4. Then drop the lock. { - let rename_pause_request_lock = RENAME_REQUEST_PAUSE_MUX.lock(); + let rename_pause_request_lock = RENAME_REQUEST_PAUSE_MUX.lock().await; let mut update_fut = store.as_ref().update_oneshot(digest, VALUE2.into()).boxed(); loop { @@ -1287,6 +1320,7 @@ mod filesystem_store_tests { Ok(()) } + #[serial] #[nativelink_test] async fn deleted_file_removed_from_store() -> Result<(), Error> { let digest = DigestInfo::try_new(HASH1, VALUE1.len())?; @@ -1332,6 +1366,7 @@ mod filesystem_store_tests { // assume block size 4K // 1B data size = 4K size on disk // 5K data size = 8K size on disk + #[serial] #[nativelink_test] async fn get_file_size_uses_block_size() -> Result<(), Error> { let content_path = make_temp_path("content_path"); @@ -1379,8 +1414,137 @@ mod filesystem_store_tests { Ok(()) } + #[serial] + #[nativelink_test] + async fn update_with_whole_file_closes_file() -> Result<(), Error> { + let mut permits = vec![]; + // Grab all permits to ensure only 1 permit is available. + { + wait_for_no_open_files().await?; + while fs::OPEN_FILE_SEMAPHORE.available_permits() > 1 { + permits.push(fs::get_permit().await); + } + assert_eq!( + fs::OPEN_FILE_SEMAPHORE.available_permits(), + 1, + "Expected 1 permit to be available" + ); + } + let content_path = make_temp_path("content_path"); + let temp_path = make_temp_path("temp_path"); + + let value = "x".repeat(1024); + + let digest = DigestInfo::try_new(HASH1, value.len())?; + + let store = Box::pin( + FilesystemStore::::new(&nativelink_config::stores::FilesystemStore { + content_path: content_path.clone(), + temp_path: temp_path.clone(), + read_buffer_size: 1, + ..Default::default() + }) + .await?, + ); + store + .as_ref() + .update_oneshot(digest, value.clone().into()) + .await?; + + let mut file = fs::create_file(OsString::from(format!("{temp_path}/dummy_file"))).await?; + { + let writer = file.as_writer().await?; + writer.write_all(value.as_bytes()).await?; + writer.as_mut().sync_all().await?; + writer.seek(tokio::io::SeekFrom::Start(0)).await?; + } + + store + .as_ref() + .update_with_whole_file(digest, file, UploadSizeInfo::ExactSize(value.len())) + .await?; + Ok(()) + } + + #[serial] + #[nativelink_test] + async fn update_with_whole_file_slow_path_when_low_file_descriptors() -> Result<(), Error> { + let mut permits = vec![]; + // Grab all permits to ensure only 1 permit is available. + { + wait_for_no_open_files().await?; + while fs::OPEN_FILE_SEMAPHORE.available_permits() > 1 { + permits.push(fs::get_permit().await); + } + assert_eq!( + fs::OPEN_FILE_SEMAPHORE.available_permits(), + 1, + "Expected 1 permit to be available" + ); + } + + let value = "x".repeat(1024); + + let digest = DigestInfo::try_new(HASH1, value.len())?; + + let store = Box::pin(FastSlowStore::new( + // Note: The config is not needed for this test, so use dummy data. + &nativelink_config::stores::FastSlowStore { + fast: nativelink_config::stores::StoreConfig::memory( + nativelink_config::stores::MemoryStore::default(), + ), + slow: nativelink_config::stores::StoreConfig::memory( + nativelink_config::stores::MemoryStore::default(), + ), + }, + Arc::new( + FilesystemStore::::new( + &nativelink_config::stores::FilesystemStore { + content_path: make_temp_path("content_path"), + temp_path: make_temp_path("temp_path"), + read_buffer_size: 1, + ..Default::default() + }, + ) + .await?, + ), + Arc::new( + FilesystemStore::::new( + &nativelink_config::stores::FilesystemStore { + content_path: make_temp_path("content_path1"), + temp_path: make_temp_path("temp_path1"), + read_buffer_size: 1, + ..Default::default() + }, + ) + .await?, + ), + )); + store + .as_ref() + .update_oneshot(digest, value.clone().into()) + .await?; + + let temp_path = make_temp_path("temp_path2"); + fs::create_dir_all(&temp_path).await?; + let mut file = fs::create_file(OsString::from(format!("{temp_path}/dummy_file"))).await?; + { + let writer = file.as_writer().await?; + writer.write_all(value.as_bytes()).await?; + writer.as_mut().sync_all().await?; + writer.seek(tokio::io::SeekFrom::Start(0)).await?; + } + + store + .as_ref() + .update_with_whole_file(digest, file, UploadSizeInfo::ExactSize(value.len())) + .await?; + Ok(()) + } + // Ensure that update_with_whole_file() moves the file without making a copy. #[cfg(target_family = "unix")] + #[serial] #[nativelink_test] async fn update_with_whole_file_uses_same_inode() -> Result<(), Error> { use std::os::unix::fs::MetadataExt; diff --git a/nativelink-util/BUILD.bazel b/nativelink-util/BUILD.bazel index 01015c4e3..3e1e1dd3b 100644 --- a/nativelink-util/BUILD.bazel +++ b/nativelink-util/BUILD.bazel @@ -41,6 +41,7 @@ rust_library( "@crates//:async-lock", "@crates//:blake3", "@crates//:bytes", + "@crates//:console-subscriber", "@crates//:futures", "@crates//:hex", "@crates//:hyper", @@ -57,6 +58,7 @@ rust_library( "@crates//:tokio-util", "@crates//:tonic", "@crates//:tracing", + "@crates//:tracing-subscriber", ], ) diff --git a/nativelink-util/Cargo.toml b/nativelink-util/Cargo.toml index 80a4f3d24..a71b0243b 100644 --- a/nativelink-util/Cargo.toml +++ b/nativelink-util/Cargo.toml @@ -12,6 +12,7 @@ async-lock = "3.3.0" async-trait = "0.1.80" blake3 = { version = "1.5.1", features = ["mmap"] } bytes = "1.6.0" +console-subscriber = { version = "0.2.0" } futures = "0.3.30" hex = "0.4.3" hyper = "0.14.28" @@ -29,6 +30,7 @@ tokio-stream = { version = "0.1.15", features = ["sync"] } tokio-util = { version = "0.7.10" } tonic = { version = "0.11.0", features = ["tls"] } tracing = "0.1.40" +tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } [dev-dependencies] nativelink-macro = { path = "../nativelink-macro" } diff --git a/nativelink-util/src/fs.rs b/nativelink-util/src/fs.rs index 3b878400d..a2cfd3d55 100644 --- a/nativelink-util/src/fs.rs +++ b/nativelink-util/src/fs.rs @@ -369,7 +369,13 @@ pub async fn create_file(path: impl AsRef) -> Result Result<(), nativelink_error::Error> { + use tracing_subscriber::prelude::*; + + static LOGGING_INITIALIZED: parking_lot::Mutex = parking_lot::Mutex::new(false); + let mut logging_initized_guard = LOGGING_INITIALIZED.lock(); + if *logging_initized_guard { + return Err(nativelink_error::make_err!( + nativelink_error::Code::Internal, + "Logging already initialized" + )); + } + *logging_initized_guard = true; + let env_filter = tracing_subscriber::EnvFilter::builder() + .with_default_directive(tracing::metadata::LevelFilter::WARN.into()) + .from_env_lossy(); + + if cfg!(feature = "enable_tokio_console") { + tracing_subscriber::registry() + .with(console_subscriber::spawn()) + .with( + tracing_subscriber::fmt::layer() + .pretty() + .with_timer(tracing_subscriber::fmt::time::time()) + .with_filter(env_filter), + ) + .init(); + } else { + tracing_subscriber::fmt() + .pretty() + .with_timer(tracing_subscriber::fmt::time::time()) + .with_env_filter(env_filter) + .init(); + } + Ok(()) +} diff --git a/nativelink-util/src/store_trait.rs b/nativelink-util/src/store_trait.rs index d15362875..b399dfbac 100644 --- a/nativelink-util/src/store_trait.rs +++ b/nativelink-util/src/store_trait.rs @@ -21,16 +21,19 @@ use std::sync::{Arc, OnceLock}; use async_trait::async_trait; use bytes::{Bytes, BytesMut}; -use futures::{future, join, try_join, FutureExt}; +use futures::future::{select, Either}; +use futures::{join, try_join, FutureExt}; use nativelink_error::{error_if, make_err, Code, Error, ResultExt}; use rand::rngs::StdRng; use rand::{RngCore, SeedableRng}; use serde::{Deserialize, Serialize}; +use tokio::io::AsyncSeekExt; +use tokio::time::timeout; use crate::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use crate::common::DigestInfo; use crate::digest_hasher::{default_digest_hasher_func, DigestHasher}; -use crate::fs; +use crate::fs::{self, idle_file_descriptor_timeout}; use crate::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; use crate::metrics_utils::Registry; @@ -77,12 +80,19 @@ pub async fn slow_update_store_with_file( file: &mut fs::ResumeableFileSlot, upload_size: UploadSizeInfo, ) -> Result<(), Error> { + file.as_writer() + .await + .err_tip(|| "Failed to get writer in upload_file_to_store")? + .rewind() + .await + .err_tip(|| "Failed to rewind in upload_file_to_store")?; let (tx, rx) = make_buf_channel_pair(); - future::join( - store - .update(digest, rx, upload_size) - .map(|r| r.err_tip(|| "Could not upload data to store in upload_file_to_store")), - async move { + + let mut update_fut = store + .update(digest, rx, upload_size) + .map(|r| r.err_tip(|| "Could not upload data to store in upload_file_to_store")); + let read_result = { + let read_data_fut = async { let (_, mut tx) = file .read_buf_cb( (BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE), tx), @@ -98,11 +108,24 @@ pub async fn slow_update_store_with_file( tx.send_eof() .err_tip(|| "Could not send EOF to store in upload_file_to_store")?; Ok(()) - }, - ) - // Ensure we get errors reported from both sides. - .map(|(upload_result, read_result)| upload_result.merge(read_result)) - .await + }; + tokio::pin!(read_data_fut); + match select(&mut update_fut, read_data_fut).await { + Either::Left((update_result, read_data_fut)) => { + return update_result.merge(read_data_fut.await) + } + Either::Right((read_result, _)) => read_result, + } + }; + match timeout(idle_file_descriptor_timeout(), &mut update_fut).await { + Ok(update_result) => update_result.merge(read_result), + Err(_) => { + file.close_file() + .await + .err_tip(|| "Failed to close file in upload_file_to_store")?; + update_fut.await.merge(read_result) + } + } } // TODO(allada) When 1.76.0 stabalizes more we can use `core::ptr::addr_eq` instead. diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index 53b77e365..83dcafa76 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -54,7 +54,7 @@ use nativelink_util::store_trait::{ set_default_digest_size_health_check, DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG, }; use nativelink_util::task::TaskExecutor; -use nativelink_util::{background_spawn, spawn, spawn_blocking}; +use nativelink_util::{background_spawn, init_tracing, spawn, spawn_blocking}; use nativelink_worker::local_worker::new_local_worker; use parking_lot::Mutex; use rustls_pemfile::{certs as extract_certs, crls as extract_crls}; @@ -70,7 +70,6 @@ use tonic::codec::CompressionEncoding; use tonic::transport::Server as TonicServer; use tower::util::ServiceExt; use tracing::{error_span, event, trace_span, Level}; -use tracing_subscriber::filter::{EnvFilter, LevelFilter}; #[global_allocator] static GLOBAL: MiMalloc = MiMalloc; @@ -853,31 +852,7 @@ async fn get_config() -> Result> { } fn main() -> Result<(), Box> { - use tracing_subscriber::prelude::*; - - let env_filter = EnvFilter::builder() - .with_default_directive(LevelFilter::WARN.into()) - .from_env_lossy(); - - if cfg!(feature = "enable_tokio_console") { - tracing_subscriber::registry() - .with(console_subscriber::spawn()) - .with( - tracing_subscriber::fmt::layer() - .pretty() - // .with_span_events(FmtSpan::CLOSE) - .with_timer(tracing_subscriber::fmt::time::time()) - .with_filter(env_filter), - ) - .init(); - } else { - tracing_subscriber::fmt() - .pretty() - // .with_span_events(FmtSpan::CLOSE) - .with_timer(tracing_subscriber::fmt::time::time()) - .with_env_filter(env_filter) - .init(); - } + init_tracing()?; let mut cfg = futures::executor::block_on(get_config())?;