Skip to content

Commit

Permalink
Resolve upload deadlock
Browse files Browse the repository at this point in the history
We get the metadata for a file after opening it, which causes two file descriptors
to be used rather than one.  In order to ensure that every future requires exactly
one file descriptor at a time and therefore not cause a deadlock in the OPEN FILE
Semaphore, we simply get the metadata before we open the file.
  • Loading branch information
chrisstaite committed Mar 29, 2024
1 parent 31a1bf1 commit b8cc286
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 70 deletions.
96 changes: 27 additions & 69 deletions nativelink-worker/src/running_actions_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tokio::process;
use tokio::sync::{oneshot, watch};
use tokio::task::spawn_blocking;
use tokio::time::timeout;
use tokio_stream::wrappers::ReadDirStream;
use tonic::Request;
use tracing::{error, info};
Expand Down Expand Up @@ -256,44 +255,32 @@ fn is_executable(metadata: &std::fs::Metadata, _full_path: &impl AsRef<Path>) ->
}

async fn upload_file(
mut resumeable_file: fs::ResumeableFileSlot,
cas_store: Pin<&dyn Store>,
full_path: impl AsRef<Path> + Debug,
hasher: DigestHasherFunc,
metadata: std::fs::Metadata,
) -> Result<FileInfo, Error> {
let (is_executable, file_size) = {
let file_handle = resumeable_file.as_reader().await.err_tip(|| {
"Could not get reader from file slot in RunningActionsManager::upload_file()"
})?;
let metadata = file_handle
.get_ref()
.as_ref()
.metadata()
.await
.err_tip(|| format!("While reading metadata for {:?}", full_path.as_ref()))?;
(is_executable(&metadata, &full_path), metadata.len())
};
let (digest, resumeable_file) = {
let (digest, mut resumeable_file) = hasher
.hasher()
.digest_for_file(resumeable_file, Some(file_size))
.await
.err_tip(|| {
format!("Failed to hash file in digest_for_file failed for {full_path:?}")
})?;
let is_executable = is_executable(&metadata, &full_path);
let file_size = metadata.len();
let resumeable_file = fs::open_file(&full_path, u64::MAX)
.await
.err_tip(|| format!("Could not open file {full_path:?}"))?;

let (digest, mut resumeable_file) = hasher
.hasher()
.digest_for_file(resumeable_file, Some(file_size))
.await
.err_tip(|| format!("Failed to hash file in digest_for_file failed for {full_path:?}"))?;

resumeable_file
.as_reader()
.await
.err_tip(|| "Could not get reader from file slot in RunningActionsManager::upload_file()")?
.get_mut()
.rewind()
.await
.err_tip(|| "Could not rewind file")?;

resumeable_file
.as_reader()
.await
.err_tip(|| {
"Could not get reader from file slot in RunningActionsManager::upload_file()"
})?
.get_mut()
.rewind()
.await
.err_tip(|| "Could not rewind file")?;
(digest, resumeable_file)
};
cas_store
.update_with_whole_file(
digest,
Expand Down Expand Up @@ -396,10 +383,7 @@ fn upload_directory<'a, P: AsRef<Path> + Debug + Send + Sync + Clone + 'a>(
// futures with all the work we are wanting to do then execute it. It allows us to
// close the directory iterator file descriptor, then open the child files/folders.
while let Some(entry) = dir_stream.next().await {
let entry = match entry {
Ok(entry) => entry,
Err(e) => return Err(e).err_tip(|| "Error while iterating directory")?,
};
let entry = entry.err_tip(|| "Error while iterating directory")?;
let file_type = entry
.file_type()
.await
Expand Down Expand Up @@ -445,11 +429,8 @@ fn upload_directory<'a, P: AsRef<Path> + Debug + Send + Sync + Clone + 'a>(
);
} else if file_type.is_file() {
file_futures.push(async move {
let file_handle =
fs::open_file(full_path.as_os_str().to_os_string(), u64::MAX)
.await
.err_tip(|| format!("Could not open file {full_path:?}"))?;
upload_file(file_handle, cas_store, &full_path, hasher)
let metadata = fs::metadata(&full_path).await?;
upload_file(cas_store, &full_path, hasher, metadata)
.map_ok(|v| v.into())
.await
});
Expand Down Expand Up @@ -1018,8 +999,7 @@ impl RunningActionImpl {
let work_directory = &self.work_directory;
output_path_futures.push(async move {
let metadata = {
let mut resumeable_file = match fs::open_file(full_path.clone(), u64::MAX).await
{
let metadata = match fs::symlink_metadata(&full_path).await {
Ok(file) => file,
Err(e) => {
if e.code == Code::NotFound {
Expand All @@ -1030,32 +1010,10 @@ impl RunningActionImpl {
return Err(e).err_tip(|| format!("Could not open file {full_path:?}"));
}
};
// We cannot rely on the file_handle's metadata, because it follows symlinks, so
// we need to instead use `symlink_metadata`.
let metadata_fut = fs::symlink_metadata(&full_path);
tokio::pin!(metadata_fut);

// Just in case we are starved for open file descriptors, we timeout the metadata
// call and close the file, then try again.
let metadata = match timeout(
fs::idle_file_descriptor_timeout(),
&mut metadata_fut,
)
.await
{
Ok(result) => result,
Err(_) => {
resumeable_file
.close_file()
.await
.err_tip(|| "In inner_upload_results()")?;
(&mut metadata_fut).await
}
}
.err_tip(|| format!("While querying symlink metadata for {entry}"))?;

if metadata.is_file() {
return Ok(OutputType::File(
upload_file(resumeable_file, cas_store, &full_path, hasher)
upload_file(cas_store, &full_path, hasher, metadata)
.await
.map(|mut file_info| {
file_info.name_or_path = NameOrPath::Path(entry);
Expand Down
192 changes: 191 additions & 1 deletion nativelink-worker/tests/running_actions_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use futures::{FutureExt, TryFutureExt};
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use nativelink_config::cas_server::EnvironmentSource;
use nativelink_error::{make_input_err, Code, Error, ResultExt};
#[cfg_attr(target_family = "windows", allow(unused_imports))]
Expand Down Expand Up @@ -2930,4 +2930,194 @@ exit 1
);
Ok(())
}

// We've experienced deadlocks when uploading, so make only a single permit available and
// check it's able to handle uploading some directories with some files in.
// Be default this test is ignored because it *must* be run single threaded... to run this
// test execute:
// cargo test -p nativelink-worker --test running_actions_manager_test -- --test-threads=1 --ignored
#[tokio::test]
#[ignore]
async fn upload_with_single_permit() -> Result<(), Box<dyn std::error::Error>> {
const WORKER_ID: &str = "foo_worker_id";

fn test_monotonic_clock() -> SystemTime {
static CLOCK: AtomicU64 = AtomicU64::new(0);
monotonic_clock(&CLOCK)
}

let (_, slow_store, cas_store, ac_store) = setup_stores().await?;
let root_work_directory = make_temp_path("root_work_directory");
fs::create_dir_all(&root_work_directory).await?;

// Take all but one FD permit away.
let _permits = futures::stream::iter(1..fs::OPEN_FILE_SEMAPHORE.available_permits())
.then(|_| fs::OPEN_FILE_SEMAPHORE.acquire())
.try_collect::<Vec<_>>()
.await?;
assert_eq!(1, fs::OPEN_FILE_SEMAPHORE.available_permits());

let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_callbacks(
RunningActionsManagerArgs {
root_work_directory,
execution_configuration: ExecutionConfiguration::default(),
cas_store: Pin::into_inner(cas_store.clone()),
ac_store: Some(Pin::into_inner(ac_store.clone())),
historical_store: Pin::into_inner(cas_store.clone()),
upload_action_result_config:
&nativelink_config::cas_server::UploadActionResultConfig {
upload_ac_results_strategy:
nativelink_config::cas_server::UploadCacheResultsStrategy::never,
..Default::default()
},
max_action_timeout: Duration::MAX,
timeout_handled_externally: false,
},
Callbacks {
now_fn: test_monotonic_clock,
sleep_fn: |_duration| Box::pin(futures::future::pending()),
},
)?);
let action_result = {
const SALT: u64 = 55;
#[cfg(target_family = "unix")]
let arguments = vec![
"sh".to_string(),
"-c".to_string(),
"printf '123 ' > ./test.txt; mkdir ./tst; printf '456 ' > ./tst/tst.txt; printf 'foo-stdout '; >&2 printf 'bar-stderr '"
.to_string(),
];
#[cfg(target_family = "windows")]
let arguments = vec![
"cmd".to_string(),
"/C".to_string(),
// Note: Windows adds two spaces after 'set /p=XXX'.
"echo | set /p=123> ./test.txt & mkdir ./tst & echo | set /p=456> ./tst/tst.txt & echo | set /p=foo-stdout & echo | set /p=bar-stderr 1>&2 & exit 0"
.to_string(),
];
let working_directory = "some_cwd";
let command = Command {
arguments,
output_paths: vec!["test.txt".to_string(), "tst".to_string()],
working_directory: working_directory.to_string(),
..Default::default()
};
let command_digest = serialize_and_upload_message(
&command,
cas_store.as_ref(),
&mut DigestHasherFunc::Sha256.hasher(),
)
.await?;
let input_root_digest = serialize_and_upload_message(
&Directory {
directories: vec![DirectoryNode {
name: working_directory.to_string(),
digest: Some(
serialize_and_upload_message(
&Directory::default(),
cas_store.as_ref(),
&mut DigestHasherFunc::Sha256.hasher(),
)
.await?
.into(),
),
}],
..Default::default()
},
cas_store.as_ref(),
&mut DigestHasherFunc::Sha256.hasher(),
)
.await?;
let action = Action {
command_digest: Some(command_digest.into()),
input_root_digest: Some(input_root_digest.into()),
..Default::default()
};
let action_digest = serialize_and_upload_message(
&action,
cas_store.as_ref(),
&mut DigestHasherFunc::Sha256.hasher(),
)
.await?;

let running_action_impl = running_actions_manager
.create_and_add_action(
WORKER_ID.to_string(),
StartExecute {
execute_request: Some(ExecuteRequest {
action_digest: Some(action_digest.into()),
..Default::default()
}),
salt: SALT,
queued_timestamp: None,
},
)
.await?;

run_action(running_action_impl.clone()).await?
};
let file_content = slow_store
.as_ref()
.get_part_unchunked(action_result.output_files[0].digest, 0, None, None)
.await?;
assert_eq!(from_utf8(&file_content)?, "123 ");
let stdout_content = slow_store
.as_ref()
.get_part_unchunked(action_result.stdout_digest, 0, None, None)
.await?;
assert_eq!(from_utf8(&stdout_content)?, "foo-stdout ");
let stderr_content = slow_store
.as_ref()
.get_part_unchunked(action_result.stderr_digest, 0, None, None)
.await?;
assert_eq!(from_utf8(&stderr_content)?, "bar-stderr ");
let mut clock_time = make_system_time(0);
assert_eq!(
action_result,
ActionResult {
output_files: vec![FileInfo {
name_or_path: NameOrPath::Path("test.txt".to_string()),
digest: DigestInfo::try_new(
"c69e10a5f54f4e28e33897fbd4f8701595443fa8c3004aeaa20dd4d9a463483b",
4
)?,
is_executable: false,
}],
stdout_digest: DigestInfo::try_new(
"15019a676f057d97d1ad3af86f3cc1e623cb33b18ff28422bbe3248d2471cc94",
11
)?,
stderr_digest: DigestInfo::try_new(
"2375ab8a01ca11e1ea7606dfb58756c153d49733cde1dbfb5a1e00f39afacf06",
12
)?,
exit_code: 0,
output_folders: vec![DirectoryInfo {
path: "tst".to_string(),
tree_digest: DigestInfo::try_new(
"95711c1905d4898a70209dd6e98241dcafb479c00241a1ea4ed8415710d706f3",
166,
)?,
},],
output_file_symlinks: vec![],
output_directory_symlinks: vec![],
server_logs: HashMap::new(),
execution_metadata: ExecutionMetadata {
worker: WORKER_ID.to_string(),
queued_timestamp: SystemTime::UNIX_EPOCH,
worker_start_timestamp: increment_clock(&mut clock_time),
input_fetch_start_timestamp: increment_clock(&mut clock_time),
input_fetch_completed_timestamp: increment_clock(&mut clock_time),
execution_start_timestamp: increment_clock(&mut clock_time),
execution_completed_timestamp: increment_clock(&mut clock_time),
output_upload_start_timestamp: increment_clock(&mut clock_time),
output_upload_completed_timestamp: increment_clock(&mut clock_time),
worker_completed_timestamp: increment_clock(&mut clock_time),
},
error: None,
message: String::new(),
}
);
Ok(())
}
}

0 comments on commit b8cc286

Please sign in to comment.