Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
feat: add push cert command to node
Browse files Browse the repository at this point in the history
Signed-off-by: Jawad Tariq <sjcool420@hotmail.co.uk>
JDawg287 committed Oct 30, 2023

Unverified

The committer email address is not verified.
1 parent 6533bec commit 4925ec8
Showing 7 changed files with 280 additions and 2 deletions.
3 changes: 3 additions & 0 deletions crates/topos/src/components/node/commands.rs
Original file line number Diff line number Diff line change
@@ -4,10 +4,12 @@ use clap::{Args, Subcommand};

mod init;
mod peer_id;
mod push_certificate;
mod up;

pub(crate) use init::Init;
pub(crate) use peer_id::PeerId;
pub(crate) use push_certificate::PushCertificate;
pub(crate) use up::Up;

/// Utility to manage your nodes in the Topos network
@@ -32,6 +34,7 @@ pub(crate) enum NodeCommands {
Up(Box<Up>),
Init(Box<Init>),
PeerId(Box<PeerId>),
PushCertificate(PushCertificate),
}

#[cfg(test)]
22 changes: 22 additions & 0 deletions crates/topos/src/components/node/commands/push_certificate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use clap::Args;

use crate::options::input_format::InputFormat;

#[derive(Args, Debug)]
pub(crate) struct PushCertificate {
#[arg(short, long="format", value_enum, default_value_t = InputFormat::Plain)]
pub(crate) format: InputFormat,

/// Global timeout for the command
#[arg(short, long = "timeout", default_value_t = 60)]
pub(crate) timeout: u64,

/// Seconds to wait before asserting the broadcast
#[arg(long = "timeout-broadcast", default_value_t = 30)]
pub(crate) timeout_broadcast: u64,

/// The node list to be used, can be a file path or a comma separated list of Uri. If
/// not provided, stdin is listened.
#[arg(short, long = "nodes", env = "TARGET_NODES_PATH")]
pub(crate) nodes: Option<String>,
}
24 changes: 24 additions & 0 deletions crates/topos/src/components/node/mod.rs
Original file line number Diff line number Diff line change
@@ -220,6 +220,30 @@ pub(crate) async fn handle_command(

Ok(())
}
Some(NodeCommands::PushCertificate(cmd)) => {
match services::push_certificate::check_delivery(
cmd.timeout_broadcast,
cmd.format,
cmd.nodes,
cmd.timeout,
)
.await
.map_err(Box::<dyn std::error::Error>::from)
{
Err(_) => {
error!("Check failed due to timeout");
std::process::exit(1);
}
Ok(Err(errors)) => {
error!("Check failed due to errors: {:?}", errors);
std::process::exit(1);
}
_ => {
info!("Check passed");
Ok(())
}
}
}
None => Ok(()),
}
}
2 changes: 2 additions & 0 deletions crates/topos/src/components/node/services.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub(crate) mod push_certificate;

use crate::config::sequencer::SequencerConfig;
use crate::config::tce::TceConfig;
use crate::edge::CommandConfig;
205 changes: 205 additions & 0 deletions crates/topos/src/components/node/services/push_certificate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
use futures::{future::join_all, StreamExt};
use rand::seq::SliceRandom;
use serde::Deserialize;
use std::time::Duration;
use std::{
fs::File,
io::{self, Read},
path::Path,
};
use tokio::time::error::Elapsed;
use tonic::transport::Uri;
use topos_core::{
api::grpc::{
shared::v1::checkpoints::TargetCheckpoint,
tce::v1::{
api_service_client::ApiServiceClient,
console_service_client::ConsoleServiceClient,
watch_certificates_request::OpenStream,
watch_certificates_response::{CertificatePushed, Event},
StatusRequest, SubmitCertificateRequest,
},
},
uci::{Certificate, CERTIFICATE_ID_LENGTH, SUBNET_ID_LENGTH},
};
use tracing::{debug, info};

use crate::options::input_format::{InputFormat, Parser};

pub(crate) async fn check_delivery(
timeout_broadcast: u64,
format: InputFormat,
peers: Option<String>,
timeout: u64,
) -> Result<Result<(), Vec<String>>, Elapsed> {
tokio::time::timeout(Duration::from_secs(timeout), async move {
info!("peers: {:?}", peers);
let peers: Vec<Uri> = format
.parse(NodeList(peers))
.map_err(|_| vec![format!("Unable to parse node list")])?
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, _>>()
.map_err(|_| vec![format!("Unable to parse node list")])?;

let random_peer: Uri = peers
.choose(&mut rand::thread_rng())
.ok_or_else(|| {
vec![format!(
"Unable to select a random peer from the list: {peers:?}"
)]
})?
.try_into()
.map_err(|_| vec![format!("Unable to parse the peer address")])?;

let pushed_certificate = Certificate::new_with_default_fields(
[0u8; CERTIFICATE_ID_LENGTH],
[1u8; SUBNET_ID_LENGTH].into(),
&[[2u8; SUBNET_ID_LENGTH].into()],
)
.map_err(|_| vec![format!("Unable to create the certificate")])?;

let certificate_id = pushed_certificate.id;
let mut join_handlers = Vec::new();

// check that every nodes delivered the certificate
for peer in peers {
join_handlers.push(tokio::spawn(async move {
let peer_string = peer.clone();
let mut client = ConsoleServiceClient::connect(peer_string.clone())
.await
.map_err(|_| (peer_string.clone(), "Unable to connect to the api console"))?;

let result = client.status(StatusRequest {}).await.map_err(|_| {
(
peer_string.clone(),
"Unable to get the status from the api console",
)
})?;

let status = result.into_inner();
if !status.has_active_sample {
return Err((peer_string, "didn't succeed in the sample phase"));
}

let mut client = ApiServiceClient::connect(peer_string.clone())
.await
.map_err(|_| (peer_string.clone(), "Unable to connect to the TCE api"))?;

let in_stream = async_stream::stream! {
yield OpenStream {
target_checkpoint: Some(TargetCheckpoint {
target_subnet_ids: vec![[2u8; SUBNET_ID_LENGTH].into()],
positions: vec![]
}),
source_checkpoint: None
}.into()
};

let response = client.watch_certificates(in_stream).await.map_err(|_| {
(
peer_string.clone(),
"Unable to execute the watch_certificates on TCE api",
)
})?;
let mut resp_stream = response.into_inner();
async move {
while let Some(received) = resp_stream.next().await {
let received = received.unwrap();
if let Some(Event::CertificatePushed(CertificatePushed {
certificate: Some(certificate),
..
})) = received.event
{
// unwrap is safe because we are sure that the certificate is present
if certificate_id == certificate.id.unwrap() {
debug!("Received the certificate on {}", peer_string);
return Ok(());
}
}
}

Err((peer_string.clone(), "didn't receive any certificate"))
}
.await
}));
}

let mut client = ApiServiceClient::connect(random_peer.clone())
.await
.map_err(|_| vec![format!("Unable to connect to the TCE api on {random_peer}")])?;

// submit a certificate to one node
_ = client
.submit_certificate(SubmitCertificateRequest {
certificate: Some(pushed_certificate.into()),
})
.await
.map_err(|_| {
vec![format!(
"Unable to submit the certificate to the TCE api on {random_peer}"
)]
})?;

tokio::time::sleep(Duration::from_secs(timeout_broadcast)).await;
let mut errors = vec![];

join_all(join_handlers)
.await
.iter()
.for_each(|result| match result {
Err(_) => {
errors.push("Unable to properly execute command".to_string());
}
Ok(Err((peer, error))) => {
errors.push(format!("{peer} {error}"));
}
_ => {}
});

if errors.is_empty() {
Ok(())
} else {
Err(errors)
}
})
.await
.map_err(|error| {
info!("Timeout reached: {:?}", error);
error
})
}

pub(crate) struct NodeList(pub(crate) Option<String>);

#[derive(Deserialize)]
struct FileNodes {
nodes: Vec<String>,
}

impl Parser<NodeList> for InputFormat {
type Result = Result<Vec<String>, io::Error>;

fn parse(&self, NodeList(input): NodeList) -> Self::Result {
let mut input_string = String::new();
_ = match input {
Some(path) if Path::new(&path).is_file() => {
File::open(path)?.read_to_string(&mut input_string)?
}
Some(string) => {
input_string = string;
0
}
None => io::stdin().read_to_string(&mut input_string)?,
};

match self {
InputFormat::Json => Ok(serde_json::from_str::<FileNodes>(&input_string)?.nodes),
InputFormat::Plain => Ok(input_string
.trim()
.split(&[',', '\n'])
.map(|s| s.trim().to_string())
.collect()),
}
}
}
4 changes: 2 additions & 2 deletions crates/topos/tests/push-certificate.rs
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ use topos_test_sdk::tce::create_network;
#[test]
fn help_display() -> Result<(), Box<dyn std::error::Error>> {
let mut cmd = Command::cargo_bin("topos")?;
cmd.arg("tce").arg("push-certificate").arg("-h");
cmd.arg("node").arg("push-certificate").arg("-h");

let output = cmd.assert().success();

@@ -56,7 +56,7 @@ async fn assert_delivery() -> Result<(), Box<dyn std::error::Error>> {
cmd.env("TOPOS_LOG_FORMAT", "json");
cmd.env("RUST_LOG", "topos=debug");

cmd.arg("tce")
cmd.arg("node")
.arg("push-certificates")
.args(["-f", "plain"])
.arg("-n")
22 changes: 22 additions & 0 deletions crates/topos/tests/snapshots/push_certificate__help_display.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
---
source: crates/topos/tests/push-certificate.rs
expression: result
---
Usage: topos tce push-certificate [OPTIONS]

Options:
-f, --format <FORMAT>
[default: plain] [possible values: json, plain]
-v, --verbose...
Defines the verbosity level
--home <HOME>
Home directory for the configuration [env: TOPOS_HOME=] [default: /home/runner/.config/topos]
-t, --timeout <TIMEOUT>
Global timeout for the command [default: 60]
--timeout-broadcast <TIMEOUT_BROADCAST>
Seconds to wait before asserting the broadcast [default: 30]
-n, --nodes <NODES>
The node list to be used, can be a file path or a comma separated list of Uri. If not provided, stdin is listened [env: TARGET_NODES_PATH=]
-h, --help
Print help

0 comments on commit 4925ec8

Please sign in to comment.