Skip to content

Commit

Permalink
mononoke: make sure multiplexed blobstore write succeeds if all under…
Browse files Browse the repository at this point in the history
…lying

Summary:
We had somewhat inconsistent behaviour in multiplexed blobstore:
1) If on_put handlers are too slow (i.e. they are slower than all blobstores) then we
succeed as soon as all blobstores were successful (regardless of the value of
minimum_successful_writes). It doesn't matter if on_put handlers fail or
succeed, we've already returned success to our user.
2) However if all writes to the queue quickly fail, then we return a failure
even if writes to all blobstore were successful.

#2 seems like a change in behaviour from an old diff D17421208 (9de1de2), and not a
desirable one - if blobstore sync queue is unavailable and it responds with
failures quickly, then blobstore writes will always fail even if all blobstores
are healthy.

So this diff makes it so that we always succeed if all blobstore puts were
successful, regardless of success or failures of on_put handlers.

Reviewed By: liubov-dmitrieva

Differential Revision: D29985084

fbshipit-source-id: 64338d552be45a70d9b1d16dfbe7d10346ab539c
  • Loading branch information
StanislavGlebik authored and facebook-github-bot committed Jul 29, 2021
1 parent e2a7bc8 commit eb55bb4
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 7 deletions.
16 changes: 10 additions & 6 deletions eden/mononoke/blobstore/multiplexedblob/src/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ pub struct MultiplexedBlobstoreBase {
/// 2. When we're recording blobstore stats to Scuba on a `get` - in this case, the read executes
/// solely to gather statistics, and the result is discarded
write_mostly_blobstores: Arc<[(BlobstoreId, Arc<dyn BlobstorePutOps>)]>,
/// At least this many `put` and `on_put` pairs have to succeed before we consider a `put` successful
/// `put` is considered successful if either this many `put` and `on_put` pairs succeeded or all puts were
/// successful (regardless of whether `on_put`s were successful).
/// This is meant to ensure that `put` fails if the data could end up lost (e.g. if a buggy experimental
/// blobstore wins the `put` race).
/// Note that if this is bigger than the number of blobstores, we will always fail writes
Expand Down Expand Up @@ -656,6 +657,7 @@ impl MultiplexedBlobstoreBase {
.increment_counter(PerfCounterType::BlobPuts);

let mut put_errors = HashMap::new();
let mut handler_errors = HashMap::new();
let mut handlers = FuturesUnordered::new();

while let Some(result) = select_next(
Expand Down Expand Up @@ -696,15 +698,17 @@ impl MultiplexedBlobstoreBase {
}
}
Right(Err((blobstore_id, e))) => {
put_errors.insert(blobstore_id, e);
handler_errors.insert(blobstore_id, e);
}
}
}
if put_errors.len() == 1 {
let (_, put_error) = put_errors.drain().next().unwrap();
Err(put_error)
let mut errors = put_errors;
errors.extend(handler_errors.into_iter());
if errors.len() == 1 {
let (_, error) = errors.drain().next().unwrap();
Err(error)
} else {
Err(ErrorKind::MultiplePutFailures(Arc::new(put_errors)).into())
Err(ErrorKind::MultiplePutFailures(Arc::new(errors)).into())
}
}
.timed()
Expand Down
67 changes: 66 additions & 1 deletion eden/mononoke/blobstore/multiplexedblob/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::queue::MultiplexedBlobstore;
use crate::scrub::{
LoggingScrubHandler, ScrubAction, ScrubBlobstore, ScrubHandler, ScrubOptions, ScrubWriteMostly,
};
use anyhow::{bail, Result};
use anyhow::{anyhow, bail, Result};
use async_trait::async_trait;
use blobstore::{
Blobstore, BlobstoreGetData, BlobstoreIsPresent, BlobstoreMetadata, BlobstorePutOps,
Expand Down Expand Up @@ -236,6 +236,25 @@ impl MultiplexedBlobstorePutHandler for LogHandler {
}
}

struct FailingPutHandler {}

#[async_trait]
impl MultiplexedBlobstorePutHandler for FailingPutHandler {
async fn on_put<'out>(
&'out self,
_ctx: &'out CoreContext,
mut _scuba: MononokeScubaSampleBuilder,
_blobstore_id: BlobstoreId,
_blobstore_type: String,
_multiplex_id: MultiplexId,
_operation_key: &'out OperationKey,
_key: &'out str,
_blob_size: Option<u64>,
) -> Result<()> {
Err(anyhow!("failed on_put"))
}
}

fn make_value(value: &str) -> BlobstoreBytes {
BlobstoreBytes::from_bytes(Bytes::copy_from_slice(value.as_bytes()))
}
Expand Down Expand Up @@ -1791,3 +1810,49 @@ async fn no_handlers(fb: FacebookInit) {
clear();
}
}

#[fbinit::test]
async fn failing_put_handler(fb: FacebookInit) {
let bs0 = Arc::new(Tickable::new());
let bs1 = Arc::new(Tickable::new());
let bs2 = Arc::new(Tickable::new());
let failing_put_handler = Arc::new(FailingPutHandler {});
let bs = MultiplexedBlobstoreBase::new(
MultiplexId::new(1),
vec![
(BlobstoreId::new(0), bs0.clone()),
(BlobstoreId::new(1), bs1.clone()),
(BlobstoreId::new(2), bs2.clone()),
],
vec![],
// 1 mininum successful write
nonzero!(1usize),
failing_put_handler,
MononokeScubaSampleBuilder::with_discard(),
nonzero!(1u64),
);
let ctx = CoreContext::test_mock(fb);

let k = String::from("k");
let v = make_value("v");

// Put succeeds in all blobstores, so failures in log handler shouldn't matter.
{
let mut fut = bs
.put(&ctx, k.to_owned(), v.clone())
.map_err(|_| ())
.boxed();

assert_eq!(PollOnce::new(Pin::new(&mut fut)).await, Poll::Pending);

bs0.tick(None);
// Poll the future to trigger a handler, which would fail
assert_eq!(PollOnce::new(Pin::new(&mut fut)).await, Poll::Pending);

bs1.tick(None);
bs2.tick(None);

// Make sure put is successful
fut.await.expect("Put should have succeeded");
}
}

0 comments on commit eb55bb4

Please sign in to comment.