Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

chore: backport fix of 0.0.11 #453

Merged
merged 4 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ strip = true
[workspace.dependencies]
topos-core = { path = "./crates/topos-core", default-features = false }
topos-crypto = { path = "./crates/topos-crypto", default-features = false }
topos-metrics = { path = "./crates/topos-metrics/", default-features = false }

# Various utility crates
clap = { version = "4.0", features = ["derive", "env", "string"] }
Expand Down
6 changes: 6 additions & 0 deletions crates/topos-config/src/tce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ use topos_p2p::{Multiaddr, PeerId};

use self::broadcast::ReliableBroadcastParams;
use self::p2p::P2PConfig;
use self::synchronization::SynchronizationConfig;

pub mod broadcast;
pub mod p2p;
pub mod synchronization;

const DEFAULT_IP: std::net::Ipv4Addr = std::net::Ipv4Addr::new(0, 0, 0, 0);

Expand Down Expand Up @@ -68,6 +70,10 @@ pub struct TceConfig {
#[serde(default)]
pub p2p: P2PConfig,

/// Synchronization configuration
#[serde(default)]
pub synchronization: SynchronizationConfig,

/// gRPC API Addr
#[serde(default = "default_grpc_api_addr")]
pub grpc_api_addr: SocketAddr,
Expand Down
36 changes: 36 additions & 0 deletions crates/topos-config/src/tce/synchronization.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use serde::{Deserialize, Serialize};

/// Configuration for the TCE synchronization
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "kebab-case")]
pub struct SynchronizationConfig {
/// Interval in seconds to synchronize the TCE
#[serde(default = "SynchronizationConfig::default_interval_seconds")]
pub interval_seconds: u64,

/// Maximum number of Proof of delivery per query per subnet
#[serde(default = "SynchronizationConfig::default_limit_per_subnet")]
pub limit_per_subnet: usize,
}

impl Default for SynchronizationConfig {
fn default() -> Self {
Self {
interval_seconds: SynchronizationConfig::INTERVAL_SECONDS,
limit_per_subnet: SynchronizationConfig::LIMIT_PER_SUBNET,
}
}
}

impl SynchronizationConfig {
pub const INTERVAL_SECONDS: u64 = 10;
pub const LIMIT_PER_SUBNET: usize = 100;

const fn default_interval_seconds() -> u64 {
Self::INTERVAL_SECONDS
}

const fn default_limit_per_subnet() -> usize {
Self::LIMIT_PER_SUBNET
}
}
2 changes: 2 additions & 0 deletions crates/topos-core/proto/topos/tce/v1/synchronization.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ message CheckpointRequest {
topos.shared.v1.UUID request_id = 1;

repeated ProofOfDelivery checkpoint = 2;

uint64 limit_per_subnet = 3;
}

message CheckpointResponse {
Expand Down
Binary file modified crates/topos-core/src/api/grpc/generated/topos.bin
Binary file not shown.
2 changes: 2 additions & 0 deletions crates/topos-core/src/api/grpc/generated/topos.tce.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ pub struct CheckpointRequest {
pub request_id: ::core::option::Option<super::super::shared::v1::Uuid>,
#[prost(message, repeated, tag = "2")]
pub checkpoint: ::prost::alloc::vec::Vec<ProofOfDelivery>,
#[prost(uint64, tag = "3")]
pub limit_per_subnet: u64,
}
#[derive(serde::Deserialize, serde::Serialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down
16 changes: 14 additions & 2 deletions crates/topos-metrics/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use prometheus::{
self, register_histogram_with_registry, register_int_counter_with_registry, Histogram,
IntCounter,
self, register_histogram_with_registry, register_int_counter_with_registry,
register_int_gauge_with_registry, Histogram, IntCounter, IntGauge,
};

use lazy_static::lazy_static;
Expand Down Expand Up @@ -31,4 +31,16 @@ lazy_static! {
TOPOS_METRIC_REGISTRY
)
.unwrap();
pub static ref STORAGE_PENDING_POOL_COUNT: IntGauge = register_int_gauge_with_registry!(
"storage_pending_pool_count",
"Number of certificates in the pending pool.",
TOPOS_METRIC_REGISTRY
)
.unwrap();
pub static ref STORAGE_PRECEDENCE_POOL_COUNT: IntGauge = register_int_gauge_with_registry!(
"storage_precedence_pool_count",
"Number of certificates in the precedence pool.",
TOPOS_METRIC_REGISTRY
)
.unwrap();
}
19 changes: 10 additions & 9 deletions crates/topos-sequencer-subnet-runtime/tests/subnet_contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,18 +321,19 @@ async fn deploy_test_token(
"Deploying new token {} with symbol {}",
token_name, token_symbol
);

let deploy_query = ierc20_messaging
let deploy_outcome = ierc20_messaging
.deploy_token(token_encoded_params)
.legacy()
.gas(DEFAULT_GAS);

let deploy_result = deploy_query.send().await.map_err(|e| {
error!("Unable deploy token: {e}");
e
})?;
.gas(DEFAULT_GAS)
.send()
.await
.map_err(|e| {
error!("Unable deploy token: {e}");
e
})?
.await;

match deploy_result.await {
match deploy_outcome {
Ok(r) => {
info!("Token deployed: {:?}", r);
}
Expand Down
16 changes: 16 additions & 0 deletions crates/topos-tce-api/src/graphql/query.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::sync::Arc;

use async_graphql::{Context, EmptyMutation, Object, Schema, Subscription};
Expand All @@ -13,6 +14,7 @@ use topos_core::api::graphql::{
query::CertificateQuery,
};
use topos_core::types::stream::CertificateSourceStreamPosition;
use topos_metrics::{STORAGE_PENDING_POOL_COUNT, STORAGE_PRECEDENCE_POOL_COUNT};
use topos_tce_storage::fullnode::FullNodeStore;
use topos_tce_storage::store::ReadStore;

Expand Down Expand Up @@ -114,6 +116,20 @@ impl QueryRoot {
Self::certificate_by_id(ctx, certificate_id).await
}

/// This endpoint is used to get the current storage pool stats.
/// It returns the number of certificates in the pending and precedence pools.
/// The values are estimated as having a precise count is costly.
async fn get_storage_pool_stats(
&self,
_ctx: &Context<'_>,
) -> Result<HashMap<&str, i64>, GraphQLServerError> {
let mut stats = HashMap::new();
stats.insert("pending_pool", STORAGE_PENDING_POOL_COUNT.get());
stats.insert("precedence_pool", STORAGE_PRECEDENCE_POOL_COUNT.get());

Ok(stats)
}

/// This endpoint is used to get the current checkpoint of the source streams.
/// The checkpoint is the position of the last certificate delivered for each source stream.
async fn get_checkpoint(
Expand Down
71 changes: 71 additions & 0 deletions crates/topos-tce-api/tests/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use topos_core::{
},
uci::Certificate,
};
use topos_metrics::{STORAGE_PENDING_POOL_COUNT, STORAGE_PRECEDENCE_POOL_COUNT};
use topos_tce_api::{Runtime, RuntimeEvent};
use topos_tce_storage::types::CertificateDeliveredWithPositions;
use topos_tce_storage::StorageClient;
Expand Down Expand Up @@ -621,3 +622,73 @@ async fn can_query_graphql_endpoint_for_certificates(
graphql_certificate.source_subnet_id
);
}

#[rstest]
#[timeout(Duration::from_secs(4))]
#[test(tokio::test)]
async fn check_storage_pool_stats(
broadcast_stream: broadcast::Receiver<CertificateDeliveredWithPositions>,
) {
let addr = get_available_addr();
let graphql_addr = get_available_addr();
let metrics_addr = get_available_addr();

let fullnode_store = create_fullnode_store::default().await;

let store =
create_validator_store(vec![], futures::future::ready(fullnode_store.clone())).await;
STORAGE_PENDING_POOL_COUNT.set(10);
STORAGE_PRECEDENCE_POOL_COUNT.set(200);

let storage_client = StorageClient::new(store.clone());

let (_runtime_client, _launcher, _ctx) = Runtime::builder()
.with_broadcast_stream(broadcast_stream)
.storage(storage_client)
.store(store)
.serve_grpc_addr(addr)
.serve_graphql_addr(graphql_addr)
.serve_metrics_addr(metrics_addr)
.build_and_launch()
.await;

// Wait for server to boot
tokio::time::sleep(Duration::from_millis(100)).await;

let query = "query {getStoragePoolStats}";

#[derive(Debug, Deserialize)]
struct Response {
// data: HashMap<String, serde_json::Value>,
data: Stats,
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct Stats {
get_storage_pool_stats: PoolStats,
}

#[derive(Debug, Deserialize)]
struct PoolStats {
pending_pool: u64,
precedence_pool: u64,
}

let client = reqwest::Client::new();

let response = client
.post(format!("http://{}", graphql_addr))
.json(&serde_json::json!({
"query": query,
}))
.send()
.await
.unwrap()
.json::<Response>()
.await
.unwrap();

assert_eq!(response.data.get_storage_pool_stats.pending_pool, 10);
assert_eq!(response.data.get_storage_pool_stats.precedence_pool, 200);
}
1 change: 1 addition & 0 deletions crates/topos-tce-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ workspace = true

[dependencies]
topos-core = { workspace = true, features = ["uci", "api"] }
topos-metrics = { workspace = true }

async-stream.workspace = true
async-trait.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions crates/topos-tce-storage/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub enum InternalStorageError {
#[error("Invalid query argument: {0}")]
InvalidQueryArgument(&'static str),

#[error("Unexpected DB state: {0}")]
UnexpectedDBState(&'static str),

#[error(transparent)]
Bincode(#[from] Box<bincode::ErrorKind>),

Expand Down
8 changes: 6 additions & 2 deletions crates/topos-tce-storage/src/fullnode/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{collections::HashMap, sync::Arc};
use arc_swap::ArcSwap;
use async_trait::async_trait;

use rocksdb::properties::ESTIMATE_NUM_KEYS;
use topos_core::{
types::{
stream::{CertificateSourceStreamPosition, CertificateTargetStreamPosition, Position},
Expand Down Expand Up @@ -233,8 +234,11 @@ impl WriteStore for FullNodeStore {
}

impl ReadStore for FullNodeStore {
fn count_certificates_delivered(&self) -> Result<usize, StorageError> {
Ok(self.perpetual_tables.certificates.iter()?.count())
fn count_certificates_delivered(&self) -> Result<u64, StorageError> {
Ok(self
.perpetual_tables
.certificates
.property_int_value(ESTIMATE_NUM_KEYS)?)
}

fn get_source_head(&self, subnet_id: &SubnetId) -> Result<Option<SourceHead>, StorageError> {
Expand Down
17 changes: 14 additions & 3 deletions crates/topos-tce-storage/src/rocks/db_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::path::Path;
#[cfg(test)]
use rocksdb::ColumnFamilyDescriptor;
use rocksdb::{
BoundColumnFamily, DBRawIteratorWithThreadMode, DBWithThreadMode, Direction, IteratorMode,
MultiThreaded, ReadOptions, WriteBatch,
BoundColumnFamily, CStrLike, DBRawIteratorWithThreadMode, DBWithThreadMode, Direction,
IteratorMode, MultiThreaded, ReadOptions, WriteBatch,
};

use bincode::Options;
Expand Down Expand Up @@ -66,7 +66,7 @@ impl<K, V> DBColumn<K, V> {
}

/// Returns the CF of the DBColumn, used to build queries.
fn cf(&self) -> Result<Arc<BoundColumnFamily<'_>>, InternalStorageError> {
pub(crate) fn cf(&self) -> Result<Arc<BoundColumnFamily<'_>>, InternalStorageError> {
self.rocksdb
.cf_handle(self.cf)
.ok_or(InternalStorageError::InvalidColumnFamily(self.cf))
Expand All @@ -78,6 +78,17 @@ where
K: DeserializeOwned + Serialize + std::fmt::Debug,
V: DeserializeOwned + Serialize + std::fmt::Debug,
{
pub(crate) fn property_int_value(
&self,
property: impl CStrLike,
) -> Result<u64, InternalStorageError> {
self.rocksdb
.property_int_value_cf(&self.cf()?, property)?
.ok_or(InternalStorageError::UnexpectedDBState(
"Property not found",
))
}

/// Insert a record into the storage by passing a Key and a Value.
///
/// Key are fixed length bincode serialized.
Expand Down
2 changes: 1 addition & 1 deletion crates/topos-tce-storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub trait WriteStore: Send {
/// [`FullNodeStore`](struct@super::fullnode::FullNodeStore) to read data.
pub trait ReadStore: Send {
/// Returns the number of certificates delivered
fn count_certificates_delivered(&self) -> Result<usize, StorageError>;
fn count_certificates_delivered(&self) -> Result<u64, StorageError>;

/// Try to get a SourceHead of a subnet
///
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-tce-storage/src/tests/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn get_checkpoint_diff_with_no_input(store: Arc<ValidatorStore>) {
}

let checkpoint = store
.get_checkpoint_diff(&[])
.get_checkpoint_diff(&[], 100)
.unwrap()
.into_iter()
.map(|(subnet, proofs)| {
Expand Down Expand Up @@ -97,7 +97,7 @@ async fn get_checkpoint_diff_with_input(store: Arc<ValidatorStore>) {
}

let checkpoint = store
.get_checkpoint_diff(&[checkpoint])
.get_checkpoint_diff(&[checkpoint], 100)
.unwrap()
.into_iter()
.map(|(subnet, proofs)| {
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-tce-storage/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,15 +485,15 @@ async fn get_source_head_for_subnet(store: Arc<ValidatorStore>) {
create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_2], 10);

store
.insert_certificates_delivered(&expected_certificates_for_source_subnet_1)
.insert_certificates_delivered(&expected_certificates_for_source_subnet_1[..])
.await
.unwrap();

let expected_certificates_for_source_subnet_2 =
create_certificate_chain(SOURCE_SUBNET_ID_2, &[TARGET_SUBNET_ID_2], 10);

store
.insert_certificates_delivered(&expected_certificates_for_source_subnet_2)
.insert_certificates_delivered(&expected_certificates_for_source_subnet_2[..])
.await
.unwrap();

Expand Down
Loading
Loading