Skip to content

Commit

Permalink
Merge pull request #3369 from niklaslong/fix/3324-block-request-cache
Browse files Browse the repository at this point in the history
[Fix] Cache block requests on the gateway
  • Loading branch information
zosorock authored Oct 22, 2024
2 parents dae2509 + 6576f25 commit 2cb5444
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 41 deletions.
96 changes: 56 additions & 40 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,30 +547,35 @@ impl<N: Network> Gateway<N> {
bail!("Dropping '{peer_ip}' for spamming events (num_events = {num_events})")
}
// Rate limit for duplicate requests.
if matches!(&event, &Event::CertificateRequest(_) | &Event::CertificateResponse(_)) {
// Retrieve the certificate ID.
let certificate_id = match &event {
Event::CertificateRequest(CertificateRequest { certificate_id }) => *certificate_id,
Event::CertificateResponse(CertificateResponse { certificate }) => certificate.id(),
_ => unreachable!(),
};
// Skip processing this certificate if the rate limit was exceed (i.e. someone is spamming a specific certificate).
let num_events = self.cache.insert_inbound_certificate(certificate_id, CACHE_REQUESTS_INTERVAL);
if num_events >= self.max_cache_duplicates() {
return Ok(());
match event {
Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
// Retrieve the certificate ID.
let certificate_id = match &event {
Event::CertificateRequest(CertificateRequest { certificate_id }) => *certificate_id,
Event::CertificateResponse(CertificateResponse { certificate }) => certificate.id(),
_ => unreachable!(),
};
// Skip processing this certificate if the rate limit was exceed (i.e. someone is spamming a specific certificate).
let num_events = self.cache.insert_inbound_certificate(certificate_id, CACHE_REQUESTS_INTERVAL);
if num_events >= self.max_cache_duplicates() {
return Ok(());
}
}
Event::TransmissionRequest(TransmissionRequest { transmission_id })
| Event::TransmissionResponse(TransmissionResponse { transmission_id, .. }) => {
// Skip processing this certificate if the rate limit was exceeded (i.e. someone is spamming a specific certificate).
let num_events = self.cache.insert_inbound_transmission(transmission_id, CACHE_REQUESTS_INTERVAL);
if num_events >= self.max_cache_duplicates() {
return Ok(());
}
}
} else if matches!(&event, &Event::TransmissionRequest(_) | Event::TransmissionResponse(_)) {
// Retrieve the transmission ID.
let transmission_id = match &event {
Event::TransmissionRequest(TransmissionRequest { transmission_id }) => *transmission_id,
Event::TransmissionResponse(TransmissionResponse { transmission_id, .. }) => *transmission_id,
_ => unreachable!(),
};
// Skip processing this certificate if the rate limit was exceeded (i.e. someone is spamming a specific certificate).
let num_events = self.cache.insert_inbound_transmission(transmission_id, CACHE_REQUESTS_INTERVAL);
if num_events >= self.max_cache_duplicates() {
return Ok(());
Event::BlockRequest(_) => {
let num_events = self.cache.insert_inbound_block_request(peer_ip, CACHE_REQUESTS_INTERVAL);
if num_events >= self.max_cache_duplicates() {
return Ok(());
}
}
_ => {}
}
trace!("{CONTEXT} Received '{}' from '{peer_ip}'", event.name());

Expand Down Expand Up @@ -632,6 +637,10 @@ impl<N: Network> Gateway<N> {
if let Some(sync_sender) = self.sync_sender.get() {
// Retrieve the block response.
let BlockResponse { request, blocks } = block_response;
// Check the response corresponds to a request.
if !self.cache.remove_outbound_block_request(peer_ip, &request) {
bail!("Unsolicited block response from '{peer_ip}'")
}
// Perform the deferred non-blocking deserialization of the blocks.
let blocks = blocks.deserialize().await.map_err(|error| anyhow!("[BlockResponse] {error}"))?;
// Ensure the block response is well-formed.
Expand Down Expand Up @@ -980,24 +989,30 @@ impl<N: Network> Transport<N> for Gateway<N> {
}};
}

// If the event type is a certificate request, increment the cache.
if matches!(event, Event::CertificateRequest(_)) | matches!(event, Event::CertificateResponse(_)) {
// Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
// Send the event to the peer.
send!(self, insert_outbound_certificate, CACHE_REQUESTS_INTERVAL, max_cache_certificates)
}
// If the event type is a transmission request, increment the cache.
else if matches!(event, Event::TransmissionRequest(_)) | matches!(event, Event::TransmissionResponse(_)) {
// Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
// Send the event to the peer.
send!(self, insert_outbound_transmission, CACHE_REQUESTS_INTERVAL, max_cache_transmissions)
}
// Otherwise, employ a general rate limit.
else {
// Send the event to the peer.
send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
// Increment the cache for certificate, transmission and block events.
match event {
Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
// Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
// Send the event to the peer.
send!(self, insert_outbound_certificate, CACHE_REQUESTS_INTERVAL, max_cache_certificates)
}
Event::TransmissionRequest(_) | Event::TransmissionResponse(_) => {
// Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
// Send the event to the peer.
send!(self, insert_outbound_transmission, CACHE_REQUESTS_INTERVAL, max_cache_transmissions)
}
Event::BlockRequest(request) => {
// Insert the outbound request so we can match it to responses.
self.cache.insert_outbound_block_request(peer_ip, request);
// Send the event to the peer and updatet the outbound event cache, use the general rate limit.
send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
}
_ => {
// Send the event to the peer, use the general rate limit.
send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
}
}
}

Expand Down Expand Up @@ -1091,6 +1106,7 @@ impl<N: Network> Disconnect for Gateway<N> {
// This is sufficient to avoid infinite growth as the committee has a fixed number
// of members.
self.cache.clear_outbound_validators_requests(peer_ip);
self.cache.clear_outbound_block_requests(peer_ip);
}
}
}
Expand Down
33 changes: 32 additions & 1 deletion node/bft/src/helpers/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::events::BlockRequest;
use snarkvm::{console::types::Field, ledger::narwhal::TransmissionID, prelude::Network};

use core::hash::Hash;
use parking_lot::RwLock;
use std::{
collections::{BTreeMap, HashMap},
collections::{BTreeMap, HashMap, HashSet},
net::{IpAddr, SocketAddr},
};
use time::OffsetDateTime;
Expand All @@ -33,6 +34,8 @@ pub struct Cache<N: Network> {
seen_inbound_certificates: RwLock<BTreeMap<i64, HashMap<Field<N>, u32>>>,
/// The ordered timestamp map of transmission IDs and cache hits.
seen_inbound_transmissions: RwLock<BTreeMap<i64, HashMap<TransmissionID<N>, u32>>>,
/// The ordered timestamp map of inbound block requests and cache hits.
seen_inbound_block_requests: RwLock<BTreeMap<i64, HashMap<SocketAddr, u32>>>,
/// The ordered timestamp map of peer IPs and their cache hits on outbound events.
seen_outbound_events: RwLock<BTreeMap<i64, HashMap<SocketAddr, u32>>>,
/// The ordered timestamp map of peer IPs and their cache hits on certificate requests.
Expand All @@ -41,6 +44,8 @@ pub struct Cache<N: Network> {
seen_outbound_transmissions: RwLock<BTreeMap<i64, HashMap<SocketAddr, u32>>>,
/// The map of IPs to the number of validators requests.
seen_outbound_validators_requests: RwLock<HashMap<SocketAddr, u32>>,
/// The ordered timestamp map of outbound block requests and cache hits.
seen_outbound_block_requests: RwLock<HashMap<SocketAddr, HashSet<BlockRequest>>>,
}

impl<N: Network> Default for Cache<N> {
Expand All @@ -58,10 +63,12 @@ impl<N: Network> Cache<N> {
seen_inbound_events: Default::default(),
seen_inbound_certificates: Default::default(),
seen_inbound_transmissions: Default::default(),
seen_inbound_block_requests: Default::default(),
seen_outbound_events: Default::default(),
seen_outbound_certificates: Default::default(),
seen_outbound_transmissions: Default::default(),
seen_outbound_validators_requests: Default::default(),
seen_outbound_block_requests: Default::default(),
}
}
}
Expand All @@ -86,6 +93,11 @@ impl<N: Network> Cache<N> {
pub fn insert_inbound_transmission(&self, key: TransmissionID<N>, interval_in_secs: i64) -> usize {
Self::retain_and_insert(&self.seen_inbound_transmissions, key, interval_in_secs)
}

/// Inserts a block request into the cache, returning the number of recent events.
pub fn insert_inbound_block_request(&self, key: SocketAddr, interval_in_secs: i64) -> usize {
Self::retain_and_insert(&self.seen_inbound_block_requests, key, interval_in_secs)
}
}

impl<N: Network> Cache<N> {
Expand Down Expand Up @@ -125,6 +137,25 @@ impl<N: Network> Cache<N> {
pub fn clear_outbound_validators_requests(&self, peer_ip: SocketAddr) {
self.seen_outbound_validators_requests.write().remove(&peer_ip);
}

/// Inserts the block request for the given peer.
pub fn insert_outbound_block_request(&self, peer_ip: SocketAddr, request: BlockRequest) {
self.seen_outbound_block_requests.write().entry(peer_ip).or_default().insert(request);
}

/// Removes the block request for the given peer. Returns whether the request was present.
pub fn remove_outbound_block_request(&self, peer_ip: SocketAddr, request: &BlockRequest) -> bool {
self.seen_outbound_block_requests
.write()
.get_mut(&peer_ip)
.map(|requests| requests.remove(request))
.unwrap_or(false)
}

/// Clears the peer's number of outbound block requests.
pub fn clear_outbound_block_requests(&self, peer_ip: SocketAddr) {
self.seen_outbound_block_requests.write().remove(&peer_ip);
}
}

impl<N: Network> Cache<N> {
Expand Down

0 comments on commit 2cb5444

Please sign in to comment.