From 865d37574c161b5b83530343fb9b3bd88246c658 Mon Sep 17 00:00:00 2001 From: Marko Atanasievski Date: Tue, 28 Mar 2023 16:45:46 +0200 Subject: [PATCH] feat: add push certificate instrumentation event (#195) --- Cargo.lock | 4 + .../Cargo.toml | 2 + .../src/subnet_runtime_proxy.rs | 106 ++++++++++-------- .../tests/subnet_contract.rs | 12 +- crates/topos-sequencer-tce-proxy/Cargo.toml | 1 + crates/topos-sequencer-tce-proxy/src/lib.rs | 25 ++++- crates/topos-sequencer-types/src/lib.rs | 11 +- crates/topos-sequencer/Cargo.toml | 1 + crates/topos-sequencer/src/app_context.rs | 27 +++-- 9 files changed, 125 insertions(+), 64 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 20e2ad4b3..e3e18e678 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5635,6 +5635,7 @@ version = "0.1.0" dependencies = [ "bincode", "hex", + "opentelemetry", "rpassword", "serde", "tokio", @@ -5707,6 +5708,7 @@ dependencies = [ "env_logger 0.10.0", "fs_extra", "hex", + "opentelemetry", "rand 0.8.5", "rand_core 0.6.4", "rand_distr", @@ -5725,6 +5727,7 @@ dependencies = [ "topos-sequencer-types", "topos-test-sdk", "tracing", + "tracing-opentelemetry", "tracing-subscriber", "web3", ] @@ -5741,6 +5744,7 @@ dependencies = [ "futures", "hex", "hyper", + "opentelemetry", "rstest 0.16.0", "serde", "serde_json", diff --git a/crates/topos-sequencer-subnet-runtime-proxy/Cargo.toml b/crates/topos-sequencer-subnet-runtime-proxy/Cargo.toml index b916b4fe3..c54dc79aa 100644 --- a/crates/topos-sequencer-subnet-runtime-proxy/Cargo.toml +++ b/crates/topos-sequencer-subnet-runtime-proxy/Cargo.toml @@ -24,6 +24,8 @@ tokio = { workspace = true, features = [ ] } tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] } tracing.workspace = true +tracing-opentelemetry.workspace = true +opentelemetry.workspace = true topos-core = { workspace = true, features = ["uci"] } topos-sequencer-subnet-client = { package = "topos-sequencer-subnet-client", path = "../topos-sequencer-subnet-client" } diff --git a/crates/topos-sequencer-subnet-runtime-proxy/src/subnet_runtime_proxy.rs b/crates/topos-sequencer-subnet-runtime-proxy/src/subnet_runtime_proxy.rs index d6b648bb7..8a4347d42 100644 --- a/crates/topos-sequencer-subnet-runtime-proxy/src/subnet_runtime_proxy.rs +++ b/crates/topos-sequencer-subnet-runtime-proxy/src/subnet_runtime_proxy.rs @@ -2,6 +2,7 @@ //! use crate::{Error, SubnetRuntimeProxyConfig}; +use opentelemetry::trace::FutureExt; use std::fmt::{Debug, Formatter}; use std::sync::Arc; use tokio::sync::Mutex; @@ -11,7 +12,8 @@ use topos_core::api::checkpoints::TargetStreamPosition; use topos_core::uci::{Certificate, SubnetId}; use topos_sequencer_subnet_client::{self, SubnetClient}; use topos_sequencer_types::{SubnetRuntimeProxyCommand, SubnetRuntimeProxyEvent}; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, info_span, warn, Instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt; /// Arbitrary tick duration for fetching new finalized blocks const SUBNET_BLOCK_TIME: Duration = Duration::new(2, 0); @@ -192,13 +194,13 @@ impl SubnetRuntimeProxy { runtime_proxy_config: &SubnetRuntimeProxyConfig, subnet_client: &mut SubnetClient, cert: &Certificate, - cert_position: u64, + position: u64, ) -> Result { debug!( "Pushing certificate with id {} to target subnet {}, tcc {}", cert.id, runtime_proxy_config.subnet_id, runtime_proxy_config.subnet_contract_address, ); - let receipt = subnet_client.push_certificate(cert, cert_position).await?; + let receipt = subnet_client.push_certificate(cert, position).await?; debug!("Push certificate transaction receipt: {:?}", &receipt); Ok("0x".to_string() + &hex::encode(receipt.transaction_hash)) } @@ -211,55 +213,71 @@ impl SubnetRuntimeProxy { match mb_cmd { Some(cmd) => match cmd { // Process certificate retrieved from TCE node - SubnetRuntimeProxyCommand::OnNewDeliveredCertificate((cert, cert_position)) => { - info!( + SubnetRuntimeProxyCommand::OnNewDeliveredCertificate { + certificate, + position, + ctx, + } => { + let span_subnet_runtime_proxy = info_span!("Subnet Runtime Proxy"); + span_subnet_runtime_proxy.set_parent(ctx); + + async { + info!( "Processing certificate received from TCE, cert_id={}", - &cert.id - ); + &certificate.id + ); - // Verify certificate signature - // Well known subnet id is public key for certificate verification - // Public key of secp256k1 is 33 bytes, we are keeping last 32 bytes as subnet id - // Add manually first byte 0x02 - let public_key = cert.source_subnet_id.to_secp256k1_public_key(); + // Verify certificate signature + // Well known subnet id is public key for certificate verification + // Public key of secp256k1 is 33 bytes, we are keeping last 32 bytes as subnet id + // Add manually first byte 0x02 + let public_key = certificate.source_subnet_id.to_secp256k1_public_key(); - // Verify signature of the certificate - match topos_crypto::signatures::verify( - &public_key, - cert.get_payload().as_slice(), - cert.signature.as_slice(), - ) { - Ok(()) => { - info!("Certificate {} passed verification", cert.id) - } - Err(e) => { - error!("Failed to verify certificate id {}: {e}", cert.id); - return; + // Verify signature of the certificate + match topos_crypto::signatures::verify( + &public_key, + certificate.get_payload().as_slice(), + certificate.signature.as_slice(), + ) { + Ok(()) => { + info!("Certificate {} passed verification", certificate.id) + } + Err(e) => { + error!("Failed to verify certificate id {}: {e}", certificate.id); + return; + } } - } - // Push the Certificate to the ToposCore contract on the target subnet - match SubnetRuntimeProxy::push_certificate( - runtime_proxy_config, - subnet_client, - &cert, - cert_position, - ) - .await - { - Ok(tx_hash) => { - debug!( - "Successfully pushed the Certificate {} to target subnet with tx hash {}", - &cert.id, &tx_hash - ); - } - Err(e) => { - error!( + let span_push_certificate = info_span!("Subnet push certificate call"); + + // Push the Certificate to the ToposCore contract on the target subnet + match SubnetRuntimeProxy::push_certificate( + runtime_proxy_config, + subnet_client, + &certificate, + position, + ) + .with_context(span_push_certificate.context()) + .instrument(span_push_certificate) + .await + { + Ok(tx_hash) => { + debug!( + "Successfully pushed the Certificate {} to target subnet with tx hash {}", + &certificate.id, &tx_hash + ); + } + Err(e) => { + error!( "Failed to push the Certificate {} to target subnet: {e}", - &cert.id - ); + &certificate.id + ); + } } } + .with_context(span_subnet_runtime_proxy.context()) + .instrument(span_subnet_runtime_proxy) + .await } }, _ => { diff --git a/crates/topos-sequencer-subnet-runtime-proxy/tests/subnet_contract.rs b/crates/topos-sequencer-subnet-runtime-proxy/tests/subnet_contract.rs index 3fc60470a..68080f70d 100644 --- a/crates/topos-sequencer-subnet-runtime-proxy/tests/subnet_contract.rs +++ b/crates/topos-sequencer-subnet-runtime-proxy/tests/subnet_contract.rs @@ -10,7 +10,8 @@ use test_log::test; use tokio::sync::oneshot; use topos_core::uci::{Certificate, CertificateId}; use topos_sequencer_types::SubnetId; -use tracing::{error, info}; +use tracing::{error, info, Span}; +use tracing_opentelemetry::OpenTelemetrySpanExt; use web3::contract::tokens::Tokenize; use web3::ethabi::Token; use web3::transports::Http; @@ -584,10 +585,11 @@ async fn test_subnet_certificate_push_call( info!("Sending mock certificate to subnet smart contract..."); if let Err(e) = runtime_proxy_worker.eval( - topos_sequencer_types::SubnetRuntimeProxyCommand::OnNewDeliveredCertificate(( - mock_cert.clone(), - 0, - )), + topos_sequencer_types::SubnetRuntimeProxyCommand::OnNewDeliveredCertificate { + certificate: mock_cert.clone(), + position: 0, + ctx: Span::current().context(), + }, ) { error!("Failed to send OnNewDeliveredTxns command: {}", e); return Err(Box::from(e)); diff --git a/crates/topos-sequencer-tce-proxy/Cargo.toml b/crates/topos-sequencer-tce-proxy/Cargo.toml index 24fdc8c59..dcdfcfac1 100644 --- a/crates/topos-sequencer-tce-proxy/Cargo.toml +++ b/crates/topos-sequencer-tce-proxy/Cargo.toml @@ -32,6 +32,7 @@ tracing-subscriber = { workspace = true, features = ["env-filter", "json", "ansi tracing.workspace = true uuid.workspace = true tracing-opentelemetry.workspace = true +opentelemetry.workspace = true [dev-dependencies] topos-tce-transport = { path = "../topos-tce-transport" } diff --git a/crates/topos-sequencer-tce-proxy/src/lib.rs b/crates/topos-sequencer-tce-proxy/src/lib.rs index 8afd5cb82..8675aa775 100644 --- a/crates/topos-sequencer-tce-proxy/src/lib.rs +++ b/crates/topos-sequencer-tce-proxy/src/lib.rs @@ -3,6 +3,7 @@ //! use crate::Error::InvalidChannelError; +use opentelemetry::trace::FutureExt; use tokio::sync::{mpsc, oneshot}; use tokio_stream::StreamExt; use tonic::transport::channel; @@ -17,7 +18,7 @@ use topos_core::{ uci::{Certificate, SubnetId}, }; use topos_sequencer_types::*; -use tracing::{error, info, info_span, warn, Instrument}; +use tracing::{error, info, info_span, warn, Instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; const CERTIFICATE_OUTBOUND_CHANNEL_SIZE: usize = 100; @@ -500,7 +501,10 @@ impl TceProxyWorker { if let Err(e) = tce_client.send_certificate(*cert).await { error!("Failure on the submission of the Certificate to the TCE client: {e}"); } - }.instrument(span).await; + } + .with_context(span.context()) + .instrument(span) + .await; } TceProxyCommand::Shutdown(sender) => { info!("Received TceProxyCommand::Shutdown command, closing tce client..."); @@ -512,12 +516,23 @@ impl TceProxyWorker { } } } + // Process certificates received from the TCE node Some((cert, target_stream_position)) = receiving_certificate_stream.next() => { - info!("Received certificate from TCE {:?}, target stream position {}", cert, target_stream_position.position); - if let Err(e) = evt_sender.send(TceProxyEvent::NewDeliveredCerts(vec![(cert, target_stream_position.position)])).await { - error!("Unable to send NewDeliveredCerts event {e}"); + let span = info_span!("PushCertificate"); + async { + info!("Received certificate from TCE {:?}, target stream position {}", cert, target_stream_position.position); + if let Err(e) = evt_sender.send(TceProxyEvent::NewDeliveredCerts { + certificates: vec![(cert, target_stream_position.position)], + ctx: Span::current().context()} + ) + .await { + error!("Unable to send NewDeliveredCerts event {e}"); + } } + .with_context(span.context()) + .instrument(span) + .await; } } } diff --git a/crates/topos-sequencer-types/src/lib.rs b/crates/topos-sequencer-types/src/lib.rs index 9dfa510d9..988916fad 100644 --- a/crates/topos-sequencer-types/src/lib.rs +++ b/crates/topos-sequencer-types/src/lib.rs @@ -89,7 +89,11 @@ pub enum SubnetRuntimeProxyEvent { #[derive(Debug)] pub enum SubnetRuntimeProxyCommand { /// Upon receiving a new delivered Certificate from the TCE - OnNewDeliveredCertificate((Certificate, u64)), + OnNewDeliveredCertificate { + certificate: Certificate, + position: u64, + ctx: Context, + }, } #[derive(Debug)] @@ -107,7 +111,10 @@ pub enum TceProxyCommand { #[derive(Debug, Clone)] pub enum TceProxyEvent { /// New delivered certificate (and its position) fetched from the TCE network - NewDeliveredCerts(Vec<(Certificate, u64)>), + NewDeliveredCerts { + certificates: Vec<(Certificate, u64)>, + ctx: Context, + }, /// Failed watching certificates channel /// Requires restart of sequencer tce proxy WatchCertificatesChannelFailed, diff --git a/crates/topos-sequencer/Cargo.toml b/crates/topos-sequencer/Cargo.toml index 5edcfd7e6..4e95a7f91 100644 --- a/crates/topos-sequencer/Cargo.toml +++ b/crates/topos-sequencer/Cargo.toml @@ -13,6 +13,7 @@ tokio = { workspace = true, features = ["full"] } tracing-subscriber = {workspace = true, features = ["fmt", "std", "env-filter",]} tracing.workspace = true tracing-opentelemetry.workspace = true +opentelemetry.workspace = true topos-crypto.workspace = true topos-core = { workspace = true, features = ["uci"] } diff --git a/crates/topos-sequencer/src/app_context.rs b/crates/topos-sequencer/src/app_context.rs index ff5f98448..9af3fa626 100644 --- a/crates/topos-sequencer/src/app_context.rs +++ b/crates/topos-sequencer/src/app_context.rs @@ -2,11 +2,12 @@ //! Application logic glue //! use crate::SequencerConfiguration; +use opentelemetry::trace::FutureExt; use topos_sequencer_certification::CertificationWorker; use topos_sequencer_subnet_runtime_proxy::SubnetRuntimeProxyWorker; use topos_sequencer_tce_proxy::TceProxyWorker; use topos_sequencer_types::*; -use tracing::{debug, error, info, info_span, warn, Instrument}; +use tracing::{debug, error, info, info_span, warn, Instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; /// Top-level transducer sequencer app context & driver (alike) @@ -87,6 +88,7 @@ impl AppContext { cert: Box::new(cert), ctx: span.context(), }) + .with_context(span.context()) .instrument(span) .await { @@ -98,13 +100,22 @@ impl AppContext { async fn on_tce_proxy_event(&mut self, evt: TceProxyEvent) { match evt { - TceProxyEvent::NewDeliveredCerts(certs) => { - // New certificates acquired from TCE - for cert in certs { - self.runtime_proxy_worker - .eval(SubnetRuntimeProxyCommand::OnNewDeliveredCertificate(cert)) - .expect("Send cross transactions to the runtime"); - } + TceProxyEvent::NewDeliveredCerts { certificates, ctx } => { + let span = info_span!("Sequencer app context"); + span.set_parent(ctx); + + span.in_scope(|| { + // New certificates acquired from TCE + for (cert, cert_position) in certificates { + self.runtime_proxy_worker + .eval(SubnetRuntimeProxyCommand::OnNewDeliveredCertificate { + certificate: cert, + position: cert_position, + ctx: Span::current().context(), + }) + .expect("Propagate new delivered Certificate to the runtime"); + } + }); } TceProxyEvent::WatchCertificatesChannelFailed => { warn!("Restarting tce proxy worker...");