diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index 5efa73f55c..967acd3af9 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -547,30 +547,35 @@ impl Gateway { bail!("Dropping '{peer_ip}' for spamming events (num_events = {num_events})") } // Rate limit for duplicate requests. - if matches!(&event, &Event::CertificateRequest(_) | &Event::CertificateResponse(_)) { - // Retrieve the certificate ID. - let certificate_id = match &event { - Event::CertificateRequest(CertificateRequest { certificate_id }) => *certificate_id, - Event::CertificateResponse(CertificateResponse { certificate }) => certificate.id(), - _ => unreachable!(), - }; - // Skip processing this certificate if the rate limit was exceed (i.e. someone is spamming a specific certificate). - let num_events = self.cache.insert_inbound_certificate(certificate_id, CACHE_REQUESTS_INTERVAL); - if num_events >= self.max_cache_duplicates() { - return Ok(()); + match event { + Event::CertificateRequest(_) | Event::CertificateResponse(_) => { + // Retrieve the certificate ID. + let certificate_id = match &event { + Event::CertificateRequest(CertificateRequest { certificate_id }) => *certificate_id, + Event::CertificateResponse(CertificateResponse { certificate }) => certificate.id(), + _ => unreachable!(), + }; + // Skip processing this certificate if the rate limit was exceed (i.e. someone is spamming a specific certificate). + let num_events = self.cache.insert_inbound_certificate(certificate_id, CACHE_REQUESTS_INTERVAL); + if num_events >= self.max_cache_duplicates() { + return Ok(()); + } + } + Event::TransmissionRequest(TransmissionRequest { transmission_id }) + | Event::TransmissionResponse(TransmissionResponse { transmission_id, .. }) => { + // Skip processing this certificate if the rate limit was exceeded (i.e. someone is spamming a specific certificate). + let num_events = self.cache.insert_inbound_transmission(transmission_id, CACHE_REQUESTS_INTERVAL); + if num_events >= self.max_cache_duplicates() { + return Ok(()); + } } - } else if matches!(&event, &Event::TransmissionRequest(_) | Event::TransmissionResponse(_)) { - // Retrieve the transmission ID. - let transmission_id = match &event { - Event::TransmissionRequest(TransmissionRequest { transmission_id }) => *transmission_id, - Event::TransmissionResponse(TransmissionResponse { transmission_id, .. }) => *transmission_id, - _ => unreachable!(), - }; - // Skip processing this certificate if the rate limit was exceeded (i.e. someone is spamming a specific certificate). - let num_events = self.cache.insert_inbound_transmission(transmission_id, CACHE_REQUESTS_INTERVAL); - if num_events >= self.max_cache_duplicates() { - return Ok(()); + Event::BlockRequest(_) => { + let num_events = self.cache.insert_inbound_block_request(peer_ip, CACHE_REQUESTS_INTERVAL); + if num_events >= self.max_cache_duplicates() { + return Ok(()); + } } + _ => {} } trace!("{CONTEXT} Received '{}' from '{peer_ip}'", event.name()); @@ -632,6 +637,10 @@ impl Gateway { if let Some(sync_sender) = self.sync_sender.get() { // Retrieve the block response. let BlockResponse { request, blocks } = block_response; + // Check the response corresponds to a request. + if !self.cache.remove_outbound_block_request(peer_ip, &request) { + bail!("Unsolicited block response from '{peer_ip}'") + } // Perform the deferred non-blocking deserialization of the blocks. let blocks = blocks.deserialize().await.map_err(|error| anyhow!("[BlockResponse] {error}"))?; // Ensure the block response is well-formed. @@ -980,24 +989,30 @@ impl Transport for Gateway { }}; } - // If the event type is a certificate request, increment the cache. - if matches!(event, Event::CertificateRequest(_)) | matches!(event, Event::CertificateResponse(_)) { - // Update the outbound event cache. This is necessary to ensure we don't under count the outbound events. - self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL); - // Send the event to the peer. - send!(self, insert_outbound_certificate, CACHE_REQUESTS_INTERVAL, max_cache_certificates) - } - // If the event type is a transmission request, increment the cache. - else if matches!(event, Event::TransmissionRequest(_)) | matches!(event, Event::TransmissionResponse(_)) { - // Update the outbound event cache. This is necessary to ensure we don't under count the outbound events. - self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL); - // Send the event to the peer. - send!(self, insert_outbound_transmission, CACHE_REQUESTS_INTERVAL, max_cache_transmissions) - } - // Otherwise, employ a general rate limit. - else { - // Send the event to the peer. - send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events) + // Increment the cache for certificate, transmission and block events. + match event { + Event::CertificateRequest(_) | Event::CertificateResponse(_) => { + // Update the outbound event cache. This is necessary to ensure we don't under count the outbound events. + self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL); + // Send the event to the peer. + send!(self, insert_outbound_certificate, CACHE_REQUESTS_INTERVAL, max_cache_certificates) + } + Event::TransmissionRequest(_) | Event::TransmissionResponse(_) => { + // Update the outbound event cache. This is necessary to ensure we don't under count the outbound events. + self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL); + // Send the event to the peer. + send!(self, insert_outbound_transmission, CACHE_REQUESTS_INTERVAL, max_cache_transmissions) + } + Event::BlockRequest(request) => { + // Insert the outbound request so we can match it to responses. + self.cache.insert_outbound_block_request(peer_ip, request); + // Send the event to the peer and updatet the outbound event cache, use the general rate limit. + send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events) + } + _ => { + // Send the event to the peer, use the general rate limit. + send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events) + } } } @@ -1091,6 +1106,7 @@ impl Disconnect for Gateway { // This is sufficient to avoid infinite growth as the committee has a fixed number // of members. self.cache.clear_outbound_validators_requests(peer_ip); + self.cache.clear_outbound_block_requests(peer_ip); } } } diff --git a/node/bft/src/helpers/cache.rs b/node/bft/src/helpers/cache.rs index 3d4cb9b66b..24a0db119f 100644 --- a/node/bft/src/helpers/cache.rs +++ b/node/bft/src/helpers/cache.rs @@ -13,12 +13,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::events::BlockRequest; use snarkvm::{console::types::Field, ledger::narwhal::TransmissionID, prelude::Network}; use core::hash::Hash; use parking_lot::RwLock; use std::{ - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, HashMap, HashSet}, net::{IpAddr, SocketAddr}, }; use time::OffsetDateTime; @@ -33,6 +34,8 @@ pub struct Cache { seen_inbound_certificates: RwLock, u32>>>, /// The ordered timestamp map of transmission IDs and cache hits. seen_inbound_transmissions: RwLock, u32>>>, + /// The ordered timestamp map of inbound block requests and cache hits. + seen_inbound_block_requests: RwLock>>, /// The ordered timestamp map of peer IPs and their cache hits on outbound events. seen_outbound_events: RwLock>>, /// The ordered timestamp map of peer IPs and their cache hits on certificate requests. @@ -41,6 +44,8 @@ pub struct Cache { seen_outbound_transmissions: RwLock>>, /// The map of IPs to the number of validators requests. seen_outbound_validators_requests: RwLock>, + /// The ordered timestamp map of outbound block requests and cache hits. + seen_outbound_block_requests: RwLock>>, } impl Default for Cache { @@ -58,10 +63,12 @@ impl Cache { seen_inbound_events: Default::default(), seen_inbound_certificates: Default::default(), seen_inbound_transmissions: Default::default(), + seen_inbound_block_requests: Default::default(), seen_outbound_events: Default::default(), seen_outbound_certificates: Default::default(), seen_outbound_transmissions: Default::default(), seen_outbound_validators_requests: Default::default(), + seen_outbound_block_requests: Default::default(), } } } @@ -86,6 +93,11 @@ impl Cache { pub fn insert_inbound_transmission(&self, key: TransmissionID, interval_in_secs: i64) -> usize { Self::retain_and_insert(&self.seen_inbound_transmissions, key, interval_in_secs) } + + /// Inserts a block request into the cache, returning the number of recent events. + pub fn insert_inbound_block_request(&self, key: SocketAddr, interval_in_secs: i64) -> usize { + Self::retain_and_insert(&self.seen_inbound_block_requests, key, interval_in_secs) + } } impl Cache { @@ -125,6 +137,25 @@ impl Cache { pub fn clear_outbound_validators_requests(&self, peer_ip: SocketAddr) { self.seen_outbound_validators_requests.write().remove(&peer_ip); } + + /// Inserts the block request for the given peer. + pub fn insert_outbound_block_request(&self, peer_ip: SocketAddr, request: BlockRequest) { + self.seen_outbound_block_requests.write().entry(peer_ip).or_default().insert(request); + } + + /// Removes the block request for the given peer. Returns whether the request was present. + pub fn remove_outbound_block_request(&self, peer_ip: SocketAddr, request: &BlockRequest) -> bool { + self.seen_outbound_block_requests + .write() + .get_mut(&peer_ip) + .map(|requests| requests.remove(request)) + .unwrap_or(false) + } + + /// Clears the peer's number of outbound block requests. + pub fn clear_outbound_block_requests(&self, peer_ip: SocketAddr) { + self.seen_outbound_block_requests.write().remove(&peer_ip); + } } impl Cache {