Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache published message ids #61

Merged
merged 3 commits into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 84 additions & 30 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ pub struct Gossipsub {

/// Counts the number of `IWANT` that we sent the each peer since the last heartbeat.
count_iasked: HashMap<PeerId, usize>,

/// short term cache for published messsage ids
published_message_ids: DuplicateCache<MessageId>,
}

impl Gossipsub {
Expand Down Expand Up @@ -321,13 +324,14 @@ impl Gossipsub {
config.heartbeat_interval(),
),
heartbeat_ticks: 0,
config,
px_peers: HashSet::new(),
outbound_peers: HashSet::new(),
peer_score: None,
count_peer_have: HashMap::new(),
count_iasked: HashMap::new(),
peer_protocols: HashMap::new(),
published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()),
config,
})
}

Expand Down Expand Up @@ -507,6 +511,12 @@ impl Gossipsub {

debug!("Publishing message: {:?}", msg_id);

// If the message is anonymous or has a random author add it to the published message ids
// cache.
if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config {
self.published_message_ids.insert(msg_id.clone());
}

// If we are not flood publishing forward the message to mesh peers.
let mesh_peers_sent =
!self.config.flood_publish() && self.forward_msg(message.clone(), None)?;
Expand Down Expand Up @@ -713,8 +723,11 @@ impl Gossipsub {
}

let interval = Interval::new(params.decay_interval);
let peer_score = PeerScore::new_with_message_delivery_time_callback(params, self.config
.message_id_fn(), callback);
let peer_score = PeerScore::new_with_message_delivery_time_callback(
params,
self.config.message_id_fn(),
callback,
);
self.peer_score = Some((peer_score, threshold, interval, GossipPromises::default()));
Ok(())
}
Expand Down Expand Up @@ -1025,8 +1038,7 @@ impl Gossipsub {
}
debug!(
"IHAVE: Asking for the following messages from {}: {:?}",
peer_id,
message_ids
peer_id, message_ids
);

Self::control_pool_add(
Expand Down Expand Up @@ -1340,8 +1352,7 @@ impl Gossipsub {
if self.blacklisted_peers.contains(source) {
debug!(
"Rejecting message from peer {} because of blacklisted source: {}",
propagation_source,
source
propagation_source, source
);
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
peer_score.reject_message(
Expand All @@ -1363,21 +1374,24 @@ impl Gossipsub {
}

// reject messages claiming to be from ourselves but not locally published
if let Some(own_id) = self.publish_config.get_own_id() {
if !self.config.allow_self_origin()
let self_published = if let Some(own_id) = self.publish_config.get_own_id() {
!self.config.allow_self_origin()
&& own_id != propagation_source
&& msg.source.as_ref().map_or(false, |s| s == own_id)
{
debug!(
"Dropping message {} claiming to be from self but forwarded from {}",
msg_id, propagation_source
);
if let Some((peer_score, _, _, gossip_promises)) = &mut self.peer_score {
peer_score.reject_message(propagation_source, &msg, RejectReason::SelfOrigin);
gossip_promises.reject_message(&msg_id, &RejectReason::SelfOrigin);
}
return;
} else {
self.published_message_ids.contains(&msg_id)
};

if self_published {
debug!(
"Dropping message {} claiming to be from self but forwarded from {}",
msg_id, propagation_source
);
if let Some((peer_score, _, _, gossip_promises)) = &mut self.peer_score {
peer_score.reject_message(propagation_source, &msg, RejectReason::SelfOrigin);
gossip_promises.reject_message(&msg_id, &RejectReason::SelfOrigin);
}
return;
}

// Add the message to the duplication cache and memcache.
Expand All @@ -1388,7 +1402,10 @@ impl Gossipsub {
}
return;
}
debug!("Put message {:?} in duplication_cache and resolve promises", &msg_id);
debug!(
"Put message {:?} in duplication_cache and resolve promises",
&msg_id
);

// Tells score that message arrived (but is maybe not fully validated yet)
// Consider message as delivered for gossip promises
Expand Down Expand Up @@ -1911,14 +1928,28 @@ impl Gossipsub {
scores
});
trace!("Mesh message deliveries: {:?}", {
self.mesh.iter().map(|(t, peers)| {
(t.clone(), peers.iter().map(|p| {
(p.clone(),
peer_score.as_ref().expect("peer_score.is_some()").0
.mesh_message_deliveries(p, t)
.unwrap_or(0.0))
}).collect::<HashMap<PeerId, f64>>())
}).collect::<HashMap<TopicHash, HashMap<PeerId, f64>>>()
self.mesh
.iter()
.map(|(t, peers)| {
(
t.clone(),
peers
.iter()
.map(|p| {
(
p.clone(),
peer_score
.as_ref()
.expect("peer_score.is_some()")
.0
.mesh_message_deliveries(p, t)
.unwrap_or(0.0),
)
})
.collect::<HashMap<PeerId, f64>>(),
)
})
.collect::<HashMap<TopicHash, HashMap<PeerId, f64>>>()
})
}

Expand All @@ -1936,7 +1967,6 @@ impl Gossipsub {
self.mcache.shift();

debug!("Completed Heartbeat");

}

/// Emits gossip - Send IHAVE messages to a random set of gossip peers. This is applied to mesh
Expand Down Expand Up @@ -2649,6 +2679,12 @@ impl NetworkBehaviour for Gossipsub {
if let Some((peer_score, ..)) = &mut self.peer_score {
if let Some(ip) = get_ip_addr(get_remote_addr(endpoint)) {
peer_score.add_ip(&peer_id, ip);
} else {
trace!(
"Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
peer_id,
endpoint
)
}
}
}
Expand All @@ -2663,6 +2699,12 @@ impl NetworkBehaviour for Gossipsub {
if let Some((peer_score, ..)) = &mut self.peer_score {
if let Some(ip) = get_ip_addr(get_remote_addr(endpoint)) {
peer_score.remove_ip(peer, &ip);
} else {
trace!(
"Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
peer,
endpoint
)
}
}
}
Expand All @@ -2678,9 +2720,21 @@ impl NetworkBehaviour for Gossipsub {
if let Some((peer_score, ..)) = &mut self.peer_score {
if let Some(ip) = get_ip_addr(get_remote_addr(endpoint_old)) {
peer_score.remove_ip(peer, &ip);
} else {
trace!(
"Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
peer,
endpoint_old
)
}
if let Some(ip) = get_ip_addr(get_remote_addr(endpoint_new)) {
peer_score.add_ip(&peer, ip);
} else {
trace!(
"Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
peer,
endpoint_new
)
}
}
}
Expand Down Expand Up @@ -2727,7 +2781,7 @@ impl NetworkBehaviour for Gossipsub {

// Check if peer is graylisted in which case we ignore the event
if let (true, _) =
self.score_below_threshold(&propagation_source, |pst| pst.graylist_threshold)
self.score_below_threshold(&propagation_source, |pst| pst.graylist_threshold)
{
debug!("RPC Dropped from greylisted peer {}", propagation_source);
return;
Expand Down
6 changes: 3 additions & 3 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2737,9 +2737,9 @@ mod tests {
assert!(match &gs.events[0] {
NetworkBehaviourAction::GenerateEvent(event) => match event {
GossipsubEvent::Subscribed { .. } => true,
_ => false
}
_ => false
_ => false,
},
_ => false,
});

let control_action = GossipsubControlAction::IHave {
Expand Down
21 changes: 21 additions & 0 deletions protocols/gossipsub/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ pub struct GossipsubConfig {

/// Enable support for flooodsub peers. Default false.
support_floodsub: bool,

/// Published message ids time cache duration. The default is 10 seconds.
published_message_ids_cache_time: Duration,
}

impl GossipsubConfig {
Expand Down Expand Up @@ -455,6 +458,11 @@ impl GossipsubConfig {
pub fn support_floodsub(&self) -> bool {
self.support_floodsub
}

/// Published message ids time cache duration. The default is 10 seconds.
pub fn published_message_ids_cache_time(&self) -> Duration {
self.published_message_ids_cache_time
}
}

impl Default for GossipsubConfig {
Expand Down Expand Up @@ -536,6 +544,7 @@ impl GossipsubConfigBuilder {
max_ihave_messages: 10,
iwant_followup_time: Duration::from_secs(3),
support_floodsub: false,
published_message_ids_cache_time: Duration::from_secs(10),
},
}
}
Expand Down Expand Up @@ -783,6 +792,14 @@ impl GossipsubConfigBuilder {
self
}

pub fn published_message_ids_cache_time(
&mut self,
published_message_ids_cache_time: Duration,
) -> &mut Self {
self.config.published_message_ids_cache_time = published_message_ids_cache_time;
self
}

/// Constructs a `GossipsubConfig` from the given configuration and validates the settings.
pub fn build(&self) -> Result<GossipsubConfig, &str> {
// check all constraints on config
Expand Down Expand Up @@ -848,6 +865,10 @@ impl std::fmt::Debug for GossipsubConfig {
let _ = builder.field("max_ihave_messages", &self.max_ihave_messages);
let _ = builder.field("iwant_followup_time", &self.iwant_followup_time);
let _ = builder.field("support_floodsub", &self.support_floodsub);
let _ = builder.field(
"published_message_ids_cache_time",
&self.published_message_ids_cache_time,
);
builder.finish()
}
}
Expand Down
40 changes: 30 additions & 10 deletions protocols/gossipsub/src/peer_score/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
use crate::time_cache::TimeCache;
use crate::{GossipsubMessage, MessageId, TopicHash};
use libp2p_core::PeerId;
use log::{debug, warn};
use log::{debug, trace, warn};
use std::collections::{hash_map, HashMap, HashSet};
use std::net::IpAddr;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -203,10 +203,11 @@ impl PeerScore {
Self::new_with_message_delivery_time_callback(params, msg_id, None)
}

pub fn new_with_message_delivery_time_callback(params: PeerScoreParams,
msg_id: fn(&GossipsubMessage) -> MessageId,
callback: Option<fn(&PeerId, &TopicHash, f64)>)
-> Self {
pub fn new_with_message_delivery_time_callback(
params: PeerScoreParams,
msg_id: fn(&GossipsubMessage) -> MessageId,
callback: Option<fn(&PeerId, &TopicHash, f64)>,
) -> Self {
PeerScore {
params,
peer_stats: HashMap::new(),
Expand Down Expand Up @@ -438,6 +439,7 @@ impl PeerScore {

/// Adds a new ip to a peer, if the peer is not yet known creates a new peer_stats entry for it
pub fn add_ip(&mut self, peer_id: &PeerId, ip: IpAddr) {
trace!("Add ip for peer {}, ip: {}", peer_id, ip);
let peer_stats = self.peer_stats.entry(peer_id.clone()).or_default();

// Mark the peer as connected (currently the default is connected, but we don't want to
Expand All @@ -457,8 +459,21 @@ impl PeerScore {
if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
peer_stats.known_ips.remove(ip);
if let Some(peer_ids) = self.peer_ips.get_mut(ip) {
trace!("Remove ip for peer {}, ip: {}", peer_id, ip);
peer_ids.remove(peer_id);
} else {
trace!(
"No entry in peer_ips for ip {} which should get removed for peer {}",
ip,
peer_id
);
}
} else {
trace!(
"No peer_stats for peer {} which should remove the ip {}",
peer_id,
ip
);
}
}

Expand Down Expand Up @@ -555,7 +570,8 @@ impl PeerScore {
.get(_from)
.and_then(|s| s.topics.get(topic))
.map(|ts| ts.in_mesh())
.unwrap_or(false) {
.unwrap_or(false)
{
callback(_from, topic, 0.0);
}
}
Expand Down Expand Up @@ -591,6 +607,10 @@ impl PeerScore {
match reason {
// these messages are not tracked, but the peer is penalized as they are invalid
RejectReason::ValidationError(_) | RejectReason::SelfOrigin => {
debug!(
"Message from {} rejected because of ValidationError or SelfOrigin",
from
);
self.mark_invalid_message_delivery(from, msg);
return;
}
Expand Down Expand Up @@ -656,7 +676,8 @@ impl PeerScore {
.get(from)
.and_then(|s| s.topics.get(topic))
.map(|ts| ts.in_mesh())
.unwrap_or(false) {
.unwrap_or(false)
{
callback(from, topic, time);
}
}
Expand Down Expand Up @@ -722,7 +743,7 @@ impl PeerScore {
}
}
}
},
}
Vacant(entry) => {
entry.insert(params);
}
Expand Down Expand Up @@ -844,8 +865,7 @@ impl PeerScore {
}

pub(crate) fn mesh_message_deliveries(&self, peer: &PeerId, topic: &TopicHash) -> Option<f64> {
self
.peer_stats
self.peer_stats
.get(peer)
.and_then(|s| s.topics.get(topic))
.map(|t| t.mesh_message_deliveries)
Expand Down