Skip to content

Commit

Permalink
Files will now close if held open too long
Browse files Browse the repository at this point in the history
In testing Buck2, it turns out they hammer the scheduler with jobs
and assume it can keep up (which is good). On local testing, where
all services are on the same machine it was deadlocking because
it was performing the following operations:
1. Open file1 for reading
2. Open file2 for writing
3. Streaming file1 -> file2

Since we allow users to limit the number of open files at any given
time, this was deadlocking because file1 was held open waiting
for file2 to open, which was waiting for a file to be closed. Since
buck2 goes crazy, it was causing a deadlock.

In most production systems this is not an issue because the CAS
is separated from the workers, but rarely might happen on the workers
if the `max_open_files` was set too low.

To get around this issue `ResumeableFileSlot` is introduced. It
allows callers to use a timeout and call `.close_file()` on it
and the next time the struct is used it will re-open the file.

related #222
  • Loading branch information
allada committed Aug 26, 2023
1 parent 1f48f4e commit 3c0897d
Show file tree
Hide file tree
Showing 18 changed files with 873 additions and 184 deletions.
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ members = [
"gencargo/ac_server",
"gencargo/ac_server_test",
"gencargo/ac_utils",
"gencargo/ac_utils_test",
"gencargo/action_messages",
"gencargo/action_messages_test",
"gencargo/async_fixed_buffer",
Expand Down Expand Up @@ -46,6 +47,7 @@ members = [
"gencargo/filesystem_store",
"gencargo/filesystem_store_test",
"gencargo/fs",
"gencargo/fs_test",
"gencargo/gen_protos_tool",
"gencargo/grpc_scheduler",
"gencargo/grpc_store",
Expand Down Expand Up @@ -149,6 +151,7 @@ uuid = { version = "1.4.0", features = ["v4"] }
ac_server = { path = "gencargo/ac_server" }
ac_server_test = { path = "gencargo/ac_server_test" }
ac_utils = { path = "gencargo/ac_utils" }
ac_utils_test = { path = "gencargo/ac_utils_test" }
action_messages = { path = "gencargo/action_messages" }
action_messages_test = { path = "gencargo/action_messages_test" }
async_fixed_buffer = { path = "gencargo/async_fixed_buffer" }
Expand Down Expand Up @@ -182,6 +185,7 @@ fastcdc_test = { path = "gencargo/fastcdc_test" }
filesystem_store = { path = "gencargo/filesystem_store" }
filesystem_store_test = { path = "gencargo/filesystem_store_test" }
fs = { path = "gencargo/fs" }
fs_test = { path = "gencargo/fs_test" }
gen_protos_tool = { path = "gencargo/gen_protos_tool" }
grpc_scheduler = { path = "gencargo/grpc_scheduler" }
grpc_store = { path = "gencargo/grpc_store" }
Expand Down
11 changes: 10 additions & 1 deletion cas/cas_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use axum::Router;
use clap::Parser;
Expand All @@ -34,7 +35,7 @@ use ac_server::AcServer;
use bytestream_server::ByteStreamServer;
use capabilities_server::CapabilitiesServer;
use cas_server::CasServer;
use common::fs::set_open_file_limit;
use common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit};
use common::log;
use config::cas_server::{CasConfig, CompressionAlgorithm, GlobalConfig, ServerConfig, WorkerConfig};
use default_scheduler_factory::scheduler_factory;
Expand Down Expand Up @@ -461,14 +462,21 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// Note: If the default changes make sure you update the documentation in
// `config/cas_server.rs`.
const DEFAULT_MAX_OPEN_FILES: usize = 512;
// Note: If the default changes make sure you update the documentation in
// `config/cas_server.rs`.
const DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS: u64 = 1000;
let global_cfg = if let Some(global_cfg) = &mut cfg.global {
if global_cfg.max_open_files == 0 {
global_cfg.max_open_files = DEFAULT_MAX_OPEN_FILES;
}
if global_cfg.idle_file_descriptor_timeout_millis == 0 {
global_cfg.idle_file_descriptor_timeout_millis = DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS;
}
*global_cfg
} else {
GlobalConfig {
max_open_files: DEFAULT_MAX_OPEN_FILES,
idle_file_descriptor_timeout_millis: DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS,
disable_metrics: cfg.servers.iter().all(|v| {
let Some(service) = &v.services else {
return true;
Expand All @@ -478,6 +486,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}
};
set_open_file_limit(global_cfg.max_open_files);
set_idle_file_descriptor_timeout(Duration::from_millis(global_cfg.idle_file_descriptor_timeout_millis))?;
!global_cfg.disable_metrics
};
// Override metrics enabled if the environment variable is set.
Expand Down
16 changes: 16 additions & 0 deletions cas/store/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -440,3 +440,19 @@ rust_test(
],
proc_macro_deps = ["@crate_index//:async-trait"],
)

rust_test(
name = "ac_utils_test",
srcs = ["tests/ac_utils_test.rs"],
deps = [
":ac_utils",
":memory_store",
":store",
"//config",
"//util:common",
"//util:error",
"@crate_index//:pretty_assertions",
"@crate_index//:rand",
"@crate_index//:tokio",
],
)
116 changes: 84 additions & 32 deletions cas/store/ac_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ use futures::{future::try_join, Future, FutureExt, TryFutureExt};
use prost::Message;
use sha2::{Digest, Sha256};
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio::time::timeout;

use buf_channel::make_buf_channel_pair;
use common::DigestInfo;
use buf_channel::{make_buf_channel_pair, DropCloserWriteHalf};
use common::{fs, DigestInfo};
use error::{Code, Error, ResultExt};
use fs::idle_file_descriptor_timeout;
use store::{Store, UploadSizeInfo};

// NOTE(blaise.bruer) From some local testing it looks like action cache items are rarely greater than
Expand All @@ -36,6 +38,9 @@ pub const ESTIMATED_DIGEST_SIZE: usize = 2048;
/// to use up all the memory on this machine.
const MAX_ACTION_MSG_SIZE: usize = 10 << 20; // 10mb.

/// Default read buffer size for reading from an AsyncReader.
const DEFAULT_READ_BUFF_SIZE: usize = 4096;

/// Attempts to fetch the digest contents from a store into the associated proto.
pub async fn get_and_decode_digest<T: Message + Default>(
store: Pin<&dyn Store>,
Expand Down Expand Up @@ -77,30 +82,38 @@ pub async fn serialize_and_upload_message<'a, T: Message>(

/// Given a bytestream computes the digest for the data.
/// Note: This will happen in a new spawn since computing digests can be thread intensive.
pub fn compute_digest<R: AsyncRead + Unpin + Send + 'static>(
mut reader: R,
) -> impl Future<Output = Result<(DigestInfo, R), Error>> {
tokio::spawn(async move {
const DEFAULT_READ_BUFF_SIZE: usize = 4096;
let mut chunk = BytesMut::with_capacity(DEFAULT_READ_BUFF_SIZE);
let mut hasher = Sha256::new();
let mut digest_size = 0;
loop {
reader
.read_buf(&mut chunk)
.await
.err_tip(|| "Could not read chunk during compute_digest")?;
if chunk.is_empty() {
break; // EOF.
}
digest_size += chunk.len();
hasher.update(&chunk);
chunk.clear();
pub async fn compute_digest<R: AsyncRead + Unpin + Send>(mut reader: R) -> Result<(DigestInfo, R), Error> {
let mut chunk = BytesMut::with_capacity(DEFAULT_READ_BUFF_SIZE);
let mut hasher = Sha256::new();
let mut digest_size = 0;
loop {
reader
.read_buf(&mut chunk)
.await
.err_tip(|| "Could not read chunk during compute_digest")?;
if chunk.is_empty() {
break; // EOF.
}
digest_size += chunk.len();
hasher.update(&chunk);
chunk.clear();
}

Ok((DigestInfo::new(hasher.finalize().into(), digest_size as i64), reader))
})
.map(|r| r.err_tip(|| "Failed to launch spawn")?)
Ok((DigestInfo::new(hasher.finalize().into(), digest_size as i64), reader))
// })
// .map(|r| r.err_tip(|| "Failed to launch spawn")?)
}

fn inner_upload_file_to_store<'a, Fut: Future<Output = Result<(), Error>> + 'a>(
cas_store: Pin<&'a dyn Store>,
digest: DigestInfo,
read_data_fn: impl FnOnce(DropCloserWriteHalf) -> Fut,
) -> impl Future<Output = Result<(), Error>> + 'a {
let (tx, rx) = make_buf_channel_pair();
let upload_file_to_store_fut = cas_store
.update(digest, rx, UploadSizeInfo::ExactSize(digest.size_bytes as usize))
.map(|r| r.err_tip(|| "Could not upload data to store in upload_file_to_store"));
try_join(read_data_fn(tx), upload_file_to_store_fut).map_ok(|(_, _)| ())
}

/// Uploads data to our store for given digest.
Expand All @@ -114,13 +127,8 @@ pub fn upload_to_store<'a, R: AsyncRead + Unpin>(
digest: DigestInfo,
reader: &'a mut R,
) -> impl Future<Output = Result<(), Error>> + 'a {
let (mut tx, rx) = make_buf_channel_pair();
let upload_to_store_fut = cas_store
.update(digest, rx, UploadSizeInfo::ExactSize(digest.size_bytes as usize))
.map(|r| r.err_tip(|| "Could not upload data to store in upload_to_store"));
let read_data_fut = async move {
inner_upload_file_to_store(cas_store, digest, move |mut tx| async move {
loop {
const DEFAULT_READ_BUFF_SIZE: usize = 4096;
let mut chunk = BytesMut::with_capacity(DEFAULT_READ_BUFF_SIZE);
reader
.read_buf(&mut chunk)
Expand All @@ -137,6 +145,50 @@ pub fn upload_to_store<'a, R: AsyncRead + Unpin>(
.await
.err_tip(|| "Could not send EOF to store in upload_to_store")?;
Ok(())
};
try_join(read_data_fut, upload_to_store_fut).map_ok(|(_, _)| ())
})
}

/// Same as `upload_to_store`, however it specializes in dealing with a `ResumeableFileSlot`.
/// This will close the reading file to close if writing the data takes a while.
pub fn upload_file_to_store<'a>(
cas_store: Pin<&'a dyn Store>,
digest: DigestInfo,
mut file_reader: fs::ResumeableFileSlot<'a>,
) -> impl Future<Output = Result<(), Error>> + 'a {
inner_upload_file_to_store(cas_store, digest, move |mut tx| async move {
loop {
let mut chunk = BytesMut::with_capacity(DEFAULT_READ_BUFF_SIZE);
file_reader
.as_reader()
.await
.err_tip(|| "Could not get reader from file slot in upload_file_to_store")?
.read_buf(&mut chunk)
.await
.err_tip(|| "Could not read chunk during upload_file_to_store")?;
if chunk.is_empty() {
break; // EOF.
}
let send_fut = tx.send(chunk.freeze());
tokio::pin!(send_fut);
loop {
match timeout(idle_file_descriptor_timeout(), &mut send_fut).await {
Ok(Ok(())) => break,
Ok(Err(err)) => {
return Err(err).err_tip(|| "Could not send buffer data to store in upload_file_to_store")
}
Err(_) => {
file_reader
.close_file()
.await
.err_tip(|| "Could not close file due to timeout in upload_file_to_store")?;
continue;
}
}
}
}
tx.send_eof()
.await
.err_tip(|| "Could not send EOF to store in upload_file_to_store")?;
Ok(())
})
}
Loading

0 comments on commit 3c0897d

Please sign in to comment.