Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

client/finality-grandpa/src/until_imported: Refactor BlockGlobalMessage #5390

Merged
merged 4 commits into from
Mar 25, 2020
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 77 additions & 24 deletions client/finality-grandpa/src/until_imported.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};

use std::collections::{HashMap, VecDeque};
use std::pin::Pin;
use std::sync::{atomic::{AtomicUsize, Ordering}, Arc};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use sp_finality_grandpa::AuthorityId;
Expand Down Expand Up @@ -308,7 +308,7 @@ pub(crate) type UntilVoteTargetImported<Block, BlockStatus, BlockSyncRequester,
/// This is used for compact commits and catch up messages which have already
/// been checked for structural soundness (e.g. valid signatures).
mxinden marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) struct BlockGlobalMessage<Block: BlockT> {
inner: Arc<(AtomicUsize, Mutex<Option<CommunicationIn<Block>>>)>,
inner: Arc<Mutex<Option<CommunicationIn<Block>>>>,
target_number: NumberFor<Block>,
}

Expand Down Expand Up @@ -416,7 +416,7 @@ impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> {
return Ok(())
}

let locked_global = Arc::new((AtomicUsize::new(unknown_count), Mutex::new(Some(input))));
let locked_global = Arc::new(Mutex::new(Some(input)));

// schedule waits for all unknown messages.
// when the last one of these has `wait_completed` called on it,
Expand All @@ -438,30 +438,20 @@ impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> {

fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked> {
if self.target_number != canon_number {
// if we return without deducting the counter, then none of the other
// handles can return the commit message.
// Delete the inner message so it won't ever be forwarded. Future calls to
// `wait_completed` on the same `inner` will ignore it.
*self.inner.lock() = None;
return None;
}

let mut last_count = self.inner.0.load(Ordering::Acquire);

// CAS loop to ensure that we always have a last reader.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case we decide against this, we should at least replace this CAS loop with a single fetch_sub call.

loop {
if last_count == 1 { // we are the last one left.
return self.inner.1.lock().take();
}

let prev_value = self.inner.0.compare_and_swap(
last_count,
last_count - 1,
Ordering::SeqCst,
);

if prev_value == last_count {
return None;
} else {
last_count = prev_value;
}
match Arc::try_unwrap(self.inner) {
// This is the last reference and thus the last outstanding block to be awaited. `inner`
// is either `Some(_)` or `None`. The latter implies that a previous `wait_completed`
// call witnessed a block number mismatch (see above).
Ok(inner) => Mutex::into_inner(inner),
// There are still other strong references to this `Arc`, thus the message is blocked on
// other blocks to be imported.
Err(_self) => None,
mxinden marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down Expand Up @@ -941,4 +931,67 @@ mod tests {

futures::executor::block_on(test);
}

fn test_catch_up() -> Arc<Mutex<Option<CommunicationIn<Block>>>> {
let header = make_header(5);

let unknown_catch_up = finality_grandpa::CatchUp {
round_number: 1,
precommits: vec![],
prevotes: vec![],
base_hash: header.hash(),
base_number: *header.number(),
};

let catch_up = voter::CommunicationIn::CatchUp(
unknown_catch_up.clone(),
voter::Callback::Blank,
);

Arc::new(Mutex::new(Some(catch_up)))
}

#[test]
fn block_global_message_wait_completed_return_when_all_awaited() {
let msg_inner = test_catch_up();

let waiting_block_1 = BlockGlobalMessage::<Block> {
inner: msg_inner.clone(),
target_number: 1,
};

let waiting_block_2 = BlockGlobalMessage::<Block> {
inner: msg_inner,
target_number: 2,
};

// waiting_block_2 is still waiting for block 2, thus this should return `None`.
assert!(waiting_block_1.wait_completed(1).is_none());

// Message only depended on block 1 and 2. Both have been imported, thus this should yield
// the message.
assert!(waiting_block_2.wait_completed(2).is_some());
}

#[test]
fn block_global_message_wait_completed_return_none_on_block_number_missmatch() {
let msg_inner = test_catch_up();

let waiting_block_1 = BlockGlobalMessage::<Block> {
inner: msg_inner.clone(),
target_number: 1,
};

let waiting_block_2 = BlockGlobalMessage::<Block> {
inner: msg_inner,
target_number: 2,
};

// Calling wait_completed with wrong block number should yield None.
assert!(waiting_block_1.wait_completed(1234).is_none());

// All blocks, that the message depended on, have been imported. Still, given the above
// block number mismatch this should return None.
assert!(waiting_block_2.wait_completed(2).is_none());
}
}