Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improved data structure for storing backoffs + slack backoff #38

Merged
merged 1 commit into from
Jul 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 200 additions & 94 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,6 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::config::{GossipsubConfig, ValidationMode};
use crate::error::PublishError;
use crate::handler::GossipsubHandler;
use crate::mcache::MessageCache;
use crate::protocol::{
GossipsubControlAction, GossipsubMessage, GossipsubSubscription, GossipsubSubscriptionAction,
MessageId, PeerInfo, SIGNING_PREFIX,
};
use crate::rpc_proto;
use crate::topic::{Hasher, Topic, TopicHash};
use futures::StreamExt;
use libp2p_core::{
connection::ConnectionId, identity::error::SigningError, identity::Keypair, Multiaddr, PeerId,
};
use libp2p_swarm::{
DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
ProtocolsHandler,
};
use log::{debug, error, info, trace, warn};
use lru_time_cache::LruCache;
use prost::Message;
use rand;
use rand::{seq::SliceRandom, thread_rng};
use std::collections::hash_map::Entry;
use std::iter::FromIterator;
use std::time::Duration;
Expand All @@ -52,13 +29,35 @@ use std::{
sync::Arc,
task::{Context, Poll},
};

use futures::StreamExt;
use log::{debug, error, info, trace, warn};
use lru_time_cache::LruCache;
use prost::Message;
use rand;
use rand::{seq::SliceRandom, thread_rng};
use wasm_timer::{Instant, Interval};

mod tests;
use libp2p_core::{
connection::ConnectionId, identity::error::SigningError, identity::Keypair, Multiaddr, PeerId,
};
use libp2p_swarm::{
DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
ProtocolsHandler,
};

use crate::config::{GossipsubConfig, ValidationMode};
use crate::error::PublishError;
use crate::handler::GossipsubHandler;
use crate::mcache::MessageCache;
use crate::protocol::{
GossipsubControlAction, GossipsubMessage, GossipsubSubscription, GossipsubSubscriptionAction,
MessageId, PeerInfo, SIGNING_PREFIX,
};
use crate::rpc_proto;
use crate::topic::{Hasher, Topic, TopicHash};

/// The number of heartbeat ticks until we clean up the backoff dictionary. Does only affect
/// memory performance and nothing external.
const BACKOFF_CLEAN_UP_TICKS: u64 = 15;
mod tests;

/// Determines if published messages should be signed or not.
///
Expand Down Expand Up @@ -148,6 +147,156 @@ impl From<MessageAuthenticity> for PublishConfig {
}
}

/// Stores backoffs in an efficient manner
struct BackoffStorage {
///stores backoffs and the index in backoffs_by_heartbeat per peer per topic
backoffs: HashMap<TopicHash, HashMap<PeerId, (Instant, usize)>>,
/// stores peer topic pairs per heartbeat (this is cyclic the current index is heartbeat_index)
backoffs_by_heartbeat: Vec<HashSet<(TopicHash, PeerId)>>,
/// the index in the backoffs_by_heartbeat vector corresponding to the current heartbeat
heartbeat_index: usize,
/// the heartbeat interval duration from the config
heartbeat_interval: Duration,
/// backoff_slack from config
backoff_slack: u32,
}

impl BackoffStorage {
fn heartbeats(d: &Duration, heartbeat_interval: &Duration) -> usize {
((d.as_nanos() + heartbeat_interval.as_nanos() - 1) / heartbeat_interval.as_nanos())
as usize
}

pub fn new(
prune_backoff: &Duration,
heartbeat_interval: Duration,
backoff_slack: u32,
) -> BackoffStorage {
//we add one additional slot for partial heartbeat
let max_heartbeats =
Self::heartbeats(prune_backoff, &heartbeat_interval) + backoff_slack as usize + 1;
BackoffStorage {
backoffs: HashMap::new(),
backoffs_by_heartbeat: vec![HashSet::new(); max_heartbeats],
heartbeat_index: 0,
heartbeat_interval,
backoff_slack,
}
}

/// Updates the backoff for a peer (if there is already a more restrictive backup this call
/// doesn't change anything)
pub fn update_backoff(&mut self, topic: &TopicHash, peer: &PeerId, time: Duration) {
let instant = Instant::now() + time;
let insert_into_backoffs_by_heartbeat =
|heartbeat_index: usize,
backoffs_by_heartbeat: &mut Vec<HashSet<_>>,
heartbeat_interval,
backoff_slack| {
let pair = (topic.clone(), peer.clone());
let index = (heartbeat_index
+ Self::heartbeats(&time, heartbeat_interval)
+ backoff_slack as usize)
% backoffs_by_heartbeat.len();
backoffs_by_heartbeat[index].insert(pair);
index
};
match self
.backoffs
.entry(topic.clone())
.or_insert_with(HashMap::new)
.entry(peer.clone())
{
Entry::Occupied(mut o) => {
let &(backoff, index) = o.get();
if backoff < instant {
let pair = (topic.clone(), peer.clone());
if let Some(s) = self.backoffs_by_heartbeat.get_mut(index) {
s.remove(&pair);
}
let index = insert_into_backoffs_by_heartbeat(
self.heartbeat_index,
&mut self.backoffs_by_heartbeat,
&self.heartbeat_interval,
self.backoff_slack,
);
o.insert((instant, index));
}
}
Entry::Vacant(v) => {
let index = insert_into_backoffs_by_heartbeat(
self.heartbeat_index,
&mut self.backoffs_by_heartbeat,
&self.heartbeat_interval,
self.backoff_slack,
);
v.insert((instant, index));
}
};
}

/// Checks if a given peer is backoffed for the given topic. This method respects the
/// configured BACKOFF_SLACK and may return true even if the backup is already over.
/// It is guaranteed to return false if the backoff is not over and eventually if enough time
/// passed true if the backoff is over.
///
/// This method should be used for deciding if we can already send a GRAFT to a previously
/// backoffed peer.
pub fn is_backoff_with_slack(&self, topic: &TopicHash, peer: &PeerId) -> bool {
self.backoffs
.get(topic)
.map_or(false, |m| m.contains_key(peer))
}

/// Checks if a given peer is backoffed for the given topic. This method ignores BACKOFF_SLACK.
/// This method should be used for deciding if an incoming GRAFT is allowed.
/// This method returns true exactly if the backoff time is over.
pub fn is_backoff(&self, topic: &TopicHash, peer: &PeerId) -> bool {
Self::is_backoff_from_backoffs(&self.backoffs, topic, peer, Duration::from_secs(0))
}

fn is_backoff_from_backoffs(
backoffs: &HashMap<TopicHash, HashMap<PeerId, (Instant, usize)>>,
topic: &TopicHash,
peer: &PeerId,
slack: Duration,
) -> bool {
backoffs.get(topic).map_or(false, |m| {
m.get(peer)
.map_or(false, |(i, _)| *i + slack > Instant::now())
})
}

/// Applies a heartbeat. That should be called regularly in intervals of length
/// `heartbeat_interval`.
///
/// TODO: Should we use an own instance of `wasm_timer::Interval` with our own interval length
/// to not rely on regular heartbeat calls?
pub fn heartbeat(&mut self) {
//clean up backoffs_by_heartbeat
if let Some(s) = self.backoffs_by_heartbeat.get_mut(self.heartbeat_index) {
let backoffs = &mut self.backoffs;
let slack = self.heartbeat_interval * self.backoff_slack;
s.retain(|(topic, peer)| {
let keep = Self::is_backoff_from_backoffs(backoffs, topic, peer, slack);
if !keep {
//remove from backoffs
if let Entry::Occupied(mut m) = backoffs.entry(topic.clone()) {
if m.get_mut().remove(peer).is_some() && m.get().is_empty() {
m.remove();
}
}
}

keep
});
}

//increase heartbeat index
self.heartbeat_index += 1;
}
}

/// Network behaviour that handles the gossipsub protocol.
///
/// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`GossipsubConfig`] instance. If message signing is
Expand Down Expand Up @@ -188,9 +337,8 @@ pub struct Gossipsub {
/// The last publish time for fanout topics.
fanout_last_pub: HashMap<TopicHash, Instant>,

/// The backoff times for each topic and peer. We do not graft to peers for some topic until
/// the given backoff instant. Values in the past get cleaned up regularly.
backoff: HashMap<TopicHash, HashMap<PeerId, Instant>>,
///Storage for backoffs
backoffs: BackoffStorage,

/// Message cache for the last few heartbeats.
mcache: MessageCache,
Expand Down Expand Up @@ -225,7 +373,11 @@ impl Gossipsub {
mesh: HashMap::new(),
fanout: HashMap::new(),
fanout_last_pub: HashMap::new(),
backoff: HashMap::new(),
backoffs: BackoffStorage::new(
&config.prune_backoff,
config.heartbeat_interval,
config.backoff_slack,
),
mcache: MessageCache::new(
config.history_gossip,
config.history_length,
Expand Down Expand Up @@ -548,12 +700,8 @@ impl Gossipsub {
};

//update backoff
Self::add_backoff(
&mut self.backoff,
peer,
topic_hash,
self.config.prune_backoff,
);
self.backoffs
.update_backoff(topic_hash, peer, self.config.prune_backoff);

GossipsubControlAction::Prune {
topic_hash: topic_hash.clone(),
Expand Down Expand Up @@ -659,29 +807,6 @@ impl Gossipsub {
debug!("Completed IWANT handling for peer: {:?}", peer_id);
}

fn add_backoff(
backoff: &mut HashMap<TopicHash, HashMap<PeerId, Instant>>,
peer_id: &PeerId,
topic: &TopicHash,
time: Duration,
) {
let instant = Instant::now() + time;
match backoff
.entry(topic.clone())
.or_insert_with(HashMap::new)
.entry(peer_id.clone())
{
Entry::Occupied(mut o) => {
if o.get() < &instant {
o.insert(instant);
}
}
Entry::Vacant(v) => {
v.insert(instant);
}
}
}

/// Handles GRAFT control messages. If subscribed to the topic, adds the peer to mesh, if not,
/// responds with PRUNE messages.
fn handle_graft(&mut self, peer_id: &PeerId, topics: Vec<TopicHash>) {
Expand All @@ -701,22 +826,16 @@ impl Gossipsub {
} else {
for topic_hash in topics {
// make sure we are not backing off that peer
if let Some(backoff_time) = self
.backoff
.get(&topic_hash)
.and_then(|peers| peers.get(peer_id))
{
if backoff_time > &Instant::now() {
debug!("GRAFT: ignoring backed off peer {:?}", peer_id);
//TODO add penalty
//no PX
do_px = false;
//TODO extra penalty if the graft is coming too fast (see
// GossipSubGraftFloodThreshold)

to_prune_topics.insert(topic_hash.clone());
continue;
}
if self.backoffs.is_backoff(&topic_hash, peer_id) {
debug!("GRAFT: ignoring backed off peer {:?}", peer_id);
//TODO add penalty
//no PX
do_px = false;
//TODO extra penalty if the graft is coming too fast (see
// GossipSubGraftFloodThreshold)

to_prune_topics.insert(topic_hash.clone());
continue;
}

//TODO check score of peer
Expand Down Expand Up @@ -780,10 +899,9 @@ impl Gossipsub {
}

// is there a backoff specified by the peer? if so obey it.
Self::add_backoff(
&mut self.backoff,
peer_id,
self.backoffs.update_backoff(
&topic_hash,
peer_id,
if let Some(backoff) = backoff {
Duration::from_secs(backoff)
} else {
Expand Down Expand Up @@ -996,12 +1114,6 @@ impl Gossipsub {
);
}

fn do_backoff(backoff_peers: Option<&HashMap<PeerId, Instant>>, peer: &PeerId) -> bool {
//We do not consider the instants here, as long as there is an entry we backoff
//In the heartbeat we clean up to backoff map to stay up to date.
backoff_peers.map_or(false, |m| m.contains_key(peer))
}

/// Heartbeat function which shifts the memcache and updates the mesh.
fn heartbeat(&mut self) {
debug!("Starting heartbeat");
Expand All @@ -1012,13 +1124,7 @@ impl Gossipsub {
let mut to_prune = HashMap::new();

//clean up expired backoffs
if self.heartbeat_ticks % BACKOFF_CLEAN_UP_TICKS == 0 {
let cutoff = Instant::now();
self.backoff.retain(|_, m| {
m.retain(|_, backoff| &*backoff > &cutoff);
!m.is_empty()
});
}
self.backoffs.heartbeat();

//check connections to explicit peers
if self.heartbeat_ticks % self.config.check_explicit_peers_ticks == 0 {
Expand All @@ -1040,13 +1146,13 @@ impl Gossipsub {
);
// not enough peers - get mesh_n - current_length more
let desired_peers = self.config.mesh_n - peers.len();
let backoff_peers = self.backoff.get(topic_hash);
let backoffs = &self.backoffs;
let peer_list =
Self::get_random_peers(&self.topic_peers, topic_hash, desired_peers, {
|peer| {
!peers.contains(peer)
&& !explicit_peers.contains(peer)
&& !Self::do_backoff(backoff_peers, peer)
&& !backoffs.is_backoff_with_slack(topic_hash, peer)
}
});
for peer in &peer_list {
Expand Down
Loading