Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Remove unneeded locking #499

Merged
merged 12 commits into from
Mar 11, 2016
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,7 @@ travis-nightly = ["ethcore/json-tests", "dev-clippy", "dev"]
[[bin]]
path = "parity/main.rs"
name = "parity"

[profile.release]
debug = false
lto = false
178 changes: 99 additions & 79 deletions ethcore/src/block_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub struct BlockQueue {
panic_handler: Arc<PanicHandler>,
engine: Arc<Box<Engine>>,
more_to_verify: Arc<Condvar>,
verification: Arc<Mutex<Verification>>,
verification: Arc<Verification>,
verifiers: Vec<JoinHandle<()>>,
deleting: Arc<AtomicBool>,
ready_signal: Arc<QueueSignal>,
Expand Down Expand Up @@ -132,18 +132,23 @@ impl QueueSignal {
}
}

#[derive(Default)]
struct Verification {
unverified: VecDeque<UnverifiedBlock>,
verified: VecDeque<PreverifiedBlock>,
verifying: VecDeque<VerifyingBlock>,
bad: HashSet<H256>,
// All locks must be captured in the order declared here.
unverified: Mutex<VecDeque<UnverifiedBlock>>,
verified: Mutex<VecDeque<PreverifiedBlock>>,
verifying: Mutex<VecDeque<VerifyingBlock>>,
bad: Mutex<HashSet<H256>>,
}

impl BlockQueue {
/// Creates a new queue instance.
pub fn new(config: BlockQueueConfig, engine: Arc<Box<Engine>>, message_channel: IoChannel<NetSyncMessage>) -> BlockQueue {
let verification = Arc::new(Mutex::new(Verification::default()));
let verification = Arc::new(Verification {
unverified: Mutex::new(VecDeque::new()),
verified: Mutex::new(VecDeque::new()),
verifying: Mutex::new(VecDeque::new()),
bad: Mutex::new(HashSet::new()),
});
let more_to_verify = Arc::new(Condvar::new());
let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel });
let deleting = Arc::new(AtomicBool::new(false));
Expand Down Expand Up @@ -186,17 +191,17 @@ impl BlockQueue {
}
}

fn verify(verification: Arc<Mutex<Verification>>, engine: Arc<Box<Engine>>, wait: Arc<Condvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>, empty: Arc<Condvar>) {
fn verify(verification: Arc<Verification>, engine: Arc<Box<Engine>>, wait: Arc<Condvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>, empty: Arc<Condvar>) {
while !deleting.load(AtomicOrdering::Acquire) {
{
let mut lock = verification.lock().unwrap();
let mut unverified = verification.unverified.lock().unwrap();

if lock.unverified.is_empty() && lock.verifying.is_empty() {
if unverified.is_empty() && verification.verifying.lock().unwrap().is_empty() {
empty.notify_all();
}

while lock.unverified.is_empty() && !deleting.load(AtomicOrdering::Acquire) {
lock = wait.wait(lock).unwrap();
while unverified.is_empty() && !deleting.load(AtomicOrdering::Acquire) {
unverified = wait.wait(unverified).unwrap();
}

if deleting.load(AtomicOrdering::Acquire) {
Expand All @@ -205,39 +210,42 @@ impl BlockQueue {
}

let block = {
let mut v = verification.lock().unwrap();
if v.unverified.is_empty() {
let mut unverified = verification.unverified.lock().unwrap();
if unverified.is_empty() {
continue;
}
let block = v.unverified.pop_front().unwrap();
v.verifying.push_back(VerifyingBlock{ hash: block.header.hash(), block: None });
let mut verifying = verification.verifying.lock().unwrap();
let block = unverified.pop_front().unwrap();
verifying.push_back(VerifyingBlock{ hash: block.header.hash(), block: None });
block
};

let block_hash = block.header.hash();
match verify_block_unordered(block.header, block.bytes, engine.deref().deref()) {
Ok(verified) => {
let mut v = verification.lock().unwrap();
for e in &mut v.verifying {
let mut verifying = verification.verifying.lock().unwrap();
for e in verifying.iter_mut() {
if e.hash == block_hash {
e.block = Some(verified);
break;
}
}
if !v.verifying.is_empty() && v.verifying.front().unwrap().hash == block_hash {
if !verifying.is_empty() && verifying.front().unwrap().hash == block_hash {
// we're next!
let mut vref = v.deref_mut();
BlockQueue::drain_verifying(&mut vref.verifying, &mut vref.verified, &mut vref.bad);
let mut verified = verification.verified.lock().unwrap();
let mut bad = verification.bad.lock().unwrap();
BlockQueue::drain_verifying(&mut verifying, &mut verified, &mut bad);
ready.set();
}
},
Err(err) => {
let mut v = verification.lock().unwrap();
let mut verifying = verification.verifying.lock().unwrap();
let mut verified = verification.verified.lock().unwrap();
let mut bad = verification.bad.lock().unwrap();
warn!(target: "client", "Stage 2 block verification failed for {}\nError: {:?}", block_hash, err);
v.bad.insert(block_hash.clone());
v.verifying.retain(|e| e.hash != block_hash);
let mut vref = v.deref_mut();
BlockQueue::drain_verifying(&mut vref.verifying, &mut vref.verified, &mut vref.bad);
bad.insert(block_hash.clone());
verifying.retain(|e| e.hash != block_hash);
BlockQueue::drain_verifying(&mut verifying, &mut verified, &mut bad);
ready.set();
}
}
Expand All @@ -257,19 +265,21 @@ impl BlockQueue {
}

/// Clear the queue and stop verification activity.
pub fn clear(&mut self) {
let mut verification = self.verification.lock().unwrap();
verification.unverified.clear();
verification.verifying.clear();
verification.verified.clear();
pub fn clear(&self) {
let mut unverified = self.verification.unverified.lock().unwrap();
let mut verifying = self.verification.verifying.lock().unwrap();
let mut verified = self.verification.verified.lock().unwrap();
unverified.clear();
verifying.clear();
verified.clear();
self.processing.write().unwrap().clear();
}

/// Wait for queue to be empty
pub fn flush(&mut self) {
let mut verification = self.verification.lock().unwrap();
while !verification.unverified.is_empty() || !verification.verifying.is_empty() {
verification = self.empty.wait(verification).unwrap();
/// Wait for unverified queue to be empty
pub fn flush(&self) {
let mut unverified = self.verification.unverified.lock().unwrap();
while !unverified.is_empty() || !self.verification.verifying.lock().unwrap().is_empty() {
unverified = self.empty.wait(unverified).unwrap();
}
}

Expand All @@ -278,76 +288,76 @@ impl BlockQueue {
if self.processing.read().unwrap().contains(&hash) {
return BlockStatus::Queued;
}
if self.verification.lock().unwrap().bad.contains(&hash) {
if self.verification.bad.lock().unwrap().contains(&hash) {
return BlockStatus::Bad;
}
BlockStatus::Unknown
}

/// Add a block to the queue.
pub fn import_block(&mut self, bytes: Bytes) -> ImportResult {
pub fn import_block(&self, bytes: Bytes) -> ImportResult {
let header = BlockView::new(&bytes).header();
let h = header.hash();
if self.processing.read().unwrap().contains(&h) {
return Err(x!(ImportError::AlreadyQueued));
}
{
let mut verification = self.verification.lock().unwrap();
if verification.bad.contains(&h) {
if self.processing.read().unwrap().contains(&h) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any rational explanation of introducing this block around if expression?

return Err(x!(ImportError::AlreadyQueued));
}

let mut bad = self.verification.bad.lock().unwrap();
if bad.contains(&h) {
return Err(x!(ImportError::KnownBad));
}

if verification.bad.contains(&header.parent_hash) {
verification.bad.insert(h.clone());
if bad.contains(&header.parent_hash) {
bad.insert(h.clone());
return Err(x!(ImportError::KnownBad));
}
}

match verify_block_basic(&header, &bytes, self.engine.deref().deref()) {
Ok(()) => {
self.processing.write().unwrap().insert(h.clone());
self.verification.lock().unwrap().unverified.push_back(UnverifiedBlock { header: header, bytes: bytes });
self.verification.unverified.lock().unwrap().push_back(UnverifiedBlock { header: header, bytes: bytes });
self.more_to_verify.notify_all();
Ok(h)
},
Err(err) => {
warn!(target: "client", "Stage 1 block verification failed for {}\nError: {:?}", BlockView::new(&bytes).header_view().sha3(), err);
self.verification.lock().unwrap().bad.insert(h.clone());
self.verification.bad.lock().unwrap().insert(h.clone());
Err(err)
}
}
}

/// Mark given block and all its children as bad. Stops verification.
pub fn mark_as_bad(&mut self, block_hashes: &[H256]) {
pub fn mark_as_bad(&self, block_hashes: &[H256]) {
if block_hashes.is_empty() {
return;
}
let mut verification_lock = self.verification.lock().unwrap();
let mut verified_lock = self.verification.verified.lock().unwrap();
let mut verified = verified_lock.deref_mut();
let mut bad = self.verification.bad.lock().unwrap();
let mut processing = self.processing.write().unwrap();

let mut verification = verification_lock.deref_mut();

verification.bad.reserve(block_hashes.len());
bad.reserve(block_hashes.len());
for hash in block_hashes {
verification.bad.insert(hash.clone());
bad.insert(hash.clone());
processing.remove(&hash);
}

let mut new_verified = VecDeque::new();
for block in verification.verified.drain(..) {
if verification.bad.contains(&block.header.parent_hash) {
verification.bad.insert(block.header.hash());
for block in verified.drain(..) {
if bad.contains(&block.header.parent_hash) {
bad.insert(block.header.hash());
processing.remove(&block.header.hash());
} else {
new_verified.push_back(block);
}
}
verification.verified = new_verified;
*verified = new_verified;
}

/// Mark given block as processed
pub fn mark_as_good(&mut self, block_hashes: &[H256]) {
pub fn mark_as_good(&self, block_hashes: &[H256]) {
if block_hashes.is_empty() {
return;
}
Expand All @@ -358,45 +368,55 @@ impl BlockQueue {
}

/// Removes up to `max` verified blocks from the queue
pub fn drain(&mut self, max: usize) -> Vec<PreverifiedBlock> {
let mut verification = self.verification.lock().unwrap();
let count = min(max, verification.verified.len());
pub fn drain(&self, max: usize) -> Vec<PreverifiedBlock> {
let mut verified = self.verification.verified.lock().unwrap();
let count = min(max, verified.len());
let mut result = Vec::with_capacity(count);
for _ in 0..count {
let block = verification.verified.pop_front().unwrap();
let block = verified.pop_front().unwrap();
result.push(block);
}
self.ready_signal.reset();
if !verification.verified.is_empty() {
if !verified.is_empty() {
self.ready_signal.set();
}
result
}

/// Get queue status.
pub fn queue_info(&self) -> BlockQueueInfo {
let verification = self.verification.lock().unwrap();
let (unverified_len, unverified_bytes) = {
let v = self.verification.unverified.lock().unwrap();
(v.len(), v.heap_size_of_children())
};
let (verifying_len, verifying_bytes) = {
let v = self.verification.verifying.lock().unwrap();
(v.len(), v.heap_size_of_children())
};
let (verified_len, verified_bytes) = {
let v = self.verification.verified.lock().unwrap();
(v.len(), v.heap_size_of_children())
};
BlockQueueInfo {
verified_queue_size: verification.verified.len(),
unverified_queue_size: verification.unverified.len(),
verifying_queue_size: verification.verifying.len(),
unverified_queue_size: unverified_len,
verifying_queue_size: verifying_len,
verified_queue_size: verified_len,
max_queue_size: self.max_queue_size,
max_mem_use: self.max_mem_use,
mem_used:
verification.unverified.heap_size_of_children()
+ verification.verifying.heap_size_of_children()
+ verification.verified.heap_size_of_children(),
unverified_bytes
+ verifying_bytes
+ verified_bytes
// TODO: https://github.com/servo/heapsize/pull/50
//+ self.processing.read().unwrap().heap_size_of_children(),
}
}

pub fn collect_garbage(&self) {
{
let mut verification = self.verification.lock().unwrap();
verification.unverified.shrink_to_fit();
verification.verifying.shrink_to_fit();
verification.verified.shrink_to_fit();
self.verification.unverified.lock().unwrap().shrink_to_fit();
self.verification.verifying.lock().unwrap().shrink_to_fit();
self.verification.verified.lock().unwrap().shrink_to_fit();
}
self.processing.write().unwrap().shrink_to_fit();
}
Expand Down Expand Up @@ -444,15 +464,15 @@ mod tests {

#[test]
fn can_import_blocks() {
let mut queue = get_test_queue();
let queue = get_test_queue();
if let Err(e) = queue.import_block(get_good_dummy_block()) {
panic!("error importing block that is valid by definition({:?})", e);
}
}

#[test]
fn returns_error_for_duplicates() {
let mut queue = get_test_queue();
let queue = get_test_queue();
if let Err(e) = queue.import_block(get_good_dummy_block()) {
panic!("error importing block that is valid by definition({:?})", e);
}
Expand All @@ -471,7 +491,7 @@ mod tests {

#[test]
fn returns_ok_for_drained_duplicates() {
let mut queue = get_test_queue();
let queue = get_test_queue();
let block = get_good_dummy_block();
let hash = BlockView::new(&block).header().hash().clone();
if let Err(e) = queue.import_block(block) {
Expand All @@ -488,7 +508,7 @@ mod tests {

#[test]
fn returns_empty_once_finished() {
let mut queue = get_test_queue();
let queue = get_test_queue();
queue.import_block(get_good_dummy_block()).expect("error importing block that is valid by definition");
queue.flush();
queue.drain(1);
Expand Down
Loading