From d093fddeb927b2ba453a061ca4da4aefe3572e8c Mon Sep 17 00:00:00 2001 From: allada Date: Thu, 4 Nov 2021 21:08:00 -0700 Subject: [PATCH] Stores now support non-exact upload sizes & S3 store optimizations S3 store now is optimized and can upload multiple parts concurrently. Non-exact upload sizes is supported for a follow-up change which will allow compression to and from backend stores. --- Cargo.lock | 86 ++++++++++- Cargo.toml | 1 + cas/grpc_service/ac_server.rs | 6 +- cas/grpc_service/bytestream_server.rs | 8 +- cas/grpc_service/cas_server.rs | 4 +- cas/grpc_service/tests/ac_server_test.rs | 8 +- .../tests/bytestream_server_test.rs | 14 +- cas/grpc_service/tests/cas_server_test.rs | 12 +- cas/store/BUILD | 3 +- cas/store/lib.rs | 2 +- cas/store/memory_store.rs | 11 +- cas/store/s3_store.rs | 146 ++++++++++++------ cas/store/store_trait.rs | 15 +- cas/store/tests/memory_store_test.rs | 20 ++- cas/store/tests/s3_store_test.rs | 7 +- cas/store/tests/verify_store_test.rs | 48 +++++- cas/store/verify_store.rs | 38 +++-- config/backends.rs | 13 ++ third_party/BUILD.bazel | 9 ++ third_party/crates.bzl | 100 +++++++++++- third_party/remote/BUILD.instant-0.1.12.bazel | 56 +++++++ third_party/remote/BUILD.lease-0.2.3.bazel | 61 ++++++++ third_party/remote/BUILD.lock_api-0.4.5.bazel | 54 +++++++ third_party/remote/BUILD.lockfree-0.5.1.bazel | 54 +++++++ .../remote/BUILD.owned-alloc-0.2.0.bazel | 53 +++++++ .../remote/BUILD.parking_lot-0.11.2.bazel | 58 +++++++ .../remote/BUILD.parking_lot_core-0.8.5.bazel | 103 ++++++++++++ .../BUILD.rusoto_credential-0.46.0.bazel | 2 +- .../remote/BUILD.scopeguard-1.1.0.bazel | 55 +++++++ third_party/remote/BUILD.smallvec-1.7.0.bazel | 57 +++++++ third_party/remote/BUILD.winapi-0.3.9.bazel | 2 + ...-1.4.2.bazel => BUILD.zeroize-1.4.3.bazel} | 2 +- util/BUILD | 26 ---- util/async_read_taker.rs | 65 -------- util/tests/async_read_taker_test.rs | 77 --------- 35 files changed, 993 insertions(+), 283 deletions(-) create mode 100644 third_party/remote/BUILD.instant-0.1.12.bazel create mode 100644 third_party/remote/BUILD.lease-0.2.3.bazel create mode 100644 third_party/remote/BUILD.lock_api-0.4.5.bazel create mode 100644 third_party/remote/BUILD.lockfree-0.5.1.bazel create mode 100644 third_party/remote/BUILD.owned-alloc-0.2.0.bazel create mode 100644 third_party/remote/BUILD.parking_lot-0.11.2.bazel create mode 100644 third_party/remote/BUILD.parking_lot_core-0.8.5.bazel create mode 100644 third_party/remote/BUILD.scopeguard-1.1.0.bazel create mode 100644 third_party/remote/BUILD.smallvec-1.7.0.bazel rename third_party/remote/{BUILD.zeroize-1.4.2.bazel => BUILD.zeroize-1.4.3.bazel} (97%) delete mode 100644 util/async_read_taker.rs delete mode 100644 util/tests/async_read_taker_test.rs diff --git a/Cargo.lock b/Cargo.lock index 931ab8800..b91d418e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -436,6 +436,7 @@ dependencies = [ "http", "json5", "lazy-init", + "lease", "log", "lru", "maplit", @@ -862,6 +863,15 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if 1.0.0", +] + [[package]] name = "itertools" version = "0.9.0" @@ -909,12 +919,41 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "lease" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbc1ff844bcf8922037d4fc6557e912f697edd049ff61a49aaa126eebfaa140d" +dependencies = [ + "futures-core", + "lockfree", + "parking_lot", +] + [[package]] name = "libc" version = "0.2.106" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a60553f9a9e039a333b4e9b20573b9e9b9c0bb3a11e201ccc48ef4283456d673" +[[package]] +name = "lock_api" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712a4d093c9976e24e7dbca41db895dabcbac38eb5f4045393d17a95bdfb1109" +dependencies = [ + "scopeguard", +] + +[[package]] +name = "lockfree" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74ee94b5ad113c7cb98c5a040f783d0952ee4fe100993881d1673c2cb002dd23" +dependencies = [ + "owned-alloc", +] + [[package]] name = "log" version = "0.4.14" @@ -1101,6 +1140,37 @@ dependencies = [ "winapi", ] +[[package]] +name = "owned-alloc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30fceb411f9a12ff9222c5f824026be368ff15dc2f13468d850c7d3f502205d6" + +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" +dependencies = [ + "cfg-if 1.0.0", + "instant", + "libc", + "redox_syscall", + "smallvec", + "winapi", +] + [[package]] name = "percent-encoding" version = "2.1.0" @@ -1607,6 +1677,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + [[package]] name = "security-framework" version = "2.4.2" @@ -1737,6 +1813,12 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5" +[[package]] +name = "smallvec" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309" + [[package]] name = "socket2" version = "0.4.2" @@ -2380,6 +2462,6 @@ dependencies = [ [[package]] name = "zeroize" -version = "1.4.2" +version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf68b08513768deaa790264a7fac27a58cbf2705cfcdc9448362229217d7e970" +checksum = "d68d9dcec5f9b43a30d38c49f91dfedfaac384cb8f085faca366c26207dd1619" diff --git a/Cargo.toml b/Cargo.toml index 2ed14a206..77aa87f2f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ rusoto_core = "0.46.0" http = "^0.2" pin-project-lite = "0.2.7" fast-async-mutex = "0.6.7" +lease = "0.2.3" [dev-dependencies] clap = "2.33.3" diff --git a/cas/grpc_service/ac_server.rs b/cas/grpc_service/ac_server.rs index fd3150afd..0a3ca6836 100644 --- a/cas/grpc_service/ac_server.rs +++ b/cas/grpc_service/ac_server.rs @@ -18,7 +18,7 @@ use proto::build::bazel::remote::execution::v2::{ use common::{log, DigestInfo}; use config::cas_server::{AcStoreConfig, InstanceName}; use error::{make_input_err, Code, Error, ResultExt}; -use store::{Store, StoreManager}; +use store::{Store, StoreManager, UploadSizeInfo}; // NOTE(blaise.bruer) From some local testing it looks like action cache items are rarely greater than // 1.2k. Giving a bit more just in case to reduce allocs. @@ -103,9 +103,9 @@ impl AcServer { .err_tip(|| format!("'instance_name' not configured for '{}'", &instance_name))? .as_ref(), ); - let expected_size = store_data.len(); + let size_info = UploadSizeInfo::ExactSize(store_data.len()); store - .update(digest, Box::new(Cursor::new(store_data)), expected_size) + .update(digest, Box::new(Cursor::new(store_data)), size_info) .await?; Ok(Response::new(action_result)) } diff --git a/cas/grpc_service/bytestream_server.rs b/cas/grpc_service/bytestream_server.rs index a5bb2ab16..58680b4de 100644 --- a/cas/grpc_service/bytestream_server.rs +++ b/cas/grpc_service/bytestream_server.rs @@ -19,7 +19,7 @@ use proto::google::bytestream::{ use common::{log, DigestInfo}; use config::cas_server::ByteStreamConfig; use error::{error_if, make_err, make_input_err, Code, Error, ResultExt}; -use store::{Store, StoreManager}; +use store::{Store, StoreManager, UploadSizeInfo}; pub struct ByteStreamServer { stores: HashMap>, @@ -182,7 +182,11 @@ impl ByteStreamServer { let rx = Box::new(rx.take(expected_size as u64)); let store = Pin::new(store_clone.as_ref()); store - .update(DigestInfo::try_new(&hash, expected_size)?, rx, expected_size) + .update( + DigestInfo::try_new(&hash, expected_size)?, + rx, + UploadSizeInfo::ExactSize(expected_size), + ) .await }) }; diff --git a/cas/grpc_service/cas_server.rs b/cas/grpc_service/cas_server.rs index 059f5fada..64240ad77 100644 --- a/cas/grpc_service/cas_server.rs +++ b/cas/grpc_service/cas_server.rs @@ -23,7 +23,7 @@ use proto::google::rpc::Status as GrpcStatus; use common::{log, DigestInfo}; use config::cas_server::{CasStoreConfig, InstanceName}; use error::{error_if, make_input_err, Error, ResultExt}; -use store::{Store, StoreManager}; +use store::{Store, StoreManager, UploadSizeInfo}; pub struct CasServer { stores: HashMap>, @@ -109,7 +109,7 @@ impl CasServer { let cursor = Box::new(Cursor::new(request_data)); let store = Pin::new(store_owned.as_ref()); store - .update(digest_copy, cursor, size_bytes) + .update(digest_copy, cursor, UploadSizeInfo::ExactSize(size_bytes)) .await .err_tip(|| "Error writing to store") } diff --git a/cas/grpc_service/tests/ac_server_test.rs b/cas/grpc_service/tests/ac_server_test.rs index e0580d7ad..0badda5c0 100644 --- a/cas/grpc_service/tests/ac_server_test.rs +++ b/cas/grpc_service/tests/ac_server_test.rs @@ -13,7 +13,7 @@ use ac_server::AcServer; use common::DigestInfo; use config; use error::Error; -use store::{Store, StoreManager}; +use store::{Store, StoreManager, UploadSizeInfo}; const INSTANCE_NAME: &str = "foo_instance_name"; const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789abcdef"; @@ -28,7 +28,11 @@ async fn insert_into_store( let data_len = store_data.len(); let digest = DigestInfo::try_new(&hash, data_len as i64)?; store - .update(digest.clone(), Box::new(Cursor::new(store_data)), data_len) + .update( + digest.clone(), + Box::new(Cursor::new(store_data)), + UploadSizeInfo::ExactSize(data_len), + ) .await?; Ok(digest.size_bytes) } diff --git a/cas/grpc_service/tests/bytestream_server_test.rs b/cas/grpc_service/tests/bytestream_server_test.rs index 4004b9b2d..1699b1d53 100644 --- a/cas/grpc_service/tests/bytestream_server_test.rs +++ b/cas/grpc_service/tests/bytestream_server_test.rs @@ -13,7 +13,7 @@ use tonic::Request; use common::DigestInfo; use config; use error::{make_err, Code, Error, ResultExt}; -use store::StoreManager; +use store::{StoreManager, UploadSizeInfo}; const INSTANCE_NAME: &str = "foo_instance_name"; const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789abcdef"; @@ -197,7 +197,11 @@ pub mod read_tests { let digest = DigestInfo::try_new(&HASH1, VALUE1.len())?; store - .update(digest, Box::new(Cursor::new(VALUE1)), VALUE1.len()) + .update( + digest, + Box::new(Cursor::new(VALUE1)), + UploadSizeInfo::ExactSize(VALUE1.len()), + ) .await?; let read_request = ReadRequest { @@ -245,7 +249,11 @@ pub mod read_tests { let data_len = raw_data.len(); let digest = DigestInfo::try_new(&HASH1, data_len)?; store - .update(digest, Box::new(Cursor::new(raw_data.clone())), data_len) + .update( + digest, + Box::new(Cursor::new(raw_data.clone())), + UploadSizeInfo::ExactSize(data_len), + ) .await?; let read_request = ReadRequest { diff --git a/cas/grpc_service/tests/cas_server_test.rs b/cas/grpc_service/tests/cas_server_test.rs index 132d49780..54025fb2a 100644 --- a/cas/grpc_service/tests/cas_server_test.rs +++ b/cas/grpc_service/tests/cas_server_test.rs @@ -14,7 +14,7 @@ use cas_server::CasServer; use common::DigestInfo; use config; use error::Error; -use store::StoreManager; +use store::{StoreManager, UploadSizeInfo}; const INSTANCE_NAME: &str = "foo_instance_name"; const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789abcdef"; @@ -84,7 +84,7 @@ mod find_missing_blobs { .update( DigestInfo::try_new(HASH1, VALUE.len())?, Box::new(Cursor::new(VALUE)), - VALUE.len(), + UploadSizeInfo::ExactSize(VALUE.len()), ) .await?; let raw_response = cas_server @@ -115,7 +115,7 @@ mod find_missing_blobs { .update( DigestInfo::try_new(HASH1, VALUE.len())?, Box::new(Cursor::new(VALUE)), - VALUE.len(), + UploadSizeInfo::ExactSize(VALUE.len()), ) .await?; let raw_response = cas_server @@ -177,7 +177,7 @@ mod batch_update_blobs { .update( DigestInfo::try_new(&HASH1, VALUE1.len())?, Box::new(Cursor::new(VALUE1)), - VALUE1.len(), + UploadSizeInfo::ExactSize(VALUE1.len()), ) .await .expect("Update should have succeeded"); @@ -258,7 +258,7 @@ mod batch_read_blobs { .update( DigestInfo::try_new(&HASH1, VALUE1.len())?, Box::new(Cursor::new(VALUE1)), - VALUE1.len(), + UploadSizeInfo::ExactSize(VALUE1.len()), ) .await .expect("Update should have succeeded"); @@ -266,7 +266,7 @@ mod batch_read_blobs { .update( DigestInfo::try_new(&HASH2, VALUE2.len())?, Box::new(Cursor::new(VALUE2)), - VALUE2.len(), + UploadSizeInfo::ExactSize(VALUE2.len()), ) .await .expect("Update should have succeeded"); diff --git a/cas/store/BUILD b/cas/store/BUILD index e1767b0fc..f4755a9a6 100644 --- a/cas/store/BUILD +++ b/cas/store/BUILD @@ -70,14 +70,13 @@ rust_library( "//config", "//third_party:futures", "//third_party:http", + "//third_party:lease", "//third_party:rand", "//third_party:rusoto_core", "//third_party:rusoto_s3", "//third_party:tokio", "//third_party:tokio_util", - "//third_party:fast_async_mutex", "//util:common", - "//util:async_read_taker", "//util:error", "//util:retry", ":traits", diff --git a/cas/store/lib.rs b/cas/store/lib.rs index c88a852a8..1b4d9077a 100644 --- a/cas/store/lib.rs +++ b/cas/store/lib.rs @@ -9,7 +9,7 @@ use memory_store::MemoryStore; use s3_store::S3Store; use verify_store::VerifyStore; -pub use traits::{StoreTrait as Store, StoreType}; +pub use traits::{StoreTrait as Store, StoreType, UploadSizeInfo}; pub struct StoreManager { stores: HashMap>, diff --git a/cas/store/memory_store.rs b/cas/store/memory_store.rs index e316c0717..15461a5d8 100644 --- a/cas/store/memory_store.rs +++ b/cas/store/memory_store.rs @@ -12,7 +12,7 @@ use common::DigestInfo; use config; use error::{Code, ResultExt}; use evicting_map::EvictingMap; -use traits::{ResultFuture, StoreTrait}; +use traits::{ResultFuture, StoreTrait, UploadSizeInfo}; pub struct MemoryStore { map: Mutex>, @@ -41,11 +41,16 @@ impl StoreTrait for MemoryStore { self: std::pin::Pin<&'a Self>, digest: DigestInfo, mut reader: Box, - expected_size: usize, + size_info: UploadSizeInfo, ) -> ResultFuture<'a, ()> { Box::pin(async move { - let mut buffer = Vec::with_capacity(expected_size); + let max_size = match size_info { + UploadSizeInfo::ExactSize(sz) => sz, + UploadSizeInfo::MaxSize(sz) => sz, + }; + let mut buffer = Vec::with_capacity(max_size); reader.read_to_end(&mut buffer).await?; + buffer.shrink_to_fit(); let mut map = self.map.lock().await; map.insert(digest, Arc::new(buffer)); Ok(()) diff --git a/cas/store/s3_store.rs b/cas/store/s3_store.rs index d7a761f0b..dd82252c6 100644 --- a/cas/store/s3_store.rs +++ b/cas/store/s3_store.rs @@ -1,15 +1,16 @@ // Copyright 2021 Nathan (Blaise) Bruer. All rights reserved. use std::cmp; +use std::io::Cursor; use std::marker::Send; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; -use fast_async_mutex::mutex::Mutex; -use futures::stream::unfold; +use futures::stream::{unfold, FuturesUnordered, StreamExt}; use http::status::StatusCode; +use lease::{Lease, Pool as ObjectPool}; use rand::{rngs::OsRng, Rng}; use rusoto_core::{region::Region, ByteStream, RusotoError}; use rusoto_s3::{ @@ -17,7 +18,7 @@ use rusoto_s3::{ CreateMultipartUploadRequest, GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectRequest, PutObjectRequest, S3Client, UploadPartRequest, S3, }; -use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::time::sleep; use tokio_util::io::ReaderStream; @@ -25,21 +26,25 @@ use common::{log, DigestInfo}; use config; use error::{make_err, make_input_err, Code, Error, ResultExt}; use retry::{ExponentialBackoff, Retrier, RetryResult}; -use traits::{ResultFuture, StoreTrait}; - -use async_read_taker::AsyncReadTaker; +use traits::{ResultFuture, StoreTrait, UploadSizeInfo}; // S3 parts cannot be smaller than this number. See: // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html const MIN_MULTIPART_SIZE: usize = 5 * 1024 * 1024; // 5mb. +// Size for the large vector pool if not specified. +const DEFAULT_BUFFER_POOL_SIZE: usize = 50; + +type ReaderType = Box; + pub struct S3Store { - s3_client: S3Client, + s3_client: Arc, bucket: String, key_prefix: String, jitter_fn: Box Duration + Send + Sync>, retry: config::backends::Retry, retrier: Retrier, + large_vec_pool: ObjectPool>, } impl S3Store { @@ -66,16 +71,28 @@ impl S3Store { s3_client: S3Client, jitter_fn: Box Duration + Send + Sync>, ) -> Result { + let mut buffer_pool_size = config.buffer_pool_size; + if buffer_pool_size == 0 { + buffer_pool_size = DEFAULT_BUFFER_POOL_SIZE; + } Ok(S3Store { - s3_client: s3_client, + s3_client: Arc::new(s3_client), bucket: config.bucket.to_owned(), key_prefix: config.key_prefix.as_ref().unwrap_or(&"".to_string()).to_owned(), jitter_fn: jitter_fn, retry: config.retry.to_owned(), retrier: Retrier::new(Box::new(|duration| Box::pin(sleep(duration)))), + large_vec_pool: ObjectPool::new(buffer_pool_size, || Vec::with_capacity(MIN_MULTIPART_SIZE)), }) } + async fn get_large_vec(self: Pin<&Self>) -> Lease> { + let mut write_data = self.large_vec_pool.get_async().await; + write_data.clear(); + write_data.shrink_to(MIN_MULTIPART_SIZE); + return write_data; + } + fn make_s3_path(&self, digest: &DigestInfo) -> String { format!("{}{}-{}", self.key_prefix, digest.str(), digest.size_bytes) } @@ -153,22 +170,41 @@ impl StoreTrait for S3Store { fn update<'a>( self: Pin<&'a Self>, digest: DigestInfo, - reader: Box, - expected_size: usize, + mut reader: ReaderType, + upload_size: UploadSizeInfo, ) -> ResultFuture<'a, ()> { Box::pin(async move { let s3_path = &self.make_s3_path(&digest); - // We make a major assumption here... If we cannot trust the size we assume it's an action cache. - // in this case it is extremely unlikely the payload will be greater than 5gb. If it is a CAS item - // it should be `trust_size = true` which we can upload in one chunk if small enough (and more efficient) - // but only if the size is small enough. We could use MAX_UPLOAD_PART_SIZE (5gb), but I think it's fine - // to use 5mb as a limit too. - if expected_size < MIN_MULTIPART_SIZE { + let max_size = match upload_size { + UploadSizeInfo::ExactSize(sz) => sz, + UploadSizeInfo::MaxSize(sz) => sz, + }; + // NOTE(blaise.bruer) It might be more optimal to use a different heuristic here, but for + // simplicity we use a hard codded value. Anything going down this if-statement will have + // the advantage of only 1 network request for the upload instead of minimum of 3 required + // for multipart upload requests. + if max_size < MIN_MULTIPART_SIZE { + let (reader, content_length) = if let UploadSizeInfo::ExactSize(sz) = upload_size { + (reader, Some(sz as i64)) + } else { + let mut write_buf = self.get_large_vec().await; + reader + .take(max_size as u64) + .read_to_end(&mut write_buf) + .await + .err_tip(|| "Failed to read file in upload to s3 in single chunk")?; + let content_length = write_buf.len(); + ( + Box::new(Cursor::new(write_buf)) as ReaderType, + Some(content_length as i64), + ) + }; + let put_object_request = PutObjectRequest { bucket: self.bucket.to_owned(), key: s3_path.to_owned(), - content_length: Some(expected_size as i64), + content_length, body: Some(ByteStream::new(ReaderStream::new(reader))), ..Default::default() }; @@ -182,7 +218,7 @@ impl StoreTrait for S3Store { // S3 requires us to upload in parts if the size is greater than 5GB. The part size must be at least // 5mb and can have up to 10,000 parts. - let bytes_per_upload_part = cmp::max(MIN_MULTIPART_SIZE, expected_size / (MIN_MULTIPART_SIZE - 1)); + let bytes_per_upload_part = cmp::max(MIN_MULTIPART_SIZE, max_size / (MIN_MULTIPART_SIZE - 1)); let response = self .s3_client @@ -201,45 +237,57 @@ impl StoreTrait for S3Store { let complete_result = { let mut part_number: i64 = 1; - let reader = Arc::new(Mutex::new(reader)); // We might end up with +1 capacity units than needed, but that is the worst case. - let mut completed_parts = Vec::with_capacity((expected_size / bytes_per_upload_part) + 1); + let mut completed_part_futs = FuturesUnordered::new(); loop { - let possible_last_chunk_size = expected_size - bytes_per_upload_part * ((part_number as usize) - 1); - let content_length = cmp::min(possible_last_chunk_size, bytes_per_upload_part); - let is_last_chunk = bytes_per_upload_part * (part_number as usize) >= expected_size; - // Wrap `AsyncRead` so we can hold a copy of it in this scope between iterations. - // This is quite difficult because we need to give full ownership of an AsyncRead - // to `ByteStream` which has an unknown lifetime. - // This wrapper will also ensure we only send `bytes_per_upload_part` then close the - // stream. - let taker = AsyncReadTaker::new(reader.clone(), content_length); - { - let body = Some(ByteStream::new(ReaderStream::new(taker))); - let response = self - .s3_client - .upload_part(UploadPartRequest { - bucket: self.bucket.to_owned(), - key: s3_path.to_owned(), - content_length: Some(content_length as i64), - body, - part_number, - upload_id: upload_id.clone(), - ..Default::default() - }) + let mut write_buf = self.get_large_vec().await; + let mut take = reader.take(bytes_per_upload_part as u64); + take.read_to_end(&mut write_buf) + .await + .err_tip(|| "Failed to read chunk in s3_store")?; + reader = take.into_inner(); + if write_buf.len() == 0 { + break; // Reached EOF. + } + + let content_length = Some(write_buf.len() as i64); + let body = Some(ByteStream::new(ReaderStream::new(Cursor::new(write_buf)))); + let request = UploadPartRequest { + bucket: self.bucket.to_owned(), + key: s3_path.to_owned(), + content_length, + body, + part_number, + upload_id: upload_id.clone(), + ..Default::default() + }; + + let s3_client = self.s3_client.clone(); + completed_part_futs.push(tokio::spawn(async move { + let part_number = request.part_number; + let mut response = s3_client + .upload_part(request) .await .map_err(|e| make_err!(Code::Unknown, "Failed to upload part: {:?}", e))?; - completed_parts.push(CompletedPart { - e_tag: response.e_tag, + let e_tag = response.e_tag.take(); + // Double down to ensure our Lease> is freed up and returned to pool. + drop(response); + Result::::Ok(CompletedPart { + e_tag, part_number: Some(part_number), - }); - } - if is_last_chunk { - break; - } + }) + })); part_number += 1; } + let mut completed_parts = Vec::with_capacity(completed_part_futs.len()); + while let Some(result) = completed_part_futs.next().await { + completed_parts.push( + result + .err_tip(|| "Failed to join s3 chunk upload")? + .err_tip(|| "Failed to upload chunk")?, + ); + } self.s3_client .complete_multipart_upload(CompleteMultipartUploadRequest { bucket: self.bucket.to_owned(), diff --git a/cas/store/store_trait.rs b/cas/store/store_trait.rs index 5791f16dc..54f1cfa9d 100644 --- a/cas/store/store_trait.rs +++ b/cas/store/store_trait.rs @@ -16,6 +16,19 @@ pub enum StoreType { pub type ResultFuture<'a, Res> = Pin> + 'a + Send>>; +#[derive(Debug, PartialEq, Copy, Clone)] +pub enum UploadSizeInfo { + /// When the data transfer amount is known to be exact size, this enum should be used. + /// The receiver store can use this to better optimize the way the data is sent or stored. + ExactSize(usize), + + /// When the data transfer amount is not known to be exact, the caller should use this enum + /// to provide the maximum size that could possibly be sent. This will bypass the exact size + /// checks, but still provide useful information to the underlying store about the data being + /// sent that it can then use to optimize the upload process. + MaxSize(usize), +} + #[async_trait] pub trait StoreTrait: Sync + Send + Unpin { fn has<'a>(self: Pin<&'a Self>, digest: DigestInfo) -> ResultFuture<'a, bool>; @@ -24,7 +37,7 @@ pub trait StoreTrait: Sync + Send + Unpin { self: Pin<&'a Self>, digest: DigestInfo, reader: Box, - upload_size: usize, + upload_size: UploadSizeInfo, ) -> ResultFuture<'a, ()>; fn get_part<'a>( diff --git a/cas/store/tests/memory_store_test.rs b/cas/store/tests/memory_store_test.rs index e91dd4571..227c26b0b 100644 --- a/cas/store/tests/memory_store_test.rs +++ b/cas/store/tests/memory_store_test.rs @@ -13,7 +13,7 @@ mod memory_store_tests { use common::DigestInfo; use config; use memory_store::MemoryStore; - use traits::StoreTrait; + use traits::{StoreTrait, UploadSizeInfo}; const VALID_HASH1: &str = "0123456789abcdef000000000000000000010000000000000123456789abcdef"; @@ -29,7 +29,7 @@ mod memory_store_tests { .update( DigestInfo::try_new(&VALID_HASH1, VALUE1.len())?, Box::new(Cursor::new(VALUE1)), - VALUE1.len(), + UploadSizeInfo::ExactSize(VALUE1.len()), ) .await?; assert!( @@ -47,7 +47,7 @@ mod memory_store_tests { .update( DigestInfo::try_new(&VALID_HASH1, VALUE2.len())?, Box::new(Cursor::new(VALUE2)), - VALUE2.len(), + UploadSizeInfo::ExactSize(VALUE2.len()), ) .await?; store @@ -80,7 +80,13 @@ mod memory_store_tests { const VALUE1: &str = "1234"; let digest = DigestInfo::try_new(&VALID_HASH1, 4).unwrap(); - store.update(digest.clone(), Box::new(Cursor::new(VALUE1)), 4).await?; + store + .update( + digest.clone(), + Box::new(Cursor::new(VALUE1)), + UploadSizeInfo::ExactSize(4), + ) + .await?; let mut store_data = Vec::new(); store @@ -129,7 +135,11 @@ mod memory_store_tests { assert!( digest.is_err() || store - .update(digest.unwrap(), Box::new(Cursor::new(value)), expected_size) + .update( + digest.unwrap(), + Box::new(Cursor::new(value)), + UploadSizeInfo::ExactSize(expected_size) + ) .await .is_err(), ".has() should have failed: {} {} {}", diff --git a/cas/store/tests/s3_store_test.rs b/cas/store/tests/s3_store_test.rs index f83d16a0e..81b110b31 100644 --- a/cas/store/tests/s3_store_test.rs +++ b/cas/store/tests/s3_store_test.rs @@ -20,7 +20,7 @@ use config; use error::Error; use error::ResultExt; use s3_store::S3Store; -use traits::StoreTrait; +use traits::{StoreTrait, UploadSizeInfo}; // Should match constant in s3_store. const MIN_MULTIPART_SIZE: usize = 5 * 1024 * 1024; // 5mb. @@ -176,7 +176,7 @@ mod s3_store_tests { .update( DigestInfo::try_new(&VALID_HASH1, 199)?, Box::new(Cursor::new(send_data.clone())), - 199, + UploadSizeInfo::ExactSize(199), ) .await?; } @@ -340,7 +340,7 @@ mod s3_store_tests { Box::new(move |_delay| Duration::from_secs(0)), )?; let store_pin = Pin::new(&store); - let data_len = send_data.len(); + let data_len = UploadSizeInfo::ExactSize(send_data.len()); store_pin .update(digest, Box::new(Cursor::new(send_data.clone())), data_len) .await?; @@ -401,6 +401,7 @@ mod s3_store_tests { rt_data, "Expected data to match" ); + assert_eq!( from_utf8(&request.headers["content-length"][0]).unwrap(), format!("{}", rt_data.len()) diff --git a/cas/store/tests/verify_store_test.rs b/cas/store/tests/verify_store_test.rs index b337311f3..0bd10e62a 100644 --- a/cas/store/tests/verify_store_test.rs +++ b/cas/store/tests/verify_store_test.rs @@ -16,7 +16,7 @@ mod verify_store_tests { use config; use error::{Error, ResultExt}; use memory_store::MemoryStore; - use traits::StoreTrait; + use traits::{StoreTrait, UploadSizeInfo}; use verify_store::VerifyStore; const VALID_HASH1: &str = "0123456789abcdef000000000000000000010000000000000123456789abcdef"; @@ -36,7 +36,13 @@ mod verify_store_tests { const VALUE1: &str = "123"; let digest = DigestInfo::try_new(&VALID_HASH1, 100).unwrap(); - let result = store.update(digest.clone(), Box::new(Cursor::new(VALUE1)), 3).await; + let result = store + .update( + digest.clone(), + Box::new(Cursor::new(VALUE1)), + UploadSizeInfo::ExactSize(3), + ) + .await; assert_eq!( result, Ok(()), @@ -66,7 +72,13 @@ mod verify_store_tests { const VALUE1: &str = "123"; let digest = DigestInfo::try_new(&VALID_HASH1, 100).unwrap(); - let result = store.update(digest.clone(), Box::new(Cursor::new(VALUE1)), 100).await; + let result = store + .update( + digest.clone(), + Box::new(Cursor::new(VALUE1)), + UploadSizeInfo::ExactSize(100), + ) + .await; assert!(result.is_err(), "Expected error, got: {:?}", &result); const EXPECTED_ERR: &str = "Expected size 100 but got size 3 on insert"; let err = result.unwrap_err().to_string(); @@ -99,7 +111,13 @@ mod verify_store_tests { const VALUE1: &str = "123"; let digest = DigestInfo::try_new(&VALID_HASH1, 3).unwrap(); - let result = store.update(digest.clone(), Box::new(Cursor::new(VALUE1)), 3).await; + let result = store + .update( + digest.clone(), + Box::new(Cursor::new(VALUE1)), + UploadSizeInfo::ExactSize(3), + ) + .await; assert_eq!(result, Ok(()), "Expected success, got: {:?}", result); assert_eq!( Pin::new(inner_store.as_ref()).has(digest).await?, @@ -126,7 +144,11 @@ mod verify_store_tests { let digest = DigestInfo::try_new(&VALID_HASH1, 6).unwrap(); let digest_clone = digest.clone(); - let future = tokio::spawn(async move { Pin::new(&store_owned).update(digest_clone, Box::new(rx), 6).await }); + let future = tokio::spawn(async move { + Pin::new(&store_owned) + .update(digest_clone, Box::new(rx), UploadSizeInfo::ExactSize(6)) + .await + }); tx.write_all("foo".as_bytes()).await?; tx.flush().await?; tx.write_all("bar".as_bytes()).await?; @@ -158,7 +180,13 @@ mod verify_store_tests { const HASH: &str = "a665a45920422f9d417e4867efdc4fb8a04a1f3fff1fa07e998e86f7f7a27ae3"; const VALUE: &str = "123"; let digest = DigestInfo::try_new(&HASH, 3).unwrap(); - let result = store.update(digest.clone(), Box::new(Cursor::new(VALUE)), 3).await; + let result = store + .update( + digest.clone(), + Box::new(Cursor::new(VALUE)), + UploadSizeInfo::ExactSize(3), + ) + .await; assert_eq!(result, Ok(()), "Expected success, got: {:?}", result); assert_eq!( Pin::new(inner_store.as_ref()).has(digest).await?, @@ -185,7 +213,13 @@ mod verify_store_tests { const HASH: &str = "6b51d431df5d7f141cbececcf79edf3dd861c3b4069f0b11661a3eefacbba918"; const VALUE: &str = "123"; let digest = DigestInfo::try_new(&HASH, 3).unwrap(); - let result = store.update(digest.clone(), Box::new(Cursor::new(VALUE)), 3).await; + let result = store + .update( + digest.clone(), + Box::new(Cursor::new(VALUE)), + UploadSizeInfo::ExactSize(3), + ) + .await; let err = result.unwrap_err().to_string(); const ACTUAL_HASH: &str = "a665a45920422f9d417e4867efdc4fb8a04a1f3fff1fa07e998e86f7f7a27ae3"; let expected_err = format!("Hashes do not match, got: {} but digest hash was {}", HASH, ACTUAL_HASH); diff --git a/cas/store/verify_store.rs b/cas/store/verify_store.rs index 2db39a809..cb1ecea7e 100644 --- a/cas/store/verify_store.rs +++ b/cas/store/verify_store.rs @@ -13,7 +13,7 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, WriteHalf}; use async_fixed_buffer::AsyncFixedBuf; use common::DigestInfo; use error::{error_if, Error, ResultExt}; -use traits::{ResultFuture, StoreTrait}; +use traits::{ResultFuture, StoreTrait, UploadSizeInfo}; pub struct VerifyStore { inner_store: Arc, @@ -38,7 +38,7 @@ impl VerifyStore { async fn inner_check_update<'a>( mut tx: WriteHalf>>, mut reader: Box, - expected_size: usize, + size_info: UploadSizeInfo, mut maybe_hasher: Option<([u8; 32], Sha256)>, ) -> Result<(), Error> { let mut buffer = vec![0u8; 1024 * 4]; @@ -58,12 +58,14 @@ async fn inner_check_update<'a>( if sz != 0 { continue; } - error_if!( - sum_size != expected_size, - "Expected size {} but got size {} on insert", - expected_size, - sum_size - ); + if let UploadSizeInfo::ExactSize(expected_size) = size_info { + error_if!( + sum_size != expected_size, + "Expected size {} but got size {} on insert", + expected_size, + sum_size + ); + } if let Some((original_hash, hasher)) = maybe_hasher { let hash_result: [u8; 32] = hasher.finalize().into(); error_if!( @@ -93,17 +95,19 @@ impl StoreTrait for VerifyStore { self: std::pin::Pin<&'a Self>, digest: DigestInfo, reader: Box, - expected_size: usize, + size_info: UploadSizeInfo, ) -> ResultFuture<'a, ()> { Box::pin(async move { let digest_size = usize::try_from(digest.size_bytes).err_tip(|| "Digest size_bytes was not convertible to usize")?; - error_if!( - self.verify_size && expected_size != digest_size, - "Expected size to match. Got {} but digest says {} on update", - expected_size, - digest.size_bytes - ); + if let UploadSizeInfo::ExactSize(expected_size) = size_info { + error_if!( + self.verify_size && expected_size != digest_size, + "Expected size to match. Got {} but digest says {} on update", + expected_size, + digest.size_bytes + ); + } let mut raw_fixed_buffer = AsyncFixedBuf::new(vec![0u8; 1024 * 4].into_boxed_slice()); let stream_closer_fut = raw_fixed_buffer.get_closer(); let (rx, tx) = tokio::io::split(raw_fixed_buffer); @@ -112,14 +116,14 @@ impl StoreTrait for VerifyStore { let inner_store_clone = self.inner_store.clone(); let spawn_future = tokio::spawn(async move { Pin::new(inner_store_clone.as_ref()) - .update(digest, Box::new(rx), expected_size) + .update(digest, Box::new(rx), size_info) .await }); let mut hasher = None; if self.verify_hash { hasher = Some((hash_copy, Sha256::new())); } - let result = inner_check_update(tx, reader, expected_size, hasher).await; + let result = inner_check_update(tx, reader, size_info, hasher).await; stream_closer_fut.await; result.merge( spawn_future diff --git a/config/backends.rs b/config/backends.rs index 025f15321..5b8b0a219 100644 --- a/config/backends.rs +++ b/config/backends.rs @@ -98,6 +98,19 @@ pub struct S3Store { /// Retry configuration to use when a network request fails. #[serde(default)] pub retry: Retry, + + /// The number of buffer objects available to this store. The default value is 5MB + /// for each entry. Due to the way S3Store buffers it's data and can process multiple + /// uploads and downloads at a time (even for the same request), it might be possible + /// for localhost to send data much faster than S3 can receive the data. If we do not + /// use a pool of buffer objects we might end up with a significant amount of data + /// queued up for upload in memory. This value will help curb this event from happening + /// by throttling a request from being able to read/write more data until a previous + /// pooled object is released. + /// + /// Default: 50 - This is arbitrary and no research was performed to choose this number. + #[serde(default)] + pub buffer_pool_size: usize, } /// Retry configuration. This configuration is exponential and each iteration diff --git a/third_party/BUILD.bazel b/third_party/BUILD.bazel index 29fa88e9d..aaf22b85d 100644 --- a/third_party/BUILD.bazel +++ b/third_party/BUILD.bazel @@ -102,6 +102,15 @@ alias( ], ) +alias( + name = "lease", + actual = "@raze__lease__0_2_3//:lease", + tags = [ + "cargo-raze", + "manual", + ], +) + alias( name = "log", actual = "@raze__log__0_4_14//:log", diff --git a/third_party/crates.bzl b/third_party/crates.bzl index dfa6fa16a..871ecb06b 100644 --- a/third_party/crates.bzl +++ b/third_party/crates.bzl @@ -891,6 +891,16 @@ def raze_fetch_remote_crates(): build_file = Label("//third_party/remote:BUILD.indexmap-1.7.0.bazel"), ) + maybe( + http_archive, + name = "raze__instant__0_1_12", + url = "https://crates.io/api/v1/crates/instant/0.1.12/download", + type = "tar.gz", + sha256 = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c", + strip_prefix = "instant-0.1.12", + build_file = Label("//third_party/remote:BUILD.instant-0.1.12.bazel"), + ) + maybe( http_archive, name = "raze__itertools__0_10_1", @@ -951,6 +961,16 @@ def raze_fetch_remote_crates(): build_file = Label("//third_party/remote:BUILD.lazy_static-1.4.0.bazel"), ) + maybe( + http_archive, + name = "raze__lease__0_2_3", + url = "https://crates.io/api/v1/crates/lease/0.2.3/download", + type = "tar.gz", + sha256 = "bbc1ff844bcf8922037d4fc6557e912f697edd049ff61a49aaa126eebfaa140d", + strip_prefix = "lease-0.2.3", + build_file = Label("//third_party/remote:BUILD.lease-0.2.3.bazel"), + ) + maybe( http_archive, name = "raze__libc__0_2_106", @@ -961,6 +981,26 @@ def raze_fetch_remote_crates(): build_file = Label("//third_party/remote:BUILD.libc-0.2.106.bazel"), ) + maybe( + http_archive, + name = "raze__lock_api__0_4_5", + url = "https://crates.io/api/v1/crates/lock_api/0.4.5/download", + type = "tar.gz", + sha256 = "712a4d093c9976e24e7dbca41db895dabcbac38eb5f4045393d17a95bdfb1109", + strip_prefix = "lock_api-0.4.5", + build_file = Label("//third_party/remote:BUILD.lock_api-0.4.5.bazel"), + ) + + maybe( + http_archive, + name = "raze__lockfree__0_5_1", + url = "https://crates.io/api/v1/crates/lockfree/0.5.1/download", + type = "tar.gz", + sha256 = "74ee94b5ad113c7cb98c5a040f783d0952ee4fe100993881d1673c2cb002dd23", + strip_prefix = "lockfree-0.5.1", + build_file = Label("//third_party/remote:BUILD.lockfree-0.5.1.bazel"), + ) + maybe( http_archive, name = "raze__log__0_4_14", @@ -1171,6 +1211,36 @@ def raze_fetch_remote_crates(): build_file = Label("//third_party/remote:BUILD.output_vt100-0.1.2.bazel"), ) + maybe( + http_archive, + name = "raze__owned_alloc__0_2_0", + url = "https://crates.io/api/v1/crates/owned-alloc/0.2.0/download", + type = "tar.gz", + sha256 = "30fceb411f9a12ff9222c5f824026be368ff15dc2f13468d850c7d3f502205d6", + strip_prefix = "owned-alloc-0.2.0", + build_file = Label("//third_party/remote:BUILD.owned-alloc-0.2.0.bazel"), + ) + + maybe( + http_archive, + name = "raze__parking_lot__0_11_2", + url = "https://crates.io/api/v1/crates/parking_lot/0.11.2/download", + type = "tar.gz", + sha256 = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99", + strip_prefix = "parking_lot-0.11.2", + build_file = Label("//third_party/remote:BUILD.parking_lot-0.11.2.bazel"), + ) + + maybe( + http_archive, + name = "raze__parking_lot_core__0_8_5", + url = "https://crates.io/api/v1/crates/parking_lot_core/0.8.5/download", + type = "tar.gz", + sha256 = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216", + strip_prefix = "parking_lot_core-0.8.5", + build_file = Label("//third_party/remote:BUILD.parking_lot_core-0.8.5.bazel"), + ) + maybe( http_archive, name = "raze__percent_encoding__2_1_0", @@ -1615,6 +1685,16 @@ def raze_fetch_remote_crates(): build_file = Label("//third_party/remote:BUILD.schannel-0.1.19.bazel"), ) + maybe( + http_archive, + name = "raze__scopeguard__1_1_0", + url = "https://crates.io/api/v1/crates/scopeguard/1.1.0/download", + type = "tar.gz", + sha256 = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd", + strip_prefix = "scopeguard-1.1.0", + build_file = Label("//third_party/remote:BUILD.scopeguard-1.1.0.bazel"), + ) + maybe( http_archive, name = "raze__security_framework__2_4_2", @@ -1755,6 +1835,16 @@ def raze_fetch_remote_crates(): build_file = Label("//third_party/remote:BUILD.slab-0.4.5.bazel"), ) + maybe( + http_archive, + name = "raze__smallvec__1_7_0", + url = "https://crates.io/api/v1/crates/smallvec/1.7.0/download", + type = "tar.gz", + sha256 = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309", + strip_prefix = "smallvec-1.7.0", + build_file = Label("//third_party/remote:BUILD.smallvec-1.7.0.bazel"), + ) + maybe( http_archive, name = "raze__socket2__0_4_2", @@ -2397,10 +2487,10 @@ def raze_fetch_remote_crates(): maybe( http_archive, - name = "raze__zeroize__1_4_2", - url = "https://crates.io/api/v1/crates/zeroize/1.4.2/download", + name = "raze__zeroize__1_4_3", + url = "https://crates.io/api/v1/crates/zeroize/1.4.3/download", type = "tar.gz", - sha256 = "bf68b08513768deaa790264a7fac27a58cbf2705cfcdc9448362229217d7e970", - strip_prefix = "zeroize-1.4.2", - build_file = Label("//third_party/remote:BUILD.zeroize-1.4.2.bazel"), + sha256 = "d68d9dcec5f9b43a30d38c49f91dfedfaac384cb8f085faca366c26207dd1619", + strip_prefix = "zeroize-1.4.3", + build_file = Label("//third_party/remote:BUILD.zeroize-1.4.3.bazel"), ) diff --git a/third_party/remote/BUILD.instant-0.1.12.bazel b/third_party/remote/BUILD.instant-0.1.12.bazel new file mode 100644 index 000000000..340df800e --- /dev/null +++ b/third_party/remote/BUILD.instant-0.1.12.bazel @@ -0,0 +1,56 @@ +""" +@generated +cargo-raze crate build file. + +DO NOT EDIT! Replaced on runs of cargo-raze +""" + +# buildifier: disable=load +load("@bazel_skylib//lib:selects.bzl", "selects") + +# buildifier: disable=load +load( + "@rules_rust//rust:defs.bzl", + "rust_binary", + "rust_library", + "rust_proc_macro", + "rust_test", +) + +package(default_visibility = [ + # Public for visibility by "@raze__crate__version//" targets. + # + # Prefer access through "//third_party", which limits external + # visibility to explicit Cargo.toml dependencies. + "//visibility:public", +]) + +licenses([ + "notice", # BSD-3-Clause from expression "BSD-3-Clause" +]) + +# Generated Targets + +rust_library( + name = "instant", + srcs = glob(["**/*.rs"]), + crate_features = [ + ], + crate_root = "src/lib.rs", + data = [], + edition = "2018", + rustc_flags = [ + "--cap-lints=allow", + ], + tags = [ + "cargo-raze", + "manual", + ], + version = "0.1.12", + # buildifier: leave-alone + deps = [ + "@raze__cfg_if__1_0_0//:cfg_if", + ], +) + +# Unsupported target "wasm" with type "test" omitted diff --git a/third_party/remote/BUILD.lease-0.2.3.bazel b/third_party/remote/BUILD.lease-0.2.3.bazel new file mode 100644 index 000000000..653365634 --- /dev/null +++ b/third_party/remote/BUILD.lease-0.2.3.bazel @@ -0,0 +1,61 @@ +""" +@generated +cargo-raze crate build file. + +DO NOT EDIT! Replaced on runs of cargo-raze +""" + +# buildifier: disable=load +load("@bazel_skylib//lib:selects.bzl", "selects") + +# buildifier: disable=load +load( + "@rules_rust//rust:defs.bzl", + "rust_binary", + "rust_library", + "rust_proc_macro", + "rust_test", +) + +package(default_visibility = [ + # Public for visibility by "@raze__crate__version//" targets. + # + # Prefer access through "//third_party", which limits external + # visibility to explicit Cargo.toml dependencies. + "//visibility:public", +]) + +licenses([ + "notice", # MIT from expression "MIT OR Apache-2.0" +]) + +# Generated Targets + +rust_library( + name = "lease", + srcs = glob(["**/*.rs"]), + crate_features = [ + "async", + "default", + "futures-core", + ], + crate_root = "src/lib.rs", + data = [], + edition = "2018", + rustc_flags = [ + "--cap-lints=allow", + ], + tags = [ + "cargo-raze", + "manual", + ], + version = "0.2.3", + # buildifier: leave-alone + deps = [ + "@raze__futures_core__0_3_17//:futures_core", + "@raze__lockfree__0_5_1//:lockfree", + "@raze__parking_lot__0_11_2//:parking_lot", + ], +) + +# Unsupported target "test_api" with type "test" omitted diff --git a/third_party/remote/BUILD.lock_api-0.4.5.bazel b/third_party/remote/BUILD.lock_api-0.4.5.bazel new file mode 100644 index 000000000..6d7b14feb --- /dev/null +++ b/third_party/remote/BUILD.lock_api-0.4.5.bazel @@ -0,0 +1,54 @@ +""" +@generated +cargo-raze crate build file. + +DO NOT EDIT! Replaced on runs of cargo-raze +""" + +# buildifier: disable=load +load("@bazel_skylib//lib:selects.bzl", "selects") + +# buildifier: disable=load +load( + "@rules_rust//rust:defs.bzl", + "rust_binary", + "rust_library", + "rust_proc_macro", + "rust_test", +) + +package(default_visibility = [ + # Public for visibility by "@raze__crate__version//" targets. + # + # Prefer access through "//third_party", which limits external + # visibility to explicit Cargo.toml dependencies. + "//visibility:public", +]) + +licenses([ + "notice", # Apache-2.0 from expression "Apache-2.0 OR MIT" +]) + +# Generated Targets + +rust_library( + name = "lock_api", + srcs = glob(["**/*.rs"]), + crate_features = [ + ], + crate_root = "src/lib.rs", + data = [], + edition = "2018", + rustc_flags = [ + "--cap-lints=allow", + ], + tags = [ + "cargo-raze", + "manual", + ], + version = "0.4.5", + # buildifier: leave-alone + deps = [ + "@raze__scopeguard__1_1_0//:scopeguard", + ], +) diff --git a/third_party/remote/BUILD.lockfree-0.5.1.bazel b/third_party/remote/BUILD.lockfree-0.5.1.bazel new file mode 100644 index 000000000..0802273a5 --- /dev/null +++ b/third_party/remote/BUILD.lockfree-0.5.1.bazel @@ -0,0 +1,54 @@ +""" +@generated +cargo-raze crate build file. + +DO NOT EDIT! Replaced on runs of cargo-raze +""" + +# buildifier: disable=load +load("@bazel_skylib//lib:selects.bzl", "selects") + +# buildifier: disable=load +load( + "@rules_rust//rust:defs.bzl", + "rust_binary", + "rust_library", + "rust_proc_macro", + "rust_test", +) + +package(default_visibility = [ + # Public for visibility by "@raze__crate__version//" targets. + # + # Prefer access through "//third_party", which limits external + # visibility to explicit Cargo.toml dependencies. + "//visibility:public", +]) + +licenses([ + "notice", # MIT from expression "MIT" +]) + +# Generated Targets + +rust_library( + name = "lockfree", + srcs = glob(["**/*.rs"]), + crate_features = [ + ], + crate_root = "src/lib.rs", + data = [], + edition = "2015", + rustc_flags = [ + "--cap-lints=allow", + ], + tags = [ + "cargo-raze", + "manual", + ], + version = "0.5.1", + # buildifier: leave-alone + deps = [ + "@raze__owned_alloc__0_2_0//:owned_alloc", + ], +) diff --git a/third_party/remote/BUILD.owned-alloc-0.2.0.bazel b/third_party/remote/BUILD.owned-alloc-0.2.0.bazel new file mode 100644 index 000000000..69833d9ef --- /dev/null +++ b/third_party/remote/BUILD.owned-alloc-0.2.0.bazel @@ -0,0 +1,53 @@ +""" +@generated +cargo-raze crate build file. + +DO NOT EDIT! Replaced on runs of cargo-raze +""" + +# buildifier: disable=load +load("@bazel_skylib//lib:selects.bzl", "selects") + +# buildifier: disable=load +load( + "@rules_rust//rust:defs.bzl", + "rust_binary", + "rust_library", + "rust_proc_macro", + "rust_test", +) + +package(default_visibility = [ + # Public for visibility by "@raze__crate__version//" targets. + # + # Prefer access through "//third_party", which limits external + # visibility to explicit Cargo.toml dependencies. + "//visibility:public", +]) + +licenses([ + "notice", # MIT from expression "MIT" +]) + +# Generated Targets + +rust_library( + name = "owned_alloc", + srcs = glob(["**/*.rs"]), + crate_features = [ + ], + crate_root = "src/lib.rs", + data = [], + edition = "2015", + rustc_flags = [ + "--cap-lints=allow", + ], + tags = [ + "cargo-raze", + "manual", + ], + version = "0.2.0", + # buildifier: leave-alone + deps = [ + ], +) diff --git a/third_party/remote/BUILD.parking_lot-0.11.2.bazel b/third_party/remote/BUILD.parking_lot-0.11.2.bazel new file mode 100644 index 000000000..648308409 --- /dev/null +++ b/third_party/remote/BUILD.parking_lot-0.11.2.bazel @@ -0,0 +1,58 @@ +""" +@generated +cargo-raze crate build file. + +DO NOT EDIT! Replaced on runs of cargo-raze +""" + +# buildifier: disable=load +load("@bazel_skylib//lib:selects.bzl", "selects") + +# buildifier: disable=load +load( + "@rules_rust//rust:defs.bzl", + "rust_binary", + "rust_library", + "rust_proc_macro", + "rust_test", +) + +package(default_visibility = [ + # Public for visibility by "@raze__crate__version//" targets. + # + # Prefer access through "//third_party", which limits external + # visibility to explicit Cargo.toml dependencies. + "//visibility:public", +]) + +licenses([ + "notice", # Apache-2.0 from expression "Apache-2.0 OR MIT" +]) + +# Generated Targets + +rust_library( + name = "parking_lot", + srcs = glob(["**/*.rs"]), + crate_features = [ + ], + crate_root = "src/lib.rs", + data = [], + edition = "2018", + rustc_flags = [ + "--cap-lints=allow", + ], + tags = [ + "cargo-raze", + "manual", + ], + version = "0.11.2", + # buildifier: leave-alone + deps = [ + "@raze__instant__0_1_12//:instant", + "@raze__lock_api__0_4_5//:lock_api", + "@raze__parking_lot_core__0_8_5//:parking_lot_core", + ], +) + +# Unsupported target "issue_203" with type "test" omitted diff --git a/third_party/remote/BUILD.parking_lot_core-0.8.5.bazel b/third_party/remote/BUILD.parking_lot_core-0.8.5.bazel new file mode 100644 index 000000000..714921fd2 --- /dev/null +++ b/third_party/remote/BUILD.parking_lot_core-0.8.5.bazel @@ -0,0 +1,103 @@ +""" +@generated +cargo-raze crate build file. + +DO NOT EDIT! Replaced on runs of cargo-raze +""" + +# buildifier: disable=load +load("@bazel_skylib//lib:selects.bzl", "selects") + +# buildifier: disable=load +load( + "@rules_rust//rust:defs.bzl", + "rust_binary", + "rust_library", + "rust_proc_macro", + "rust_test", +) + +package(default_visibility = [ + # Public for visibility by "@raze__crate__version//" targets. + # + # Prefer access through "//third_party", which limits external + # visibility to explicit Cargo.toml dependencies. + "//visibility:public", +]) + +licenses([ + "notice", # Apache-2.0 from expression "Apache-2.0 OR MIT" +]) + +# Generated Targets +# buildifier: disable=out-of-order-load +# buildifier: disable=load-on-top +load( + "@rules_rust//cargo:cargo_build_script.bzl", + "cargo_build_script", +) + +cargo_build_script( + name = "parking_lot_core_build_script", + srcs = glob(["**/*.rs"]), + build_script_env = { + }, + crate_features = [ + ], + crate_root = "build.rs", + data = glob(["**"]), + edition = "2018", + rustc_flags = [ + "--cap-lints=allow", + ], + tags = [ + "cargo-raze", + "manual", + ], + version = "0.8.5", + visibility = ["//visibility:private"], + deps = [ + ] + selects.with_or({ + # cfg(unix) + ( + "@rules_rust//rust/platform:x86_64-unknown-linux-gnu", + ): [ + ], + "//conditions:default": [], + }), +) + +rust_library( + name = "parking_lot_core", + srcs = glob(["**/*.rs"]), + aliases = { + }, + crate_features = [ + ], + crate_root = "src/lib.rs", + data = [], + edition = "2018", + rustc_flags = [ + "--cap-lints=allow", + ], + tags = [ + "cargo-raze", + "manual", + ], + version = "0.8.5", + # buildifier: leave-alone + deps = [ + ":parking_lot_core_build_script", + "@raze__cfg_if__1_0_0//:cfg_if", + "@raze__instant__0_1_12//:instant", + "@raze__smallvec__1_7_0//:smallvec", + ] + selects.with_or({ + # cfg(unix) + ( + "@rules_rust//rust/platform:x86_64-unknown-linux-gnu", + ): [ + "@raze__libc__0_2_106//:libc", + ], + "//conditions:default": [], + }), +) diff --git a/third_party/remote/BUILD.rusoto_credential-0.46.0.bazel b/third_party/remote/BUILD.rusoto_credential-0.46.0.bazel index e387d885a..fdb5574a1 100644 --- a/third_party/remote/BUILD.rusoto_credential-0.46.0.bazel +++ b/third_party/remote/BUILD.rusoto_credential-0.46.0.bazel @@ -60,7 +60,7 @@ rust_library( "@raze__serde_json__1_0_68//:serde_json", "@raze__shlex__0_1_1//:shlex", "@raze__tokio__1_13_0//:tokio", - "@raze__zeroize__1_4_2//:zeroize", + "@raze__zeroize__1_4_3//:zeroize", ], ) diff --git a/third_party/remote/BUILD.scopeguard-1.1.0.bazel b/third_party/remote/BUILD.scopeguard-1.1.0.bazel new file mode 100644 index 000000000..b63a722f9 --- /dev/null +++ b/third_party/remote/BUILD.scopeguard-1.1.0.bazel @@ -0,0 +1,55 @@ +""" +@generated +cargo-raze crate build file. + +DO NOT EDIT! Replaced on runs of cargo-raze +""" + +# buildifier: disable=load +load("@bazel_skylib//lib:selects.bzl", "selects") + +# buildifier: disable=load +load( + "@rules_rust//rust:defs.bzl", + "rust_binary", + "rust_library", + "rust_proc_macro", + "rust_test", +) + +package(default_visibility = [ + # Public for visibility by "@raze__crate__version//" targets. + # + # Prefer access through "//third_party", which limits external + # visibility to explicit Cargo.toml dependencies. + "//visibility:public", +]) + +licenses([ + "notice", # MIT from expression "MIT OR Apache-2.0" +]) + +# Generated Targets + +# Unsupported target "readme" with type "example" omitted + +rust_library( + name = "scopeguard", + srcs = glob(["**/*.rs"]), + crate_features = [ + ], + crate_root = "src/lib.rs", + data = [], + edition = "2015", + rustc_flags = [ + "--cap-lints=allow", + ], + tags = [ + "cargo-raze", + "manual", + ], + version = "1.1.0", + # buildifier: leave-alone + deps = [ + ], +) diff --git a/third_party/remote/BUILD.smallvec-1.7.0.bazel b/third_party/remote/BUILD.smallvec-1.7.0.bazel new file mode 100644 index 000000000..032cb002f --- /dev/null +++ b/third_party/remote/BUILD.smallvec-1.7.0.bazel @@ -0,0 +1,57 @@ +""" +@generated +cargo-raze crate build file. + +DO NOT EDIT! Replaced on runs of cargo-raze +""" + +# buildifier: disable=load +load("@bazel_skylib//lib:selects.bzl", "selects") + +# buildifier: disable=load +load( + "@rules_rust//rust:defs.bzl", + "rust_binary", + "rust_library", + "rust_proc_macro", + "rust_test", +) + +package(default_visibility = [ + # Public for visibility by "@raze__crate__version//" targets. + # + # Prefer access through "//third_party", which limits external + # visibility to explicit Cargo.toml dependencies. + "//visibility:public", +]) + +licenses([ + "notice", # MIT from expression "MIT OR Apache-2.0" +]) + +# Generated Targets + +# Unsupported target "bench" with type "bench" omitted + +rust_library( + name = "smallvec", + srcs = glob(["**/*.rs"]), + crate_features = [ + ], + crate_root = "src/lib.rs", + data = [], + edition = "2018", + rustc_flags = [ + "--cap-lints=allow", + ], + tags = [ + "cargo-raze", + "manual", + ], + version = "1.7.0", + # buildifier: leave-alone + deps = [ + ], +) + +# Unsupported target "macro" with type "test" omitted diff --git a/third_party/remote/BUILD.winapi-0.3.9.bazel b/third_party/remote/BUILD.winapi-0.3.9.bazel index 2c12e924e..af57845fa 100644 --- a/third_party/remote/BUILD.winapi-0.3.9.bazel +++ b/third_party/remote/BUILD.winapi-0.3.9.bazel @@ -62,6 +62,7 @@ cargo_build_script( "namedpipeapi", "ntdef", "ntsecapi", + "ntstatus", "objbase", "processenv", "profileapi", @@ -126,6 +127,7 @@ rust_library( "namedpipeapi", "ntdef", "ntsecapi", + "ntstatus", "objbase", "processenv", "profileapi", diff --git a/third_party/remote/BUILD.zeroize-1.4.2.bazel b/third_party/remote/BUILD.zeroize-1.4.3.bazel similarity index 97% rename from third_party/remote/BUILD.zeroize-1.4.2.bazel rename to third_party/remote/BUILD.zeroize-1.4.3.bazel index 7c7145d6f..47a68f351 100644 --- a/third_party/remote/BUILD.zeroize-1.4.2.bazel +++ b/third_party/remote/BUILD.zeroize-1.4.3.bazel @@ -48,7 +48,7 @@ rust_library( "cargo-raze", "manual", ], - version = "1.4.2", + version = "1.4.3", # buildifier: leave-alone deps = [ ], diff --git a/util/BUILD b/util/BUILD index 600cd6f6d..9263751b7 100644 --- a/util/BUILD +++ b/util/BUILD @@ -63,18 +63,6 @@ rust_library( visibility = ["//visibility:public"], ) -rust_library( - name = "async_read_taker", - srcs = ["async_read_taker.rs"], - deps = [ - "//third_party:fast_async_mutex", - "//third_party:futures", - "//third_party:pin_project_lite", - "//third_party:tokio", - ], - visibility = ["//visibility:public"], -) - rust_test( name = "async_fixed_buffer_test", srcs = ["tests/async_fixed_buffer_test.rs"], @@ -112,17 +100,3 @@ rust_test( ":retry", ], ) - -rust_test( - name = "async_read_taker_test", - srcs = ["tests/async_read_taker_test.rs"], - deps = [ - "//third_party:fast_async_mutex", - "//third_party:futures", - "//third_party:pretty_assertions", - "//third_party:tokio", - ":async_fixed_buffer", - ":async_read_taker", - ":error", - ], -) diff --git a/util/async_read_taker.rs b/util/async_read_taker.rs deleted file mode 100644 index f7071285f..000000000 --- a/util/async_read_taker.rs +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright 2021 Nathan (Blaise) Bruer. All rights reserved. - -use std::future::Future; -use std::marker::Send; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - -use fast_async_mutex::mutex::Mutex; -use futures::ready; -use pin_project_lite::pin_project; -use tokio::io::{AsyncRead, ReadBuf}; - -pub type ArcMutexAsyncRead = Arc>>; - -// TODO(blaise.bruer) It does not look like this class is needed any more. Should consider removing it. -pin_project! { - /// Useful object that can be used to chunk an AsyncReader by a specific size. - /// This also requires the inner reader be sharable between threads. This allows - /// the caller to still "own" the underlying reader in a way that once `limit` is - /// reached the caller can keep using it, but still use this struct to read the data. - pub struct AsyncReadTaker { - inner: ArcMutexAsyncRead, - // Add '_' to avoid conflicts with `limit` method. - limit_: usize, - } -} - -impl AsyncReadTaker { - pub fn new(inner: ArcMutexAsyncRead, limit: usize) -> Self { - AsyncReadTaker { inner, limit_: limit } - } -} - -impl AsyncRead for AsyncReadTaker { - /// Note: This function is modeled after tokio::Take::poll_read. - /// see: https://docs.rs/tokio/1.12.0/src/tokio/io/util/take.rs.html#77 - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - if self.limit_ == 0 { - return Poll::Ready(Ok(())); - } - - let b = { - let mut inner_fut = self.inner.lock_owned(); - let mut inner = Pin::new(ready!(Pin::new(&mut inner_fut).poll(cx))); - // Now that we have all our locks lets begin changing our state. - let mut b = buf.take(self.limit_); - ready!(inner.as_mut().poll_read(cx, &mut b))?; - b - }; - let n = b.filled().len(); - // We need to update the original ReadBuf - unsafe { - buf.assume_init(n); - } - buf.advance(n); - self.limit_ -= n; - - Poll::Ready(Ok(())) - } -} diff --git a/util/tests/async_read_taker_test.rs b/util/tests/async_read_taker_test.rs deleted file mode 100644 index 54aa39921..000000000 --- a/util/tests/async_read_taker_test.rs +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2021 Nathan (Blaise) Bruer. All rights reserved. - -use std::sync::Arc; - -use fast_async_mutex::mutex::Mutex; -use futures::{poll, FutureExt}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; - -use async_fixed_buffer::AsyncFixedBuf; -use async_read_taker::{ArcMutexAsyncRead, AsyncReadTaker}; -use error::{make_err, Code, Error}; - -#[cfg(test)] -mod async_read_taker_tests { - use super::*; - use pretty_assertions::assert_eq; // Must be declared in every module. - - #[tokio::test] - async fn done_before_split() -> Result<(), Error> { - let raw_fixed_buffer = AsyncFixedBuf::new(vec![0u8; 100].into_boxed_slice()); - let (rx, mut tx) = tokio::io::split(raw_fixed_buffer); - - let mut taker = AsyncReadTaker::new(Arc::new(Mutex::new(Box::new(rx))), 1024); - let write_data = vec![97u8; 50]; - { - // Send our data. - tx.write_all(&write_data).await?; - tx.write(&vec![]).await?; // Write EOF. - } - { - // Receive and check our data. - let mut read_buffer = Vec::new(); - let read_sz = taker.read_to_end(&mut read_buffer).await?; - assert_eq!(read_sz, 50, "Expected sizes to match"); - assert_eq!(&read_buffer, &write_data, "Expected sizes to match"); - } - Ok(()) - } - - #[tokio::test] - async fn shutdown_during_read() -> Result<(), Error> { - let raw_fixed_buffer = AsyncFixedBuf::new(vec![0u8; 100].into_boxed_slice()); - let (rx, mut tx) = tokio::io::split(raw_fixed_buffer); - - const WRITE_DATA: &[u8] = &[97u8; 25]; - const READ_AMOUNT: usize = 50; - - let reader: ArcMutexAsyncRead = Arc::new(Mutex::new(Box::new(rx))); - - tx.write_all(&WRITE_DATA).await?; - - let mut taker = Box::pin(AsyncReadTaker::new(reader.clone(), READ_AMOUNT)); - - let mut read_buffer = Vec::new(); - let mut read_fut = taker.read_to_end(&mut read_buffer).boxed(); - { - // Poll the future to make sure it did start reading. Failing to do this step makes this test useless. - assert!( - poll!(&mut read_fut).is_pending(), - "Should not have received EOF. Should be pending" - ); - } - // Shutdown the sender. This should cause the futures to resolve. - tx.shutdown().await?; - { - // Ensure an appropriate error message was returned. - let err: Error = read_fut.await.unwrap_err().into(); - assert_eq!(err, make_err!(Code::Internal, "Sender disconnected")); - assert_eq!( - &read_buffer, &WRITE_DATA, - "Expected poll!() macro to have processed the data we wrote" - ); - } - - Ok(()) - } -}