-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Conversation
/// May return false negative for longer queues | ||
pub fn processing_fork(&self, best_block_hash: &H256) -> bool { | ||
let processing = self.processing.read(); | ||
if processing.is_empty() || processing.len() > MAX_QUEUE_WITH_FORK { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this optimization is introduced to avoid spending too much time traversing the entire processing
map, right?
If we introduce a different mapping of ParentHash -> count_of_blocks_in_processing_that_build_on_top
we could do it in constant time for any parent hash.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I implemented this approach, tested it and found one drawback: it doesn't work well in the case, when node is catching up blocks. In this case imported blocks are inserted into the queue one by one, as a result the suggested data structure contains N parent hashes with count 1. N could be pretty huge. So I considered it as a significant overhead. As I didn't find good solution to address both situations, I've decided to revert it and leave the current approach for now.
any updates on this? |
I prepared the changes, requested by @tomusdrw, but I need to test them first before returning to review phase |
c1b5893
to
9fe53d6
Compare
self.reset(); | ||
if n == 0 { | ||
debug_sync!(self, "Header not found, bottom line reached, resetting, last imported: {}", self.last_imported_hash); | ||
self.reset_to_block(&best_hash, best); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, here the calling of simple reset is not enough, because it resets counters (last_imported_block) to the current round. In this case the rounds went too far and simple rollback inside round doesn't help. reset_to_block here actually resets block sync to its initial state
let block = Unverified::from_rlp(r.at(0)?.as_raw().to_vec())?; | ||
let hash = block.header.hash(); | ||
let number = block.header.number(); | ||
trace!(target: "sync", "{} -> NewBlock ({})", peer_id, hash); | ||
if number > sync.highest_block.unwrap_or(0) { | ||
sync.highest_block = Some(number); | ||
} | ||
let parent_hash = block.header.parent_hash(); | ||
let difficulty: U256 = r.val_at(1)?; | ||
// Most probably the sent block is being imported by peer right now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See item 2 in the addressed issues in PR's description
f1e0856
to
062cbec
Compare
Rebased to the new master cc @vorot93 |
self.imported_this_round = None; | ||
self.round_parents = VecDeque::new(); | ||
self.target_hash = None; | ||
self.retract_step = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about self.blocks
? It seems that reset_to
does reset them too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's being reset inside self.reset() call.
let block = Unverified::from_rlp(r.at(0)?.as_raw().to_vec())?; | ||
let hash = block.header.hash(); | ||
let number = block.header.number(); | ||
trace!(target: "sync", "{} -> NewBlock ({})", peer_id, hash); | ||
if number > sync.highest_block.unwrap_or(0) { | ||
sync.highest_block = Some(number); | ||
} | ||
let parent_hash = block.header.parent_hash(); | ||
let difficulty: U256 = r.val_at(1)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why read that from RLP again if we have decoded block
already? Isn't that simply part of the block.header
?
EDIT: Ah, seems it's expected to be Total Difficulty, isn't that part of the decode struct anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it's not. There is the usual Header there only. Also see some details in the the next question
ethcore/sync/src/chain/handler.rs
Outdated
let difficulty: U256 = r.val_at(1)?; | ||
// Most probably the sent block is being imported by peer right now | ||
// Use td and hash, that peer must have for now | ||
let parent_td = difficulty - block.header.difficulty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Would be good to add overflow protection
- How is that non zero? Does RLP really contain total difficulty and the header just this block's difficulty? I thought that we don't really send TD over the wire.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Added.
- Please refer to your code for sending NewBlock packet: https://github.com/openethereum/openethereum/pull/9954/files#diff-4cbf12eb53ed0891acc41ffd155a8930R478
As I've written in the description, I've looked through geth's logic for it as well. And they are doing exactly the same (send TD and extract it)
ethcore/sync/src/chain/mod.rs
Outdated
@@ -672,6 +703,8 @@ pub struct ChainSync { | |||
/// Connected peers pending Status message. | |||
/// Value is request timestamp. | |||
handshaking_peers: HashMap<PeerId, Instant>, | |||
/// Requests, that can not be processed at the moment | |||
delayed_requests: HashSet<(PeerId, u8, Vec<u8>)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we maintain ordering of the requests? With HashSet
they will get randomized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reworked to vec. Standard option with linked_hash_map doesn't suit because of MallocSizeOf bound
ethcore/sync/src/chain/supplier.rs
Outdated
match response { | ||
Err(e) => Err(e), | ||
Ok(Some((packet_id, rlp_stream))) => { | ||
io.respond(packet_id.id(), rlp_stream.out()).unwrap_or_else( | ||
|e| debug!(target: "sync", "{:?}", error_func(e))); | ||
Ok(()) | ||
} | ||
_ => Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
match response { | |
Err(e) => Err(e), | |
Ok(Some((packet_id, rlp_stream))) => { | |
io.respond(packet_id.id(), rlp_stream.out()).unwrap_or_else( | |
|e| debug!(target: "sync", "{:?}", error_func(e))); | |
Ok(()) | |
} | |
_ => Ok(()) | |
} | |
if let Some(packet_id, rlp_stream) = response? { | |
io.respond(packet_id.id(), rlp_stream.out()).unwrap_or_else( | |
|e| debug!(target: "sync", "{:?}", error_func(e))); | |
} | |
Ok(()) |
nit: That's a tiny bit shorter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No )) It's shorter because you omitted Err processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the error is returned with ?
after response
, but TBH, it makes the code harder to read, so I would be in favor of keeping it as is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aaa, sorry I missed this ?
} | ||
|
||
/// Dispatch delayed requests | ||
/// The main difference with dispatch packet is the direct send of the responses to the peer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we check if the peer is still connected? Not sure if it's worth it but it seems possible they went away?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added cleanup for disconnected peer
_ => { | ||
debug!(target:"sync", "Unexpected packet {} was dispatched for delayed processing", packet_id); | ||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this case really possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now, no indeed. But I've left this for future need, if some other packet is dispatched in postponed manner
}) | ||
match result { | ||
Err(PacketProcessError::Decoder(e)) => debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e), | ||
Err(PacketProcessError::ClientBusy) => sync.write().add_delayed_request(peer, packet_id, data), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can ensure that only GetBlockHeadersPacket
s get added here instead of performing that check when processing delayed requests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried to create a generic mechanism of packets with delayed response. IMO it's pretty useful and may be used in future
@@ -672,6 +700,8 @@ pub struct ChainSync { | |||
/// Connected peers pending Status message. | |||
/// Value is request timestamp. | |||
handshaking_peers: HashMap<PeerId, Instant>, | |||
/// Requests, that can not be processed at the moment | |||
delayed_requests: Vec<(PeerId, u8, Vec<u8>)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit worried that this is unbounded. How do we know it doesn't grow too big?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I addressed this by cleaning requests for disconnected peer (in terms of the other your question)
ethcore/sync/src/chain/mod.rs
Outdated
@@ -821,6 +852,21 @@ impl ChainSync { | |||
self.active_peers = self.peers.keys().cloned().collect(); | |||
} | |||
|
|||
/// Add a request for later processing | |||
pub fn add_delayed_request(&mut self, peer: PeerId, packet_id: u8, data: &[u8]) { | |||
if let Some(position) = self.delayed_requests.iter().position(|request| request.0 == peer && request.1 == packet_id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be more efficient to use binary_search_by()
here, to keep the vector sorted at all times?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made it a vector due to @tomusdrw very good suggestion to keep the order of insertions. So sorting won't help (if i understand your suggestion correctly).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the deduplication needed anyway? Is data
expected to be different? I'd just push
unconditionally and rather restrict the size of delayed_requests
to make sure we don't use up too much resources.
ethcore/sync/src/chain/mod.rs
Outdated
@@ -821,6 +852,21 @@ impl ChainSync { | |||
self.active_peers = self.peers.keys().cloned().collect(); | |||
} | |||
|
|||
/// Add a request for later processing | |||
pub fn add_delayed_request(&mut self, peer: PeerId, packet_id: u8, data: &[u8]) { | |||
if let Some(position) = self.delayed_requests.iter().position(|request| request.0 == peer && request.1 == packet_id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the deduplication needed anyway? Is data
expected to be different? I'd just push
unconditionally and rather restrict the size of delayed_requests
to make sure we don't use up too much resources.
@tomusdrw I thought here about spamming prevention. If node receives too many requests in such mode (processing fork), some requests will be lost, if we simply limit the size of this container. It would be a weird attack ofc, but nevertheless |
Right, with your solution you can still spam the node, but it's limited by Apart from that looks okay to me. |
e7f253a
to
4968d14
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good!
Purpose
This PR aims to fix issues in the following scenario during block sync:
There are two connected nodes A and F
Sync is going up to block N for both nodes
Node A receives notification from F about the new block with unknown parent. This notification can be caused by multiple reasons:
-- Fork happened and new subchain started from N block, in this case F sends notification about N'+1 block (and parent in N')
-- There were several "light" (light in terms of difficulty and import speed) blocks N+1, N+2, .. and one "heavy" block N+x after them in blocks import queue for F, in this case F may send notification only about "heavy" one N+x
-- Notifications about previous blocks were simple lost somehow
-- Some combination of all three
NOTE: Current block sync was designed with the assumption, that the node sends NewBlock message, only when the import of this block is completed. This assumption was changed by @tomusdrw in his Improve block and transaction propagation #9954, now the node sends this notification before importing blocks.
Node A starts retraction in order to find the beginning of possible fork, requesting headers for blocks N, N-1, N-2, N-3 etc from F
Meanwhile node F has not yet completed the import of the block, the notification about was sent to A, so it responds with old blocks, that do not consider new blocks, that are being imported
Node A quickly runs into N - 128 block (128 is the limit for number of requested headers)
Node F finishes new blocks import
Unfortunately retraction on node A went too far already and A requested 128 headers beginning from N - M block, where M > 128. As a result, F returns old 128 headers without new N' and N'+ 1
Also A doesn't receive the notifications about the new blocks from F (that could break the circle), because A constantly requests F, considers node F as "busy" and doesn't start new sync with it correspondingly.
Node A continues retraction to 0 and sync stops.
Addressed issues
This PR fixes several problems in handling of such scenario
Achieved results
So with all fixes in place the possibility of the described scenario is significantly decreased, the node A doesn't request blocks prematurely. Almost. Unfortunately, we observed situations during the testing, when retraction on node A still started and went behind the point of no return (block N - 128). This happened in pretty weird use cases of several forks in the row, when current block sync algorithm just simply could not handle such situation. In these cases I considered described retractions as legit, in spite of its predetermined result and required time (several minutes). I just fixed it in order to be properly restarted at the end, when no fork was found (item 3 in the list of addressed issues). As a result, the sync was paused for several minutes, but automatically recovered by itself and continued.
Comments
This problem is reproduced on configuration from #10724 when A and F run locally on one machine and A is only connected to F (F connected to the whole mainnet). As a result, network latency between A and F is low and A can quickly descend down to N - M block to the stuck condition. Also node A doesn't have any other sources of blocks but node F. For regular configurations stuck is less likely but it still causes overhead by sequences of GetBlock requests emitted by nodes, trying to find the common parent in such situation.
PS Huge shoutout to @iFA88 for the invaluable help in reproducing the scenarios and debugging