Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Incrementally calculate verification queue heap size #2749

Merged
merged 3 commits into from
Oct 20, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 55 additions & 11 deletions ethcore/src/verification/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Sorts them ready for blockchain insertion.

use std::thread::{JoinHandle, self};
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering};
use std::sync::{Condvar as SCondvar, Mutex as SMutex};
use util::*;
use io::*;
Expand Down Expand Up @@ -83,6 +83,13 @@ pub enum Status {
Unknown,
}

// the internal queue sizes.
struct Sizes {
unverified: AtomicUsize,
verifying: AtomicUsize,
verified: AtomicUsize,
}

/// A queue of items to be verified. Sits between network or other I/O and the `BlockChain`.
/// Keeps them in the same order as inserted, minus invalid items.
pub struct VerificationQueue<K: Kind> {
Expand Down Expand Up @@ -147,6 +154,7 @@ struct Verification<K: Kind> {
bad: Mutex<HashSet<H256>>,
more_to_verify: SMutex<()>,
empty: SMutex<()>,
sizes: Sizes,
}

impl<K: Kind> VerificationQueue<K> {
Expand All @@ -159,7 +167,11 @@ impl<K: Kind> VerificationQueue<K> {
bad: Mutex::new(HashSet::new()),
more_to_verify: SMutex::new(()),
empty: SMutex::new(()),

sizes: Sizes {
unverified: AtomicUsize::new(0),
verifying: AtomicUsize::new(0),
verified: AtomicUsize::new(0),
}
});
let more_to_verify = Arc::new(SCondvar::new());
let deleting = Arc::new(AtomicBool::new(false));
Expand Down Expand Up @@ -235,6 +247,7 @@ impl<K: Kind> VerificationQueue<K> {
None => continue,
};

verification.sizes.unverified.fetch_sub(item.heap_size_of_children(), AtomicOrdering::SeqCst);
verifying.push_back(Verifying { hash: item.hash(), output: None });
item
};
Expand All @@ -247,6 +260,8 @@ impl<K: Kind> VerificationQueue<K> {
for (i, e) in verifying.iter_mut().enumerate() {
if e.hash == hash {
idx = Some(i);

verification.sizes.verifying.fetch_add(verified.heap_size_of_children(), AtomicOrdering::SeqCst);
e.output = Some(verified);
break;
}
Expand All @@ -256,7 +271,7 @@ impl<K: Kind> VerificationQueue<K> {
// we're next!
let mut verified = verification.verified.lock();
let mut bad = verification.bad.lock();
VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad);
VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.sizes);
true
} else {
false
Expand All @@ -271,7 +286,7 @@ impl<K: Kind> VerificationQueue<K> {
verifying.retain(|e| e.hash != hash);

if verifying.front().map_or(false, |x| x.output.is_some()) {
VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad);
VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.sizes);
true
} else {
false
Expand All @@ -285,16 +300,24 @@ impl<K: Kind> VerificationQueue<K> {
}
}

fn drain_verifying(verifying: &mut VecDeque<Verifying<K>>, verified: &mut VecDeque<K::Verified>, bad: &mut HashSet<H256>) {
fn drain_verifying(verifying: &mut VecDeque<Verifying<K>>, verified: &mut VecDeque<K::Verified>, bad: &mut HashSet<H256>, sizes: &Sizes) {
let mut removed_size = 0;
let mut inserted_size = 0;
while let Some(output) = verifying.front_mut().and_then(|x| x.output.take()) {
assert!(verifying.pop_front().is_some());
let size = output.heap_size_of_children();
removed_size += size;

if bad.contains(&output.parent_hash()) {
bad.insert(output.hash());
} else {
inserted_size += size;
verified.push_back(output);
}
}

sizes.verifying.fetch_sub(removed_size, AtomicOrdering::SeqCst);
sizes.verified.fetch_add(inserted_size, AtomicOrdering::SeqCst);
}

/// Clear the queue and stop verification activity.
Expand All @@ -305,6 +328,12 @@ impl<K: Kind> VerificationQueue<K> {
unverified.clear();
verifying.clear();
verified.clear();

let sizes = &self.verification.sizes;
sizes.unverified.store(0, AtomicOrdering::Release);
sizes.verifying.store(0, AtomicOrdering::Release);
sizes.verified.store(0, AtomicOrdering::Release);

self.processing.write().clear();
}

Expand Down Expand Up @@ -348,6 +377,8 @@ impl<K: Kind> VerificationQueue<K> {

match K::create(input, &*self.engine) {
Ok(item) => {
self.verification.sizes.unverified.fetch_add(item.heap_size_of_children(), AtomicOrdering::SeqCst);

self.processing.write().insert(h.clone());
self.verification.unverified.lock().push_back(item);
self.more_to_verify.notify_all();
Expand Down Expand Up @@ -377,14 +408,18 @@ impl<K: Kind> VerificationQueue<K> {
}

let mut new_verified = VecDeque::new();
let mut removed_size = 0;
for output in verified.drain(..) {
if bad.contains(&output.parent_hash()) {
removed_size += output.heap_size_of_children();
bad.insert(output.hash());
processing.remove(&output.hash());
} else {
new_verified.push_back(output);
}
}

self.verification.sizes.verified.fetch_sub(removed_size, AtomicOrdering::SeqCst);
*verified = new_verified;
}

Expand All @@ -407,6 +442,9 @@ impl<K: Kind> VerificationQueue<K> {
let count = min(max, verified.len());
let result = verified.drain(..count).collect::<Vec<_>>();

let drained_size = result.iter().map(HeapSizeOf::heap_size_of_children).fold(0, |a, c| a + c);
self.verification.sizes.verified.fetch_sub(drained_size, AtomicOrdering::SeqCst);

self.ready_signal.reset();
if !verified.is_empty() {
self.ready_signal.set_async();
Expand All @@ -416,17 +454,23 @@ impl<K: Kind> VerificationQueue<K> {

/// Get queue status.
pub fn queue_info(&self) -> QueueInfo {
use std::mem::size_of;

let (unverified_len, unverified_bytes) = {
let v = self.verification.unverified.lock();
(v.len(), v.heap_size_of_children())
let len = self.verification.unverified.lock().len();
let size = self.verification.sizes.unverified.load(AtomicOrdering::Acquire);

(len, size + len * size_of::<K::Unverified>())
};
let (verifying_len, verifying_bytes) = {
let v = self.verification.verifying.lock();
(v.len(), v.heap_size_of_children())
let len = self.verification.verifying.lock().len();
let size = self.verification.sizes.verifying.load(AtomicOrdering::Acquire);
(len, size + len * size_of::<Verifying<K>>())
};
let (verified_len, verified_bytes) = {
let v = self.verification.verified.lock();
(v.len(), v.heap_size_of_children())
let len = self.verification.verified.lock().len();
let size = self.verification.sizes.verified.load(AtomicOrdering::Acquire);
(len, size + len * size_of::<K::Verified>())
};

QueueInfo {
Expand Down