Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

fix: block handling during certificate generation #471

Merged
merged 10 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions crates/topos-sequencer-subnet-runtime/src/certification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ use std::sync::Arc;
use tokio::sync::Mutex;
use topos_core::uci::{Certificate, CertificateId, SubnetId};
use topos_sequencer_subnet_client::{BlockInfo, SubnetEvent};
use tracing::debug;

pub struct Certification {
/// Last known certificate id for subnet
pub last_certificate_id: Option<CertificateId>,
/// Latest BLOCK_HISTORY_LENGTH blocks kept in memory
pub finalized_blocks: LinkedList<BlockInfo>,
/// Subnet id for which certificates are generated
pub subnet_id: SubnetId,
/// Type of verifier used
Expand All @@ -19,6 +18,10 @@ pub struct Certification {
signing_key: Vec<u8>,
/// Optional synchronization from particular block number
pub start_block: Option<u64>,
/// Blocks received from subnet, not yet certified. We keep them in memory until we can
/// generate certificate for them. They are kept as linked list to maintain
/// order of blocks, latest received blocks are at the end of the list
finalized_blocks: LinkedList<BlockInfo>,
}

impl Debug for Certification {
Expand Down Expand Up @@ -52,8 +55,11 @@ impl Certification {
let subnet_id = self.subnet_id;
let mut generated_certificates = Vec::new();

// Keep account of blocks with generated certificates so that we can remove them from
// finalized blocks
let mut certified_blocks: Vec<u64> = Vec::with_capacity(self.finalized_blocks.len());

// For every block, create one certificate
// This will change after MVP
for block_info in &self.finalized_blocks {
// Parse target subnets from events
let mut target_subnets: HashSet<SubnetId> = HashSet::new();
Expand Down Expand Up @@ -95,6 +101,7 @@ impl Certification {
.update_signature(self.get_signing_key())
.map_err(Error::CertificateSigningError)?;
generated_certificates.push(certificate);
certified_blocks.push(block_info.number);
}

// Check for inconsistencies
Expand Down Expand Up @@ -126,7 +133,24 @@ impl Certification {
}

// Remove processed blocks
self.finalized_blocks.clear();
for processed_block_number in certified_blocks {
let front_block_number = self.finalized_blocks.front().map(|front| front.number);

if front_block_number.is_some() {
if Some(processed_block_number) == front_block_number {
debug!(
"Block {processed_block_number} processed and removed from the block list"
);
self.finalized_blocks.pop_front();
} else {
panic!(
"Block history is inconsistent, this should not happen! \
processed_block_number: {processed_block_number}, front_number: {:?}",
front_block_number
);
}
}
}

Ok(generated_certificates)
}
Expand All @@ -138,8 +162,5 @@ impl Certification {
/// Expand short block history. Remove older blocks
pub fn append_blocks(&mut self, blocks: Vec<BlockInfo>) {
self.finalized_blocks.extend(blocks);
while self.finalized_blocks.len() > Self::BLOCK_HISTORY_LENGTH {
self.finalized_blocks.pop_front();
}
}
}
6 changes: 5 additions & 1 deletion crates/topos-sequencer-subnet-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ pub mod proxy;

use crate::proxy::{SubnetRuntimeProxyCommand, SubnetRuntimeProxyEvent};

// Optimal Size of event channel is yet to be determined. Now just putting a number
const EVENT_SUBSCRIBER_CHANNEL_SIZE: usize = 64;
atanmarko marked this conversation as resolved.
Show resolved Hide resolved

#[derive(Debug, Error)]
pub enum Error {
#[error("Peers error: {err}")]
Expand Down Expand Up @@ -103,7 +106,8 @@ impl SubnetRuntimeProxyWorker {
signing_key: Vec<u8>,
) -> Result<Self, Error> {
let runtime_proxy = SubnetRuntimeProxy::spawn_new(config, signing_key)?;
let (events_sender, events_rcv) = mpsc::channel::<SubnetRuntimeProxyEvent>(256);
let (events_sender, events_rcv) =
mpsc::channel::<SubnetRuntimeProxyEvent>(EVENT_SUBSCRIBER_CHANNEL_SIZE);
let commands;
{
let mut runtime_proxy = runtime_proxy.lock().await;
Expand Down
2 changes: 1 addition & 1 deletion crates/topos-sequencer-subnet-runtime/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ impl SubnetRuntimeProxy {
certification.clone(),
block
).await {
error!("Failed to process next block: {}", e);
error!("Failed to process next block: {}, exit block production!", e);
break None;
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/topos-tce-proxy/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ impl TceClientBuilder {
let tce_endpoint = tce_endpoint.clone();
let tce_grpc_client = tce_grpc_client.clone();
let context_backoff = context.clone();
// TODO: Push certificates to the TCE one by one
atanmarko marked this conversation as resolved.
Show resolved Hide resolved
certificate_to_send.push_back(async move {
debug!("Submitting certificate {} to the TCE using backoff strategy...", &tce_endpoint);
let cert = cert.clone();
Expand Down
Loading