Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Block import optimization #2748

Merged
merged 2 commits into from
Oct 20, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ethcore/res/ethereum/tests
Submodule tests updated 0 files
25 changes: 12 additions & 13 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,16 +361,19 @@ impl Client {

/// This is triggered by a message coming from a block queue when the block is ready for insertion
pub fn import_verified_blocks(&self) -> usize {
let max_blocks_to_import = 64;
let (imported_blocks, import_results, invalid_blocks, imported, duration) = {
let max_blocks_to_import = 4;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This prevents flushing buffer from growing too large.

let (imported_blocks, import_results, invalid_blocks, imported, duration, is_empty) = {
let mut imported_blocks = Vec::with_capacity(max_blocks_to_import);
let mut invalid_blocks = HashSet::new();
let mut import_results = Vec::with_capacity(max_blocks_to_import);

let _import_lock = self.import_lock.lock();
let blocks = self.block_queue.drain(max_blocks_to_import);
if blocks.is_empty() {
return 0;
}
let _timer = PerfTimer::new("import_verified_blocks");
let start = precise_time_ns();
let blocks = self.block_queue.drain(max_blocks_to_import);

for block in blocks {
let header = &block.header;
Expand All @@ -394,23 +397,19 @@ impl Client {
let imported = imported_blocks.len();
let invalid_blocks = invalid_blocks.into_iter().collect::<Vec<H256>>();

{
if !invalid_blocks.is_empty() {
self.block_queue.mark_as_bad(&invalid_blocks);
}
if !imported_blocks.is_empty() {
self.block_queue.mark_as_good(&imported_blocks);
}
if !invalid_blocks.is_empty() {
self.block_queue.mark_as_bad(&invalid_blocks);
}
let is_empty = self.block_queue.mark_as_good(&imported_blocks);
let duration_ns = precise_time_ns() - start;
(imported_blocks, import_results, invalid_blocks, imported, duration_ns)
(imported_blocks, import_results, invalid_blocks, imported, duration_ns, is_empty)
};

{
if !imported_blocks.is_empty() && self.block_queue.queue_info().is_empty() {
if !imported_blocks.is_empty() && is_empty {
let (enacted, retracted) = self.calculate_enacted_retracted(&import_results);

if self.queue_info().is_empty() {
if is_empty {
self.miner.chain_new_blocks(self, &imported_blocks, &invalid_blocks, &enacted, &retracted);
}

Expand Down
1 change: 1 addition & 0 deletions ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ impl Miner {
trace!(target: "miner", "prepare_block: done recalibration.");
}

let _timer = PerfTimer::new("prepare_block");
let (transactions, mut open_block, original_work_hash) = {
let transactions = {self.transaction_queue.lock().top_transactions()};
let mut sealing_work = self.sealing_work.lock();
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ fn can_handle_long_fork() {
push_blocks_to_client(client, 49, 1201, 800);
push_blocks_to_client(client, 53, 1201, 600);

for _ in 0..40 {
for _ in 0..400 {
client.import_verified_blocks();
}
assert_eq!(2000, client.chain_info().best_block_number);
Expand Down
53 changes: 37 additions & 16 deletions ethcore/src/verification/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,21 @@ struct QueueSignal {

impl QueueSignal {
#[cfg_attr(feature="dev", allow(bool_comparison))]
fn set(&self) {
fn set_sync(&self) {
// Do not signal when we are about to close
if self.deleting.load(AtomicOrdering::Relaxed) {
return;
}

if self.signalled.compare_and_swap(false, true, AtomicOrdering::Relaxed) == false {
if let Err(e) = self.message_channel.send_sync(ClientIoMessage::BlockVerified) {
debug!("Error sending BlockVerified message: {:?}", e);
}
}
}

#[cfg_attr(feature="dev", allow(bool_comparison))]
fn set_async(&self) {
// Do not signal when we are about to close
if self.deleting.load(AtomicOrdering::Relaxed) {
return;
Expand All @@ -128,8 +142,8 @@ impl QueueSignal {
struct Verification<K: Kind> {
// All locks must be captured in the order declared here.
unverified: Mutex<VecDeque<K::Unverified>>,
verified: Mutex<VecDeque<K::Verified>>,
verifying: Mutex<VecDeque<Verifying<K>>>,
verified: Mutex<VecDeque<K::Verified>>,
bad: Mutex<HashSet<H256>>,
more_to_verify: SMutex<()>,
empty: SMutex<()>,
Expand All @@ -140,8 +154,8 @@ impl<K: Kind> VerificationQueue<K> {
pub fn new(config: Config, engine: Arc<Engine>, message_channel: IoChannel<ClientIoMessage>) -> Self {
let verification = Arc::new(Verification {
unverified: Mutex::new(VecDeque::new()),
verified: Mutex::new(VecDeque::new()),
verifying: Mutex::new(VecDeque::new()),
verified: Mutex::new(VecDeque::new()),
bad: Mutex::new(HashSet::new()),
more_to_verify: SMutex::new(()),
empty: SMutex::new(()),
Expand Down Expand Up @@ -226,7 +240,7 @@ impl<K: Kind> VerificationQueue<K> {
};

let hash = item.hash();
match K::verify(item, &*engine) {
let is_ready = match K::verify(item, &*engine) {
Ok(verified) => {
let mut verifying = verification.verifying.lock();
let mut idx = None;
Expand All @@ -243,7 +257,9 @@ impl<K: Kind> VerificationQueue<K> {
let mut verified = verification.verified.lock();
let mut bad = verification.bad.lock();
VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad);
ready.set();
true
} else {
false
}
},
Err(_) => {
Expand All @@ -256,9 +272,15 @@ impl<K: Kind> VerificationQueue<K> {

if verifying.front().map_or(false, |x| x.output.is_some()) {
VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad);
ready.set();
true
} else {
false
}
}
};
if is_ready {
// Import the block immediately
ready.set_sync();
}
}
}
Expand Down Expand Up @@ -366,15 +388,17 @@ impl<K: Kind> VerificationQueue<K> {
*verified = new_verified;
}

/// Mark given item as processed
pub fn mark_as_good(&self, hashes: &[H256]) {
/// Mark given item as processed.
/// Returns true if the queue becomes empty.
pub fn mark_as_good(&self, hashes: &[H256]) -> bool {
if hashes.is_empty() {
return;
return self.processing.read().is_empty();
}
let mut processing = self.processing.write();
for hash in hashes {
processing.remove(hash);
}
processing.is_empty()
}

/// Removes up to `max` verified items from the queue
Expand All @@ -385,7 +409,7 @@ impl<K: Kind> VerificationQueue<K> {

self.ready_signal.reset();
if !verified.is_empty() {
self.ready_signal.set();
self.ready_signal.set_async();
}
result
}
Expand All @@ -411,12 +435,9 @@ impl<K: Kind> VerificationQueue<K> {
verified_queue_size: verified_len,
max_queue_size: self.max_queue_size,
max_mem_use: self.max_mem_use,
mem_used:
unverified_bytes
+ verifying_bytes
+ verified_bytes
// TODO: https://github.com/servo/heapsize/pull/50
//+ self.processing.read().heap_size_of_children(),
mem_used: unverified_bytes
+ verifying_bytes
+ verified_bytes
}
}

Expand Down
Loading