Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
feat: adding filter on message when non validator (#405)
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <simon.paitrault@gmail.com>
  • Loading branch information
Freyskeyd authored Dec 14, 2023
1 parent b424aa9 commit b096482
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 17 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion crates/topos-p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::convert::Infallible;
pub(crate) use behaviour::Behaviour;
pub use client::NetworkClient;
pub use client::RetryPolicy;
pub(crate) use command::Command;
pub use command::Command;
pub use event::Event;
use http::Request;
use http::Response;
Expand Down Expand Up @@ -102,6 +102,10 @@ pub mod utils {
}

impl GrpcOverP2P {
pub fn new(proxy_sender: mpsc::Sender<Command>) -> Self {
Self { proxy_sender }
}

pub async fn create<C, S>(&self, peer: PeerId) -> Result<C::Output, P2PError>
where
C: GrpcClient<Output = C>,
Expand Down
4 changes: 1 addition & 3 deletions crates/topos-p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,7 @@ impl<'a> NetworkBuilder<'a> {
.build();
let (shutdown_channel, shutdown) = mpsc::channel::<oneshot::Sender<()>>(1);

let grpc_over_p2p = GrpcOverP2P {
proxy_sender: command_sender.clone(),
};
let grpc_over_p2p = GrpcOverP2P::new(command_sender.clone());

Ok((
NetworkClient {
Expand Down
15 changes: 15 additions & 0 deletions crates/topos-tce-broadcast/src/double_echo/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
//! Everything related to the double_echo implementation
//!
//! ## Messages and roles
//!
//! In order to prevent many non validator's messages to be published on the
//! gossip topics, messages are filtered when the [`DoubleEcho`] is producing events.
//!
//! For `validator` nothing changed, for `fullnode` and `sentry` node, their `Echo` and
//! `Ready` messages are filtered, they still produce `Gossip` messages tho.
//!
//! It doesn't mean that a `fullnode` will stop propagate messages from
//! `validators`, it only prevents a non validator to publish messages that will
//! be ignored by others. `fullnode` still consumes Echo and Ready coming from
//! validators and use those messages to build their state.
use crate::TaskStatus;
use crate::{DoubleEchoCommand, SubscriptionsView};
use std::collections::HashSet;
Expand Down
3 changes: 1 addition & 2 deletions crates/topos-tce-transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ pub enum ProtocolEvents {
AlreadyDelivered {
certificate_id: CertificateId,
},
/// Emitted to get peers list, expected that Commands.ApplyPeers will come as reaction
NeedPeers,

/// (pb.Broadcast)
Broadcast {
certificate_id: CertificateId,
Expand Down
2 changes: 2 additions & 0 deletions crates/topos-tce/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ axum = "0.6.18"
axum-prometheus = "0.3.3"

[dev-dependencies]
topos-test-sdk = { path = "../topos-test-sdk/" }
async-stream.workspace = true
async-trait.workspace = true
hyper.workspace = true
libp2p.workspace = true
rand.workspace = true
rand_core.workspace = true
rand_distr.workspace = true
rstest.workspace = true
tonic.workspace = true
tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] }
tracing.workspace = true
Expand Down
5 changes: 4 additions & 1 deletion crates/topos-tce/src/app_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tracing::{error, info, warn};

mod api;
mod network;
mod protocol;
pub(crate) mod protocol;

/// Top-level transducer main app context & driver (alike)
///
Expand All @@ -35,6 +35,7 @@ mod protocol;
/// config+data as input and runs app returning data as output
///
pub struct AppContext {
pub is_validator: bool,
pub events: mpsc::Sender<Events>,
pub tce_cli: ReliableBroadcastClient,
pub network_client: NetworkClient,
Expand All @@ -55,6 +56,7 @@ impl AppContext {

/// Factory
pub fn new(
is_validator: bool,
pending_storage: StorageClient,
tce_cli: ReliableBroadcastClient,
network_client: NetworkClient,
Expand All @@ -65,6 +67,7 @@ impl AppContext {
let (events, receiver) = mpsc::channel(100);
(
Self {
is_validator,
events,
tce_cli,
network_client,
Expand Down
18 changes: 12 additions & 6 deletions crates/topos-tce/src/app_context/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use tce_transport::ProtocolEvents;
use topos_core::api::grpc::tce::v1::{double_echo_request, DoubleEchoRequest, Echo, Gossip, Ready};
use tracing::{debug, error, info};
use tracing::{debug, error, info, warn};

use crate::events::Events;
use crate::AppContext;
Expand Down Expand Up @@ -43,7 +43,7 @@ impl AppContext {
certificate_id,
signature,
validator_id,
} => {
} if self.is_validator => {
// Send echo message
let request = DoubleEchoRequest {
request: Some(double_echo_request::Request::Echo(Echo {
Expand All @@ -66,7 +66,7 @@ impl AppContext {
certificate_id,
signature,
validator_id,
} => {
} if self.is_validator => {
let request = DoubleEchoRequest {
request: Some(double_echo_request::Request::Ready(Ready {
certificate_id: Some(certificate_id.into()),
Expand All @@ -83,10 +83,16 @@ impl AppContext {
error!("Unable to send Ready due to error: {e}");
}
}

evt => {
debug!("Unhandled event: {:?}", evt);
ProtocolEvents::BroadcastFailed { certificate_id } => {
warn!("Broadcast failed for certificate {certificate_id}")
}
ProtocolEvents::AlreadyDelivered { certificate_id } => {
info!("Certificate {certificate_id} already delivered")
}
ProtocolEvents::Die => {
error!("The DoubleEcho unexpectedly died, this is unrecoverable")
}
_ => {}
}
}
}
17 changes: 13 additions & 4 deletions crates/topos-tce/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use config::TceConfiguration;
use futures::StreamExt;
use opentelemetry::global;
use std::{future::IntoFuture, sync::Arc};
use std::{
future::IntoFuture,
sync::{atomic::AtomicBool, Arc},
};
use tokio::{
spawn,
sync::{broadcast, mpsc},
};
use tokio_stream::wrappers::BroadcastStream;
use tokio_util::sync::CancellationToken;
use topos_core::api::grpc::tce::v1::synchronizer_service_server::SynchronizerServiceServer;
use topos_crypto::messages::MessageSigner;
use topos_crypto::{messages::MessageSigner, validator_id::ValidatorId};
use topos_p2p::{
utils::{local_key_pair, local_key_pair_from_slice},
GrpcContext, GrpcRouter, Multiaddr,
Expand All @@ -24,9 +27,12 @@ use topos_tce_storage::{
};
use topos_tce_synchronizer::SynchronizerService;
use tracing::{debug, warn};

mod app_context;
pub mod config;
pub mod events;
#[cfg(test)]
mod tests;

pub use app_context::AppContext;

Expand All @@ -52,7 +58,8 @@ pub async fn run(
_ => return Err(Box::try_from("Error, no singing key".to_string()).unwrap()),
};

let public_address = message_signer.public_address.to_string();
let validator_id: ValidatorId = message_signer.public_address.into();
let public_address = validator_id.to_string();

warn!("Public node address: {public_address}");

Expand All @@ -70,6 +77,7 @@ pub async fn run(
// Remove myself from the bootnode list
let mut boot_peers = config.boot_peers.clone();
boot_peers.retain(|(p, _)| *p != peer_id);
let is_validator = config.validators.contains(&validator_id);

debug!("Starting the Storage");
let path = if let StorageConfiguration::RocksDB(Some(ref path)) = config.storage {
Expand Down Expand Up @@ -144,7 +152,7 @@ pub async fn run(
let (tce_cli, tce_stream) = ReliableBroadcastClient::new(
ReliableBroadcastConfig {
tce_params: config.tce_params.clone(),
validator_id: message_signer.public_address.into(),
validator_id,
validators: config.validators.clone(),
message_signer,
},
Expand Down Expand Up @@ -182,6 +190,7 @@ pub async fn run(

// setup transport-tce-storage-api connector
let (app_context, _tce_stream) = AppContext::new(
is_validator,
storage_client,
tce_cli,
network_client,
Expand Down
148 changes: 148 additions & 0 deletions crates/topos-tce/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use libp2p::PeerId;
use rstest::{fixture, rstest};
use std::{
collections::{HashMap, HashSet},
future::IntoFuture,
net::SocketAddr,
sync::Arc,
};
use tce_transport::ProtocolEvents;
use tokio_stream::Stream;
use topos_tce_api::RuntimeEvent;
use topos_tce_gatekeeper::{Gatekeeper, GatekeeperClient};

use tokio::sync::{broadcast, mpsc};
use topos_crypto::messages::MessageSigner;
use topos_p2p::{utils::GrpcOverP2P, NetworkClient};
use topos_tce_broadcast::{ReliableBroadcastClient, ReliableBroadcastConfig};
use topos_tce_storage::{validator::ValidatorStore, StorageClient};
use topos_test_sdk::{
certificates::create_certificate_chain,
constants::{CERTIFICATE_ID_1, SOURCE_SUBNET_ID_1, TARGET_SUBNET_ID_1},
storage::create_validator_store,
tce::public_api::{create_public_api, PublicApiContext},
};

use crate::AppContext;

#[rstest]
#[tokio::test]
async fn non_validator_publish_gossip(
#[future] setup_test: (
AppContext,
mpsc::Receiver<topos_p2p::Command>,
Arc<MessageSigner>,
),
) {
let (mut context, mut p2p_receiver, _) = setup_test.await;
let certificates = create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], 1);
context
.on_protocol_event(ProtocolEvents::Gossip {
cert: certificates[0].certificate.clone(),
})
.await;

assert!(matches!(
p2p_receiver.try_recv(),
Ok(topos_p2p::Command::Gossip { topic, .. }) if topic == "topos_gossip"
));
}

#[rstest]
#[tokio::test]
async fn non_validator_do_not_publish_echo(
#[future] setup_test: (
AppContext,
mpsc::Receiver<topos_p2p::Command>,
Arc<MessageSigner>,
),
) {
let (mut context, mut p2p_receiver, message_signer) = setup_test.await;
context
.on_protocol_event(ProtocolEvents::Echo {
certificate_id: CERTIFICATE_ID_1,
signature: message_signer.sign_message(&[]).ok().unwrap(),
validator_id: message_signer.public_address.into(),
})
.await;

assert!(p2p_receiver.try_recv().is_err(),);
}

#[rstest]
#[tokio::test]
async fn non_validator_do_not_publish_ready(
#[future] setup_test: (
AppContext,
mpsc::Receiver<topos_p2p::Command>,
Arc<MessageSigner>,
),
) {
let (mut context, mut p2p_receiver, message_signer) = setup_test.await;
context
.on_protocol_event(ProtocolEvents::Ready {
certificate_id: CERTIFICATE_ID_1,
signature: message_signer.sign_message(&[]).ok().unwrap(),
validator_id: message_signer.public_address.into(),
})
.await;

assert!(p2p_receiver.try_recv().is_err(),);
}

#[fixture]
async fn setup_test(
#[future] create_validator_store: Arc<ValidatorStore>,
#[future] create_public_api: (PublicApiContext, impl Stream<Item = RuntimeEvent>),
) -> (
AppContext,
mpsc::Receiver<topos_p2p::Command>,
Arc<MessageSigner>,
) {
let validator_store = create_validator_store.await;
let is_validator = false;
let mut message_signer = Arc::new(MessageSigner::new(&[5u8; 32]).unwrap());
let validator_id = message_signer.public_address.into();

let (broadcast_sender, broadcast_receiver) = broadcast::channel(1);

let (tce_cli, tce_stream) = ReliableBroadcastClient::new(
ReliableBroadcastConfig {
tce_params: tce_transport::ReliableBroadcastParams::default(),
validator_id,
validators: HashSet::new(),
message_signer: message_signer.clone(),
},
validator_store.clone(),
broadcast_sender,
)
.await;

let (shutdown_p2p, _) = mpsc::channel(1);
let (p2p_sender, mut p2p_receiver) = mpsc::channel(1);
let grpc_over_p2p = GrpcOverP2P::new(p2p_sender.clone());
let network_client = NetworkClient {
retry_ttl: 10,
local_peer_id: PeerId::random(),
sender: p2p_sender,
grpc_over_p2p,
shutdown_channel: shutdown_p2p,
};

let (api_context, _api_stream) = create_public_api.await;
let api_client = api_context.client;

let (gatekeeper_client, gatekeeper) = Gatekeeper::builder().into_future().await.unwrap();

let (mut context, _) = AppContext::new(
is_validator,
StorageClient::new(validator_store.clone()),
tce_cli,
network_client,
api_client,
gatekeeper_client,
validator_store,
);

(context, p2p_receiver, message_signer)
}
Loading

0 comments on commit b096482

Please sign in to comment.