Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Detect obsolete block responses (#6077)
Browse files Browse the repository at this point in the history
  • Loading branch information
arkpar authored May 19, 2020
1 parent 9891eab commit 812d94d
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 38 deletions.
4 changes: 2 additions & 2 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,13 +311,13 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<block_requests::Event<B
build_time: total_handling_time,
});
},
block_requests::Event::Response { peer, original_request, response, request_duration } => {
block_requests::Event::Response { peer, original_request: _, response, request_duration } => {
self.events.push_back(BehaviourOut::RequestFinished {
peer: peer.clone(),
protocol: self.block_requests.protocol_name().to_vec(),
request_duration,
});
let ev = self.substrate.on_block_response(peer, original_request, response);
let ev = self.substrate.on_block_response(peer, response);
self.inject_event(ev);
}
block_requests::Event::RequestCancelled { peer, request_duration, .. } |
Expand Down
97 changes: 61 additions & 36 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,30 +548,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
self.sync.update_chain_info(&info.best_hash, info.best_number);
}

/// Accepts a response from the legacy substream and determines what the corresponding
/// request was.
fn handle_response(
&mut self,
who: PeerId,
response: &message::BlockResponse<B>
) -> Option<message::BlockRequest<B>> {
if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) {
if peer.obsolete_requests.remove(&response.id).is_some() {
trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", who, response.id);
return None;
}
// Clear the request. If the response is invalid peer will be disconnected anyway.
let request = peer.block_request.take();
if request.as_ref().map_or(false, |(_, r)| r.id == response.id) {
return request.map(|(_, r)| r)
}
trace!(target: "sync", "Unexpected response packet from {} ({})", who, response.id);
self.peerset_handle.report_peer(who.clone(), rep::UNEXPECTED_RESPONSE);
self.behaviour.disconnect_peer(&who);
}
None
}

fn update_peer_info(&mut self, who: &PeerId) {
if let Some(info) = self.sync.peer_info(who) {
if let Some(ref mut peer) = self.context_data.peers.get_mut(who) {
Expand Down Expand Up @@ -609,11 +585,9 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
GenericMessage::Status(s) => return self.on_status_message(who, s),
GenericMessage::BlockRequest(r) => self.on_block_request(who, r),
GenericMessage::BlockResponse(r) => {
if let Some(request) = self.handle_response(who.clone(), &r) {
let outcome = self.on_block_response(who.clone(), request, r);
self.update_peer_info(&who);
return outcome
}
let outcome = self.on_block_response(who.clone(), r);
self.update_peer_info(&who);
return outcome
},
GenericMessage::BlockAnnounce(announce) => {
let outcome = self.on_block_announce(who.clone(), announce);
Expand Down Expand Up @@ -705,6 +679,10 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
);
}

fn update_peer_request(&mut self, who: &PeerId, request: &mut message::BlockRequest<B>) {
update_peer_request::<B, H>(&mut self.context_data.peers, who, request)
}

/// Called when a new peer is connected
pub fn on_peer_connected(&mut self, who: PeerId) {
trace!(target: "sync", "Connecting {}", who);
Expand Down Expand Up @@ -844,9 +822,34 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
pub fn on_block_response(
&mut self,
peer: PeerId,
request: message::BlockRequest<B>,
response: message::BlockResponse<B>,
) -> CustomMessageOutcome<B> {
let request = if let Some(ref mut p) = self.context_data.peers.get_mut(&peer) {
if p.obsolete_requests.remove(&response.id).is_some() {
trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", peer, response.id);
return CustomMessageOutcome::None;
}
// Clear the request. If the response is invalid peer will be disconnected anyway.
match p.block_request.take() {
Some((_, request)) if request.id == response.id => request,
Some(_) => {
trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", peer, response.id);
return CustomMessageOutcome::None;
}
None => {
trace!(target: "sync", "Unexpected response packet from unknown peer {}", peer);
self.behaviour.disconnect_peer(&peer);
self.peerset_handle.report_peer(peer, rep::UNEXPECTED_RESPONSE);
return CustomMessageOutcome::None;
}
}
} else {
trace!(target: "sync", "Unexpected response packet from unknown peer {}", peer);
self.behaviour.disconnect_peer(&peer);
self.peerset_handle.report_peer(peer, rep::UNEXPECTED_RESPONSE);
return CustomMessageOutcome::None;
};

let blocks_range = || match (
response.blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
Expand Down Expand Up @@ -891,8 +894,9 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
match self.sync.on_block_data(&peer, Some(request), response) {
Ok(sync::OnBlockData::Import(origin, blocks)) =>
CustomMessageOutcome::BlockImport(origin, blocks),
Ok(sync::OnBlockData::Request(peer, req)) => {
Ok(sync::OnBlockData::Request(peer, mut req)) => {
if self.use_new_block_requests_protocol {
self.update_peer_request(&peer, &mut req);
CustomMessageOutcome::BlockRequest {
target: peer,
request: req,
Expand Down Expand Up @@ -1076,8 +1080,9 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
if info.roles.is_full() {
match self.sync.new_peer(who.clone(), info.best_hash, info.best_number) {
Ok(None) => (),
Ok(Some(req)) => {
Ok(Some(mut req)) => {
if self.use_new_block_requests_protocol {
self.update_peer_request(&who, &mut req);
self.pending_messages.push_back(CustomMessageOutcome::BlockRequest {
target: who.clone(),
request: req,
Expand Down Expand Up @@ -1413,8 +1418,9 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
Ok(sync::OnBlockData::Import(origin, blocks)) => {
CustomMessageOutcome::BlockImport(origin, blocks)
},
Ok(sync::OnBlockData::Request(peer, req)) => {
Ok(sync::OnBlockData::Request(peer, mut req)) => {
if self.use_new_block_requests_protocol {
self.update_peer_request(&peer, &mut req);
CustomMessageOutcome::BlockRequest {
target: peer,
request: req,
Expand Down Expand Up @@ -1520,8 +1526,9 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
);
for result in results {
match result {
Ok((id, req)) => {
Ok((id, mut req)) => {
if self.use_new_block_requests_protocol {
update_peer_request(&mut self.context_data.peers, &id, &mut req);
self.pending_messages.push_back(CustomMessageOutcome::BlockRequest {
target: id,
request: req,
Expand Down Expand Up @@ -1935,6 +1942,22 @@ fn send_request<B: BlockT, H: ExHashT>(
send_message::<B>(behaviour, stats, who, None, message)
}

fn update_peer_request<B: BlockT, H: ExHashT>(
peers: &mut HashMap<PeerId, Peer<B, H>>,
who: &PeerId,
request: &mut message::BlockRequest<B>,
) {
if let Some(ref mut peer) = peers.get_mut(who) {
request.id = peer.next_request_id;
peer.next_request_id += 1;
if let Some((timestamp, request)) = peer.block_request.take() {
trace!(target: "sync", "Request {} for {} is now obsolete.", request.id, who);
peer.obsolete_requests.insert(request.id, timestamp);
}
peer.block_request = Some((Instant::now(), request.clone()));
}
}

fn send_message<B: BlockT>(
behaviour: &mut GenericProto,
stats: &mut HashMap<&'static str, PacketStats>,
Expand Down Expand Up @@ -2012,8 +2035,9 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
self.propagate_extrinsics();
}

for (id, r) in self.sync.block_requests() {
for (id, mut r) in self.sync.block_requests() {
if self.use_new_block_requests_protocol {
update_peer_request(&mut self.context_data.peers, &id, &mut r);
let event = CustomMessageOutcome::BlockRequest {
target: id.clone(),
request: r,
Expand All @@ -2029,8 +2053,9 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
)
}
}
for (id, r) in self.sync.justification_requests() {
for (id, mut r) in self.sync.justification_requests() {
if self.use_new_block_requests_protocol {
update_peer_request(&mut self.context_data.peers, &id, &mut r);
let event = CustomMessageOutcome::BlockRequest {
target: id,
request: r,
Expand Down

0 comments on commit 812d94d

Please sign in to comment.