diff --git a/ethcore/src/verification/queue/kind.rs b/ethcore/src/verification/queue/kind.rs index fbc6346c9c0..9735187265a 100644 --- a/ethcore/src/verification/queue/kind.rs +++ b/ethcore/src/verification/queue/kind.rs @@ -113,6 +113,7 @@ pub mod blocks { } /// An unverified block. + #[derive(PartialEq, Debug)] pub struct Unverified { /// Unverified block header. pub header: Header, diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 588bfc0c711..4c229cd87ae 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -23,12 +23,11 @@ use std::cmp; use heapsize::HeapSizeOf; use ethereum_types::H256; use rlp::{self, Rlp}; -use ethcore::header::{BlockNumber, Header as BlockHeader}; +use ethcore::header::BlockNumber; use ethcore::client::{BlockStatus, BlockId, BlockImportError, BlockImportErrorKind}; use ethcore::error::{ImportErrorKind, BlockError}; -use ethcore::verification::queue::kind::blocks::Unverified; use sync_io::SyncIo; -use blocks::BlockCollection; +use blocks::{BlockCollection, SyncBody, SyncHeader}; const MAX_HEADERS_TO_REQUEST: usize = 128; const MAX_BODIES_TO_REQUEST: usize = 32; @@ -236,45 +235,39 @@ impl BlockDownloader { let mut valid_response = item_count == 0; //empty response is valid let mut any_known = false; for i in 0..item_count { - let info: BlockHeader = r.val_at(i).map_err(|e| { - trace!(target: "sync", "Error decoding block header RLP: {:?}", e); - BlockDownloaderImportError::Invalid - })?; - let number = BlockNumber::from(info.number()); + let info = SyncHeader::from_rlp(r.at(i)?.as_raw().to_vec())?; + let number = BlockNumber::from(info.header.number()); + let hash = info.header.hash(); // Check if any of the headers matches the hash we requested if !valid_response { if let Some(expected) = expected_hash { - valid_response = expected == info.hash() + valid_response = expected == hash; } } - any_known = any_known || self.blocks.contains_head(&info.hash()); - if self.blocks.contains(&info.hash()) { - trace!(target: "sync", "Skipping existing block header {} ({:?})", number, info.hash()); + any_known = any_known || self.blocks.contains_head(&hash); + if self.blocks.contains(&hash) { + trace!(target: "sync", "Skipping existing block header {} ({:?})", number, hash); continue; } if self.highest_block.as_ref().map_or(true, |n| number > *n) { self.highest_block = Some(number); } - let hash = info.hash(); - let hdr = r.at(i).map_err(|e| { - trace!(target: "sync", "Error decoding block header RLP: {:?}", e); - BlockDownloaderImportError::Invalid - })?; + match io.chain().block_status(BlockId::Hash(hash.clone())) { BlockStatus::InChain | BlockStatus::Queued => { match self.state { State::Blocks => trace!(target: "sync", "Header already in chain {} ({})", number, hash), _ => trace!(target: "sync", "Header already in chain {} ({}), state = {:?}", number, hash, self.state), } - headers.push(hdr.as_raw().to_vec()); + headers.push(info); hashes.push(hash); }, BlockStatus::Bad => { return Err(BlockDownloaderImportError::Invalid); }, BlockStatus::Unknown | BlockStatus::Pending => { - headers.push(hdr.as_raw().to_vec()); + headers.push(info); hashes.push(hash); } } @@ -325,19 +318,15 @@ impl BlockDownloader { let item_count = r.item_count().unwrap_or(0); if item_count == 0 { return Err(BlockDownloaderImportError::Useless); - } - else if self.state != State::Blocks { + } else if self.state != State::Blocks { trace!(target: "sync", "Ignored unexpected block bodies"); - } - else { + } else { let mut bodies = Vec::with_capacity(item_count); for i in 0..item_count { - let body = r.at(i).map_err(|e| { - trace!(target: "sync", "Error decoding block boides RLP: {:?}", e); - BlockDownloaderImportError::Invalid - })?; - bodies.push(body.as_raw().to_vec()); + let body = SyncBody::from_rlp(r.at(i)?.as_raw())?; + bodies.push(body); } + if self.blocks.insert_bodies(bodies) != item_count { trace!(target: "sync", "Deactivating peer for giving invalid block bodies"); return Err(BlockDownloaderImportError::Invalid); @@ -483,15 +472,6 @@ impl BlockDownloader { let block = block_and_receipts.block; let receipts = block_and_receipts.receipts; - let block = match Unverified::from_rlp(block) { - Ok(block) => block, - Err(_) => { - debug!(target: "sync", "Bad block rlp"); - bad = true; - break; - } - }; - let h = block.header.hash(); let number = block.header.number(); let parent = *block.header.parent_hash(); diff --git a/ethcore/sync/src/blocks.rs b/ethcore/sync/src/blocks.rs index 248180b281d..a502cee9c55 100644 --- a/ethcore/sync/src/blocks.rs +++ b/ethcore/sync/src/blocks.rs @@ -23,36 +23,116 @@ use triehash_ethereum::ordered_trie_root; use bytes::Bytes; use rlp::{Rlp, RlpStream, DecoderError}; use network; -use ethcore::encoded::Block; -use ethcore::views::{HeaderView, BodyView}; use ethcore::header::Header as BlockHeader; +use ethcore::verification::queue::kind::blocks::Unverified; +use transaction::UnverifiedTransaction; known_heap_size!(0, HeaderId); type SmallHashVec = SmallVec<[H256; 1]>; +pub struct SyncHeader { + pub bytes: Bytes, + pub header: BlockHeader, +} + +impl HeapSizeOf for SyncHeader { + fn heap_size_of_children(&self) -> usize { + self.bytes.heap_size_of_children() + + self.header.heap_size_of_children() + } +} + +impl SyncHeader { + pub fn from_rlp(bytes: Bytes) -> Result { + let result = SyncHeader { + header: ::rlp::decode(&bytes)?, + bytes, + }; + + Ok(result) + } +} + +pub struct SyncBody { + pub transactions_bytes: Bytes, + pub transactions: Vec, + pub uncles_bytes: Bytes, + pub uncles: Vec, +} + +impl SyncBody { + pub fn from_rlp(bytes: &[u8]) -> Result { + let rlp = Rlp::new(bytes); + let transactions_rlp = rlp.at(0)?; + let uncles_rlp = rlp.at(1)?; + + let result = SyncBody { + transactions_bytes: transactions_rlp.as_raw().to_vec(), + transactions: transactions_rlp.as_list()?, + uncles_bytes: uncles_rlp.as_raw().to_vec(), + uncles: uncles_rlp.as_list()?, + }; + + Ok(result) + } + + fn empty_body() -> Self { + SyncBody { + transactions_bytes: ::rlp::EMPTY_LIST_RLP.to_vec(), + transactions: Vec::with_capacity(0), + uncles_bytes: ::rlp::EMPTY_LIST_RLP.to_vec(), + uncles: Vec::with_capacity(0), + } + } +} + +impl HeapSizeOf for SyncBody { + fn heap_size_of_children(&self) -> usize { + self.transactions_bytes.heap_size_of_children() + + self.transactions.heap_size_of_children() + + self.uncles_bytes.heap_size_of_children() + + self.uncles.heap_size_of_children() + } +} + /// Block data with optional body. struct SyncBlock { - header: Bytes, - body: Option, + header: SyncHeader, + body: Option, receipts: Option, receipts_root: H256, } +impl HeapSizeOf for SyncBlock { + fn heap_size_of_children(&self) -> usize { + self.header.heap_size_of_children() + self.body.heap_size_of_children() + } +} + +fn unverified_from_sync(header: SyncHeader, body: Option) -> Unverified { + let mut stream = RlpStream::new_list(3); + stream.append_raw(&header.bytes, 1); + let body = body.unwrap_or_else(SyncBody::empty_body); + stream.append_raw(&body.transactions_bytes, 1); + stream.append_raw(&body.uncles_bytes, 1); + + Unverified { + header: header.header, + transactions: body.transactions, + uncles: body.uncles, + bytes: stream.out().to_vec(), + } +} + /// Block with optional receipt pub struct BlockAndReceipts { /// Block data. - pub block: Bytes, + pub block: Unverified, /// Block receipts RLP list. pub receipts: Option, } -impl HeapSizeOf for SyncBlock { - fn heap_size_of_children(&self) -> usize { - self.header.heap_size_of_children() + self.body.heap_size_of_children() - } -} - /// Used to identify header by transactions and uncles hashes #[derive(Eq, PartialEq, Hash)] struct HeaderId { @@ -124,7 +204,7 @@ impl BlockCollection { } /// Insert a set of headers into collection and advance subchain head pointers. - pub fn insert_headers(&mut self, headers: Vec) { + pub fn insert_headers(&mut self, headers: Vec) { for h in headers { if let Err(e) = self.insert_header(h) { trace!(target: "sync", "Ignored invalid header: {:?}", e); @@ -134,7 +214,7 @@ impl BlockCollection { } /// Insert a collection of block bodies for previously downloaded headers. - pub fn insert_bodies(&mut self, bodies: Vec) -> usize { + pub fn insert_bodies(&mut self, bodies: Vec) -> usize { let mut inserted = 0; for b in bodies { if let Err(e) = self.insert_body(b) { @@ -278,30 +358,33 @@ impl BlockCollection { while let Some(h) = head { head = self.parents.get(&h).cloned(); if let Some(head) = head { - match self.blocks.get(&head) { - Some(block) if block.body.is_some() && (!self.need_receipts || block.receipts.is_some()) => { - blocks.push(block); - hashes.push(head); - self.head = Some(head); - } - _ => break, + match self.blocks.remove(&head) { + Some(block) => { + if block.body.is_some() && (!self.need_receipts || block.receipts.is_some()) { + blocks.push(block); + hashes.push(head); + self.head = Some(head); + } else { + self.blocks.insert(head, block); + break; + } + }, + _ => { + break; + }, } } } - for block in blocks { - let body = view!(BodyView, block.body.as_ref().expect("blocks contains only full blocks; qed")); - let header = view!(HeaderView, &block.header); - let block_view = Block::new_from_header_and_body(&header, &body); + for block in blocks.into_iter() { + let unverified = unverified_from_sync(block.header, block.body); drained.push(BlockAndReceipts { - block: block_view.into_inner(), + block: unverified, receipts: block.receipts.clone(), }); } } - for h in hashes { - self.blocks.remove(&h); - } + trace!(target: "sync", "Drained {} blocks, new head :{:?}", drained.len(), self.head); drained } @@ -337,26 +420,23 @@ impl BlockCollection { self.downloading_headers.contains(hash) || self.downloading_bodies.contains(hash) } - fn insert_body(&mut self, b: Bytes) -> Result<(), network::Error> { + fn insert_body(&mut self, body: SyncBody) -> Result<(), network::Error> { let header_id = { - let body = Rlp::new(&b); - let tx = body.at(0)?; - let tx_root = ordered_trie_root(tx.iter().map(|r| r.as_raw())); - let uncles = keccak(body.at(1)?.as_raw()); + let tx_root = ordered_trie_root(Rlp::new(&body.transactions_bytes).iter().map(|r| r.as_raw())); + let uncles = keccak(&body.uncles_bytes); HeaderId { transactions_root: tx_root, uncles: uncles } }; - match self.header_ids.get(&header_id).cloned() { + match self.header_ids.remove(&header_id) { Some(h) => { - self.header_ids.remove(&header_id); self.downloading_bodies.remove(&h); match self.blocks.get_mut(&h) { Some(ref mut block) => { trace!(target: "sync", "Got body {}", h); - block.body = Some(b); + block.body = Some(body); Ok(()) }, None => { @@ -401,54 +481,63 @@ impl BlockCollection { } } - fn insert_header(&mut self, header: Bytes) -> Result { - let info: BlockHeader = Rlp::new(&header).as_val()?; - let hash = info.hash(); + fn insert_header(&mut self, info: SyncHeader) -> Result { + let hash = info.header.hash(); if self.blocks.contains_key(&hash) { return Ok(hash); } + match self.head { None if hash == self.heads[0] => { trace!(target: "sync", "New head {}", hash); - self.head = Some(info.parent_hash().clone()); + self.head = Some(info.header.parent_hash().clone()); }, _ => () } - let mut block = SyncBlock { - header: header, - body: None, - receipts: None, - receipts_root: H256::new(), - }; let header_id = HeaderId { - transactions_root: info.transactions_root().clone(), - uncles: info.uncles_hash().clone(), + transactions_root: *info.header.transactions_root(), + uncles: *info.header.uncles_hash(), }; - if header_id.transactions_root == KECCAK_NULL_RLP && header_id.uncles == KECCAK_EMPTY_LIST_RLP { + + let body = if header_id.transactions_root == KECCAK_NULL_RLP && header_id.uncles == KECCAK_EMPTY_LIST_RLP { // empty body, just mark as downloaded - let mut body_stream = RlpStream::new_list(2); - body_stream.append_raw(&::rlp::EMPTY_LIST_RLP, 1); - body_stream.append_raw(&::rlp::EMPTY_LIST_RLP, 1); - block.body = Some(body_stream.out()); - } - else { - trace!("Queueing body tx_root = {:?}, uncles = {:?}, block = {:?}, number = {}", header_id.transactions_root, header_id.uncles, hash, info.number()); - self.header_ids.insert(header_id, hash.clone()); - } - if self.need_receipts { - let receipt_root = info.receipts_root().clone(); + Some(SyncBody::empty_body()) + } else { + trace!( + "Queueing body tx_root = {:?}, uncles = {:?}, block = {:?}, number = {}", + header_id.transactions_root, + header_id.uncles, + hash, + info.header.number() + ); + self.header_ids.insert(header_id, hash); + None + }; + + let (receipts, receipts_root) = if self.need_receipts { + let receipt_root = *info.header.receipts_root(); if receipt_root == KECCAK_NULL_RLP { let receipts_stream = RlpStream::new_list(0); - block.receipts = Some(receipts_stream.out()); + (Some(receipts_stream.out()), receipt_root) } else { - self.receipt_ids.entry(receipt_root).or_insert_with(|| SmallHashVec::new()).push(hash.clone()); + self.receipt_ids.entry(receipt_root).or_insert_with(|| SmallHashVec::new()).push(hash); + (None, receipt_root) } - block.receipts_root = receipt_root; - } + } else { + (None, H256::new()) + }; + + self.parents.insert(*info.header.parent_hash(), hash); + + let block = SyncBlock { + header: info, + body, + receipts, + receipts_root, + }; - self.parents.insert(info.parent_hash().clone(), hash.clone()); - self.blocks.insert(hash.clone(), block); + self.blocks.insert(hash, block); trace!(target: "sync", "New header: {:x}", hash); Ok(hash) } @@ -485,10 +574,11 @@ impl BlockCollection { #[cfg(test)] mod test { - use super::BlockCollection; + use super::{BlockCollection, SyncHeader}; use ethcore::client::{TestBlockChainClient, EachBlockWith, BlockId, BlockChainClient}; - use ethcore::views::HeaderView; use ethcore::header::BlockNumber; + use ethcore::verification::queue::kind::blocks::Unverified; + use ethcore::views::HeaderView; use rlp::*; fn is_empty(bc: &BlockCollection) -> bool { @@ -541,7 +631,7 @@ mod test { assert_eq!(bc.downloading_headers.len(), 1); assert!(bc.drain().is_empty()); - bc.insert_headers(headers[0..6].to_vec()); + bc.insert_headers(headers[0..6].iter().map(|h| SyncHeader::from_rlp(h.to_vec()).unwrap()).collect()); assert_eq!(hashes[5], bc.heads[0]); for h in &hashes[0..6] { bc.clear_header_download(h) @@ -550,7 +640,10 @@ mod test { assert!(!bc.is_downloading(&hashes[0])); assert!(bc.contains(&hashes[0])); - assert_eq!(&bc.drain().into_iter().map(|b| b.block).collect::>()[..], &blocks[0..6]); + assert_eq!( + bc.drain().into_iter().map(|b| b.block).collect::>(), + blocks[0..6].iter().map(|b| Unverified::from_rlp(b.to_vec()).unwrap()).collect::>() + ); assert!(!bc.contains(&hashes[0])); assert_eq!(hashes[5], bc.head.unwrap()); @@ -558,13 +651,17 @@ mod test { assert_eq!(hashes[5], h); let (h, _) = bc.needed_headers(6, false).unwrap(); assert_eq!(hashes[20], h); - bc.insert_headers(headers[10..16].to_vec()); + bc.insert_headers(headers[10..16].iter().map(|h| SyncHeader::from_rlp(h.to_vec()).unwrap()).collect()); assert!(bc.drain().is_empty()); - bc.insert_headers(headers[5..10].to_vec()); - assert_eq!(&bc.drain().into_iter().map(|b| b.block).collect::>()[..], &blocks[6..16]); + bc.insert_headers(headers[5..10].iter().map(|h| SyncHeader::from_rlp(h.to_vec()).unwrap()).collect()); + assert_eq!( + bc.drain().into_iter().map(|b| b.block).collect::>(), + blocks[6..16].iter().map(|b| Unverified::from_rlp(b.to_vec()).unwrap()).collect::>() + ); + assert_eq!(hashes[15], bc.heads[0]); - bc.insert_headers(headers[15..].to_vec()); + bc.insert_headers(headers[15..].iter().map(|h| SyncHeader::from_rlp(h.to_vec()).unwrap()).collect()); bc.drain(); assert!(bc.is_empty()); } @@ -584,11 +681,11 @@ mod test { let heads: Vec<_> = hashes.iter().enumerate().filter_map(|(i, h)| if i % 20 == 0 { Some(h.clone()) } else { None }).collect(); bc.reset_to(heads); - bc.insert_headers(headers[2..22].to_vec()); + bc.insert_headers(headers[2..22].iter().map(|h| SyncHeader::from_rlp(h.to_vec()).unwrap()).collect()); assert_eq!(hashes[0], bc.heads[0]); assert_eq!(hashes[21], bc.heads[1]); assert!(bc.head.is_none()); - bc.insert_headers(headers[0..2].to_vec()); + bc.insert_headers(headers[0..2].iter().map(|h| SyncHeader::from_rlp(h.to_vec()).unwrap()).collect()); assert!(bc.head.is_some()); assert_eq!(hashes[21], bc.heads[0]); } @@ -608,9 +705,9 @@ mod test { let heads: Vec<_> = hashes.iter().enumerate().filter_map(|(i, h)| if i % 20 == 0 { Some(h.clone()) } else { None }).collect(); bc.reset_to(heads); - bc.insert_headers(headers[1..2].to_vec()); + bc.insert_headers(headers[1..2].iter().map(|h| SyncHeader::from_rlp(h.to_vec()).unwrap()).collect()); assert!(bc.drain().is_empty()); - bc.insert_headers(headers[0..1].to_vec()); + bc.insert_headers(headers[0..1].iter().map(|h| SyncHeader::from_rlp(h.to_vec()).unwrap()).collect()); assert_eq!(bc.drain().len(), 2); } }