Skip to content

Commit

Permalink
Restore retries for stores to remote REAPI byte stores (#19737)
Browse files Browse the repository at this point in the history
This fixes #19732 by restoring the retries when storing hits retryable
server failures from the REAPI remote cache server, which were lost in
the #19050 refactoring.

This also explicitly tests for retries, refactoring `StubCAS` to
generalise `read_request_count` to expose the counts of more requests
than just reads, and also consistently return a `Status::internal(...)`
for the simulated errors.

I think #19050 fortunately landed just after 2.17 was cut, so this
regression only affects the 2.18 pre-releases.
  • Loading branch information
huonw authored Sep 7, 2023
1 parent 5a2328c commit b1d9e14
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 60 deletions.
22 changes: 16 additions & 6 deletions src/rust/engine/fs/store/src/remote/reapi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,22 @@ impl ByteStoreProvider for Provider {
let batch_api_allowed_by_server_config =
max_batch_total_size_bytes == 0 || len < max_batch_total_size_bytes;

let result = if batch_api_allowed_by_local_config && batch_api_allowed_by_server_config {
self.store_bytes_source_batch(digest, bytes).await
} else {
self.store_bytes_source_stream(digest, bytes).await
};
result.map_err(|e| e.to_string())
retry_call(
bytes,
move |bytes| async move {
if batch_api_allowed_by_local_config && batch_api_allowed_by_server_config {
self.store_bytes_source_batch(digest, bytes).await
} else {
self.store_bytes_source_stream(digest, bytes).await
}
},
|err| match err {
ByteStoreError::Grpc(status) => status_is_retryable(status),
ByteStoreError::Other(_) => false,
},
)
.await
.map_err(|e| e.to_string())
}

async fn load(
Expand Down
60 changes: 57 additions & 3 deletions src/rust/engine/fs/store/src/remote/reapi_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::time::Duration;
use bytes::Bytes;
use grpc_util::tls;
use hashing::{Digest, Fingerprint};
use mock::StubCAS;
use mock::{RequestType, StubCAS};
use testutil::data::TestData;

use crate::remote::{ByteStoreProvider, RemoteOptions};
Expand Down Expand Up @@ -118,7 +118,12 @@ async fn load_grpc_error() {
assert!(
error.contains("StubCAS is configured to always fail"),
"Bad error message, got: {error}"
)
);
// retries:
assert_eq!(
cas.request_counts.lock().get(&RequestType::BSRead),
Some(&3)
);
}

#[tokio::test]
Expand Down Expand Up @@ -219,7 +224,7 @@ async fn store_bytes_empty_file() {
}

#[tokio::test]
async fn store_bytes_grpc_error() {
async fn store_bytes_batch_grpc_error() {
let testdata = TestData::roland();
let cas = StubCAS::cas_always_errors();
let provider = new_provider(&cas).await;
Expand All @@ -232,6 +237,47 @@ async fn store_bytes_grpc_error() {
error.contains("StubCAS is configured to always fail"),
"Bad error message, got: {error}"
);

// retries:
assert_eq!(
cas
.request_counts
.lock()
.get(&RequestType::CASBatchUpdateBlobs),
Some(&3)
);
}

#[tokio::test]
async fn store_bytes_write_stream_grpc_error() {
let cas = StubCAS::cas_always_errors();
let chunk_size = 10 * 1024;
let provider = Provider::new(remote_options(
cas.address(),
chunk_size,
0, // disable batch API, force streaming API
))
.await
.unwrap();

let all_the_henries = big_file_bytes();
let fingerprint = big_file_fingerprint();
let digest = Digest::new(fingerprint, all_the_henries.len());

let error = provider
.store_bytes(digest, byte_source(all_the_henries))
.await
.expect_err("Want err");
assert!(
error.contains("StubCAS is configured to always fail"),
"Bad error message, got: {error}"
);

// retries:
assert_eq!(
cas.request_counts.lock().get(&RequestType::BSWrite),
Some(&3)
);
}

#[tokio::test]
Expand Down Expand Up @@ -300,4 +346,12 @@ async fn list_missing_digests_grpc_error() {
error.contains("StubCAS is configured to always fail"),
"Bad error message, got: {error}"
);
// retries:
assert_eq!(
cas
.request_counts
.lock()
.get(&RequestType::CASFindMissingBlobs),
Some(&3)
);
}
24 changes: 12 additions & 12 deletions src/rust/engine/fs/store/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use fs::{
use grpc_util::prost::MessageExt;
use grpc_util::tls;
use hashing::{Digest, Fingerprint};
use mock::StubCAS;
use mock::{RequestType, StubCAS};
use protos::gen::build::bazel::remote::execution::v2 as remexec;
use workunit_store::WorkunitStore;

Expand Down Expand Up @@ -151,7 +151,7 @@ async fn load_file_prefers_local() {
.await,
Ok(testdata.bytes())
);
assert_eq!(0, cas.read_request_count());
assert_eq!(0, cas.request_count(RequestType::BSRead));
}

#[tokio::test]
Expand Down Expand Up @@ -179,7 +179,7 @@ async fn load_directory_prefers_local() {
.unwrap(),
testdir.directory()
);
assert_eq!(0, cas.read_request_count());
assert_eq!(0, cas.request_count(RequestType::BSRead));
}

#[tokio::test]
Expand All @@ -198,7 +198,7 @@ async fn load_file_falls_back_and_backfills() {
Ok(testdata.bytes()),
"Read from CAS"
);
assert_eq!(1, cas.read_request_count());
assert_eq!(1, cas.request_count(RequestType::BSRead));
assert_eq!(
crate::local_tests::load_file_bytes(
&crate::local_tests::new_store(dir.path()),
Expand Down Expand Up @@ -232,7 +232,7 @@ async fn load_file_falls_back_and_backfills_for_huge_file() {
.unwrap(),
testdata.bytes()
);
assert_eq!(1, cas.read_request_count());
assert_eq!(1, cas.request_count(RequestType::BSRead));
assert!(
crate::local_tests::load_file_bytes(
&crate::local_tests::new_store(dir.path()),
Expand Down Expand Up @@ -260,7 +260,7 @@ async fn load_directory_falls_back_and_backfills() {
.unwrap(),
testdir.directory()
);
assert_eq!(1, cas.read_request_count());
assert_eq!(1, cas.request_count(RequestType::BSRead));
assert_eq!(
crate::local_tests::load_directory_proto_bytes(
&crate::local_tests::new_store(dir.path()),
Expand Down Expand Up @@ -336,7 +336,7 @@ async fn load_file_missing_is_none() {
)
.await;
assert!(matches!(result, Err(StoreError::MissingDigest { .. })),);
assert_eq!(1, cas.read_request_count());
assert_eq!(1, cas.request_count(RequestType::BSRead));
}

#[tokio::test]
Expand All @@ -349,7 +349,7 @@ async fn load_directory_missing_errors() {
.load_directory(TestDirectory::containing_roland().digest())
.await;
assert!(matches!(result, Err(StoreError::MissingDigest { .. })),);
assert_eq!(1, cas.read_request_count());
assert_eq!(1, cas.request_count(RequestType::BSRead));
}

#[tokio::test]
Expand All @@ -365,9 +365,9 @@ async fn load_file_remote_error_is_error() {
.await
.expect_err("Want error");
assert!(
cas.read_request_count() > 0,
cas.request_count(RequestType::BSRead) > 0,
"Want read_request_count > 0 but got {}",
cas.read_request_count()
cas.request_count(RequestType::BSRead)
);
assert!(
error
Expand All @@ -389,9 +389,9 @@ async fn load_directory_remote_error_is_error() {
.await
.expect_err("Want error");
assert!(
cas.read_request_count() > 0,
cas.request_count(RequestType::BSRead) > 0,
"Want read_request_count > 0 but got {}",
cas.read_request_count()
cas.request_count(RequestType::BSRead)
);
assert!(
error
Expand Down
32 changes: 26 additions & 6 deletions src/rust/engine/testutil/mock/src/cas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::cas_service::StubCASResponder;
pub struct StubCAS {
// CAS fields.
// TODO: These are inlined (rather than namespaced) for backwards compatibility.
read_request_count: Arc<Mutex<usize>>,
pub request_counts: Arc<RequestCounter>,
pub write_message_sizes: Arc<Mutex<Vec<usize>>>,
pub blobs: Arc<Mutex<HashMap<Fingerprint, Bytes>>>,
// AC fields.
Expand All @@ -46,6 +46,26 @@ pub struct StubCAS {
shutdown_sender: Option<tokio::sync::oneshot::Sender<()>>,
}

pub type RequestCounter = Mutex<HashMap<RequestType, usize>>;

#[derive(PartialEq, Eq, Hash, Debug, Copy, Clone)]
pub enum RequestType {
// ByteStream
BSRead,
BSWrite,
// ContentAddressableStorage
CASFindMissingBlobs,
CASBatchUpdateBlobs,
CASBatchReadBlobs,
// add others of interest as required
}

impl RequestType {
pub fn record(self, request_counts: &RequestCounter) {
*request_counts.lock().entry(self).or_insert(0) += 1;
}
}

impl Drop for StubCAS {
fn drop(&mut self) {
if let Some(s) = self.shutdown_sender.take() {
Expand Down Expand Up @@ -158,15 +178,15 @@ impl StubCASBuilder {
}

pub fn build(self) -> StubCAS {
let read_request_count = Arc::new(Mutex::new(0));
let request_counts = Arc::new(Mutex::new(HashMap::new()));
let write_message_sizes = Arc::new(Mutex::new(Vec::new()));
let blobs = Arc::new(Mutex::new(self.content));
let cas_responder = StubCASResponder {
chunk_size_bytes: self.chunk_size_bytes.unwrap_or(1024),
instance_name: self.instance_name,
blobs: blobs.clone(),
always_errors: self.cas_always_errors,
read_request_count: read_request_count.clone(),
request_counts: request_counts.clone(),
write_message_sizes: write_message_sizes.clone(),
required_auth_header: self.required_auth_token.map(|t| format!("Bearer {t}")),
};
Expand Down Expand Up @@ -204,7 +224,7 @@ impl StubCASBuilder {
});

StubCAS {
read_request_count,
request_counts,
write_message_sizes,
blobs,
action_cache: ActionCacheHandle {
Expand Down Expand Up @@ -237,8 +257,8 @@ impl StubCAS {
format!("http://{}", self.local_addr)
}

pub fn read_request_count(&self) -> usize {
*self.read_request_count.lock()
pub fn request_count(&self, request_type: RequestType) -> usize {
*self.request_counts.lock().get(&request_type).unwrap_or(&0)
}

pub fn remove(&self, fingerprint: Fingerprint) -> bool {
Expand Down
Loading

0 comments on commit b1d9e14

Please sign in to comment.