-
Notifications
You must be signed in to change notification settings - Fork 371
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
removes redundant allocations when generating new gossip pull/push messages #4882
removes redundant allocations when generating new gossip pull/push messages #4882
Conversation
ceaf0f5
to
4a7266d
Compare
4a7266d
to
b8f55a7
Compare
b8f55a7
to
b198b28
Compare
b198b28
to
df4b677
Compare
47736b5
to
4ffbbb2
Compare
4ffbbb2
to
a77c643
Compare
…ssages Instead of collecting the intermediate values into vectors: https://github.com/anza-xyz/agave/blob/2b0966de4/gossip/src/cluster_info.rs#L1311-L1318 https://github.com/anza-xyz/agave/blob/2b0966de4/gossip/src/cluster_info.rs#L1257-L1268 https://github.com/anza-xyz/agave/blob/2b0966de4/gossip/src/crds_gossip_pull.rs#L292-L302 gossip new_{push,pull}_requests can just return an iterator. This iterator can then be written directly to a PacketBatch, bypassing the unnecessary vector allocations.
a77c643
to
f1e3710
Compare
let addr = get_node_addr( | ||
pubkey, | ||
ContactInfo::gossip, | ||
&gossip_crds, | ||
&self.socket_addr_space, | ||
)?; | ||
Some((addr, messages)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adding additional check here that gossip addr is valid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahh i take it back. we are moving the check here from
if ContactInfo::is_valid_address(&address, &self.socket_addr_space) {
in handle_batch_push_messages()
let entries = Rc::new(entries); | ||
push_messages | ||
.into_iter() | ||
.flat_map(|(peer, msgs)| { | ||
let msgs = msgs.into_iter().map(|k| entries[k].clone()); | ||
split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, msgs) | ||
.map(move |payload| (peer, Protocol::PushMessage(self_id, payload))) | ||
.flat_map(move |(peer, msgs): (SocketAddr, Vec<usize>)| { | ||
let entries = Rc::clone(&entries); | ||
let msgs = msgs.into_iter().map(move |k| entries[k].clone()); | ||
let msgs = split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, msgs) | ||
.map(move |msgs| Protocol::PushMessage(self_id, msgs)); | ||
repeat(peer).zip(msgs) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just so i understand, we need Rc
here so that we don't drop entries
when new_push_requests()
returns the iterator with a closure: Iterator<Item = (SocketAddr, Protocol)>
. and we don't want entries
to get dropped because then the execution of the closure would fail when we actually want to use the return value of new_push_requests()
? basically Rc
keep entries
alive until we actually execute the closure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is just that rust lifetime checks cannot handle this despite having move
there.
Without the Rc thing this won't compile, unless I clone entries which would be expensive.
Either::Right(out.chain(reqs)) | ||
} else { | ||
Either::Left(out) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: the return values of new_push_requests
and new_pull_requests
are the same type: Iterator<Item = (SocketAddr, Protocol)>
. but the iterators of each are built from different closures, so we need Either
wrapper here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right.
They are different types implementing the same trait.
out.extend(pull_requests); | ||
out.extend(pings); | ||
let reqs = self.new_pull_requests(thread_pool, gossip_validators, stakes); | ||
Either::Right(out.chain(reqs)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looked it up but to make sure, rust convention uses Either::Right
for the extended use case. and Either::Left
for the base case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not aware of any distinction between Either::{Left,Right}.
I don't think it makes any difference.
.flat_map(|(_, filters)| filters) | ||
.cloned() | ||
.collect() | ||
Either::Right(pulls.clone().map(|(_, filter)| filter)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i know this logic hasn't changed in this PR. But why are we using filters
from already existing pull
s to use with entrypoint
? aren't we then asking both the entrypoint and some pull addr for the same info because they have the same filter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am guessing the other node might not respond to pull requests, or it may send invalid response.
So we pull from the entrypoint as well which is supposed to be more trusted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so kinda just like a redundancy check for pull requests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess so.
impl ClusterInfo { | ||
// Wrapper for ClusterInfo.new_pull_requests replicating old return | ||
// type for legacy tests. | ||
#[allow(clippy::type_complexity)] | ||
fn old_pull_requests( | ||
&self, | ||
thread_pool: &ThreadPool, | ||
gossip_validators: Option<&HashSet<Pubkey>>, | ||
stakes: &HashMap<Pubkey, u64>, | ||
) -> ( | ||
Vec<(SocketAddr, Ping)>, // Ping packets | ||
Vec<(SocketAddr, Protocol)>, // Pull requests | ||
) { | ||
self.new_pull_requests(thread_pool, gossip_validators, stakes) | ||
.partition_map(|(addr, protocol)| { | ||
if let Protocol::PingMessage(ping) = protocol { | ||
Either::Left((addr, ping)) | ||
} else { | ||
Either::Right((addr, protocol)) | ||
} | ||
}) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
worth adding tests that verify the new behavior of new_pull_requests
explicitly? instead of relying solely on partition_map
in this wrapper?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a pretty trivial change though.
Also local-cluster tests sufficiently rely on pull requests to be functioning.
So I feel like we are good here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have you run this PR (and previous ones) against testnet to see that we are getting the expected performance improvements? or at least not increasing processing times?
yeah, I have node running with all these patches continuously. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm!
Problem
Instead of collecting the intermediate values into vectors:
https://github.com/anza-xyz/agave/blob/2b0966de4/gossip/src/cluster_info.rs#L1311-L1318
https://github.com/anza-xyz/agave/blob/2b0966de4/gossip/src/cluster_info.rs#L1257-L1268
https://github.com/anza-xyz/agave/blob/2b0966de4/gossip/src/crds_gossip_pull.rs#L292-L302
gossip
new_{push,pull}_requests
can just return an iterator. This iterator can then be written directly to a PacketBatch, bypassing the unnecessary vector allocations.Summary of Changes
The commit reduces allocation for new gossip pull/push messages by holding on to iterators and bypassing intermediate vector allocations.