Skip to content

Commit

Permalink
protocols/kad: Enable filtering of (provider) records (#2163)
Browse files Browse the repository at this point in the history
Introduce `KademliaStoreInserts` option, which allows to filter records.

Co-authored-by: Max Inden <mail@max-inden.de>
  • Loading branch information
rubdos and mxinden authored Aug 17, 2021
1 parent a5b6a0b commit c58f697
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 35 deletions.
5 changes: 5 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

- Update dependencies.

- Introduce `KademliaStoreInserts` option, which allows to filter records (see
[PR 2163]).

[PR 2163]: https://github.com/libp2p/rust-libp2p/pull/2163

# 0.31.0 [2021-07-12]

- Update dependencies.
Expand Down
107 changes: 90 additions & 17 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ pub struct Kademlia<TStore> {
/// Configuration of the wire protocol.
protocol_config: KademliaProtocolConfig,

/// Configuration of [`RecordStore`] filtering.
record_filtering: KademliaStoreInserts,

/// The currently active (i.e. in-progress) queries.
queries: QueryPool<QueryInner>,

Expand Down Expand Up @@ -131,6 +134,29 @@ pub enum KademliaBucketInserts {
Manual,
}

/// The configurable filtering strategies for the acceptance of
/// incoming records.
///
/// This can be used for e.g. signature verification or validating
/// the accompanying [`Key`].
///
/// [`Key`]: crate::record::Key
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum KademliaStoreInserts {
/// Whenever a (provider) record is received,
/// the record is forwarded immediately to the [`RecordStore`].
Unfiltered,
/// Whenever a (provider) record is received, an event is emitted.
/// Provider records generate a [`KademliaEvent::InboundAddProviderRequest`],
/// normal records generate a [`KademliaEvent::InboundPutRecordRequest`].
///
/// When deemed valid, a (provider) record needs to be explicitly stored in
/// the [`RecordStore`] via [`RecordStore::put`] or [`RecordStore::add_provider`],
/// whichever is applicable. A mutable reference to the [`RecordStore`] can
/// be retrieved via [`Kademlia::store_mut`].
FilterBoth,
}

/// The configuration for the `Kademlia` behaviour.
///
/// The configuration is consumed by [`Kademlia::new`].
Expand All @@ -142,6 +168,7 @@ pub struct KademliaConfig {
record_ttl: Option<Duration>,
record_replication_interval: Option<Duration>,
record_publication_interval: Option<Duration>,
record_filtering: KademliaStoreInserts,
provider_record_ttl: Option<Duration>,
provider_publication_interval: Option<Duration>,
connection_idle_timeout: Duration,
Expand Down Expand Up @@ -175,6 +202,7 @@ impl Default for KademliaConfig {
record_ttl: Some(Duration::from_secs(36 * 60 * 60)),
record_replication_interval: Some(Duration::from_secs(60 * 60)),
record_publication_interval: Some(Duration::from_secs(24 * 60 * 60)),
record_filtering: KademliaStoreInserts::Unfiltered,
provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)),
connection_idle_timeout: Duration::from_secs(10),
Expand Down Expand Up @@ -259,6 +287,15 @@ impl KademliaConfig {
self
}

/// Sets whether or not records should be filtered before being stored.
///
/// See [`KademliaStoreInserts`] for the different values.
/// Defaults to [`KademliaStoreInserts::Unfiltered`].
pub fn set_record_filtering(&mut self, filtering: KademliaStoreInserts) -> &mut Self {
self.record_filtering = filtering;
self
}

/// Sets the (re-)replication interval for stored records.
///
/// Periodic replication of stored records ensures that the records
Expand Down Expand Up @@ -393,6 +430,7 @@ where
kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout),
kbucket_inserts: config.kbucket_inserts,
protocol_config: config.protocol_config,
record_filtering: config.record_filtering,
queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
queries: QueryPool::new(config.query_config),
connected_peers: Default::default(),
Expand Down Expand Up @@ -1572,22 +1610,33 @@ where
// The record is cloned because of the weird libp2p protocol
// requirement to send back the value in the response, although this
// is a waste of resources.
match self.store.put(record.clone()) {
Ok(()) => debug!(
"Record stored: {:?}; {} bytes",
record.key,
record.value.len()
),
Err(e) => {
info!("Record not stored: {:?}", e);
match self.record_filtering {
KademliaStoreInserts::Unfiltered => match self.store.put(record.clone()) {
Ok(()) => debug!(
"Record stored: {:?}; {} bytes",
record.key,
record.value.len()
),
Err(e) => {
info!("Record not stored: {:?}", e);
self.queued_events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: source,
handler: NotifyHandler::One(connection),
event: KademliaHandlerIn::Reset(request_id),
});
return;
}
},
KademliaStoreInserts::FilterBoth => {
self.queued_events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: source,
handler: NotifyHandler::One(connection),
event: KademliaHandlerIn::Reset(request_id),
});

return;
.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::InboundPutRecordRequest {
source,
connection,
record: record.clone(),
},
));
}
}
}
Expand Down Expand Up @@ -1620,8 +1669,18 @@ where
expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl),
addresses: provider.multiaddrs,
};
if let Err(e) = self.store.add_provider(record) {
info!("Provider record not stored: {:?}", e);
match self.record_filtering {
KademliaStoreInserts::Unfiltered => {
if let Err(e) = self.store.add_provider(record) {
info!("Provider record not stored: {:?}", e);
}
}
KademliaStoreInserts::FilterBoth => {
self.queued_events
.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::InboundAddProviderRequest { record },
));
}
}
}
}
Expand Down Expand Up @@ -2257,6 +2316,20 @@ pub struct PeerRecord {
/// See [`NetworkBehaviour::poll`].
#[derive(Debug)]
pub enum KademliaEvent {
/// A peer sent a [`KademliaHandlerIn::PutRecord`] request and filtering is enabled.
///
/// See [`KademliaStoreInserts`] and [`KademliaConfig::set_record_filtering`].
InboundPutRecordRequest {
source: PeerId,
connection: ConnectionId,
record: Record,
},

/// A peer sent a [`KademliaHandlerIn::AddProvider`] request and filtering [`KademliaStoreInserts::FilterBoth`] is enabled.
///
/// See [`KademliaStoreInserts`] and [`KademliaConfig::set_record_filtering`] for details..
InboundAddProviderRequest { record: ProviderRecord },

/// An inbound request has been received and handled.
//
// Note on the difference between 'request' and 'query': A request is a
Expand Down
64 changes: 47 additions & 17 deletions protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ fn get_record_not_found() {
/// is equal to the configured replication factor.
#[test]
fn put_record() {
fn prop(records: Vec<Record>, seed: Seed) {
fn prop(records: Vec<Record>, seed: Seed, filter_records: bool, drop_records: bool) {
let mut rng = StdRng::from_seed(seed.0);
let replication_factor =
NonZeroUsize::new(rng.gen_range(1, (K_VALUE.get() / 2) + 1)).unwrap();
Expand All @@ -501,6 +501,10 @@ fn put_record() {
config.disjoint_query_paths(true);
}

if filter_records {
config.set_record_filtering(KademliaStoreInserts::FilterBoth);
}

let mut swarms = {
let mut fully_connected_swarms =
build_fully_connected_nodes_with_config(num_total - 1, config.clone());
Expand Down Expand Up @@ -596,6 +600,22 @@ fn put_record() {
}
}
}
Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::InboundPutRecordRequest { record, .. },
))) => {
assert_ne!(
swarm.behaviour().record_filtering,
KademliaStoreInserts::Unfiltered
);
if !drop_records {
// Accept the record
swarm
.behaviour_mut()
.store_mut()
.put(record)
.expect("record is stored");
}
}
// Ignore any other event.
Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Expand Down Expand Up @@ -650,21 +670,29 @@ fn put_record() {
})
.collect::<HashSet<_>>();

assert_eq!(actual.len(), replication_factor.get());

let actual_not_expected = actual.difference(&expected).collect::<Vec<&PeerId>>();
assert!(
actual_not_expected.is_empty(),
"Did not expect records to be stored on nodes {:?}.",
actual_not_expected,
);

let expected_not_actual = expected.difference(&actual).collect::<Vec<&PeerId>>();
assert!(
expected_not_actual.is_empty(),
"Expected record to be stored on nodes {:?}.",
expected_not_actual,
);
if swarms[0].behaviour().record_filtering != KademliaStoreInserts::Unfiltered
&& drop_records
{
assert_eq!(actual.len(), 0);
} else {
assert_eq!(actual.len(), replication_factor.get());

let actual_not_expected =
actual.difference(&expected).collect::<Vec<&PeerId>>();
assert!(
actual_not_expected.is_empty(),
"Did not expect records to be stored on nodes {:?}.",
actual_not_expected,
);

let expected_not_actual =
expected.difference(&actual).collect::<Vec<&PeerId>>();
assert!(
expected_not_actual.is_empty(),
"Expected record to be stored on nodes {:?}.",
expected_not_actual,
);
}
}

if republished {
Expand Down Expand Up @@ -692,7 +720,9 @@ fn put_record() {
}))
}

QuickCheck::new().tests(3).quickcheck(prop as fn(_, _) -> _)
QuickCheck::new()
.tests(4)
.quickcheck(prop as fn(_, _, _, _) -> _)
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion protocols/kad/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub use behaviour::{
QueryStats,
};
pub use behaviour::{
Kademlia, KademliaBucketInserts, KademliaCaching, KademliaConfig, KademliaEvent, Quorum,
Kademlia, KademliaCaching, KademliaConfig, KademliaEvent, KademliaStoreInserts, Quorum,
};
pub use protocol::KadConnectionType;
pub use query::QueryId;
Expand Down

0 comments on commit c58f697

Please sign in to comment.