Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
feat: expand GetLastPendingCertificates (#294)
Browse files Browse the repository at this point in the history
  • Loading branch information
atanmarko authored Sep 6, 2023
1 parent 06a8e1c commit 631f168
Show file tree
Hide file tree
Showing 17 changed files with 119 additions and 53 deletions.
2 changes: 1 addition & 1 deletion crates/topos-api/proto/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@topos-protocol/topos-grpc-client-stub",
"version": "0.1.3",
"version": "0.1.4",
"description": "JavaScript gRPC client stub for topos-api",
"files": [
"generated"
Expand Down
9 changes: 8 additions & 1 deletion crates/topos-api/proto/topos/tce/v1/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import "topos/shared/v1/subnet.proto";
import "topos/shared/v1/uuid.proto";
import "topos/tce/v1/synchronization.proto";
import "topos/uci/v1/certification.proto";
import "topos/shared/v1/certificate.proto";


service APIService {
Expand Down Expand Up @@ -44,10 +45,16 @@ message GetLastPendingCertificatesRequest {
repeated topos.shared.v1.SubnetId subnet_ids = 1;
}

message LastPendingCertificate {
topos.uci.v1.Certificate value = 1;
// Pending certificate index (effectively total number of pending certificates)
int32 index = 2;
}

message GetLastPendingCertificatesResponse {
// Bytes and array types (SubnetId) could not be key in the map type according to specifications,
// so we use SubnetId hex encoded string with 0x prefix as key
map<string, topos.uci.v1.OptionalCertificate> last_pending_certificate = 1;
map<string, LastPendingCertificate> last_pending_certificate = 1;
}

message WatchCertificatesRequest {
Expand Down
Binary file modified crates/topos-api/src/grpc/generated/topos.bin
Binary file not shown.
11 changes: 10 additions & 1 deletion crates/topos-api/src/grpc/generated/topos.tce.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,22 @@ pub struct GetLastPendingCertificatesRequest {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct LastPendingCertificate {
#[prost(message, optional, tag = "1")]
pub value: ::core::option::Option<super::super::uci::v1::Certificate>,
/// Pending certificate index (effectively total number of pending certificates)
#[prost(int32, tag = "2")]
pub index: i32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetLastPendingCertificatesResponse {
/// Bytes and array types (SubnetId) could not be key in the map type according to specifications,
/// so we use SubnetId hex encoded string with 0x prefix as key
#[prost(map = "string, message", tag = "1")]
pub last_pending_certificate: ::std::collections::HashMap<
::prost::alloc::string::String,
super::super::uci::v1::OptionalCertificate,
LastPendingCertificate,
>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down
10 changes: 6 additions & 4 deletions crates/topos-api/tests/tce_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ use topos_api::grpc::tce::v1::api_service_server::{ApiService, ApiServiceServer}
use topos_api::grpc::tce::v1::watch_certificates_request::{Command, OpenStream};
use topos_api::grpc::tce::v1::{
GetLastPendingCertificatesRequest, GetLastPendingCertificatesResponse, GetSourceHeadRequest,
GetSourceHeadResponse, SourceStreamPosition, SubmitCertificateRequest,
GetSourceHeadResponse, LastPendingCertificate, SourceStreamPosition, SubmitCertificateRequest,
SubmitCertificateResponse, WatchCertificatesRequest, WatchCertificatesResponse,
};
use topos_api::grpc::uci::v1::{Certificate, OptionalCertificate};
use topos_api::grpc::uci::v1::Certificate;
use uuid::Uuid;

use topos_test_sdk::constants::*;
Expand Down Expand Up @@ -76,14 +76,15 @@ async fn create_tce_layer() {
for subnet_id in subnet_ids {
map.insert(
base64::engine::general_purpose::STANDARD.encode(&subnet_id.value),
OptionalCertificate {
LastPendingCertificate {
value: Some(Certificate {
source_subnet_id: subnet_id.into(),
id: Some(return_certificate_id.clone()),
prev_id: Some(return_prev_certificate_id.clone()),
target_subnets: Vec::new(),
..Default::default()
}),
index: 0,
},
);
}
Expand Down Expand Up @@ -200,8 +201,9 @@ async fn create_tce_layer() {
let mut expected_last_pending_certificate_ids = HashMap::new();
expected_last_pending_certificate_ids.insert(
base64::engine::general_purpose::STANDARD.encode(&source_subnet_id.value),
OptionalCertificate {
LastPendingCertificate {
value: Some(original_certificate.clone()),
index: 0,
},
);

Expand Down
2 changes: 1 addition & 1 deletion crates/topos-sequencer-subnet-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl SubnetRuntimeProxyWorker {

pub async fn set_source_head_certificate_id(
&self,
source_head_certificate_id: Option<CertificateId>,
source_head_certificate_id: Option<(CertificateId, u64)>,
) -> Result<(), Error> {
let mut runtime_proxy = self.runtime_proxy.lock().await;
runtime_proxy
Expand Down
11 changes: 6 additions & 5 deletions crates/topos-sequencer-subnet-runtime/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub struct SubnetRuntimeProxy {
pub certification: Arc<Mutex<Certification>>,
command_task_shutdown: mpsc::Sender<oneshot::Sender<()>>,
block_task_shutdown: mpsc::Sender<oneshot::Sender<()>>,
source_head_certificate_id_sender: Option<oneshot::Sender<Option<CertificateId>>>,
source_head_certificate_id_sender: Option<oneshot::Sender<Option<(CertificateId, u64)>>>,
}

impl Debug for SubnetRuntimeProxy {
Expand Down Expand Up @@ -111,13 +111,14 @@ impl SubnetRuntimeProxy {
info!("Waiting for the source head certificate id to continue with certificate generation");
// Wait for last_certificate_id retrieved on TCE component setup
match source_head_certificate_id_received.await {
Ok(last_certificate_id) => {
Ok(certificate_and_position) => {
info!(
"Source head certificate id received {:?}",
last_certificate_id.map(|id| id.to_string())
certificate_and_position
);
let cert_id = certificate_and_position.map(|(id, _position)| id);
// Certificate generation is now ready to run
certification.last_certificate_id = last_certificate_id;
certification.last_certificate_id = cert_id;
}
Err(e) => {
panic!("Failed to get source head certificate, unable to proceed with certificate generation: {e}")
Expand Down Expand Up @@ -396,7 +397,7 @@ impl SubnetRuntimeProxy {

pub async fn set_source_head_certificate_id(
&mut self,
source_head_certificate_id: Option<CertificateId>,
source_head_certificate_id: Option<(CertificateId, u64)>,
) -> Result<(), Error> {
self.source_head_certificate_id_sender
.take()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ async fn test_subnet_send_token_processing(
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
info!("Set source head certificate to 0");
if let Err(e) = runtime_proxy_worker
.set_source_head_certificate_id(Some(CERTIFICATE_ID_1))
.set_source_head_certificate_id(Some((CERTIFICATE_ID_1, 0)))
.await
{
panic!("Unable to set source head certificate id: {e}");
Expand Down
2 changes: 1 addition & 1 deletion crates/topos-sequencer/src/app_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl AppContext {
Ok((tce_proxy_worker, source_head_certificate)) => {
info!("TCE proxy client is restarted for the source subnet {:?} from the head {:?}",config.subnet_id, source_head_certificate);
let source_head_certificate_id =
source_head_certificate.map(|cert| cert.id);
source_head_certificate.map(|cert| cert.0.id);
(tce_proxy_worker, source_head_certificate_id)
}
Err(e) => {
Expand Down
3 changes: 2 additions & 1 deletion crates/topos-sequencer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ pub async fn launch(
"TCE proxy client is starting for the source subnet {:?} from the head {:?}",
subnet_id, source_head_certificate
);
let source_head_certificate_id = source_head_certificate.map(|cert| cert.id);
let source_head_certificate_id =
source_head_certificate.map(|(cert, position)| (cert.id, position));
(tce_proxy_worker, source_head_certificate_id)
}
Err(e) => {
Expand Down
17 changes: 13 additions & 4 deletions crates/topos-tce-api/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ use std::pin::Pin;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status, Streaming};
use topos_api::grpc::tce::v1::LastPendingCertificate;
use topos_core::api::grpc::tce::v1::{
api_service_server::ApiService, GetLastPendingCertificatesRequest,
GetLastPendingCertificatesResponse, GetSourceHeadRequest, GetSourceHeadResponse,
SubmitCertificateRequest, SubmitCertificateResponse, WatchCertificatesRequest,
WatchCertificatesResponse,
};
use topos_core::api::grpc::uci::v1::OptionalCertificate;
use topos_core::uci::SubnetId;
use topos_metrics::API_GRPC_CERTIFICATE_RECEIVED_TOTAL;
use tracing::{error, info, Span};
Expand Down Expand Up @@ -223,12 +223,21 @@ impl ApiService for TceGrpcService {
Ok(Ok(map)) => Ok(Response::new(GetLastPendingCertificatesResponse {
last_pending_certificate: map
.into_iter()
.map(|(subnet_id, cert)| {
.map(|(subnet_id, last_pending_certificate)| {
(
base64::engine::general_purpose::STANDARD
.encode(subnet_id.as_array()),
OptionalCertificate {
value: cert.map(Into::into),
{
LastPendingCertificate {
index: last_pending_certificate
.as_ref()
.map(|(_cert, index)| *index)
.unwrap_or_default()
as i32,
value: last_pending_certificate
.map(|(cert, _index)| cert)
.map(Into::into),
}
},
)
})
Expand Down
6 changes: 4 additions & 2 deletions crates/topos-tce-api/src/runtime/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ pub(crate) enum InternalRuntimeCommand {
sender: oneshot::Sender<Result<(u64, Certificate), RuntimeError>>,
},

/// Get source head certificate by source subnet id
/// Get source head certificate and its index (basically number pending certificates) by source subnet id
GetLastPendingCertificates {
subnet_ids: Vec<SubnetId>,
sender: oneshot::Sender<Result<HashMap<SubnetId, Option<Certificate>>, RuntimeError>>,
#[allow(clippy::type_complexity)]
sender:
oneshot::Sender<Result<HashMap<SubnetId, Option<(Certificate, u64)>>, RuntimeError>>,
},
}
4 changes: 3 additions & 1 deletion crates/topos-tce-api/src/runtime/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub enum RuntimeEvent {

GetLastPendingCertificates {
subnet_ids: HashSet<SubnetId>,
sender: oneshot::Sender<Result<HashMap<SubnetId, Option<Certificate>>, RuntimeError>>,
#[allow(clippy::type_complexity)]
sender:
oneshot::Sender<Result<HashMap<SubnetId, Option<(Certificate, u64)>>, RuntimeError>>,
},
}
45 changes: 29 additions & 16 deletions crates/topos-tce-proxy/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use tokio_stream::StreamExt;

use tonic::IntoRequest;
use topos_core::api::grpc::checkpoints::{TargetCheckpoint, TargetStreamPosition};
use topos_core::api::grpc::tce::v1::{GetLastPendingCertificatesRequest, GetSourceHeadRequest};
use topos_core::api::grpc::tce::v1::{
GetLastPendingCertificatesRequest, GetSourceHeadRequest, GetSourceHeadResponse,
};
use topos_core::{
api::grpc::tce::v1::{
watch_certificates_request, watch_certificates_response, SubmitCertificateRequest,
Expand All @@ -27,12 +29,13 @@ pub(crate) enum TceClientCommand {
// Get head certificate that was sent to the TCE node for this subnet
GetSourceHead {
subnet_id: SubnetId,
sender: oneshot::Sender<Result<Certificate, Error>>,
sender: oneshot::Sender<Result<(Certificate, u64), Error>>,
},
// Get map of subnet id->last pending certificate
GetLastPendingCertificates {
subnet_ids: Vec<SubnetId>,
sender: oneshot::Sender<Result<HashMap<SubnetId, Option<Certificate>>, Error>>,
#[allow(clippy::type_complexity)]
sender: oneshot::Sender<Result<HashMap<SubnetId, Option<(Certificate, u64)>>, Error>>,
},
// Open the stream to the TCE node
// Mark the position from which TCE node certificates should be retrieved
Expand Down Expand Up @@ -86,11 +89,12 @@ impl TceClient {
Ok(())
}

pub async fn get_source_head(&mut self) -> Result<Certificate, Error> {
// Return source head and position of the certificate
pub async fn get_source_head(&mut self) -> Result<(Certificate, u64), Error> {
#[allow(clippy::type_complexity)]
let (sender, receiver): (
oneshot::Sender<Result<Certificate, Error>>,
oneshot::Receiver<Result<Certificate, Error>>,
oneshot::Sender<Result<(Certificate, u64), Error>>,
oneshot::Receiver<Result<(Certificate, u64), Error>>,
) = oneshot::channel();
self.command_sender
.send(TceClientCommand::GetSourceHead {
Expand All @@ -106,7 +110,7 @@ impl TceClient {
pub async fn get_last_pending_certificates(
&mut self,
subnet_ids: Vec<SubnetId>,
) -> Result<HashMap<SubnetId, Option<Certificate>>, Error> {
) -> Result<HashMap<SubnetId, Option<(Certificate, u64)>>, Error> {
#[allow(clippy::type_complexity)]
let (sender, receiver) = oneshot::channel();
self.command_sender
Expand Down Expand Up @@ -400,20 +404,26 @@ impl TceClientBuilder {
break;
}
Some(TceClientCommand::GetSourceHead {subnet_id, sender}) => {
let result: Result<Certificate, Error> = match tce_grpc_client
let result: Result<(Certificate, u64), Error> = match tce_grpc_client
.get_source_head(GetSourceHeadRequest {
subnet_id: Some(subnet_id.into())
})
.await
.map(|r| r.into_inner().certificate) {
Ok(Some(certificate)) => Ok(certificate.try_into().map_err(|_| Error::InvalidCertificate)?),
Ok(None) => {
.map(|r| r.into_inner()) {
Ok(GetSourceHeadResponse {
position: Some(pos),
certificate: Some(cert),
}) => {
Ok((cert.try_into().map_err(|_| Error::InvalidCertificate)?,
pos.position))
},
Ok(_) => {
Err(Error::SourceHeadEmpty{subnet_id})
},
Err(e) => {
Err(Error::UnableToGetSourceHeadCertificate{subnet_id, details: e.to_string()})
},
};
}
};

if sender.send(result).is_err() {
error!("Unable to pass result of the source head, channel failed");
Expand All @@ -440,10 +450,12 @@ impl TceClientBuilder {
)
.map_err(|_| Error::InvalidSubnetId)?;

let certificate: Option<Certificate> =
let index: u64 = last_pending_certificate.index as u64;
let certificate_and_index: Option<(Certificate, u64)> =
match last_pending_certificate.value {
Some(certificate) => Some(
Certificate::try_from(certificate)
.map(|certificate| (certificate, index))
.map_err(
|e| Error::UnableToGetLastPendingCertificates {
details: e.to_string(),
Expand All @@ -454,12 +466,13 @@ impl TceClientBuilder {
None => None,
};


Ok((
subnet_id,
certificate,
certificate_and_index
))
})
.collect::<Result<HashMap<SubnetId, Option<Certificate>>, Error>>()?;
.collect::<Result<HashMap<SubnetId, Option<(Certificate, u64)>>, Error>>()?;
Ok(result)
}
Err(e) => Err(Error::UnableToGetLastPendingCertificates {
Expand Down
Loading

0 comments on commit 631f168

Please sign in to comment.