Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sc-beefy-consensus: Remove unneeded stream. #4015

Merged
merged 8 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 132 additions & 30 deletions substrate/client/consensus/beefy/src/communication/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@

use std::{collections::BTreeSet, sync::Arc, time::Duration};

use sc_network::{PeerId, ReputationChange};
use sc_network::{NetworkPeers, PeerId, ReputationChange};
use sc_network_gossip::{MessageIntent, ValidationResult, Validator, ValidatorContext};
use sp_runtime::traits::{Block, Hash, Header, NumberFor};

use codec::{Decode, DecodeAll, Encode};
use log::{debug, trace};
use parking_lot::{Mutex, RwLock};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use wasm_timer::Instant;

use crate::{
Expand Down Expand Up @@ -223,7 +222,7 @@ impl<B: Block> Filter<B> {
/// rejected/expired.
///
///All messaging is handled in a single BEEFY global topic.
pub(crate) struct GossipValidator<B>
pub(crate) struct GossipValidator<B, N>
where
B: Block,
{
Expand All @@ -232,26 +231,22 @@ where
gossip_filter: RwLock<Filter<B>>,
next_rebroadcast: Mutex<Instant>,
known_peers: Arc<Mutex<KnownPeers<B>>>,
report_sender: TracingUnboundedSender<PeerReport>,
network: Arc<N>,
}

impl<B> GossipValidator<B>
impl<B, N> GossipValidator<B, N>
where
B: Block,
{
pub(crate) fn new(
known_peers: Arc<Mutex<KnownPeers<B>>>,
) -> (GossipValidator<B>, TracingUnboundedReceiver<PeerReport>) {
let (tx, rx) = tracing_unbounded("mpsc_beefy_gossip_validator", 100_000);
let val = GossipValidator {
pub(crate) fn new(known_peers: Arc<Mutex<KnownPeers<B>>>, network: Arc<N>) -> Self {
Self {
votes_topic: votes_topic::<B>(),
justifs_topic: proofs_topic::<B>(),
gossip_filter: RwLock::new(Filter::new()),
next_rebroadcast: Mutex::new(Instant::now() + REBROADCAST_AFTER),
known_peers,
report_sender: tx,
};
(val, rx)
network,
}
}

/// Update gossip validator filter.
Expand All @@ -265,9 +260,15 @@ where
);
self.gossip_filter.write().update(filter);
}
}

impl<B, N> GossipValidator<B, N>
where
B: Block,
N: NetworkPeers,
{
fn report(&self, who: PeerId, cost_benefit: ReputationChange) {
let _ = self.report_sender.unbounded_send(PeerReport { who, cost_benefit });
self.network.report_peer(who, cost_benefit);
}

fn validate_vote(
Expand Down Expand Up @@ -370,9 +371,10 @@ where
}
}

impl<B> Validator<B> for GossipValidator<B>
impl<B, N> Validator<B> for GossipValidator<B, N>
where
B: Block,
N: NetworkPeers + Send + Sync,
{
fn peer_disconnected(&self, _context: &mut dyn ValidatorContext<B>, who: &PeerId) {
self.known_peers.lock().remove(who);
Expand Down Expand Up @@ -495,6 +497,95 @@ pub(crate) mod tests {
};
use sp_keystore::{testing::MemoryKeystore, Keystore};

pub(crate) struct TestNetwork {
report_sender: futures::channel::mpsc::UnboundedSender<PeerReport>,
}

impl TestNetwork {
pub fn new() -> (Self, futures::channel::mpsc::UnboundedReceiver<PeerReport>) {
let (tx, rx) = futures::channel::mpsc::unbounded();

(Self { report_sender: tx }, rx)
}
}

impl NetworkPeers for TestNetwork {
fn set_authorized_peers(&self, _: std::collections::HashSet<PeerId>) {
todo!()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: what about unreachable!()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust analyzer filed them out automatically. In the end it is a panic :D

Copy link
Member

@davxy davxy Apr 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. Functionally is the same soup, but maybe not the semantic. I was suggesting it just because I'd use todo! macro where I intend to eventually add something in the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it is you ser :)

c5f616b

}

fn set_authorized_only(&self, _: bool) {
todo!()
}

fn add_known_address(&self, _: PeerId, _: sc_network::Multiaddr) {
todo!()
}

fn report_peer(&self, peer_id: PeerId, cost_benefit: ReputationChange) {
let _ = self.report_sender.unbounded_send(PeerReport { who: peer_id, cost_benefit });
}

fn peer_reputation(&self, _: &PeerId) -> i32 {
todo!()
}

fn disconnect_peer(&self, _: PeerId, _: sc_network::ProtocolName) {
todo!()
}

fn accept_unreserved_peers(&self) {
todo!()
}

fn deny_unreserved_peers(&self) {
todo!()
}

fn add_reserved_peer(
&self,
_: sc_network::config::MultiaddrWithPeerId,
) -> Result<(), String> {
todo!()
}

fn remove_reserved_peer(&self, _: PeerId) {
todo!()
}

fn set_reserved_peers(
&self,
_: sc_network::ProtocolName,
_: std::collections::HashSet<sc_network::Multiaddr>,
) -> Result<(), String> {
todo!()
}

fn add_peers_to_reserved_set(
&self,
_: sc_network::ProtocolName,
_: std::collections::HashSet<sc_network::Multiaddr>,
) -> Result<(), String> {
todo!()
}

fn remove_peers_from_reserved_set(
&self,
_: sc_network::ProtocolName,
_: Vec<PeerId>,
) -> Result<(), String> {
todo!()
}

fn sync_num_connected(&self) -> usize {
todo!()
}

fn peer_role(&self, _: PeerId, _: Vec<u8>) -> Option<sc_network::ObservedRole> {
todo!()
}
}

struct TestContext;
impl<B: sp_runtime::traits::Block> ValidatorContext<B> for TestContext {
fn broadcast_topic(&mut self, _topic: B::Hash, _force: bool) {
Expand Down Expand Up @@ -560,8 +651,13 @@ pub(crate) mod tests {
fn should_validate_messages() {
let keys = vec![Keyring::<AuthorityId>::Alice.public()];
let validator_set = ValidatorSet::<AuthorityId>::new(keys.clone(), 0).unwrap();
let (gv, mut report_stream) =
GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));

let (network, mut report_stream) = TestNetwork::new();

let gv = GossipValidator::<Block, _>::new(
Arc::new(Mutex::new(KnownPeers::new())),
Arc::new(network),
);
let sender = PeerId::random();
let mut context = TestContext;

Expand All @@ -574,7 +670,7 @@ pub(crate) mod tests {
let mut expected_report = PeerReport { who: sender, cost_benefit: expected_cost };
let res = gv.validate(&mut context, &sender, bad_encoding);
assert!(matches!(res, ValidationResult::Discard));
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);

// verify votes validation

Expand All @@ -585,14 +681,14 @@ pub(crate) mod tests {
let res = gv.validate(&mut context, &sender, &encoded);
assert!(matches!(res, ValidationResult::Discard));
// nothing reported
assert!(report_stream.try_recv().is_err());
assert!(report_stream.try_next().is_err());

gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set });
// nothing in cache first time
let res = gv.validate(&mut context, &sender, &encoded);
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
expected_report.cost_benefit = benefit::VOTE_MESSAGE;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);

// reject vote, voter not in validator set
let mut bad_vote = vote.clone();
Expand All @@ -601,7 +697,7 @@ pub(crate) mod tests {
let res = gv.validate(&mut context, &sender, &bad_vote);
assert!(matches!(res, ValidationResult::Discard));
expected_report.cost_benefit = cost::UNKNOWN_VOTER;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);

// reject if the round is not GRANDPA finalized
gv.update_filter(GossipFilterCfg { start: 1, end: 2, validator_set: &validator_set });
Expand All @@ -611,7 +707,7 @@ pub(crate) mod tests {
let res = gv.validate(&mut context, &sender, &encoded);
assert!(matches!(res, ValidationResult::Discard));
expected_report.cost_benefit = cost::FUTURE_MESSAGE;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);

// reject if the round is not live anymore
gv.update_filter(GossipFilterCfg { start: 7, end: 10, validator_set: &validator_set });
Expand All @@ -621,7 +717,7 @@ pub(crate) mod tests {
let res = gv.validate(&mut context, &sender, &encoded);
assert!(matches!(res, ValidationResult::Discard));
expected_report.cost_benefit = cost::OUTDATED_MESSAGE;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);

// now verify proofs validation

Expand All @@ -631,23 +727,23 @@ pub(crate) mod tests {
let res = gv.validate(&mut context, &sender, &encoded_proof);
assert!(matches!(res, ValidationResult::Discard));
expected_report.cost_benefit = cost::OUTDATED_MESSAGE;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);

// accept next proof with good set_id
let proof = dummy_proof(7, &validator_set);
let encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
let res = gv.validate(&mut context, &sender, &encoded_proof);
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
expected_report.cost_benefit = benefit::VALIDATED_PROOF;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);

// accept future proof with good set_id
let proof = dummy_proof(20, &validator_set);
let encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
let res = gv.validate(&mut context, &sender, &encoded_proof);
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
expected_report.cost_benefit = benefit::VALIDATED_PROOF;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);

// reject proof, future set_id
let bad_validator_set = ValidatorSet::<AuthorityId>::new(keys, 1).unwrap();
Expand All @@ -656,7 +752,7 @@ pub(crate) mod tests {
let res = gv.validate(&mut context, &sender, &encoded_proof);
assert!(matches!(res, ValidationResult::Discard));
expected_report.cost_benefit = cost::FUTURE_MESSAGE;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);

// reject proof, bad signatures (Bob instead of Alice)
let bad_validator_set =
Expand All @@ -667,14 +763,17 @@ pub(crate) mod tests {
assert!(matches!(res, ValidationResult::Discard));
expected_report.cost_benefit = cost::INVALID_PROOF;
expected_report.cost_benefit.value += cost::PER_SIGNATURE_CHECKED;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);
}

#[test]
fn messages_allowed_and_expired() {
let keys = vec![Keyring::Alice.public()];
let validator_set = ValidatorSet::<AuthorityId>::new(keys.clone(), 0).unwrap();
let (gv, _) = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
let gv = GossipValidator::<Block, _>::new(
Arc::new(Mutex::new(KnownPeers::new())),
Arc::new(TestNetwork::new().0),
);
gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set });
let sender = sc_network::PeerId::random();
let topic = Default::default();
Expand Down Expand Up @@ -751,7 +850,10 @@ pub(crate) mod tests {
fn messages_rebroadcast() {
let keys = vec![Keyring::Alice.public()];
let validator_set = ValidatorSet::<AuthorityId>::new(keys.clone(), 0).unwrap();
let (gv, _) = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
let gv = GossipValidator::<Block, _>::new(
Arc::new(Mutex::new(KnownPeers::new())),
Arc::new(TestNetwork::new().0),
);
gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set });
let sender = sc_network::PeerId::random();
let topic = Default::default();
Expand Down
Loading
Loading