diff --git a/crates/topos-tce-storage/README.md b/crates/topos-tce-storage/README.md new file mode 100644 index 000000000..fede2c370 --- /dev/null +++ b/crates/topos-tce-storage/README.md @@ -0,0 +1,79 @@ +# topos-tce-storage + +The library provides the storage layer for the Topos TCE. +It is responsible for storing and retrieving the [certificates](https://docs.topos.technology/content/module-1/4-protocol.html#certificates), managing the +pending certificates pool and the certificate status, storing different +metadata related to the protocol and the internal state of the TCE. + +The storage layer is implemented using RocksDB. +The library exposes multiple stores that are used by the TCE. + + +### Architecture + +The storage layer is composed of multiple stores that are used by the TCE. +Each store is described in detail in its own module. + +Those stores are mainly used in `topos-tce-broadcast`, `topos-tce-api` and +`topos-tce-synchronizer`. + +As an overview, the storage layer is composed of the following stores: + + + + Text changing depending on mode. Light: 'So light!' Dark: 'So dark!' + + +#### Definitions and Responsibilities + +As illustrated above, multiple `stores` are exposed in the library using various `tables`. + +The difference between a `store` and a `table` is that the `table` is responsible for storing +the data while the `store` manages the data access and its behavior. + +Here's the list of the different stores and their responsibilities: + +- The [`EpochValidatorsStore`](struct@epoch::EpochValidatorsStore) is responsible for managing the list of validators for each `epoch`. +- The [`FullNodeStore`](struct@fullnode::FullNodeStore) is responsible for managing all persistent data such as [`Certificate`] delivered and associated `streams`. +- The [`IndexStore`](struct@index::IndexStore) is responsible for managing indexes and collect information about the broadcast and the network. +- The [`ValidatorStore`](struct@validator::ValidatorStore) is responsible for managing the pending data that one validator needs to keep track, such as the certificates pool. + +For more information about a `store`, see the related doc. + +Next, we've the list of the different tables and their responsibilities: + +- The [`EpochValidatorsTables`](struct@epoch::EpochValidatorsTables) is responsible for storing the list of validators for each `epoch`. +- The [`ValidatorPerpetualTables`](struct@validator::ValidatorPerpetualTables) is responsible for storing the delivered [`Certificate`]s and the persistent data related to the Broadcast. +- The [`ValidatorPendingTables`](struct@validator::ValidatorPendingTables) is responsible for storing the pending data, such as the certificates pool. +- The [`IndexTables`](struct@index::IndexTables) is responsible for storing indexes about the delivery of [`Certificate`]s such as `target subnet stream`. + +### Special Considerations + +When using the storage layer, be aware of the following: +- The storage layer uses [rocksdb](https://rocksdb.org/) as the backend, which means don't need an external service, as `rocksdb` is an embedded key-value store. +- The storage layer uses [`Arc`](struct@std::sync::Arc) to share the stores between threads. It also means that a `store` is only instantiated once. +- Some storage methods are batching multiple writes into a single transaction. + +### Design Philosophy + +The choice of using [rocksdb](https://rocksdb.org/) as a backend was made because it matches a lot of the conditions +that we were expected, such as being embedded and having good performances when reading and +writing our data. + +Splitting storage into multiple `stores` and `tables` allows us to have a strong separation of concerns directly at the storage level. + +However, `RocksDB` is not the best fit when it comes to compose or filter data based on the data +itself. + +As mentioned above, the different stores are using [`Arc`](struct@std::sync::Arc), allowing a single store to be instantiated once +and then shared between threads. This is very useful when it comes to the [`FullNodeStore`](struct@fullnode::FullNodeStore) as it is used +in various places but should provide single entry point to the data. + +It also means that the store is immutable thus can be shared easily between threads, +which is a good thing for the concurrency. +However, some stores are implementing the [`WriteStore`](trait@store::WriteStore) trait in order to +insert or mutate data, managing locks on resources and preventing any other query to mutate the data +currently in processing. For more information about the locks see [`locking`](module@fullnode::locking) + +The rest of the mutation on the data are handled by [rocksdb](https://rocksdb.org/) itself. + diff --git a/crates/topos-tce-storage/assets/store-dark.png b/crates/topos-tce-storage/assets/store-dark.png new file mode 100644 index 000000000..c95fd52e1 Binary files /dev/null and b/crates/topos-tce-storage/assets/store-dark.png differ diff --git a/crates/topos-tce-storage/assets/store-light.png b/crates/topos-tce-storage/assets/store-light.png new file mode 100644 index 000000000..2b0ab5f6f Binary files /dev/null and b/crates/topos-tce-storage/assets/store-light.png differ diff --git a/crates/topos-tce-storage/src/fullnode/mod.rs b/crates/topos-tce-storage/src/fullnode/mod.rs index 7b6012f7a..c72111e0a 100644 --- a/crates/topos-tce-storage/src/fullnode/mod.rs +++ b/crates/topos-tce-storage/src/fullnode/mod.rs @@ -24,8 +24,20 @@ use crate::{ use self::locking::LockGuards; -mod locking; - +pub mod locking; + +/// Store to manage FullNode data +/// +/// The [`FullNodeStore`] is responsible for storing and exposing the data that is +/// needed by a full node to perform its duties. +/// +/// The responsabilities of the [`FullNodeStore`] are: +/// +/// - Store and expose the certificates that are delivered +/// - Store and expose the state of the certificate streams +/// +/// To do so, it implements [`ReadStore`] / [`WriteStore`] by using multiple tables and store such +/// as [`ValidatorPerpetualTables`], [`EpochValidatorsStore`] and [`IndexTables`] pub struct FullNodeStore { certificate_lock_guards: LockGuards, subnet_lock_guards: LockGuards, diff --git a/crates/topos-tce-storage/src/index/mod.rs b/crates/topos-tce-storage/src/index/mod.rs index 5bc6f5dd3..53d80c423 100644 --- a/crates/topos-tce-storage/src/index/mod.rs +++ b/crates/topos-tce-storage/src/index/mod.rs @@ -2,7 +2,7 @@ use std::{fs::create_dir_all, path::PathBuf}; use rocksdb::ColumnFamilyDescriptor; use topos_core::{ - types::stream::{CertificateTargetStreamPosition, Position}, + types::stream::Position, uci::{CertificateId, SubnetId}, }; use tracing::warn; @@ -13,9 +13,8 @@ use crate::{ constants, db::{default_options, init_with_cfs}, db_column::DBColumn, - TargetSourceListKey, }, - types::CertificateSequenceNumber, + types::{CertificateSequenceNumber, TargetSourceListColumn, TargetStreamsColumn}, }; #[allow(unused)] @@ -24,8 +23,8 @@ pub(crate) struct IndexStore { } pub struct IndexTables { - pub(crate) target_streams: DBColumn, - pub(crate) target_source_list: DBColumn, + pub(crate) target_streams: TargetStreamsColumn, + pub(crate) target_source_list: TargetSourceListColumn, pub(crate) source_list: DBColumn, pub(crate) source_list_per_target: DBColumn<(SubnetId, SubnetId), bool>, } diff --git a/crates/topos-tce-storage/src/lib.rs b/crates/topos-tce-storage/src/lib.rs index 6d17603a7..1df0b0913 100644 --- a/crates/topos-tce-storage/src/lib.rs +++ b/crates/topos-tce-storage/src/lib.rs @@ -1,11 +1,86 @@ -use errors::InternalStorageError; -use rocks::iterator::ColumnIterator; +//! The library provides the storage layer for the Topos TCE. +//! It is responsible for storing and retrieving the [certificates](https://docs.topos.technology/content/module-1/4-protocol.html#certificates), managing the +//! pending certificates pool and the certificate status, storing different +//! metadata related to the protocol and the internal state of the TCE. +//! +//! The storage layer is implemented using RocksDB. +//! The library exposes multiple stores that are used by the TCE. +//! +//! +//! ## Architecture +//! +//! The storage layer is composed of multiple stores that are used by the TCE. +//! Each store is described in detail in its own module. +//! +//! Those stores are mainly used in `topos-tce-broadcast`, `topos-tce-api` and +//! `topos-tce-synchronizer`. +//! +//! As an overview, the storage layer is composed of the following stores: +//! +//! +//! +//! Text changing depending on mode. Light: 'So light!' Dark: 'So dark!' +//! +//! +//! ### Definitions and Responsibilities +//! +//! As illustrated above, multiple `stores` are exposed in the library using various `tables`. +//! +//! The difference between a `store` and a `table` is that the `table` is responsible for storing +//! the data while the `store` manages the data access and its behavior. +//! +//! Here's the list of the different stores and their responsibilities: +//! +//! - The [`EpochValidatorsStore`](struct@epoch::EpochValidatorsStore) is responsible for managing the list of validators for each `epoch`. +//! - The [`FullNodeStore`](struct@fullnode::FullNodeStore) is responsible for managing all persistent data such as [`Certificate`] delivered and associated `streams`. +//! - The [`IndexStore`](struct@index::IndexStore) is responsible for managing indexes and collect information about the broadcast and the network. +//! - The [`ValidatorStore`](struct@validator::ValidatorStore) is responsible for managing the pending data that one validator needs to keep track, such as the certificates pool. +//! +//! For more information about a `store`, see the related doc. +//! +//! Next, we've the list of the different tables and their responsibilities: +//! +//! - The [`EpochValidatorsTables`](struct@epoch::EpochValidatorsTables) is responsible for storing the list of validators for each `epoch`. +//! - The [`ValidatorPerpetualTables`](struct@validator::ValidatorPerpetualTables) is responsible for storing the delivered [`Certificate`]s and the persistent data related to the Broadcast. +//! - The [`ValidatorPendingTables`](struct@validator::ValidatorPendingTables) is responsible for storing the pending data, such as the certificates pool. +//! - The [`IndexTables`](struct@index::IndexTables) is responsible for storing indexes about the delivery of [`Certificate`]s such as `target subnet stream`. +//! +//! ## Special Considerations +//! +//! When using the storage layer, be aware of the following: +//! - The storage layer uses [rocksdb](https://rocksdb.org/) as the backend, which means don't need an external service, as `rocksdb` is an embedded key-value store. +//! - The storage layer uses [`Arc`](struct@std::sync::Arc) to share the stores between threads. It also means that a `store` is only instantiated once. +//! - Some storage methods are batching multiple writes into a single transaction. +//! +//! ## Design Philosophy +//! +//! The choice of using [rocksdb](https://rocksdb.org/) as a backend was made because it matches a lot of the conditions +//! that we were expected, such as being embedded and having good performances when reading and +//! writing our data. +//! +//! Splitting storage into multiple `stores` and `tables` allows us to have a strong separation of concerns directly at the storage level. +//! +//! However, `RocksDB` is not the best fit when it comes to compose or filter data based on the data +//! itself. +//! +//! As mentioned above, the different stores are using [`Arc`](struct@std::sync::Arc), allowing a single store to be instantiated once +//! and then shared between threads. This is very useful when it comes to the [`FullNodeStore`](struct@fullnode::FullNodeStore) as it is used +//! in various places but should provide single entry point to the data. +//! +//! It also means that the store is immutable thus can be shared easily between threads, +//! which is a good thing for the concurrency. +//! However, some stores are implementing the [`WriteStore`](trait@store::WriteStore) trait in order to +//! insert or mutate data, managing locks on resources and preventing any other query to mutate the data +//! currently in processing. For more information about the locks see [`locking`](module@fullnode::locking) +//! +//! The rest of the mutation on the data are handled by [rocksdb](https://rocksdb.org/) itself. +//! use serde::{Deserialize, Serialize}; use std::collections::HashMap; use topos_core::{ types::stream::{CertificateSourceStreamPosition, CertificateTargetStreamPosition, Position}, - uci::{Certificate, CertificateId, SubnetId}, + uci::{CertificateId, SubnetId}, }; // v2 @@ -16,7 +91,6 @@ pub mod epoch; pub mod fullnode; pub mod index; pub mod types; -/// Everything that is needed to participate to the protocol pub mod validator; // v1 @@ -72,108 +146,3 @@ pub struct SourceHead { /// Position of the Certificate pub position: Position, } - -/// Define possible status of a certificate -#[derive(Debug, Deserialize, Serialize)] -pub enum CertificateStatus { - Pending, - Delivered, -} - -/// The `Storage` trait defines methods to interact and manage with the persistency layer -#[async_trait::async_trait] -pub trait Storage: Sync + Send + 'static { - async fn get_pending_certificate( - &self, - certificate_id: CertificateId, - ) -> Result<(PendingCertificateId, Certificate), InternalStorageError>; - - /// Add a pending certificate to the pool - async fn add_pending_certificate( - &self, - certificate: &Certificate, - ) -> Result; - - /// Persist the certificate with given status - async fn persist( - &self, - certificate: &Certificate, - pending_certificate_id: Option, - ) -> Result; - - /// Update the certificate entry with new status - async fn update( - &self, - certificate_id: &CertificateId, - status: CertificateStatus, - ) -> Result<(), InternalStorageError>; - - /// Returns the source heads of given subnets - async fn get_source_heads( - &self, - subnets: Vec, - ) -> Result, InternalStorageError>; - - /// Returns the certificate data given their id - async fn get_certificates( - &self, - certificate_ids: Vec, - ) -> Result, InternalStorageError>; - - /// Returns the certificate data given its id - async fn get_certificate( - &self, - certificate_id: CertificateId, - ) -> Result; - - /// Returns the certificate emitted by given subnet - /// Ranged by position since emitted Certificate are totally ordered - async fn get_certificates_by_source( - &self, - source_subnet_id: SubnetId, - from: Position, - limit: usize, - ) -> Result, InternalStorageError>; - - /// Returns the certificate received by given subnet - /// Ranged by timestamps since received Certificate are not referrable by position - async fn get_certificates_by_target( - &self, - target_subnet_id: SubnetId, - source_subnet_id: SubnetId, - from: Position, - limit: usize, - ) -> Result, InternalStorageError>; - - /// Returns all the known Certificate that are not delivered yet - async fn get_pending_certificates( - &self, - ) -> Result, InternalStorageError>; - - /// Returns the next Certificate that are not delivered yet - async fn get_next_pending_certificate( - &self, - starting_at: Option, - ) -> Result, InternalStorageError>; - - /// Remove a certificate from pending pool - async fn remove_pending_certificate( - &self, - index: PendingCertificateId, - ) -> Result<(), InternalStorageError>; - - async fn get_target_stream_iterator( - &self, - target: SubnetId, - source: SubnetId, - position: Position, - ) -> Result< - ColumnIterator<'_, CertificateTargetStreamPosition, CertificateId>, - InternalStorageError, - >; - - async fn get_source_list_by_target( - &self, - target: SubnetId, - ) -> Result, InternalStorageError>; -} diff --git a/crates/topos-tce-storage/src/rocks.rs b/crates/topos-tce-storage/src/rocks.rs index c451f5f68..51d2c77ab 100644 --- a/crates/topos-tce-storage/src/rocks.rs +++ b/crates/topos-tce-storage/src/rocks.rs @@ -1,20 +1,4 @@ -use std::collections::HashMap; -use std::{ - fmt::Debug, - sync::atomic::{AtomicU64, Ordering}, -}; - -use topos_core::types::stream::CertificateSourceStreamPosition; -use topos_core::uci::{Certificate, CertificateId, CERTIFICATE_ID_LENGTH}; -use tracing::warn; - -use crate::{ - errors::InternalStorageError, CertificatePositions, CertificateTargetStreamPosition, - PendingCertificateId, Position, SourceHead, Storage, SubnetId, -}; - -use self::iterator::ColumnIterator; -use self::{db::RocksDB, map::Map}; +use self::db::RocksDB; pub(crate) mod constants; pub(crate) mod db; @@ -24,322 +8,3 @@ pub(crate) mod map; pub(crate) mod types; pub(crate) use types::*; - -pub const EMPTY_PREVIOUS_CERT_ID: [u8; CERTIFICATE_ID_LENGTH] = [0u8; CERTIFICATE_ID_LENGTH]; - -#[derive(Debug)] -pub struct RocksDBStorage { - pending_certificates: PendingCertificatesColumn, - certificates: CertificatesColumn, - source_streams: SourceStreamsColumn, - target_streams: TargetStreamsColumn, - target_source_list: TargetSourceListColumn, - next_pending_id: AtomicU64, -} - -impl RocksDBStorage { - #[cfg(test)] - #[allow(dead_code)] - pub(crate) fn new( - pending_certificates: PendingCertificatesColumn, - certificates: CertificatesColumn, - source_streams: SourceStreamsColumn, - target_streams: TargetStreamsColumn, - target_source_list: TargetSourceListColumn, - next_pending_id: AtomicU64, - ) -> Self { - Self { - pending_certificates, - certificates, - source_streams, - target_streams, - target_source_list, - next_pending_id, - } - } -} - -#[async_trait::async_trait] -impl Storage for RocksDBStorage { - async fn get_pending_certificate( - &self, - certificate_id: CertificateId, - ) -> Result<(PendingCertificateId, Certificate), InternalStorageError> { - self.pending_certificates - .iter()? - .filter(|(_pending_id, cert)| cert.id == certificate_id) - .collect::>() - .first() - .cloned() - .ok_or(InternalStorageError::CertificateNotFound(certificate_id)) - } - - async fn add_pending_certificate( - &self, - certificate: &Certificate, - ) -> Result { - let key = self.next_pending_id.fetch_add(1, Ordering::Relaxed); - - self.pending_certificates.insert(&key, certificate)?; - - Ok(key) - } - - async fn persist( - &self, - certificate: &Certificate, - pending_certificate_id: Option, - ) -> Result { - let mut batch = self.certificates.batch(); - - // Inserting the certificate data into the CERTIFICATES cf - batch = batch.insert_batch(&self.certificates, [(&certificate.id, certificate)])?; - - if let Some(pending_id) = pending_certificate_id { - match self.pending_certificates.get(&pending_id) { - Ok(Some(ref pending_certificate)) if pending_certificate == certificate => { - batch = batch.delete(&self.pending_certificates, pending_id)?; - } - Ok(_) => { - warn!( - "PendingCertificateId {} ignored during persist execution: Difference in \ - certificates", - pending_id - ); - } - - _ => { - warn!( - "PendingCertificateId {} ignored during persist execution: Not Found", - pending_id - ); - } - } - } - - let source_subnet_position = if certificate.prev_id.as_array() == &EMPTY_PREVIOUS_CERT_ID { - Position::ZERO - } else if let Some((CertificateSourceStreamPosition { position, .. }, _)) = self - .source_streams - .prefix_iter(&certificate.source_subnet_id)? - .last() - { - position.increment().map_err(|error| { - InternalStorageError::PositionError(error, certificate.source_subnet_id.into()) - })? - } else { - // TODO: Need to be fixed when dealing with order of delivery - Position::ZERO - // TODO: Better error to define that we were expecting a previous defined position - // return Err(InternalStorageError::CertificateNotFound( - // certificate.prev_id, - // )); - }; - - // Return from function as info - let source_subnet_stream_position = CertificateSourceStreamPosition { - subnet_id: certificate.source_subnet_id, - position: source_subnet_position, - }; - - // Adding the certificate to the stream - batch = batch.insert_batch( - &self.source_streams, - [( - CertificateSourceStreamPosition { - subnet_id: certificate.source_subnet_id, - position: source_subnet_position, - }, - certificate.id, - )], - )?; - - // Return list of new target stream positions of certificate that will be persisted - // Information is needed by sequencer/subnet contract to know from - // where to continue with streaming on restart - let mut target_subnet_stream_positions: HashMap = - HashMap::new(); - - // Adding certificate to target_streams - // TODO: Add expected position instead of calculating on the go - let mut targets = Vec::new(); - - for target_subnet_id in &certificate.target_subnets { - let target = match self - .target_streams - .prefix_iter(&TargetSourceListKey( - *target_subnet_id, - certificate.source_subnet_id, - ))? - .last() - { - Some((mut target_stream_position, _)) => { - target_stream_position.position = target_stream_position - .position - .increment() - .map_err(|error| { - InternalStorageError::PositionError( - error, - certificate.source_subnet_id.into(), - ) - })?; - target_stream_position - } - None => CertificateTargetStreamPosition::new( - *target_subnet_id, - certificate.source_subnet_id, - Position::ZERO, - ), - }; - - target_subnet_stream_positions.insert(*target_subnet_id, target); - - batch = batch.insert_batch( - &self.target_source_list, - [( - TargetSourceListKey(*target_subnet_id, certificate.source_subnet_id), - *target.position, - )], - )?; - - targets.push((target, certificate.id)); - } - - batch = batch.insert_batch(&self.target_streams, targets)?; - - batch.write()?; - - Ok(CertificatePositions { - targets: target_subnet_stream_positions, - source: source_subnet_stream_position, - }) - } - - async fn update( - &self, - _certificate_id: &CertificateId, - _status: crate::CertificateStatus, - ) -> Result<(), InternalStorageError> { - unimplemented!(); - } - - async fn get_source_heads( - &self, - subnets: Vec, - ) -> Result, InternalStorageError> { - let mut result: Vec = Vec::new(); - for source_subnet_id in subnets { - let (position, certificate_id) = self - .source_streams - .prefix_iter(&source_subnet_id)? - .last() - .map(|(source_stream_position, cert_id)| (source_stream_position.position, cert_id)) - .ok_or(InternalStorageError::MissingHeadForSubnet(source_subnet_id))?; - result.push(SourceHead { - position, - certificate_id, - subnet_id: source_subnet_id, - }); - } - Ok(result) - } - - async fn get_certificates( - &self, - certificate_ids: Vec, - ) -> Result, InternalStorageError> { - let mut result = Vec::new(); - - for certificate_id in certificate_ids { - result.push(self.get_certificate(certificate_id).await?); - } - - Ok(result) - } - - async fn get_certificate( - &self, - certificate_id: CertificateId, - ) -> Result { - let res = self.certificates.get(&certificate_id)?; - res.ok_or(InternalStorageError::CertificateNotFound(certificate_id)) - } - - async fn get_certificates_by_source( - &self, - source_subnet_id: SubnetId, - from: crate::Position, - limit: usize, - ) -> Result, InternalStorageError> { - Ok(self - .source_streams - .prefix_iter(&source_subnet_id)? - // TODO: Find a better way to convert u64 to usize - .skip(from.try_into().unwrap()) - .take(limit) - .map(|(_, certificate_id)| certificate_id) - .collect()) - } - - async fn get_certificates_by_target( - &self, - target_subnet_id: SubnetId, - source_subnet_id: SubnetId, - from: Position, - limit: usize, - ) -> Result, InternalStorageError> { - Ok(self - .target_streams - .prefix_iter(&(&target_subnet_id, &source_subnet_id))? - // TODO: Find a better way to convert u64 to usize - .skip(from.try_into().unwrap()) - .take(limit) - .map(|(_, certificate_id)| certificate_id) - .collect()) - } - - async fn get_pending_certificates( - &self, - ) -> Result, InternalStorageError> { - Ok(self.pending_certificates.iter()?.collect()) - } - async fn get_next_pending_certificate( - &self, - starting_at: Option, - ) -> Result, InternalStorageError> { - Ok(self - .pending_certificates - .iter()? - .nth(starting_at.map(|v| v + 1).unwrap_or(0))) - } - - async fn remove_pending_certificate(&self, index: u64) -> Result<(), InternalStorageError> { - self.pending_certificates.delete(&index) - } - - async fn get_target_stream_iterator( - &self, - target: SubnetId, - source: SubnetId, - position: Position, - ) -> Result< - ColumnIterator<'_, CertificateTargetStreamPosition, CertificateId>, - InternalStorageError, - > { - Ok(self.target_streams.prefix_iter_at( - &(&target, &source), - &CertificateTargetStreamPosition::new(target, source, position), - )?) - } - - async fn get_source_list_by_target( - &self, - target: SubnetId, - ) -> Result, InternalStorageError> { - Ok(self - .target_source_list - .prefix_iter(&target)? - .map(|(TargetSourceListKey(_, k), _)| k) - .collect()) - } -} diff --git a/crates/topos-tce-storage/src/rocks/db_column.rs b/crates/topos-tce-storage/src/rocks/db_column.rs index 0a56ca3d0..a011b30de 100644 --- a/crates/topos-tce-storage/src/rocks/db_column.rs +++ b/crates/topos-tce-storage/src/rocks/db_column.rs @@ -176,24 +176,6 @@ impl DBBatch { } } - pub(crate) fn delete( - mut self, - db: &DBColumn, - key: Key, - ) -> Result - where - K: Serialize, - V: Serialize, - Key: Borrow, - { - check_cross_batch(&self.rocksdb, &db.rocksdb)?; - - let key_buffer = be_fix_int_ser(key.borrow())?; - self.batch.delete_cf(&db.cf()?, key_buffer); - - Ok(self) - } - pub(crate) fn insert_batch( mut self, db: &DBColumn, diff --git a/crates/topos-tce-storage/src/rocks/types.rs b/crates/topos-tce-storage/src/rocks/types.rs index 4dad00dc1..edef13ef5 100644 --- a/crates/topos-tce-storage/src/rocks/types.rs +++ b/crates/topos-tce-storage/src/rocks/types.rs @@ -1,13 +1,7 @@ use serde::{Deserialize, Serialize}; -use topos_core::{ - types::stream::{CertificateSourceStreamPosition, CertificateTargetStreamPosition}, - uci::{Certificate, CertificateId}, -}; use crate::SubnetId; -use super::db_column::DBColumn; - #[derive(Debug, Serialize, Deserialize)] pub(crate) struct TargetSourceListKey( // Target subnet id @@ -15,17 +9,3 @@ pub(crate) struct TargetSourceListKey( // Source subnet id pub(crate) SubnetId, ); - -/// Column that keeps certificates that are not yet delivered -pub(crate) type PendingCertificatesColumn = DBColumn; -/// Column that keeps list of all certificates retrievable by their id -pub(crate) type CertificatesColumn = DBColumn; -/// Column that keeps list of certificates received from particular subnet and -/// maps (source subnet id, source certificate position) to certificate id -pub(crate) type SourceStreamsColumn = DBColumn; -/// Column that keeps list of certificates that are delivered to target subnet, -/// and maps their target (target subnet, source subnet and position/count per source subnet) -/// to certificate id -pub(crate) type TargetStreamsColumn = DBColumn; -/// Keeps position for particular target subnet id <- source subnet id column in TargetStreamsColumn -pub(crate) type TargetSourceListColumn = DBColumn; diff --git a/crates/topos-tce-storage/src/store.rs b/crates/topos-tce-storage/src/store.rs index 8ffc9e8c9..356bc12c7 100644 --- a/crates/topos-tce-storage/src/store.rs +++ b/crates/topos-tce-storage/src/store.rs @@ -10,30 +10,56 @@ use crate::{ errors::StorageError, CertificatePositions, CertificateTargetStreamPosition, SourceHead, }; +/// This trait exposes common methods between +/// [`ValidatorStore`](struct@super::validator::ValidatorStore) and +/// [`FullNodeStore`](struct@super::fullnode::FullNodeStore) to write data. +/// +/// All methods are `async` to allow the implementation to deal with write concurrency. #[async_trait] pub trait WriteStore: Send { - /// Insert a CertificateDelivered in the differents tables. Removing pending if needed. + /// Insert a [`CertificateDelivered`] in the storage. Returns its positions + /// in the source and target streams. + /// + /// The [`ValidatorStore`](struct@super::validator::ValidatorStore) implementation + /// checks for a [`PendingCertificateId`](type@super::PendingCertificateId) and remove it if + /// the certificate is successfully inserted. async fn insert_certificate_delivered( &self, certificate: &CertificateDelivered, ) -> Result; - /// Insert multiple CertificateDelivered + /// Insert multiple [`CertificateDelivered`] in the storage. + /// + /// See [`insert_certificate_delivered`](fn@WriteStore::insert_certificate_delivered) for more + /// details async fn insert_certificates_delivered( &self, certificates: &[CertificateDelivered], ) -> Result<(), StorageError>; } +/// This trait exposes common methods between +/// [`ValidatorStore`](struct@super::validator::ValidatorStore) and +/// [`FullNodeStore`](struct@super::fullnode::FullNodeStore) to read data. pub trait ReadStore: Send { + /// Try to get a SourceHead of a subnet + /// + /// Returns `Ok(None)` if the subnet is not found, meaning that no certificate are currently + /// delivered for this particular subnet. fn get_source_head(&self, subnet_id: &SubnetId) -> Result, StorageError>; - /// Try to get a Certificate + + /// Try to get a [`CertificateDelivered`] + /// + /// Returns `Ok(None)` if the certificate is not found, meaning that the certificate is either + /// inexisting or not yet delivered. fn get_certificate( &self, certificate_id: &CertificateId, ) -> Result, StorageError>; - /// Try to get multiple certificates at once + /// Try to get multiple [`CertificateDelivered`] at once. + /// + /// See [`get_certificate`](fn@ReadStore::get_certificate) fn get_certificates( &self, certificate_ids: &[CertificateId], @@ -46,6 +72,9 @@ pub trait ReadStore: Send { ) -> Result, StorageError>; /// Returns the local checkpoint + /// + /// A `Checkpoint` is the representation of the state of delivery, it is a list of [`SubnetId`] + /// with the associated [`SourceHead`] fn get_checkpoint(&self) -> Result, StorageError>; /// Returns the certificates delivered by a source subnet from a position. @@ -55,12 +84,14 @@ pub trait ReadStore: Send { limit: usize, ) -> Result, StorageError>; + /// Returns the certificates delivered to a target subnet from a position. fn get_target_stream_certificates_from_position( &self, position: CertificateTargetStreamPosition, limit: usize, ) -> Result, StorageError>; + /// Returns the list of source subnets that delivered certificates to a particular target subnet fn get_target_source_subnet_list( &self, target_subnet_id: &SubnetId, diff --git a/crates/topos-tce-storage/src/tests/db_columns.rs b/crates/topos-tce-storage/src/tests/db_columns.rs index 877e5ac61..8378489ad 100644 --- a/crates/topos-tce-storage/src/tests/db_columns.rs +++ b/crates/topos-tce-storage/src/tests/db_columns.rs @@ -2,13 +2,13 @@ use rstest::rstest; use test_log::test; use topos_core::types::stream::CertificateSourceStreamPosition; use topos_core::uci::Certificate; +use topos_test_sdk::certificates::create_certificate_at_position; use topos_test_sdk::constants::SOURCE_SUBNET_ID_1; +use crate::rocks::map::Map; use crate::tests::{PREV_CERTIFICATE_ID, SOURCE_STORAGE_SUBNET_ID}; -use crate::{ - rocks::{map::Map, CertificatesColumn, PendingCertificatesColumn, SourceStreamsColumn}, - Position, -}; +use crate::types::{CertificatesColumn, PendingCertificatesColumn, StreamsColumn}; +use crate::Position; use super::support::columns::{certificates_column, pending_column, source_streams_column}; @@ -29,11 +29,14 @@ async fn can_persist_a_delivered_certificate(certificates_column: CertificatesCo Certificate::new_with_default_fields(PREV_CERTIFICATE_ID, SOURCE_SUBNET_ID_1, &Vec::new()) .unwrap(); + let certificate = create_certificate_at_position(Position::ZERO, certificate); assert!(certificates_column - .insert(&certificate.id, &certificate) + .insert(&certificate.certificate.id, &certificate) .is_ok()); assert_eq!( - certificates_column.get(&certificate.id).unwrap(), + certificates_column + .get(&certificate.certificate.id) + .unwrap(), Some(certificate) ); } @@ -42,25 +45,26 @@ async fn can_persist_a_delivered_certificate(certificates_column: CertificatesCo #[test(tokio::test)] async fn delivered_certificate_position_are_incremented( certificates_column: CertificatesColumn, - source_streams_column: SourceStreamsColumn, + source_streams_column: StreamsColumn, ) { let certificate = Certificate::new_with_default_fields(PREV_CERTIFICATE_ID, SOURCE_SUBNET_ID_1, &[]).unwrap(); + let certificate = create_certificate_at_position(Position::ZERO, certificate); assert!(certificates_column - .insert(&certificate.id, &certificate) + .insert(&certificate.certificate.id, &certificate) .is_ok()); assert!(source_streams_column .insert( &CertificateSourceStreamPosition::new(SOURCE_STORAGE_SUBNET_ID, Position::ZERO), - &certificate.id + &certificate.certificate.id ) .is_ok()); } #[rstest] #[test(tokio::test)] -async fn position_can_be_fetch_for_one_subnet(source_streams_column: SourceStreamsColumn) { +async fn position_can_be_fetch_for_one_subnet(source_streams_column: StreamsColumn) { let certificate = Certificate::new_with_default_fields(PREV_CERTIFICATE_ID, SOURCE_SUBNET_ID_1, &[]).unwrap(); diff --git a/crates/topos-tce-storage/src/tests/support/columns.rs b/crates/topos-tce-storage/src/tests/support/columns.rs index dc1fd02be..e64043f48 100644 --- a/crates/topos-tce-storage/src/tests/support/columns.rs +++ b/crates/topos-tce-storage/src/tests/support/columns.rs @@ -1,9 +1,9 @@ use rstest::fixture; -use crate::rocks::TargetSourceListColumn; -use crate::rocks::{ - constants, db_column::DBColumn, CertificatesColumn, PendingCertificatesColumn, - SourceStreamsColumn, TargetStreamsColumn, +use crate::rocks::{constants, db_column::DBColumn}; +use crate::types::{ + CertificatesColumn, PendingCertificatesColumn, StreamsColumn, TargetSourceListColumn, + TargetStreamsColumn, }; use super::database_name; @@ -20,7 +20,7 @@ pub(crate) fn certificates_column(database_name: &'static str) -> CertificatesCo } #[fixture] -pub(crate) fn source_streams_column(database_name: &'static str) -> SourceStreamsColumn { +pub(crate) fn source_streams_column(database_name: &'static str) -> StreamsColumn { DBColumn::reopen(&rocks_db(database_name), constants::SOURCE_STREAMS) } diff --git a/crates/topos-tce-storage/src/types.rs b/crates/topos-tce-storage/src/types.rs index 69c161ed4..631f6db8f 100644 --- a/crates/topos-tce-storage/src/types.rs +++ b/crates/topos-tce-storage/src/types.rs @@ -1,9 +1,16 @@ use topos_core::{ api::grpc::checkpoints::SourceStreamPosition, - types::{CertificateDelivered, Ready, Signature}, + types::{ + stream::{CertificateSourceStreamPosition, CertificateTargetStreamPosition, Position}, + CertificateDelivered, Ready, Signature, + }, + uci::{Certificate, CertificateId}, }; -use crate::{CertificatePositions, PendingCertificateId}; +use crate::{ + rocks::{db_column::DBColumn, TargetSourceListKey}, + CertificatePositions, PendingCertificateId, +}; pub type Echo = String; @@ -11,6 +18,20 @@ pub type CertificateSequenceNumber = u64; pub type EpochId = u64; pub type Validators = Vec; +/// Column that keeps certificates that are not yet delivered +pub(crate) type PendingCertificatesColumn = DBColumn; +/// Column that keeps list of all certificates retrievable by their id +pub(crate) type CertificatesColumn = DBColumn; +/// Column that keeps list of certificates received from particular subnet and +/// maps (source subnet id, source certificate position) to certificate id +pub(crate) type StreamsColumn = DBColumn; +/// Column that keeps list of certificates that are delivered to target subnet, +/// and maps their target (target subnet, source subnet and position/count per source subnet) +/// to certificate id +pub(crate) type TargetStreamsColumn = DBColumn; +/// Keeps position for particular target subnet id <- source subnet id column in TargetStreamsColumn +pub(crate) type TargetSourceListColumn = DBColumn; + #[derive(Debug, Clone)] pub enum PendingResult { AlreadyDelivered, diff --git a/crates/topos-tce-storage/src/validator/mod.rs b/crates/topos-tce-storage/src/validator/mod.rs index ac9dbcf91..3449b7124 100644 --- a/crates/topos-tce-storage/src/validator/mod.rs +++ b/crates/topos-tce-storage/src/validator/mod.rs @@ -1,3 +1,19 @@ +//! Validator's context store and storage +//! +//! The [`ValidatorStore`] is responsible for managing the various kind of data that are required by the +//! TCE network in order to broadcast certificates. It is composed of two main parts: +//! +//! - a [`FullNodeStore`] +//! - a [`ValidatorPendingTables`] +//! +//! ## Responsibilities +//! +//! This store is used in places where the [`FullNodeStore`] is not enough, it allows to access the +//! different pending pools and to manage them but also to access the [`FullNodeStore`] in order to +//! persist or update [`Certificate`] or `streams`. +//! +//! Pending pools and their behavior is decribed in the [`ValidatorPendingTables`] documentation. +//! use std::{ collections::HashMap, path::PathBuf, @@ -23,18 +39,30 @@ use crate::{ CertificatePositions, CertificateTargetStreamPosition, PendingCertificateId, SourceHead, }; -pub(crate) use self::tables::ValidatorPendingTables; +pub use self::tables::ValidatorPendingTables; pub use self::tables::ValidatorPerpetualTables; mod tables; -/// Contains all persistent data about the validator +/// Store to manage Validator data +/// +/// The [`ValidatorStore`] is composed of a [`FullNodeStore`] and a [`ValidatorPendingTables`]. +/// +/// As the [`FullNodeStore`] is responsible of keeping and managing data that are persistent, +/// the [`ValidatorStore`] is delegating to it many of the [`WriteStore`] and [`ReadStore`] +/// functionality. +/// +/// The key point is that the [`ValidatorStore`] is managing the different pending pools using a [`ValidatorPendingTables`]. +/// +/// Pending pools and how they behave is decribed in the [`ValidatorPendingTables`] documentation. +/// pub struct ValidatorStore { pub(crate) pending_tables: ValidatorPendingTables, pub(crate) fullnode_store: Arc, } impl ValidatorStore { + /// Open a [`ValidatorStore`] at the given `path` and using the given [`FullNodeStore`] pub fn open( path: PathBuf, fullnode_store: Arc, @@ -48,14 +76,19 @@ impl ValidatorStore { Ok(store) } + /// Returns the [`FullNodeStore`] used by the [`ValidatorStore`] pub fn get_fullnode_store(&self) -> Arc { self.fullnode_store.clone() } + /// Returns the number of certificates in the pending pool pub fn count_pending_certificates(&self) -> Result { Ok(self.pending_tables.pending_pool.iter()?.count()) } + /// Try to return the [`PendingCertificateId`] for a [`CertificateId`] + /// + /// Return `Ok(None)` if the `certificate_id` is not found. pub fn get_pending_id( &self, certificate_id: &CertificateId, @@ -63,6 +96,9 @@ impl ValidatorStore { Ok(self.pending_tables.pending_pool_index.get(certificate_id)?) } + /// Try to return the [`Certificate`] for a [`PendingCertificateId`] + /// + /// Return `Ok(None)` if the `pending_id` is not found. pub fn get_pending_certificate( &self, pending_id: &PendingCertificateId, @@ -70,6 +106,7 @@ impl ValidatorStore { Ok(self.pending_tables.pending_pool.get(pending_id)?) } + /// Returns the entire pending_pool pub fn get_pending_certificates( &self, ) -> Result, StorageError> { diff --git a/crates/topos-tce-storage/src/validator/tables.rs b/crates/topos-tce-storage/src/validator/tables.rs index 9b89b98ef..b310b3ab0 100644 --- a/crates/topos-tce-storage/src/validator/tables.rs +++ b/crates/topos-tce-storage/src/validator/tables.rs @@ -1,8 +1,8 @@ -use std::{collections::BTreeSet, fs::create_dir_all, path::PathBuf, sync::atomic::AtomicU64}; +use std::{fs::create_dir_all, path::PathBuf, sync::atomic::AtomicU64}; use rocksdb::ColumnFamilyDescriptor; use topos_core::{ - types::{stream::CertificateSourceStreamPosition, CertificateDelivered, ProofOfDelivery}, + types::ProofOfDelivery, uci::{Certificate, CertificateId}, }; use tracing::warn; @@ -14,23 +14,49 @@ use crate::{ db::{default_options, init_with_cfs}, db_column::DBColumn, }, - types::{EpochId, EpochSummary}, + types::{CertificatesColumn, EpochId, EpochSummary, PendingCertificatesColumn, StreamsColumn}, PendingCertificateId, }; -/// Volatile and pending data +/// Pending data used by Validator +/// +/// It contains data that is not yet delivered. +/// +/// When a [`Certificate`] is received, it can either be added to the pending +/// pool or to the precedence pool. +/// +/// Prior to be inserted in either of the pending or precedence pools, a [`Certificate`] +/// needs to be validated. A validated certificate means that the proof of the certificate +/// has be verified using FROST. +/// +/// ## Pending pool +/// +/// The pending pool stores certificates that are ready to be broadcast. +/// A [`Certificate`] is ready to be broadcast when it has been validated and its previous [`Certificate`] is +/// already delivered. +/// +/// The ordering inside the pending pool is a FIFO queue, each [`Certificate`] in the pool gets +/// assigned to a unique [`PendingCertificateId`](type@crate::PendingCertificateId). +/// +/// ## Precedence pool +/// +/// The precedence pool stores certificates that are not yet ready to be broadcast. +/// Typically waiting for its previous [`Certificate`] to be delivered. +/// However, the [`Certificate`] is already validated. +/// +/// When a [`Certificate`] is delivered, the [`ValidatorStore`](struct@super::ValidatorStore) will +/// check for any child [`Certificate`] in the precedence pool waiting to be promoted to the +/// pending pool in order to be broadcast. +/// pub struct ValidatorPendingTables { pub(crate) next_pending_id: AtomicU64, - #[allow(unused)] - fetching_pool: BTreeSet, // Not sure to keep it - pub(crate) pending_pool: DBColumn, + pub(crate) pending_pool: PendingCertificatesColumn, pub(crate) pending_pool_index: DBColumn, pub(crate) precedence_pool: DBColumn, - #[allow(unused)] - expiration_tracker: (), // Unknown } impl ValidatorPendingTables { + /// Open the [`ValidatorPendingTables`] at the given path. pub fn open(mut path: PathBuf) -> Self { path.push("pending"); if !path.exists() { @@ -49,19 +75,18 @@ impl ValidatorPendingTables { Self { // TODO: Fetch it from the storage next_pending_id: AtomicU64::new(0), - fetching_pool: BTreeSet::new(), pending_pool: DBColumn::reopen(&db, cfs::PENDING_POOL), pending_pool_index: DBColumn::reopen(&db, cfs::PENDING_POOL_INDEX), precedence_pool: DBColumn::reopen(&db, cfs::PRECEDENCE_POOL), - expiration_tracker: (), } } } /// Data that shouldn't be purged at all. +// TODO: TP-774: Rename and move to FullNode domain pub struct ValidatorPerpetualTables { - pub(crate) certificates: DBColumn, - pub(crate) streams: DBColumn, + pub(crate) certificates: CertificatesColumn, + pub(crate) streams: StreamsColumn, #[allow(unused)] epoch_chain: DBColumn, pub(crate) unverified: DBColumn, diff --git a/scripts/check_readme.sh b/scripts/check_readme.sh index 9ab3ff37a..6d7e76463 100755 --- a/scripts/check_readme.sh +++ b/scripts/check_readme.sh @@ -13,3 +13,4 @@ function check { } check crates/topos-tce-broadcast +check crates/topos-tce-storage