From e870a4c21050961ac565430c9dd1de35a2e46b9d Mon Sep 17 00:00:00 2001 From: Marko Atanasievski Date: Wed, 28 Feb 2024 15:55:07 +0100 Subject: [PATCH 1/9] fix: update certificate generation --- .../src/certification.rs | 35 +++++++++++++++---- .../topos-sequencer-subnet-runtime/src/lib.rs | 5 ++- .../src/proxy.rs | 3 +- 3 files changed, 34 insertions(+), 9 deletions(-) diff --git a/crates/topos-sequencer-subnet-runtime/src/certification.rs b/crates/topos-sequencer-subnet-runtime/src/certification.rs index fa70e160b..4e3bc3959 100644 --- a/crates/topos-sequencer-subnet-runtime/src/certification.rs +++ b/crates/topos-sequencer-subnet-runtime/src/certification.rs @@ -9,8 +9,6 @@ use topos_sequencer_subnet_client::{BlockInfo, SubnetEvent}; pub struct Certification { /// Last known certificate id for subnet pub last_certificate_id: Option, - /// Latest BLOCK_HISTORY_LENGTH blocks kept in memory - pub finalized_blocks: LinkedList, /// Subnet id for which certificates are generated pub subnet_id: SubnetId, /// Type of verifier used @@ -19,6 +17,8 @@ pub struct Certification { signing_key: Vec, /// Optional synchronization from particular block number pub start_block: Option, + /// Latest BLOCK_HISTORY_LENGTH blocks kept in memory + finalized_blocks: LinkedList, } impl Debug for Certification { @@ -52,8 +52,11 @@ impl Certification { let subnet_id = self.subnet_id; let mut generated_certificates = Vec::new(); + // Keep account of blocks with generated certificates so that we can remove them from + // finalized blocks + let mut processed_blocks: Vec = Vec::with_capacity(self.finalized_blocks.len()); + // For every block, create one certificate - // This will change after MVP for block_info in &self.finalized_blocks { // Parse target subnets from events let mut target_subnets: HashSet = HashSet::new(); @@ -91,10 +94,13 @@ impl Certification { proof, ) .map_err(|e| Error::CertificateGenerationError(e.to_string()))?; + certificate .update_signature(self.get_signing_key()) .map_err(Error::CertificateSigningError)?; + generated_certificates.push(certificate); + processed_blocks.push(block_info.number); } // Check for inconsistencies @@ -126,7 +132,25 @@ impl Certification { } // Remove processed blocks - self.finalized_blocks.clear(); + for processed_block_number in processed_blocks { + let mut front_number = None; + if let Some(front) = self.finalized_blocks.front() { + front_number = Some(front.number); + } + + if front_number.is_some() { + if Some(processed_block_number) == front_number { + + self.finalized_blocks.pop_front(); + } else { + panic!( + "Block history is inconsistent, this should not happen! \ + processed_block_number: {processed_block_number}, front_number: \ + {front_number}" + ); + } + } + } Ok(generated_certificates) } @@ -138,8 +162,5 @@ impl Certification { /// Expand short block history. Remove older blocks pub fn append_blocks(&mut self, blocks: Vec) { self.finalized_blocks.extend(blocks); - while self.finalized_blocks.len() > Self::BLOCK_HISTORY_LENGTH { - self.finalized_blocks.pop_front(); - } } } diff --git a/crates/topos-sequencer-subnet-runtime/src/lib.rs b/crates/topos-sequencer-subnet-runtime/src/lib.rs index aabd1f670..4bb673a91 100644 --- a/crates/topos-sequencer-subnet-runtime/src/lib.rs +++ b/crates/topos-sequencer-subnet-runtime/src/lib.rs @@ -18,6 +18,8 @@ pub mod proxy; use crate::proxy::{SubnetRuntimeProxyCommand, SubnetRuntimeProxyEvent}; +const EVENT_SUBSCRIBER_CHANNEL_SIZE: usize = 64; + #[derive(Debug, Error)] pub enum Error { #[error("Peers error: {err}")] @@ -103,7 +105,8 @@ impl SubnetRuntimeProxyWorker { signing_key: Vec, ) -> Result { let runtime_proxy = SubnetRuntimeProxy::spawn_new(config, signing_key)?; - let (events_sender, events_rcv) = mpsc::channel::(256); + let (events_sender, events_rcv) = + mpsc::channel::(EVENT_SUBSCRIBER_CHANNEL_SIZE); let commands; { let mut runtime_proxy = runtime_proxy.lock().await; diff --git a/crates/topos-sequencer-subnet-runtime/src/proxy.rs b/crates/topos-sequencer-subnet-runtime/src/proxy.rs index dfc28d64f..162daf737 100644 --- a/crates/topos-sequencer-subnet-runtime/src/proxy.rs +++ b/crates/topos-sequencer-subnet-runtime/src/proxy.rs @@ -342,7 +342,8 @@ impl SubnetRuntimeProxy { let mut certification = certification.lock().await; - // Update certificate block history + // Update certificate block history. If block history is full, + // it will block until some of the blocks are processed certification.append_blocks(vec![block_info]); let new_certificates = match certification.generate_certificates().await { From b4f4961356dbf990bf1881ee4faaba3a9969a703 Mon Sep 17 00:00:00 2001 From: Marko Atanasievski Date: Fri, 1 Mar 2024 16:57:45 +0100 Subject: [PATCH 2/9] fix: updates --- .../src/certification.rs | 17 ++++++++++------- .../topos-sequencer-subnet-runtime/src/proxy.rs | 2 +- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/crates/topos-sequencer-subnet-runtime/src/certification.rs b/crates/topos-sequencer-subnet-runtime/src/certification.rs index 4e3bc3959..06d851ad6 100644 --- a/crates/topos-sequencer-subnet-runtime/src/certification.rs +++ b/crates/topos-sequencer-subnet-runtime/src/certification.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use tokio::sync::Mutex; use topos_core::uci::{Certificate, CertificateId, SubnetId}; use topos_sequencer_subnet_client::{BlockInfo, SubnetEvent}; +use tracing::debug; pub struct Certification { /// Last known certificate id for subnet @@ -133,20 +134,22 @@ impl Certification { // Remove processed blocks for processed_block_number in processed_blocks { - let mut front_number = None; + let mut front_block_number = None; if let Some(front) = self.finalized_blocks.front() { - front_number = Some(front.number); + front_block_number = Some(front.number); } - if front_number.is_some() { - if Some(processed_block_number) == front_number { - + if front_block_number.is_some() { + if Some(processed_block_number) == front_block_number { + debug!( + "Block {processed_block_number} processed and removed from the block list" + ); self.finalized_blocks.pop_front(); } else { panic!( "Block history is inconsistent, this should not happen! \ - processed_block_number: {processed_block_number}, front_number: \ - {front_number}" + processed_block_number: {processed_block_number}, front_number: {:?}", + front_block_number ); } } diff --git a/crates/topos-sequencer-subnet-runtime/src/proxy.rs b/crates/topos-sequencer-subnet-runtime/src/proxy.rs index 162daf737..9bad20607 100644 --- a/crates/topos-sequencer-subnet-runtime/src/proxy.rs +++ b/crates/topos-sequencer-subnet-runtime/src/proxy.rs @@ -259,7 +259,7 @@ impl SubnetRuntimeProxy { certification.clone(), block ).await { - error!("Failed to process next block: {}", e); + error!("Failed to process next block: {}, exit block production!", e); break None; } } From fdb70ab1251a08cc756e9e57ff5c056d01311bf6 Mon Sep 17 00:00:00 2001 From: Marko Atanasievski Date: Fri, 1 Mar 2024 17:33:06 +0100 Subject: [PATCH 3/9] fix: comments --- crates/topos-sequencer-subnet-runtime/src/proxy.rs | 3 +-- crates/topos-tce-proxy/src/client.rs | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/topos-sequencer-subnet-runtime/src/proxy.rs b/crates/topos-sequencer-subnet-runtime/src/proxy.rs index 9bad20607..5039698bb 100644 --- a/crates/topos-sequencer-subnet-runtime/src/proxy.rs +++ b/crates/topos-sequencer-subnet-runtime/src/proxy.rs @@ -342,8 +342,7 @@ impl SubnetRuntimeProxy { let mut certification = certification.lock().await; - // Update certificate block history. If block history is full, - // it will block until some of the blocks are processed + // Update certificate block history certification.append_blocks(vec![block_info]); let new_certificates = match certification.generate_certificates().await { diff --git a/crates/topos-tce-proxy/src/client.rs b/crates/topos-tce-proxy/src/client.rs index 69f81adb9..a6682abb5 100644 --- a/crates/topos-tce-proxy/src/client.rs +++ b/crates/topos-tce-proxy/src/client.rs @@ -381,6 +381,7 @@ impl TceClientBuilder { let tce_endpoint = tce_endpoint.clone(); let tce_grpc_client = tce_grpc_client.clone(); let context_backoff = context.clone(); + // TODO: Push certificates to the TCE one by one certificate_to_send.push_back(async move { debug!("Submitting certificate {} to the TCE using backoff strategy...", &tce_endpoint); let cert = cert.clone(); From ee35451e5a5d07773e4fe0586773f042d747b2ab Mon Sep 17 00:00:00 2001 From: Marko Atanasievski Date: Fri, 1 Mar 2024 17:33:48 +0100 Subject: [PATCH 4/9] fix: format --- crates/topos-sequencer-subnet-runtime/src/certification.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/topos-sequencer-subnet-runtime/src/certification.rs b/crates/topos-sequencer-subnet-runtime/src/certification.rs index 06d851ad6..b871aa3cc 100644 --- a/crates/topos-sequencer-subnet-runtime/src/certification.rs +++ b/crates/topos-sequencer-subnet-runtime/src/certification.rs @@ -95,11 +95,9 @@ impl Certification { proof, ) .map_err(|e| Error::CertificateGenerationError(e.to_string()))?; - certificate .update_signature(self.get_signing_key()) .map_err(Error::CertificateSigningError)?; - generated_certificates.push(certificate); processed_blocks.push(block_info.number); } From 99cfa96ca729cbeabc203c329f5455fc70a36537 Mon Sep 17 00:00:00 2001 From: Marko Atanasievski Date: Mon, 4 Mar 2024 11:15:47 +0100 Subject: [PATCH 5/9] fix: optimize --- .../topos-sequencer-subnet-runtime/src/certification.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/crates/topos-sequencer-subnet-runtime/src/certification.rs b/crates/topos-sequencer-subnet-runtime/src/certification.rs index b871aa3cc..940aa68e4 100644 --- a/crates/topos-sequencer-subnet-runtime/src/certification.rs +++ b/crates/topos-sequencer-subnet-runtime/src/certification.rs @@ -132,10 +132,11 @@ impl Certification { // Remove processed blocks for processed_block_number in processed_blocks { - let mut front_block_number = None; - if let Some(front) = self.finalized_blocks.front() { - front_block_number = Some(front.number); - } + let mut front_block_number = if let Some(front) = self.finalized_blocks.front() { + Some(front.number) + } else { + None + }; if front_block_number.is_some() { if Some(processed_block_number) == front_block_number { From 9289ce0895fea6228a217624ecbcadc3806d8070 Mon Sep 17 00:00:00 2001 From: Marko Atanasievski Date: Mon, 4 Mar 2024 11:48:01 +0100 Subject: [PATCH 6/9] fix: clippy --- crates/topos-sequencer-subnet-runtime/src/certification.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/topos-sequencer-subnet-runtime/src/certification.rs b/crates/topos-sequencer-subnet-runtime/src/certification.rs index 940aa68e4..63c278db9 100644 --- a/crates/topos-sequencer-subnet-runtime/src/certification.rs +++ b/crates/topos-sequencer-subnet-runtime/src/certification.rs @@ -132,7 +132,7 @@ impl Certification { // Remove processed blocks for processed_block_number in processed_blocks { - let mut front_block_number = if let Some(front) = self.finalized_blocks.front() { + let front_block_number = if let Some(front) = self.finalized_blocks.front() { Some(front.number) } else { None From 1a2b583453a9a7a00ef30b2c32596a371dcefc4c Mon Sep 17 00:00:00 2001 From: Marko Atanasievski Date: Mon, 4 Mar 2024 12:06:43 +0100 Subject: [PATCH 7/9] fix: optimize --- crates/topos-sequencer-subnet-runtime/src/certification.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/crates/topos-sequencer-subnet-runtime/src/certification.rs b/crates/topos-sequencer-subnet-runtime/src/certification.rs index 63c278db9..9f9dce27a 100644 --- a/crates/topos-sequencer-subnet-runtime/src/certification.rs +++ b/crates/topos-sequencer-subnet-runtime/src/certification.rs @@ -132,11 +132,7 @@ impl Certification { // Remove processed blocks for processed_block_number in processed_blocks { - let front_block_number = if let Some(front) = self.finalized_blocks.front() { - Some(front.number) - } else { - None - }; + let front_block_number = self.finalized_blocks.front().map(|front| front.number); if front_block_number.is_some() { if Some(processed_block_number) == front_block_number { From 6af3f540a6aa34a823c0a7ba866bc31f486c5d3f Mon Sep 17 00:00:00 2001 From: Marko Atanasievski Date: Wed, 13 Mar 2024 10:22:13 +0100 Subject: [PATCH 8/9] fix: review --- crates/topos-sequencer-subnet-runtime/src/certification.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/topos-sequencer-subnet-runtime/src/certification.rs b/crates/topos-sequencer-subnet-runtime/src/certification.rs index 9f9dce27a..1fb83af86 100644 --- a/crates/topos-sequencer-subnet-runtime/src/certification.rs +++ b/crates/topos-sequencer-subnet-runtime/src/certification.rs @@ -55,7 +55,7 @@ impl Certification { // Keep account of blocks with generated certificates so that we can remove them from // finalized blocks - let mut processed_blocks: Vec = Vec::with_capacity(self.finalized_blocks.len()); + let mut certified_blocks: Vec = Vec::with_capacity(self.finalized_blocks.len()); // For every block, create one certificate for block_info in &self.finalized_blocks { @@ -99,7 +99,7 @@ impl Certification { .update_signature(self.get_signing_key()) .map_err(Error::CertificateSigningError)?; generated_certificates.push(certificate); - processed_blocks.push(block_info.number); + certified_blocks.push(block_info.number); } // Check for inconsistencies @@ -131,7 +131,7 @@ impl Certification { } // Remove processed blocks - for processed_block_number in processed_blocks { + for processed_block_number in certified_blocks { let front_block_number = self.finalized_blocks.front().map(|front| front.number); if front_block_number.is_some() { From 406f11ef3c94cb4c1507651c5c371aa1f2420783 Mon Sep 17 00:00:00 2001 From: Marko Atanasievski Date: Wed, 13 Mar 2024 10:26:15 +0100 Subject: [PATCH 9/9] fix: review --- crates/topos-sequencer-subnet-runtime/src/certification.rs | 4 +++- crates/topos-sequencer-subnet-runtime/src/lib.rs | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/topos-sequencer-subnet-runtime/src/certification.rs b/crates/topos-sequencer-subnet-runtime/src/certification.rs index 1fb83af86..61e7f25de 100644 --- a/crates/topos-sequencer-subnet-runtime/src/certification.rs +++ b/crates/topos-sequencer-subnet-runtime/src/certification.rs @@ -18,7 +18,9 @@ pub struct Certification { signing_key: Vec, /// Optional synchronization from particular block number pub start_block: Option, - /// Latest BLOCK_HISTORY_LENGTH blocks kept in memory + /// Blocks received from subnet, not yet certified. We keep them in memory until we can + /// generate certificate for them. They are kept as linked list to maintain + /// order of blocks, latest received blocks are at the end of the list finalized_blocks: LinkedList, } diff --git a/crates/topos-sequencer-subnet-runtime/src/lib.rs b/crates/topos-sequencer-subnet-runtime/src/lib.rs index 4bb673a91..9a8c44d4f 100644 --- a/crates/topos-sequencer-subnet-runtime/src/lib.rs +++ b/crates/topos-sequencer-subnet-runtime/src/lib.rs @@ -18,6 +18,7 @@ pub mod proxy; use crate::proxy::{SubnetRuntimeProxyCommand, SubnetRuntimeProxyEvent}; +// Optimal Size of event channel is yet to be determined. Now just putting a number const EVENT_SUBSCRIBER_CHANNEL_SIZE: usize = 64; #[derive(Debug, Error)]