Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Skip gossip requests with different shred version (bp #10240) #10297

Merged
merged 1 commit into from
May 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 224 additions & 30 deletions core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ struct GossipStats {
process_pull_response_count: Counter,
process_pull_response_len: Counter,
process_pull_response_timeout: Counter,
process_pull_response_fail: Counter,
process_pull_response_success: Counter,
process_pull_requests: Counter,
generate_pull_responses: Counter,
process_prune: Counter,
process_push_message: Counter,
prune_received_cache: Counter,
Expand All @@ -228,6 +231,11 @@ struct GossipStats {
push_message: Counter,
new_pull_requests: Counter,
mark_pull_request: Counter,
skip_pull_response_shred_version: Counter,
skip_pull_shred_version: Counter,
skip_push_message_shred_version: Counter,
push_message_count: Counter,
push_message_value_count: Counter,
}

pub struct ClusterInfo {
Expand Down Expand Up @@ -1526,12 +1534,17 @@ impl ClusterInfo {
if contact_info.id == me.id() {
warn!("PullRequest ignored, I'm talking to myself");
inc_new_counter_debug!("cluster_info-window-request-loopback", 1);
} else {
} else if contact_info.shred_version == 0
|| contact_info.shred_version == me.my_shred_version()
|| me.my_shred_version() == 0
{
gossip_pull_data.push(PullData {
from_addr,
caller,
filter,
});
} else {
me.stats.skip_pull_shred_version.add_relaxed(1);
}
}
datapoint_debug!(
Expand Down Expand Up @@ -1620,6 +1633,26 @@ impl ClusterInfo {
}
}

fn update_data_budget(&self, stakes: &HashMap<Pubkey, u64>) {
let mut w_outbound_budget = self.outbound_budget.write().unwrap();

let now = timestamp();
const INTERVAL_MS: u64 = 100;
// allow 50kBps per staked validator, epoch slots + votes ~= 1.5kB/slot ~= 4kB/s
const BYTES_PER_INTERVAL: usize = 5000;
const MAX_BUDGET_MULTIPLE: usize = 5; // allow budget build-up to 5x the interval default

if now - w_outbound_budget.last_timestamp_ms > INTERVAL_MS {
let len = std::cmp::max(stakes.len(), 2);
w_outbound_budget.bytes += len * BYTES_PER_INTERVAL;
w_outbound_budget.bytes = std::cmp::min(
w_outbound_budget.bytes,
MAX_BUDGET_MULTIPLE * len * BYTES_PER_INTERVAL,
);
w_outbound_budget.last_timestamp_ms = now;
}
}

// Pull requests take an incoming bloom filter of contained entries from a node
// and tries to send back to them the values it detects are missing.
fn handle_pull_requests(
Expand All @@ -1632,33 +1665,19 @@ impl ClusterInfo {
let mut caller_and_filters = vec![];
let mut addrs = vec![];
let mut time = Measure::start("handle_pull_requests");
{
let mut w_outbound_budget = me.outbound_budget.write().unwrap();

let now = timestamp();
const INTERVAL_MS: u64 = 100;
// allow 50kBps per staked validator, epoch slots + votes ~= 1.5kB/slot ~= 4kB/s
const BYTES_PER_INTERVAL: usize = 5000;
const MAX_BUDGET_MULTIPLE: usize = 5; // allow budget build-up to 5x the interval default

if now - w_outbound_budget.last_timestamp_ms > INTERVAL_MS {
let len = std::cmp::max(stakes.len(), 2);
w_outbound_budget.bytes += len * BYTES_PER_INTERVAL;
w_outbound_budget.bytes = std::cmp::min(
w_outbound_budget.bytes,
MAX_BUDGET_MULTIPLE * len * BYTES_PER_INTERVAL,
);
w_outbound_budget.last_timestamp_ms = now;
}
}
me.update_data_budget(stakes);
for pull_data in requests {
caller_and_filters.push((pull_data.caller, pull_data.filter));
addrs.push(pull_data.from_addr);
}
let now = timestamp();
let self_id = me.id();

let pull_responses = me
.time_gossip_write_lock("process_pull_reqs", &me.stats.process_pull_requests)
.time_gossip_read_lock("generate_pull_responses", &me.stats.generate_pull_responses)
.generate_pull_responses(&caller_and_filters);

me.time_gossip_write_lock("process_pull_reqs", &me.stats.process_pull_requests)
.process_pull_requests(caller_and_filters, now);

// Filter bad to addresses
Expand Down Expand Up @@ -1755,37 +1774,94 @@ impl ClusterInfo {
Some(packets)
}

// Returns (failed, timeout, success)
fn handle_pull_response(
me: &Self,
from: &Pubkey,
data: Vec<CrdsValue>,
mut crds_values: Vec<CrdsValue>,
timeouts: &HashMap<Pubkey, u64>,
) {
let len = data.len();
) -> (usize, usize, usize) {
let len = crds_values.len();
trace!("PullResponse me: {} from: {} len={}", me.id, from, len);
let (_fail, timeout_count) = me

if let Some(shred_version) = me.lookup_contact_info(from, |ci| ci.shred_version) {
Self::filter_by_shred_version(
from,
&mut crds_values,
shred_version,
me.my_shred_version(),
);
}
let filtered_len = crds_values.len();

let (fail, timeout_count, success) = me
.time_gossip_write_lock("process_pull", &me.stats.process_pull_response)
.process_pull_response(from, timeouts, data, timestamp());
.process_pull_response(from, timeouts, crds_values, timestamp());

me.stats
.skip_pull_response_shred_version
.add_relaxed((len - filtered_len) as u64);
me.stats.process_pull_response_count.add_relaxed(1);
me.stats.process_pull_response_len.add_relaxed(len as u64);
me.stats
.process_pull_response_len
.add_relaxed(filtered_len as u64);
me.stats
.process_pull_response_timeout
.add_relaxed(timeout_count as u64);
me.stats.process_pull_response_fail.add_relaxed(fail as u64);
me.stats
.process_pull_response_success
.add_relaxed(success as u64);

(fail, timeout_count, success)
}

fn filter_by_shred_version(
from: &Pubkey,
crds_values: &mut Vec<CrdsValue>,
shred_version: u16,
my_shred_version: u16,
) {
if my_shred_version != 0 && shred_version != 0 && shred_version != my_shred_version {
// Allow someone to update their own ContactInfo so they
// can change shred versions if needed.
crds_values.retain(|crds_value| match &crds_value.data {
CrdsData::ContactInfo(contact_info) => contact_info.id == *from,
_ => false,
});
}
}

fn handle_push_message(
me: &Self,
recycler: &PacketsRecycler,
from: &Pubkey,
data: Vec<CrdsValue>,
mut crds_values: Vec<CrdsValue>,
stakes: &HashMap<Pubkey, u64>,
) -> Option<Packets> {
let self_id = me.id();
inc_new_counter_debug!("cluster_info-push_message", 1);
me.stats.push_message_count.add_relaxed(1);
let len = crds_values.len();

if let Some(shred_version) = me.lookup_contact_info(from, |ci| ci.shred_version) {
Self::filter_by_shred_version(
from,
&mut crds_values,
shred_version,
me.my_shred_version(),
);
}
let filtered_len = crds_values.len();
me.stats
.push_message_value_count
.add_relaxed(filtered_len as u64);
me.stats
.skip_push_message_shred_version
.add_relaxed((len - filtered_len) as u64);

let updated: Vec<_> = me
.time_gossip_write_lock("process_push", &me.stats.process_push_message)
.process_push_message(from, data, timestamp());
.process_push_message(from, crds_values, timestamp());

let updated_labels: Vec<_> = updated.into_iter().map(|u| u.value.label()).collect();
let prunes_map: HashMap<Pubkey, HashSet<Pubkey>> = me
Expand Down Expand Up @@ -1945,6 +2021,11 @@ impl ClusterInfo {
self.stats.process_pull_requests.clear(),
i64
),
(
"generate_pull_responses",
self.stats.generate_pull_responses.clear(),
i64
),
("process_prune", self.stats.process_prune.clear(), i64),
(
"process_push_message",
Expand Down Expand Up @@ -1974,6 +2055,34 @@ impl ClusterInfo {
i64
),
);
datapoint_info!(
"cluster_info_shred_version_skip",
(
"skip_push_message_shred_version",
self.stats.skip_push_message_shred_version.clear(),
i64
),
(
"skip_pull_response_shred_version",
self.stats.skip_pull_response_shred_version.clear(),
i64
),
(
"skip_pull_shred_version",
self.stats.skip_pull_shred_version.clear(),
i64
),
(
"push_message_count",
self.stats.push_message_count.clear(),
i64
),
(
"push_message_value_count",
self.stats.push_message_value_count.clear(),
i64
),
);

*last_print = Instant::now();
}
Expand Down Expand Up @@ -2289,6 +2398,91 @@ mod tests {
assert!(ClusterInfo::is_spy_node(&node));
}

#[test]
fn test_handle_pull() {
let node = Node::new_localhost();
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(node.info));

let entrypoint_pubkey = Pubkey::new_rand();
let data = test_crds_values(entrypoint_pubkey);
let timeouts = HashMap::new();
assert_eq!(
(0, 0, 1),
ClusterInfo::handle_pull_response(
&cluster_info,
&entrypoint_pubkey,
data.clone(),
&timeouts
)
);

let entrypoint_pubkey2 = Pubkey::new_rand();
assert_eq!(
(1, 0, 0),
ClusterInfo::handle_pull_response(&cluster_info, &entrypoint_pubkey2, data, &timeouts)
);
}

fn test_crds_values(pubkey: Pubkey) -> Vec<CrdsValue> {
let entrypoint = ContactInfo::new_localhost(&pubkey, timestamp());
let entrypoint_crdsvalue =
CrdsValue::new_unsigned(CrdsData::ContactInfo(entrypoint.clone()));
vec![entrypoint_crdsvalue]
}

#[test]
fn test_filter_shred_version() {
let from = Pubkey::new_rand();
let my_shred_version = 1;
let other_shred_version = 1;

// Allow same shred_version
let mut values = test_crds_values(from);
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);

// Allow shred_version=0.
let other_shred_version = 0;
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);

// Change to sender's ContactInfo version, allow that.
let other_shred_version = 2;
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);

let snapshot_hash_data = CrdsValue::new_unsigned(CrdsData::SnapshotHashes(SnapshotHash {
from: Pubkey::new_rand(),
hashes: vec![],
wallclock: 0,
}));
values.push(snapshot_hash_data);
// Change to sender's ContactInfo version, allow that.
let other_shred_version = 2;
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);
}

#[test]
fn test_cluster_spy_gossip() {
//check that gossip doesn't try to push to invalid addresses
Expand Down
18 changes: 11 additions & 7 deletions core/src/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,22 +158,26 @@ impl CrdsGossip {
self.pull.mark_pull_request_creation_time(from, now)
}
/// process a pull request and create a response
pub fn process_pull_requests(
&mut self,
filters: Vec<(CrdsValue, CrdsFilter)>,
now: u64,
) -> Vec<Vec<CrdsValue>> {
pub fn process_pull_requests(&mut self, filters: Vec<(CrdsValue, CrdsFilter)>, now: u64) {
self.pull
.process_pull_requests(&mut self.crds, filters, now)
.process_pull_requests(&mut self.crds, filters, now);
}

pub fn generate_pull_responses(
&self,
filters: &[(CrdsValue, CrdsFilter)],
) -> Vec<Vec<CrdsValue>> {
self.pull.generate_pull_responses(&self.crds, filters)
}

/// process a pull response
pub fn process_pull_response(
&mut self,
from: &Pubkey,
timeouts: &HashMap<Pubkey, u64>,
response: Vec<CrdsValue>,
now: u64,
) -> (usize, usize) {
) -> (usize, usize, usize) {
self.pull
.process_pull_response(&mut self.crds, from, timeouts, response, now)
}
Expand Down
Loading