Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
refactor: improve delivery timing
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <simon.paitrault@gmail.com>
  • Loading branch information
Freyskeyd committed Feb 29, 2024
1 parent c41a51a commit 707fe93
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 12 deletions.
40 changes: 39 additions & 1 deletion crates/topos-core/src/api/graphql/certificate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ use super::{checkpoint::SourceStreamPosition, subnet::SubnetId};
#[derive(Serialize, Deserialize, Debug, NewType)]
pub struct CertificateId(String);

impl From<uci::CertificateId> for CertificateId {
fn from(value: uci::CertificateId) -> Self {
Self(value.to_string())
}
}

#[derive(Serialize, Deserialize, Debug, SimpleObject)]
#[serde(rename_all = "camelCase")]
pub struct CertificatePositions {
Expand All @@ -30,6 +36,38 @@ pub struct Certificate {
pub positions: CertificatePositions,
}

#[derive(Debug, Serialize, Deserialize, SimpleObject)]
#[serde(rename_all = "camelCase")]
pub struct UndeliveredCertificate {
pub id: CertificateId,
pub prev_id: CertificateId,
pub proof: String,
pub signature: String,
pub source_subnet_id: SubnetId,
pub state_root: String,
pub target_subnets: Vec<SubnetId>,
pub tx_root_hash: String,
pub receipts_root_hash: String,
pub verifier: u32,
}

impl From<&crate::uci::Certificate> for UndeliveredCertificate {
fn from(value: &crate::uci::Certificate) -> Self {
Self {
id: CertificateId(value.id.to_string()),
prev_id: CertificateId(value.prev_id.to_string()),
proof: hex::encode(&value.proof),
signature: hex::encode(&value.signature),
source_subnet_id: (&value.source_subnet_id).into(),
state_root: hex::encode(value.state_root),
target_subnets: value.target_subnets.iter().map(Into::into).collect(),
tx_root_hash: hex::encode(value.tx_root_hash),
receipts_root_hash: format!("0x{}", hex::encode(value.receipts_root_hash)),
verifier: value.verifier,
}
}
}

#[derive(Debug, Serialize, Deserialize, SimpleObject)]
pub struct Ready {
message: String,
Expand All @@ -52,7 +90,7 @@ impl From<&CertificateDelivered> for Certificate {
receipts_root_hash: format!("0x{}", hex::encode(uci_cert.receipts_root_hash)),
verifier: uci_cert.verifier,
positions: CertificatePositions {
source: (&value.proof_of_delivery.delivery_position).into(),
source: (&value.proof_of_delivery).into(),
},
}
}
Expand Down
12 changes: 7 additions & 5 deletions crates/topos-core/src/api/graphql/checkpoint.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_graphql::{InputObject, SimpleObject};
use serde::{Deserialize, Serialize};

use crate::types::stream::CertificateSourceStreamPosition;
use crate::types::ProofOfDelivery;

use super::{certificate::CertificateId, subnet::SubnetId};

Expand All @@ -17,13 +17,15 @@ pub struct SourceStreamPositionInput {
pub struct SourceStreamPosition {
pub source_subnet_id: SubnetId,
pub position: u64,
pub certificate_id: CertificateId,
}

impl From<&CertificateSourceStreamPosition> for SourceStreamPosition {
fn from(value: &CertificateSourceStreamPosition) -> Self {
impl From<&ProofOfDelivery> for SourceStreamPosition {
fn from(value: &ProofOfDelivery) -> Self {
Self {
source_subnet_id: (&value.subnet_id).into(),
position: *value.position,
certificate_id: value.certificate_id.into(),
source_subnet_id: (&value.delivery_position.subnet_id).into(),
position: *value.delivery_position.position,
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions crates/topos-tce-api/src/graphql/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ use crate::{
},
runtime::InternalRuntimeCommand,
};
use topos_tce_storage::fullnode::FullNodeStore;
use topos_tce_storage::validator::ValidatorStore;

use super::query::SubscriptionRoot;

#[derive(Default)]
pub struct ServerBuilder {
store: Option<Arc<FullNodeStore>>,
store: Option<Arc<ValidatorStore>>,
serve_addr: Option<SocketAddr>,
runtime: Option<mpsc::Sender<InternalRuntimeCommand>>,
}
Expand All @@ -34,7 +34,7 @@ impl ServerBuilder {

self
}
pub(crate) fn store(mut self, store: Arc<FullNodeStore>) -> Self {
pub(crate) fn store(mut self, store: Arc<ValidatorStore>) -> Self {
self.store = Some(store);

self
Expand Down Expand Up @@ -62,13 +62,15 @@ impl ServerBuilder {
.take()
.expect("Cannot build GraphQL server without a FullNode store");

let fulltnode_store = store.get_fullnode_store();
let runtime = self
.runtime
.take()
.expect("Cannot build GraphQL server without the internal runtime channel");

let schema: ServiceSchema = Schema::build(QueryRoot, EmptyMutation, SubscriptionRoot)
.data(store)
.data(fulltnode_store)
.data(runtime)
.finish();

Expand Down
76 changes: 75 additions & 1 deletion crates/topos-tce-api/src/graphql/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use async_graphql::{Context, EmptyMutation, Object, Schema, Subscription};
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use tokio::sync::{mpsc, oneshot};
use topos_core::api::graphql::certificate::UndeliveredCertificate;
use topos_core::api::graphql::checkpoint::SourceStreamPosition;
use topos_core::api::graphql::errors::GraphQLServerError;
use topos_core::api::graphql::filter::SubnetFilter;
Expand All @@ -18,6 +19,7 @@ use topos_metrics::{STORAGE_PENDING_POOL_COUNT, STORAGE_PRECEDENCE_POOL_COUNT};
use topos_tce_storage::fullnode::FullNodeStore;
use topos_tce_storage::store::ReadStore;

use topos_tce_storage::validator::ValidatorStore;
use tracing::debug;

use crate::runtime::InternalRuntimeCommand;
Expand Down Expand Up @@ -121,12 +123,44 @@ impl QueryRoot {
/// The values are estimated as having a precise count is costly.
async fn get_storage_pool_stats(
&self,
_ctx: &Context<'_>,
ctx: &Context<'_>,
) -> Result<HashMap<&str, i64>, GraphQLServerError> {
let mut stats = HashMap::new();
stats.insert("pending_pool", STORAGE_PENDING_POOL_COUNT.get());
stats.insert("precedence_pool", STORAGE_PRECEDENCE_POOL_COUNT.get());

let store = ctx.data::<Arc<ValidatorStore>>().map_err(|_| {
tracing::error!("Failed to get store from context");

GraphQLServerError::ParseDataConnector
})?;

stats.insert(
"pending_certificate_iter",
store
.get_pending_certificates()
.map_err(|_| GraphQLServerError::StorageError)?
.len() as i64,
);

stats.insert(
"iter_pending_certificate",
store
.iter_count_pending_certificates()
.map_err(|_| GraphQLServerError::StorageError)?
.try_into()
.unwrap_or(i64::MAX),
);

stats.insert(
"iter_precedence_certificate",
store
.iter_count_precedence_pool_certificates()
.map_err(|_| GraphQLServerError::StorageError)?
.try_into()
.unwrap_or(i64::MAX),
);

Ok(stats)
}

Expand All @@ -151,9 +185,49 @@ impl QueryRoot {
.map(|(subnet_id, head)| SourceStreamPosition {
source_subnet_id: subnet_id.into(),
position: *head.position,
certificate_id: head.certificate_id.into(),
})
.collect())
}

async fn get_pending_pool(
&self,
ctx: &Context<'_>,
) -> Result<HashMap<u64, CertificateId>, GraphQLServerError> {
let store = ctx.data::<Arc<ValidatorStore>>().map_err(|_| {
tracing::error!("Failed to get store from context");

GraphQLServerError::ParseDataConnector
})?;

Ok(store
.get_pending_certificates()
.map_err(|_| GraphQLServerError::StorageError)?
.iter()
.map(|(id, certificate)| (*id, certificate.id.into()))
.collect())
}

async fn check_precedence(
&self,
ctx: &Context<'_>,
certificate_id: CertificateId,
) -> Result<Option<UndeliveredCertificate>, GraphQLServerError> {
let store = ctx.data::<Arc<ValidatorStore>>().map_err(|_| {
tracing::error!("Failed to get store from context");

GraphQLServerError::ParseDataConnector
})?;

store
.check_precedence(
&certificate_id
.try_into()
.map_err(|_| GraphQLServerError::ParseCertificateId)?,
)
.map_err(|_| GraphQLServerError::StorageError)
.map(|certificate| certificate.as_ref().map(Into::into))
}
}

pub struct SubscriptionRoot;
Expand Down
1 change: 1 addition & 0 deletions crates/topos-tce-api/src/graphql/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ async fn open_watch_certificate_delivered() {
source {
sourceSubnetId
position
certificateId
}
}
}
Expand Down
1 change: 0 additions & 1 deletion crates/topos-tce-api/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ impl RuntimeBuilder {
.store(
self.store
.take()
.map(|store| store.get_fullnode_store())
.expect("Unable to build GraphQL Server, Store is missing"),
)
.runtime(internal_runtime_command_sender.clone())
Expand Down
1 change: 1 addition & 0 deletions crates/topos-tce-api/tests/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ async fn can_query_graphql_endpoint_for_certificates(
source {{
sourceSubnetId
position
certificateId
}}
}}
}}
Expand Down
7 changes: 6 additions & 1 deletion crates/topos-tce-broadcast/src/double_echo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ impl DoubleEcho {

command if self.subscriptions.is_some() => {
match command {
DoubleEchoCommand::Broadcast { cert, need_gossip } => {
_ = self
.task_manager_message_sender
.send(DoubleEchoCommand::Broadcast { need_gossip, cert })
.await;
}
DoubleEchoCommand::Echo { certificate_id, validator_id, signature } => {
// Check if source is part of known_validators
if !self.validators.contains(&validator_id) {
Expand Down Expand Up @@ -173,7 +179,6 @@ impl DoubleEcho {

self.handle_ready(certificate_id, validator_id, signature).await
},
_ => {}
}

},
Expand Down
29 changes: 29 additions & 0 deletions crates/topos-tce-storage/src/validator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,17 @@ impl ValidatorStore {
.property_int_value(ESTIMATE_NUM_KEYS)?)
}

/// Returns the number of certificates in the pending pool (by iterating)
pub fn iter_count_pending_certificates(&self) -> Result<u64, StorageError> {
Ok(self
.pending_tables
.pending_pool
.iter()?
.count()
.try_into()
.unwrap_or(u64::MAX))
}

/// Returns the number of certificates in the precedence pool
pub fn count_precedence_pool_certificates(&self) -> Result<u64, StorageError> {
Ok(self
Expand All @@ -139,6 +150,17 @@ impl ValidatorStore {
.property_int_value(ESTIMATE_NUM_KEYS)?)
}

/// Returns the number of certificates in the precedence pool (by iterating)
pub fn iter_count_precedence_pool_certificates(&self) -> Result<u64, StorageError> {
Ok(self
.pending_tables
.precedence_pool
.iter()?
.count()
.try_into()
.unwrap_or(u64::MAX))
}

/// Try to return the [`PendingCertificateId`] for a [`CertificateId`]
///
/// Return `Ok(None)` if the `certificate_id` is not found.
Expand Down Expand Up @@ -183,6 +205,13 @@ impl ValidatorStore {
.collect())
}

pub fn check_precedence(
&self,
certificate_id: &CertificateId,
) -> Result<Option<Certificate>, StorageError> {
Ok(self.pending_tables.precedence_pool.get(certificate_id)?)
}

// TODO: Performance issue on this one as we iter over all the pending certificates
// We need to improve how we request the pending certificates.
pub fn get_pending_certificates_for_subnets(
Expand Down
10 changes: 10 additions & 0 deletions crates/topos-tce/src/app_context/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use topos_core::uci::{Certificate, SubnetId};
use topos_metrics::CERTIFICATE_DELIVERY_LATENCY;
use topos_tce_api::RuntimeError;
use topos_tce_api::RuntimeEvent as ApiEvent;
use topos_tce_broadcast::DoubleEchoCommand;
use topos_tce_storage::errors::{InternalStorageError, StorageError};
use topos_tce_storage::types::PendingResult;
use tracing::debug;
Expand All @@ -29,6 +30,15 @@ impl AppContext {
certificate.id, certificate.source_subnet_id
);

_ = self
.tce_cli
.get_double_echo_channel()
.send(DoubleEchoCommand::Broadcast {
need_gossip: true,
cert: *certificate,
})
.await;

sender.send(Ok(PendingResult::InPending(pending_id)))
}
Ok(None) => {
Expand Down
9 changes: 9 additions & 0 deletions crates/topos-tce/src/app_context/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ impl AppContext {
"Certificate {} has been inserted into pending pool",
cert.id
);

_ = self
.tce_cli
.get_double_echo_channel()
.send(DoubleEchoCommand::Broadcast {
need_gossip: false,
cert,
})
.await;
}
Ok(None) => {
debug!(
Expand Down

0 comments on commit 707fe93

Please sign in to comment.