diff --git a/.github/workflows/multichain-integration.yml b/.github/workflows/multichain-integration.yml index 2cefeeb59..a9bd804bc 100644 --- a/.github/workflows/multichain-integration.yml +++ b/.github/workflows/multichain-integration.yml @@ -84,5 +84,5 @@ jobs: working-directory: ./integration-tests/chain-signatures run: cargo test --jobs 1 -- --test-threads 1 env: - RUST_LOG: INFO + RUST_LOG: info,workspaces=warn RUST_BACKTRACE: 1 diff --git a/chain-signatures/node/src/gcp/mod.rs b/chain-signatures/node/src/gcp/mod.rs index 6efa35087..84a6f984f 100644 --- a/chain-signatures/node/src/gcp/mod.rs +++ b/chain-signatures/node/src/gcp/mod.rs @@ -280,31 +280,40 @@ impl DatastoreService { ) })?; - batch.entity_results.ok_or_else(|| { - DatastoreStorageError::FetchEntitiesError( - "Could not retrieve entity results while fetching entities".to_string(), - ) - }) + // NOTE: if entity_results is None, we return an empty Vec since the fetch query + // could not find any entities in the DB. + Ok(batch.entity_results.unwrap_or_default()) } #[tracing::instrument(level = "debug", skip_all)] pub async fn delete(&self, keyable: T) -> DatastoreResult<()> { - let mut key = keyable.key(); - if let Some(path) = key.path.as_mut().and_then(|p| p.first_mut()) { - path.kind = Some(format!("{}-{}", T::kind(), self.env)); - } + self.delete_many(&[keyable]).await + } + + #[tracing::instrument(level = "debug", skip_all)] + pub async fn delete_many(&self, keyables: &[T]) -> DatastoreResult<()> { + let mutations = keyables + .iter() + .map(|keyable| { + let mut key = keyable.key(); + if let Some(path) = key.path.as_mut().and_then(|p| p.first_mut()) { + path.kind = Some(format!("{}-{}", T::kind(), self.env)); + } + Mutation { + insert: None, + delete: Some(key), + update: None, + base_version: None, + upsert: None, + update_time: None, + } + }) + .collect::>(); let request = CommitRequest { database_id: Some("".to_string()), mode: Some(String::from("NON_TRANSACTIONAL")), - mutations: Some(vec![Mutation { - insert: None, - delete: Some(key), - update: None, - base_version: None, - upsert: None, - update_time: None, - }]), + mutations: Some(mutations), single_use_transaction: None, transaction: None, }; diff --git a/chain-signatures/node/src/protocol/consensus.rs b/chain-signatures/node/src/protocol/consensus.rs index a08088921..71e49d44e 100644 --- a/chain-signatures/node/src/protocol/consensus.rs +++ b/chain-signatures/node/src/protocol/consensus.rs @@ -358,6 +358,12 @@ impl ConsensusProtocol for WaitingForConsensusState { .find_participant(ctx.my_account_id()) .unwrap(); + // Clear triples from storage before starting the new epoch. This is necessary if the node has accumulated + // triples from previous epochs. If it was not able to clear the previous triples, we'll leave them as-is + if let Err(err) = ctx.triple_storage().write().await.clear().await { + tracing::warn!(?err, "failed to clear triples from storage"); + } + let triple_manager = TripleManager::new( me, self.threshold, @@ -725,8 +731,7 @@ async fn load_triples( let mut retries = 3; let mut error = None; while retries > 0 { - let read_lock = triple_storage.read().await; - match read_lock.load().await { + match triple_storage.read().await.load().await { Err(DatastoreStorageError::FetchEntitiesError(_)) => { tracing::info!("There are no triples persisted."); return Ok(vec![]); diff --git a/chain-signatures/node/src/protocol/message.rs b/chain-signatures/node/src/protocol/message.rs index 94bcc373a..a4d7292f0 100644 --- a/chain-signatures/node/src/protocol/message.rs +++ b/chain-signatures/node/src/protocol/message.rs @@ -298,8 +298,14 @@ impl MessageHandler for RunningState { leftover_messages.push(message) } Err(presignature::GenerationError::TripleIsMissing(_)) => { - // Store the message until triple is ready - leftover_messages.push(message) + // If a triple is missing, that means our system cannot process this presignature. We will have to bin + // this message and have the other node timeout on that generation. + tracing::warn!( + presignature_id = id, + triple0 = message.triple0, + triple1 = message.triple1, + "unable to process presignature: one or more triples are missing", + ); } Err(presignature::GenerationError::CaitSithInitializationError(error)) => { // ignore the message since the generation had bad parameters. Also have the other node who @@ -311,10 +317,6 @@ impl MessageHandler for RunningState { ); continue; } - Err(presignature::GenerationError::DatastoreStorageError(_)) => { - // Store the message until we are ready to process it - leftover_messages.push(message) - } } } if !leftover_messages.is_empty() { diff --git a/chain-signatures/node/src/protocol/presignature.rs b/chain-signatures/node/src/protocol/presignature.rs index 81ed3e100..9cd01c92c 100644 --- a/chain-signatures/node/src/protocol/presignature.rs +++ b/chain-signatures/node/src/protocol/presignature.rs @@ -1,6 +1,5 @@ use super::message::PresignatureMessage; use super::triple::{Triple, TripleConfig, TripleId, TripleManager}; -use crate::gcp::error::DatastoreStorageError; use crate::protocol::contract::primitives::Participants; use crate::types::{PresignatureProtocol, SecretKeyShare}; use crate::util::AffinePointExt; @@ -87,8 +86,6 @@ pub enum GenerationError { TripleIsMissing(TripleId), #[error("cait-sith initialization error: {0}")] CaitSithInitializationError(#[from] InitializationError), - #[error("datastore storage error: {0}")] - DatastoreStorageError(#[from] DatastoreStorageError), #[error("triple {0} is generating")] TripleIsGenerating(TripleId), } diff --git a/chain-signatures/node/src/storage/triple_storage.rs b/chain-signatures/node/src/storage/triple_storage.rs index 4101296ba..f01d6d0eb 100644 --- a/chain-signatures/node/src/storage/triple_storage.rs +++ b/chain-signatures/node/src/storage/triple_storage.rs @@ -55,6 +55,19 @@ impl KeyKind for TripleData { } } +impl Keyable for TripleData { + fn key(&self) -> Key { + Key { + path: Some(vec![PathElement { + kind: None, + name: Some(format!("{}/{}", self.account_id, self.triple.id)), + id: None, + }]), + partition_id: None, + } + } +} + impl IntoValue for TripleData { fn into_value(self) -> Value { let triple_key = TripleKey { @@ -147,6 +160,7 @@ type TripleResult = std::result::Result; pub trait TripleNodeStorage { async fn insert(&mut self, triple: Triple, mine: bool) -> TripleResult<()>; async fn delete(&mut self, id: TripleId) -> TripleResult<()>; + async fn clear(&mut self) -> TripleResult>; async fn load(&self) -> TripleResult>; fn account_id(&self) -> &AccountId; } @@ -174,6 +188,13 @@ impl TripleNodeStorage for MemoryTripleNodeStorage { Ok(()) } + async fn clear(&mut self) -> TripleResult> { + let res = self.load().await?; + self.triples.clear(); + self.mine.clear(); + Ok(res) + } + async fn load(&self) -> TripleResult> { let mut res: Vec = vec![]; for (triple_id, triple) in self.triples.clone() { @@ -232,6 +253,12 @@ impl TripleNodeStorage for DataStoreTripleNodeStorage { Ok(()) } + async fn clear(&mut self) -> TripleResult> { + let triples = self.load().await?; + self.datastore.delete_many(&triples).await?; + Ok(triples) + } + async fn load(&self) -> TripleResult> { tracing::debug!("loading triples using datastore"); let filter = if self.datastore.is_emulator() {