Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
feat: add push certificate instrumentation event (#195)
Browse files Browse the repository at this point in the history
  • Loading branch information
atanmarko authored Mar 28, 2023
1 parent 4461eae commit 865d375
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 64 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/topos-sequencer-subnet-runtime-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ tokio = { workspace = true, features = [
] }
tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] }
tracing.workspace = true
tracing-opentelemetry.workspace = true
opentelemetry.workspace = true

topos-core = { workspace = true, features = ["uci"] }
topos-sequencer-subnet-client = { package = "topos-sequencer-subnet-client", path = "../topos-sequencer-subnet-client" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//!
use crate::{Error, SubnetRuntimeProxyConfig};
use opentelemetry::trace::FutureExt;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use tokio::sync::Mutex;
Expand All @@ -11,7 +12,8 @@ use topos_core::api::checkpoints::TargetStreamPosition;
use topos_core::uci::{Certificate, SubnetId};
use topos_sequencer_subnet_client::{self, SubnetClient};
use topos_sequencer_types::{SubnetRuntimeProxyCommand, SubnetRuntimeProxyEvent};
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info, info_span, warn, Instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;

/// Arbitrary tick duration for fetching new finalized blocks
const SUBNET_BLOCK_TIME: Duration = Duration::new(2, 0);
Expand Down Expand Up @@ -192,13 +194,13 @@ impl SubnetRuntimeProxy {
runtime_proxy_config: &SubnetRuntimeProxyConfig,
subnet_client: &mut SubnetClient,
cert: &Certificate,
cert_position: u64,
position: u64,
) -> Result<String, Error> {
debug!(
"Pushing certificate with id {} to target subnet {}, tcc {}",
cert.id, runtime_proxy_config.subnet_id, runtime_proxy_config.subnet_contract_address,
);
let receipt = subnet_client.push_certificate(cert, cert_position).await?;
let receipt = subnet_client.push_certificate(cert, position).await?;
debug!("Push certificate transaction receipt: {:?}", &receipt);
Ok("0x".to_string() + &hex::encode(receipt.transaction_hash))
}
Expand All @@ -211,55 +213,71 @@ impl SubnetRuntimeProxy {
match mb_cmd {
Some(cmd) => match cmd {
// Process certificate retrieved from TCE node
SubnetRuntimeProxyCommand::OnNewDeliveredCertificate((cert, cert_position)) => {
info!(
SubnetRuntimeProxyCommand::OnNewDeliveredCertificate {
certificate,
position,
ctx,
} => {
let span_subnet_runtime_proxy = info_span!("Subnet Runtime Proxy");
span_subnet_runtime_proxy.set_parent(ctx);

async {
info!(
"Processing certificate received from TCE, cert_id={}",
&cert.id
);
&certificate.id
);

// Verify certificate signature
// Well known subnet id is public key for certificate verification
// Public key of secp256k1 is 33 bytes, we are keeping last 32 bytes as subnet id
// Add manually first byte 0x02
let public_key = cert.source_subnet_id.to_secp256k1_public_key();
// Verify certificate signature
// Well known subnet id is public key for certificate verification
// Public key of secp256k1 is 33 bytes, we are keeping last 32 bytes as subnet id
// Add manually first byte 0x02
let public_key = certificate.source_subnet_id.to_secp256k1_public_key();

// Verify signature of the certificate
match topos_crypto::signatures::verify(
&public_key,
cert.get_payload().as_slice(),
cert.signature.as_slice(),
) {
Ok(()) => {
info!("Certificate {} passed verification", cert.id)
}
Err(e) => {
error!("Failed to verify certificate id {}: {e}", cert.id);
return;
// Verify signature of the certificate
match topos_crypto::signatures::verify(
&public_key,
certificate.get_payload().as_slice(),
certificate.signature.as_slice(),
) {
Ok(()) => {
info!("Certificate {} passed verification", certificate.id)
}
Err(e) => {
error!("Failed to verify certificate id {}: {e}", certificate.id);
return;
}
}
}

// Push the Certificate to the ToposCore contract on the target subnet
match SubnetRuntimeProxy::push_certificate(
runtime_proxy_config,
subnet_client,
&cert,
cert_position,
)
.await
{
Ok(tx_hash) => {
debug!(
"Successfully pushed the Certificate {} to target subnet with tx hash {}",
&cert.id, &tx_hash
);
}
Err(e) => {
error!(
let span_push_certificate = info_span!("Subnet push certificate call");

// Push the Certificate to the ToposCore contract on the target subnet
match SubnetRuntimeProxy::push_certificate(
runtime_proxy_config,
subnet_client,
&certificate,
position,
)
.with_context(span_push_certificate.context())
.instrument(span_push_certificate)
.await
{
Ok(tx_hash) => {
debug!(
"Successfully pushed the Certificate {} to target subnet with tx hash {}",
&certificate.id, &tx_hash
);
}
Err(e) => {
error!(
"Failed to push the Certificate {} to target subnet: {e}",
&cert.id
);
&certificate.id
);
}
}
}
.with_context(span_subnet_runtime_proxy.context())
.instrument(span_subnet_runtime_proxy)
.await
}
},
_ => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use test_log::test;
use tokio::sync::oneshot;
use topos_core::uci::{Certificate, CertificateId};
use topos_sequencer_types::SubnetId;
use tracing::{error, info};
use tracing::{error, info, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use web3::contract::tokens::Tokenize;
use web3::ethabi::Token;
use web3::transports::Http;
Expand Down Expand Up @@ -584,10 +585,11 @@ async fn test_subnet_certificate_push_call(

info!("Sending mock certificate to subnet smart contract...");
if let Err(e) = runtime_proxy_worker.eval(
topos_sequencer_types::SubnetRuntimeProxyCommand::OnNewDeliveredCertificate((
mock_cert.clone(),
0,
)),
topos_sequencer_types::SubnetRuntimeProxyCommand::OnNewDeliveredCertificate {
certificate: mock_cert.clone(),
position: 0,
ctx: Span::current().context(),
},
) {
error!("Failed to send OnNewDeliveredTxns command: {}", e);
return Err(Box::from(e));
Expand Down
1 change: 1 addition & 0 deletions crates/topos-sequencer-tce-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ tracing-subscriber = { workspace = true, features = ["env-filter", "json", "ansi
tracing.workspace = true
uuid.workspace = true
tracing-opentelemetry.workspace = true
opentelemetry.workspace = true

[dev-dependencies]
topos-tce-transport = { path = "../topos-tce-transport" }
Expand Down
25 changes: 20 additions & 5 deletions crates/topos-sequencer-tce-proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//!
use crate::Error::InvalidChannelError;
use opentelemetry::trace::FutureExt;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::StreamExt;
use tonic::transport::channel;
Expand All @@ -17,7 +18,7 @@ use topos_core::{
uci::{Certificate, SubnetId},
};
use topos_sequencer_types::*;
use tracing::{error, info, info_span, warn, Instrument};
use tracing::{error, info, info_span, warn, Instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;

const CERTIFICATE_OUTBOUND_CHANNEL_SIZE: usize = 100;
Expand Down Expand Up @@ -500,7 +501,10 @@ impl TceProxyWorker {
if let Err(e) = tce_client.send_certificate(*cert).await {
error!("Failure on the submission of the Certificate to the TCE client: {e}");
}
}.instrument(span).await;
}
.with_context(span.context())
.instrument(span)
.await;
}
TceProxyCommand::Shutdown(sender) => {
info!("Received TceProxyCommand::Shutdown command, closing tce client...");
Expand All @@ -512,12 +516,23 @@ impl TceProxyWorker {
}
}
}

// Process certificates received from the TCE node
Some((cert, target_stream_position)) = receiving_certificate_stream.next() => {
info!("Received certificate from TCE {:?}, target stream position {}", cert, target_stream_position.position);
if let Err(e) = evt_sender.send(TceProxyEvent::NewDeliveredCerts(vec![(cert, target_stream_position.position)])).await {
error!("Unable to send NewDeliveredCerts event {e}");
let span = info_span!("PushCertificate");
async {
info!("Received certificate from TCE {:?}, target stream position {}", cert, target_stream_position.position);
if let Err(e) = evt_sender.send(TceProxyEvent::NewDeliveredCerts {
certificates: vec![(cert, target_stream_position.position)],
ctx: Span::current().context()}
)
.await {
error!("Unable to send NewDeliveredCerts event {e}");
}
}
.with_context(span.context())
.instrument(span)
.await;
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions crates/topos-sequencer-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ pub enum SubnetRuntimeProxyEvent {
#[derive(Debug)]
pub enum SubnetRuntimeProxyCommand {
/// Upon receiving a new delivered Certificate from the TCE
OnNewDeliveredCertificate((Certificate, u64)),
OnNewDeliveredCertificate {
certificate: Certificate,
position: u64,
ctx: Context,
},
}

#[derive(Debug)]
Expand All @@ -107,7 +111,10 @@ pub enum TceProxyCommand {
#[derive(Debug, Clone)]
pub enum TceProxyEvent {
/// New delivered certificate (and its position) fetched from the TCE network
NewDeliveredCerts(Vec<(Certificate, u64)>),
NewDeliveredCerts {
certificates: Vec<(Certificate, u64)>,
ctx: Context,
},
/// Failed watching certificates channel
/// Requires restart of sequencer tce proxy
WatchCertificatesChannelFailed,
Expand Down
1 change: 1 addition & 0 deletions crates/topos-sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ tokio = { workspace = true, features = ["full"] }
tracing-subscriber = {workspace = true, features = ["fmt", "std", "env-filter",]}
tracing.workspace = true
tracing-opentelemetry.workspace = true
opentelemetry.workspace = true

topos-crypto.workspace = true
topos-core = { workspace = true, features = ["uci"] }
Expand Down
27 changes: 19 additions & 8 deletions crates/topos-sequencer/src/app_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
//! Application logic glue
//!
use crate::SequencerConfiguration;
use opentelemetry::trace::FutureExt;
use topos_sequencer_certification::CertificationWorker;
use topos_sequencer_subnet_runtime_proxy::SubnetRuntimeProxyWorker;
use topos_sequencer_tce_proxy::TceProxyWorker;
use topos_sequencer_types::*;
use tracing::{debug, error, info, info_span, warn, Instrument};
use tracing::{debug, error, info, info_span, warn, Instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;

/// Top-level transducer sequencer app context & driver (alike)
Expand Down Expand Up @@ -87,6 +88,7 @@ impl AppContext {
cert: Box::new(cert),
ctx: span.context(),
})
.with_context(span.context())
.instrument(span)
.await
{
Expand All @@ -98,13 +100,22 @@ impl AppContext {

async fn on_tce_proxy_event(&mut self, evt: TceProxyEvent) {
match evt {
TceProxyEvent::NewDeliveredCerts(certs) => {
// New certificates acquired from TCE
for cert in certs {
self.runtime_proxy_worker
.eval(SubnetRuntimeProxyCommand::OnNewDeliveredCertificate(cert))
.expect("Send cross transactions to the runtime");
}
TceProxyEvent::NewDeliveredCerts { certificates, ctx } => {
let span = info_span!("Sequencer app context");
span.set_parent(ctx);

span.in_scope(|| {
// New certificates acquired from TCE
for (cert, cert_position) in certificates {
self.runtime_proxy_worker
.eval(SubnetRuntimeProxyCommand::OnNewDeliveredCertificate {
certificate: cert,
position: cert_position,
ctx: Span::current().context(),
})
.expect("Propagate new delivered Certificate to the runtime");
}
});
}
TceProxyEvent::WatchCertificatesChannelFailed => {
warn!("Restarting tce proxy worker...");
Expand Down

0 comments on commit 865d375

Please sign in to comment.